Merge branch 'backend_manual' of gitlab2.kylin.com:kylin-desktop/update-manager-group/kylin-system-updater into backend_manual

This commit is contained in:
Xueyi Luo 2022-04-20 10:11:22 +08:00
commit c615486626
4 changed files with 89 additions and 19 deletions

4
Makefile Normal file
View File

@ -0,0 +1,4 @@
all:updater
updater:
cd backend && ./setup.py build

View File

@ -53,11 +53,15 @@ from urllib.parse import urlsplit
from copy import copy
import psutil
import ctypes
from ctypes import *
import struct
# 禁止关机锁文件路径
FILELOCK_PATH = "/tmp/lock/"
SHUTDOWN_BLOCK_FILELOCK = "kylin-update.lock"
pidfile = None
VERIFY_SO = "libkylin_signtool.so"
class ExecutionTime(object):
"""
@ -885,6 +889,71 @@ def get_proc_from_dbus_name(dbus_name, bus=None):
return "root"
return proc.name()
def deb_verify(deb_path, _isinstall = False):
logging.info("Verify pkg:%s.",deb_path)
_deb_path = str(deb_path)
try:
# 加载验证签名库
args = ["dpkg-architecture", "-qDEB_TARGET_MULTIARCH"]
ret = subprocess.run(args, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,text=True)
verifyso_path = os.path.join("/usr/lib/",str(ret.stdout).strip(),VERIFY_SO)
logging.info("Load verify interface:%s.",verifyso_path)
verifyso = ctypes.CDLL(verifyso_path)
ctx = StuStruct()
ctx_obj = pointer(ctx)
#环境初始化
ret = verifyso.SOF_Initialize(ctx_obj)
if (ret) :
logging.info("SOF_InitializeEx error!")
return 2
if os.path.isfile(_deb_path):
ret = verifyso.BJCA_dodebverify(None, bytes(_deb_path, encoding='utf8'), _isinstall)
if (ret == 0):
logging.info("Signature Verified Ok")
verifyso.SOF_Finalize(ctx_obj)
return 0
else:
logging.info("Signature Verified failed")
verifyso.SOF_Finalize(ctx_obj)
return 4
# verifyso.SOF_Finalize(ctx)
else:
return 3
except Exception as e:
logging.error(e)
return 1
class StuStruct(Structure):
# _fields_是容纳每个结构体成员类型和值的列表可以配合自动生成fields list和value list的函数使用
# pass
# """
# 也可以直接初始化,适用于结构体数量不多的情况
_fields_ = []
# """
def PolicyKit_Authority(details = '', sender = None):
try:
details = {'polkit.message':details}
cancel_id = ''
action = "cn.kylinos.KylinSystemUpdater.action"
kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority')
(granted, notused , details) = kit.CheckAuthorization(
('system-bus-name', {'name': sender}),
action, details, dbus.UInt32(1),cancel_id, timeout=600)
if granted:
logging.info("Authentication success ...")
return True,_("Authentication success.")
else:
logging.info("Authentication failure ...")
return False,_("Authentication failure.")
except Exception as e:
logging.error(e)
return False,str(e)
if __name__ == "__main__":
#print(mirror_from_sources_list())

View File

@ -26,7 +26,7 @@ from gettext import gettext as _
from SystemUpdater.Core.UpdaterConfigParser import UpgradeConfig
from SystemUpdater.Core.utils import get_broken_details,get_lis_from_cache,KillProcessUU
from SystemUpdater.Core.DpkgInstallProgress import LogInstallProgress
from SystemUpdater.Core.utils import inhibit_sleep,plymouth_splash
from SystemUpdater.Core.utils import inhibit_sleep,plymouth_splash,deb_verify,PolicyKit_Authority
class UpdateManager():
BACKEND_PKG_NAME = 'kylin-system-updater'
@ -272,13 +272,19 @@ class UpdateManager():
return False
# 进行本地deb包安装的操作
def start_deb_install(self, deb_path = "", _check_local_dep = False, _auto_satisfy = False):
def start_deb_install(self, deb_path = "", _check_local_dep = False, _auto_satisfy = False, sender=None):
# _check_local_dep : 是否查询本地依赖
# _auto_satisfy : 是否通过网络下载依赖
header = ''
desc = ''
absolute_path, debname = os.path.split(deb_path)
try:
# 验签提权
if deb_verify(deb_path) != 0: #验签失败,提权
(status,error_string) = PolicyKit_Authority(_("Kylin Syetm Updater Will Install pkgs."),sender)
if not status:
self.dbusController.PurgePackagesFinished(False,error_string,'')
return
deb_cache, ins = self._suit_install_mode(deb_path)
if self._is_broken > 0 or not self.cacheSatisfy or self._need_downgrade:
# 走 dpkg 安装流程说明本地apt环境已经损坏,or dep not satisfied or need downgrade
@ -774,6 +780,7 @@ class UpdateManager():
def _suit_install_mode(self, deb_path):
self._is_broken = False
self.cacheSatisfy = False
_is_install = False
absolute_path, debname = os.path.split(deb_path)
# 检查本地破损
try:
@ -812,7 +819,7 @@ class UpdateManager():
else:
self.cacheSatisfy = False
logging.info("Cache satisfy is %r.",self.cacheSatisfy)
return deb_cache, install
return deb_cache, install, _is_install
def _gen_noSatisfyList(self, depends, deb_cache):
_noSatisfyList = []

View File

@ -10,7 +10,7 @@ from gettext import gettext as _
from .backend import InstallBackend
from .Core.loop import mainloop
from .Core.utils import humanize_size
from SystemUpdater.Core.utils import unLockedEnableShutdown,get_proc_from_dbus_name
from SystemUpdater.Core.utils import unLockedEnableShutdown,get_proc_from_dbus_name,PolicyKit_Authority
import locale
UPDATER_DBUS_INTERFACE = 'com.kylin.systemupgrade.interface'
@ -413,20 +413,10 @@ class UpdateManagerDbusController(dbus.service.Object):
@dbus.service.method(UPDATER_DBUS_INTERFACE,in_signature='ass',out_signature='bs',sender_keyword='sender')
def PurgePackages(self,_purge_list,cur_user,sender=None):
try:
details = {'polkit.message':_("Kylin Installer need to uninstall the package")}
cancel_id = ''
action = "cn.kylinos.KylinSystemUpdater.action"
kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority')
(granted, notused , details) = kit.CheckAuthorization(
('system-bus-name', {'name': sender}),
action, details, dbus.UInt32(1),cancel_id, timeout=600)
if granted:
logging.info("Authentication success ...")
else:
logging.info("Authentication failure ...")
self.PurgePackagesFinished(False,_("Authentication failure."),'')
return False
(status, details) = PolicyKit_Authority(_("Kylin Installer need to uninstall the package"), sender)
if not status:
self.PurgePackagesFinished(False,details,'')
return False,details
purge_list = [str(pkg) for pkg in _purge_list]
@ -491,7 +481,7 @@ class UpdateManagerDbusController(dbus.service.Object):
logging.info(COLORMETHOR_PREFIX+'Method'+COLORLOG_SUFFIX+' InstallDebFile and check_local_dep:%r, auto_satisfy:%r.',\
check_local_dep,auto_satisfy)
logging.info("Will install: %s.",path)
self.parent.start_deb_install(deb_path, _check_local_dep, _auto_satisfy)
self.parent.start_deb_install(deb_path, _check_local_dep, _auto_satisfy,sender)
return True
except Exception as e:
logging.error(str(e))