diff --git a/backend/SystemUpdater/Core/utils.py b/backend/SystemUpdater/Core/utils.py index 93607f9..98c45d6 100644 --- a/backend/SystemUpdater/Core/utils.py +++ b/backend/SystemUpdater/Core/utils.py @@ -53,11 +53,15 @@ from urllib.parse import urlsplit from copy import copy import psutil +import ctypes +from ctypes import * +import struct # 禁止关机锁文件路径 FILELOCK_PATH = "/tmp/lock/" SHUTDOWN_BLOCK_FILELOCK = "kylin-update.lock" pidfile = None +VERIFY_SO = "libkylin_signtool.so" class ExecutionTime(object): """ @@ -885,6 +889,71 @@ def get_proc_from_dbus_name(dbus_name, bus=None): return "root" return proc.name() + +def deb_verify(deb_path, _isinstall = False): + logging.info("Verify pkg:%s.",deb_path) + _deb_path = str(deb_path) + try: + # 加载验证签名库 + args = ["dpkg-architecture", "-qDEB_TARGET_MULTIARCH"] + ret = subprocess.run(args, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,text=True) + verifyso_path = os.path.join("/usr/lib/",str(ret.stdout).strip(),VERIFY_SO) + logging.info("Load verify interface:%s.",verifyso_path) + verifyso = ctypes.CDLL(verifyso_path) + ctx = StuStruct() + ctx_obj = pointer(ctx) + + #环境初始化 + ret = verifyso.SOF_Initialize(ctx_obj) + if (ret) : + logging.info("SOF_InitializeEx error!") + return 2 + + if os.path.isfile(_deb_path): + ret = verifyso.BJCA_dodebverify(None, bytes(_deb_path, encoding='utf8'), _isinstall) + if (ret == 0): + logging.info("Signature Verified Ok") + verifyso.SOF_Finalize(ctx_obj) + return 0 + else: + logging.info("Signature Verified failed") + verifyso.SOF_Finalize(ctx_obj) + return 4 + # verifyso.SOF_Finalize(ctx) + else: + return 3 + except Exception as e: + logging.error(e) + return 1 + +class StuStruct(Structure): + # _fields_是容纳每个结构体成员类型和值的列表,可以配合自动生成fields list和value list的函数使用 + # pass + # """ + # 也可以直接初始化,适用于结构体数量不多的情况 + _fields_ = [] + # """ + +def PolicyKit_Authority(details = '', sender = None): + try: + + details = {'polkit.message':details} + cancel_id = '' + action = "cn.kylinos.KylinSystemUpdater.action" + kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority') + kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority') + (granted, notused , details) = kit.CheckAuthorization( + ('system-bus-name', {'name': sender}), + action, details, dbus.UInt32(1),cancel_id, timeout=600) + if granted: + logging.info("Authentication success ...") + return True,_("Authentication success.") + else: + logging.info("Authentication failure ...") + return False,_("Authentication failure.") + except Exception as e: + logging.error(e) + return False,str(e) if __name__ == "__main__": #print(mirror_from_sources_list()) diff --git a/backend/SystemUpdater/UpdateManager.py b/backend/SystemUpdater/UpdateManager.py index 7e76c6e..71c4264 100644 --- a/backend/SystemUpdater/UpdateManager.py +++ b/backend/SystemUpdater/UpdateManager.py @@ -26,7 +26,7 @@ from gettext import gettext as _ from SystemUpdater.Core.UpdaterConfigParser import UpgradeConfig from SystemUpdater.Core.utils import get_broken_details,get_lis_from_cache,KillProcessUU from SystemUpdater.Core.DpkgInstallProgress import LogInstallProgress -from SystemUpdater.Core.utils import inhibit_sleep,plymouth_splash +from SystemUpdater.Core.utils import inhibit_sleep,plymouth_splash,deb_verify,PolicyKit_Authority class UpdateManager(): BACKEND_PKG_NAME = 'kylin-system-updater' @@ -272,13 +272,19 @@ class UpdateManager(): return False # 进行本地deb包安装的操作 - def start_deb_install(self, deb_path = "", _check_local_dep = False, _auto_satisfy = False): + def start_deb_install(self, deb_path = "", _check_local_dep = False, _auto_satisfy = False, sender=None): # _check_local_dep : 是否查询本地依赖 # _auto_satisfy : 是否通过网络下载依赖 header = '' desc = '' absolute_path, debname = os.path.split(deb_path) try: + # 验签提权 + if deb_verify(deb_path) != 0: #验签失败,提权 + (status,error_string) = PolicyKit_Authority(_("Kylin Syetm Updater Will Install pkgs."),sender) + if not status: + self.dbusController.PurgePackagesFinished(False,error_string,'') + return deb_cache, ins = self._suit_install_mode(deb_path) if self._is_broken > 0 or not self.cacheSatisfy or self._need_downgrade: # 走 dpkg 安装流程,说明本地apt环境已经损坏,or dep not satisfied or need downgrade @@ -774,6 +780,7 @@ class UpdateManager(): def _suit_install_mode(self, deb_path): self._is_broken = False self.cacheSatisfy = False + _is_install = False absolute_path, debname = os.path.split(deb_path) # 检查本地破损 try: @@ -812,7 +819,7 @@ class UpdateManager(): else: self.cacheSatisfy = False logging.info("Cache satisfy is %r.",self.cacheSatisfy) - return deb_cache, install + return deb_cache, install, _is_install def _gen_noSatisfyList(self, depends, deb_cache): _noSatisfyList = [] diff --git a/backend/SystemUpdater/UpdateManagerDbus.py b/backend/SystemUpdater/UpdateManagerDbus.py index 0e6f4f1..ae62ee9 100755 --- a/backend/SystemUpdater/UpdateManagerDbus.py +++ b/backend/SystemUpdater/UpdateManagerDbus.py @@ -10,7 +10,7 @@ from gettext import gettext as _ from .backend import InstallBackend from .Core.loop import mainloop from .Core.utils import humanize_size -from SystemUpdater.Core.utils import unLockedEnableShutdown,get_proc_from_dbus_name +from SystemUpdater.Core.utils import unLockedEnableShutdown,get_proc_from_dbus_name,PolicyKit_Authority import locale UPDATER_DBUS_INTERFACE = 'com.kylin.systemupgrade.interface' @@ -413,20 +413,10 @@ class UpdateManagerDbusController(dbus.service.Object): @dbus.service.method(UPDATER_DBUS_INTERFACE,in_signature='ass',out_signature='bs',sender_keyword='sender') def PurgePackages(self,_purge_list,cur_user,sender=None): try: - details = {'polkit.message':_("Kylin Installer need to uninstall the package")} - cancel_id = '' - action = "cn.kylinos.KylinSystemUpdater.action" - kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority') - kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority') - (granted, notused , details) = kit.CheckAuthorization( - ('system-bus-name', {'name': sender}), - action, details, dbus.UInt32(1),cancel_id, timeout=600) - if granted: - logging.info("Authentication success ...") - else: - logging.info("Authentication failure ...") - self.PurgePackagesFinished(False,_("Authentication failure."),'') - return False + (status, details) = PolicyKit_Authority(_("Kylin Installer need to uninstall the package"), sender) + if not status: + self.PurgePackagesFinished(False,details,'') + return False,details purge_list = [str(pkg) for pkg in _purge_list] @@ -491,7 +481,7 @@ class UpdateManagerDbusController(dbus.service.Object): logging.info(COLORMETHOR_PREFIX+'Method'+COLORLOG_SUFFIX+' InstallDebFile and check_local_dep:%r, auto_satisfy:%r.',\ check_local_dep,auto_satisfy) logging.info("Will install: %s.",path) - self.parent.start_deb_install(deb_path, _check_local_dep, _auto_satisfy) + self.parent.start_deb_install(deb_path, _check_local_dep, _auto_satisfy,sender) return True except Exception as e: logging.error(str(e)) diff --git a/debian/changelog b/debian/changelog index aa3dc5f..708a092 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,12 @@ +kylin-system-updater (2.0.1kord) v101; urgency=medium + + * BUG: 无 + * 需求号: 无 + * 其他改动说明: 增加开机定时下载、关机安装功能 + * 其他改动影响域:系统更新 + + -- luoxueyi Tue, 19 Apr 2022 17:40:34 +0800 + kylin-system-updater (2.0.0kord) v101; urgency=medium * BUG: 无