3.0.0.0-ok11

This commit is contained in:
wangsong 2024-01-24 11:15:22 +08:00
commit b85534051d
10 changed files with 142 additions and 39 deletions

View File

@ -60,6 +60,10 @@ class Sqlite3Server(object):
self.deb_metadata.update({"caller":''})
self.deb_metadata.update({"old_version":''})
self.deb_metadata.update({"new_version":''})
self.deb_policy_timestamp = 0
self.deb_policy_keep = False
self.purge_policy_timestamp = 0
self.purge_policy_keep = False
# Initialize the connection database and modify it to connect when using
def init_sqlit(self):

View File

@ -730,26 +730,29 @@ def get_proc_from_dbus_name(dbus_name, bus=None):
def deb_verify(deb_path, _isinstall = False):
logging.info("Verify pkg:%s.",deb_path)
_deb_path = str(deb_path)
_verify_status = False
try:
# # 加载验证签名库 , 验签接口暂时无法调用
# 加载验证签名库 , 验签接口暂时无法调用
if not os.path.isfile("/usr/bin/kylinsigntool"):
logging.error("SOF_InitializeEx error!")
return 1
return _verify_status
args = ["/usr/bin/kylinsigntool", "-v", _deb_path]
ret = subprocess.run(args, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,text=True)
if "Signature Verified failed" in str(ret.stdout).strip() or "签名验证失败" in str(ret.stdout).strip():
if "Signature Verified failed" in str(ret.stdout).strip() or "签名验证失败" in str(ret.stdout).strip() \
or "Deb signature does not exist" in str(ret.stdout).strip() or "签名不存在" in str(ret.stdout).strip() \
or "证书验证失败" in str(ret.stdout).strip():
logging.info("Signature Verified failed!")
elif "Signature Verified Ok" in str(ret.stdout).strip() or "签名验证成功" in str(ret.stdout).strip():
elif "Signature Verified Ok" in str(ret.stdout).strip() or "签名验证成功" in str(ret.stdout).strip() \
or "Certificate verification is successful" in str(ret.stdout).strip() or "证书验证成功" in str(ret.stdout).strip():
logging.info("Signature Verified Ok!")
return 0
_verify_status = True
else:
logging.error("Signature Verified failed:%s.",ret)
return 2
except Exception as e:
logging.error(e)
return 3
return _verify_status
def PolicyKit_Authority(details = '', sender = None, InstPolicy = False, source=''):
def PolicyKit_Authority(details = '', sender = None, InstPolicy = False, authentication = False, source=''):
_allow_kylinsign = False
_verify_kylinsign = False
try:
@ -760,36 +763,48 @@ def PolicyKit_Authority(details = '', sender = None, InstPolicy = False, source=
with open(inst_policies_path, "r") as f:
lines = f.readlines()
for line in lines:
if "allow-kylinsign" in line:
if "allow-kylinsign" in line and "#allow-kylinsign" not in line:
_allow_kylinsign = True
if "verify-kylinsign" in line:
if "verify-kylinsign" in line and "#verify-kylinsign" not in line:
_verify_kylinsign = True
if _allow_kylinsign == True and _verify_kylinsign == False: #策略: 阻止
logging.debug("unknown sources apply installation policies: deter")
return False,_("The package is unsigned, refuses to install.")
logging.info("unknown sources apply installation policies: deter")
return True,"USI-policy-deter"
elif _allow_kylinsign == True and _verify_kylinsign == True: #策略: 警告
logging.debug("unknown sources apply installation policies: warning")
logging.info("unknown sources apply installation policies: warning")
elif _allow_kylinsign == False and _verify_kylinsign == False: #策略: 关闭
logging.debug("unknown sources apply installation policies: close")
logging.info("unknown sources apply installation policies: close")
return True,"USI-policy-close"
else:
logging.warning("Unknown sources apply installation policies get failed.")
#用户鉴权
logging.debug("Authentication via PolicyKit .")
logging.info("Authentication via PolicyKit .")
details = {'polkit.message':details}
cancel_id = ''
action = get_policykit_authority_action_enum(source)
kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority')
(granted, notused , details) = kit.CheckAuthorization(
('system-bus-name', {'name': sender}),
action, details, dbus.UInt32(1),cancel_id, timeout=60*60*24*7)
if granted:
logging.info("Authentication success ...")
return True,_("Authentication success.")
if False:
source=source+'-self'
logging.info('authentication status: %r.',authentication)
if True == authentication:
action = get_policykit_authority_action_enum(source)
logging.info("PolicyKit source: %s, action: %s.",source,action)
kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority')
(granted, notused , details) = kit.CheckAuthorization(
('system-bus-name', {'name': sender}),
action, details, dbus.UInt32(1),cancel_id, timeout=60*60*24*7)
if granted:
logging.info("Authentication success ...")
return True,_("Authentication success.")
else:
logging.info("Cancel authentication ...")
return False,_("Cancel authentication.")
else:
logging.info("Authentication failure ...")
return False,_("Authentication failure.")
return True,_("Authentication success.")
except Exception as e:
logging.error(e)
return False,str(e)

View File

@ -453,12 +453,43 @@ class UpdateManager():
sender_name = get_proc_from_dbus_name(sender)
caller = get_caller_from_enum(sender_name)
caller_trans = get_source_name_from_enum(sender_name)
if deb_verify(deb_path) != 0: #验签失败,提权
(status,error_string) = PolicyKit_Authority(caller_trans+_(" requires authentication to install software packages."),
sender,InstPolicy=True,source=source)
if not status:
self.dbusController.InstalldebFinished(False,error_string,'')
return
if not deb_verify(deb_path): #验签失败,提权
if not self.sqlite3_server.deb_policy_keep:
(status,error_string) = PolicyKit_Authority(caller_trans+_(" requires authentication to install software packages."),
sender = sender, InstPolicy = True,
authentication = self.configs_uncover.getWithDefault("InstallAndPurge","install_authority",True),
source=source)
if not status:
self.dbusController.InstalldebFinished(False,error_string,'')
return
else:
logging.info("Start check deb policy timeout...")
if error_string == "USI-policy-close" or error_string == "USI-policy-deter":
self.sqlite3_server.deb_policy_keep = False
self.sqlite3_server.deb_policy_timestamp = 0
else:
self.sqlite3_server.deb_policy_keep = True
def _check_deb_policy():
if self.sqlite3_server.deb_policy_timestamp % 10 == 0:
logging.info("Checking for deb policy timeout(%d)...",self.sqlite3_server.deb_policy_timestamp)
if (self.sqlite3_server.deb_policy_timestamp <= 0):
logging.warning("Deb policy timeout")
self.sqlite3_server.deb_policy_keep = False
return False
else:
self.sqlite3_server.deb_policy_timestamp = self.sqlite3_server.deb_policy_timestamp - 1
return True
from gi.repository import GLib
self.sqlite3_server.deb_policy_timestamp = 60 * 5
GLib.timeout_add_seconds(1,_check_deb_policy)
else:
self.sqlite3_server.deb_policy_timestamp = 60 * 5
logging.info("Deb policy keep, ignore...")
self.deb_obj.update({"debname":str(debname)})
self.deb_obj.update({"old_version":""})
self.deb_obj.update({"source":str(caller)})

View File

@ -420,11 +420,35 @@ class UpdateManagerDbusController(dbus.service.Object):
purge_list = [str(pkg) for pkg in _purge_list]
sender_name = get_proc_from_dbus_name(sender)
logging.info(COLORMETHOR_PREFIX+'Method'+COLORLOG_SUFFIX+' DistPurgePackages Sender:%s and purge list is:%s...',sender_name, purge_list)
(status, details) = PolicyKit_Authority(get_source_name_from_enum(sender_name)+_(" requires authentication to uninstall software packages."),
sender,source=sender_name)
if not status:
self.PurgePackagesFinished(False,details,'')
return self.RETURN_UNKNOWN_CODE,details
if not self.parent.sqlite3_server.purge_policy_keep:
(status, details) = PolicyKit_Authority(get_source_name_from_enum(sender_name)+_(" requires authentication to uninstall software packages."),
sender = sender, InstPolicy = False,
authentication = self.parent.configs_uncover.getWithDefault("InstallAndPurge","purge_authority",False),
source=sender_name)
if not status:
self.PurgePackagesFinished(False,details,'')
return self.RETURN_UNKNOWN_CODE,details
else:
logging.info("Start check purge policy timeout...")
self.parent.sqlite3_server.purge_policy_keep = True
def _check_purge_policy():
if self.parent.sqlite3_server.purge_policy_timestamp % 10 == 0:
logging.info("Checking for purge policy timeout(%d)...",self.parent.sqlite3_server.purge_policy_timestamp)
if (self.parent.sqlite3_server.purge_policy_timestamp <= 0):
logging.warning("Purge policy timeout")
self.parent.sqlite3_server.purge_policy_keep = False
return False
else:
self.parent.sqlite3_server.purge_policy_timestamp = self.parent.sqlite3_server.purge_policy_timestamp - 1
return True
from gi.repository import GLib
self.parent.sqlite3_server.purge_policy_timestamp = 60 * 5
GLib.timeout_add_seconds(1,_check_purge_policy)
else:
self.parent.sqlite3_server.purge_policy_timestamp = 60 * 5
logging.info("Purge policy keep, ignore...")
#目前只有360使用这个环境变量 当其他包也使用时 可以将这个权限放开
if True:

View File

@ -7,4 +7,8 @@ upload_installer_log = False
[InstallMode]
shutdown_install = False
manual_install = False
auto_install = False
auto_install = False
[InstallAndPurge]
install_authority = True
purge_authority = False

View File

@ -199,6 +199,10 @@ msgstr "བདེན་དཔང་ར་སྤྲོད་ལེགས་འག
msgid "Authentication failure."
msgstr "བདེན་དཔང་ར་སྤྲོད་ཕམ་སོང་།"
#: ../SystemUpdater/Core/utils.py:753
msgid "Cancel authentication."
msgstr "ཕྱིར་འབུད་བྱ་རྒྱུ།"
#: ../SystemUpdater/Core/enums.py:101
msgid "Deb format exception, read local deb file error."
msgstr "མཉེན་ཆས་ཀྱི་ཁུག་མའི་རྣམ་གཞག་རྒྱུན་ལྡན་མིན་པས་ཕམ་ཁ་བླངས།"

View File

@ -2745,6 +2745,10 @@ msgstr "认证成功"
msgid "Authentication failure."
msgstr "认证失败"
#: ../SystemUpdater/Core/utils.py:753
msgid "Cancel authentication."
msgstr "取消认证"
#: ../SystemUpdater/Core/enums.py:101
msgid "Deb format exception, read local deb file error."
msgstr "软件包格式异常,读取失败。"

View File

@ -2675,6 +2675,10 @@ msgstr "認證成功。"
msgid "Authentication failure."
msgstr "認證失敗。"
#: ../SystemUpdater/Core/utils.py:753
msgid "Cancel authentication."
msgstr "取消認證"
#: ../SystemUpdater/Core/enums.py:101
msgid "Deb format exception, read local deb file error."
msgstr "軟體包格式異常,讀取失敗。"

View File

@ -2707,6 +2707,10 @@ msgstr "認證成功。"
msgid "Authentication failure."
msgstr "認證失敗。"
#: ../SystemUpdater/Core/utils.py:753
msgid "Cancel authentication."
msgstr "取消認證"
#: ../SystemUpdater/Core/enums.py:101
msgid "Deb format exception, read local deb file error."
msgstr "軟體包格式異常,讀取失敗。"

11
debian/changelog vendored
View File

@ -1,4 +1,4 @@
kylin-system-updater (3.0.0.0-ok11) nile; urgency=medium
kylin-system-updater (3.0.0.0-ok12) nile; urgency=medium
* BUG: 无
* 需求号: 无
@ -7,6 +7,15 @@ kylin-system-updater (3.0.0.0-ok11) nile; urgency=medium
-- wangsong <wangsong@kylinos.cn> Wed, 24 Jan 2024 11:13:16 +0800
kylin-system-updater (3.0.0.0-ok11) nile; urgency=medium
* BUG: 无
* 需求号: 无
* 其他改动说明: 安装卸载鉴权可配置,鉴权保持
* 其他改动影响域:可变系统更新
-- wangsong <wangsong@kylinos.cn> Wed, 17 Jan 2024 17:35:00 +0800
kylin-system-updater (3.0.0.0-ok10) nile; urgency=medium
* BUG: 无