安装卸载鉴权可配置,鉴权保持
This commit is contained in:
parent
22c57952bb
commit
1b45cfa1e4
|
@ -60,6 +60,10 @@ class Sqlite3Server(object):
|
|||
self.deb_metadata.update({"caller":''})
|
||||
self.deb_metadata.update({"old_version":''})
|
||||
self.deb_metadata.update({"new_version":''})
|
||||
self.deb_policy_timestamp = 0
|
||||
self.deb_policy_keep = False
|
||||
self.purge_policy_timestamp = 0
|
||||
self.purge_policy_keep = False
|
||||
|
||||
# Initialize the connection database and modify it to connect when using
|
||||
def init_sqlit(self):
|
||||
|
|
|
@ -730,26 +730,29 @@ def get_proc_from_dbus_name(dbus_name, bus=None):
|
|||
def deb_verify(deb_path, _isinstall = False):
|
||||
logging.info("Verify pkg:%s.",deb_path)
|
||||
_deb_path = str(deb_path)
|
||||
_verify_status = False
|
||||
try:
|
||||
# # 加载验证签名库 , 验签接口暂时无法调用
|
||||
# 加载验证签名库 , 验签接口暂时无法调用
|
||||
if not os.path.isfile("/usr/bin/kylinsigntool"):
|
||||
logging.error("SOF_InitializeEx error!")
|
||||
return 1
|
||||
return _verify_status
|
||||
args = ["/usr/bin/kylinsigntool", "-v", _deb_path]
|
||||
ret = subprocess.run(args, stdout=subprocess.PIPE,stderr=subprocess.STDOUT,text=True)
|
||||
if "Signature Verified failed" in str(ret.stdout).strip() or "签名验证失败" in str(ret.stdout).strip():
|
||||
if "Signature Verified failed" in str(ret.stdout).strip() or "签名验证失败" in str(ret.stdout).strip() \
|
||||
or "Deb signature does not exist" in str(ret.stdout).strip() or "签名不存在" in str(ret.stdout).strip() \
|
||||
or "证书验证失败" in str(ret.stdout).strip():
|
||||
logging.info("Signature Verified failed!")
|
||||
elif "Signature Verified Ok" in str(ret.stdout).strip() or "签名验证成功" in str(ret.stdout).strip():
|
||||
elif "Signature Verified Ok" in str(ret.stdout).strip() or "签名验证成功" in str(ret.stdout).strip() \
|
||||
or "Certificate verification is successful" in str(ret.stdout).strip() or "证书验证成功" in str(ret.stdout).strip():
|
||||
logging.info("Signature Verified Ok!")
|
||||
return 0
|
||||
_verify_status = True
|
||||
else:
|
||||
logging.error("Signature Verified failed:%s.",ret)
|
||||
return 2
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
return 3
|
||||
return _verify_status
|
||||
|
||||
def PolicyKit_Authority(details = '', sender = None, InstPolicy = False, source=''):
|
||||
def PolicyKit_Authority(details = '', sender = None, InstPolicy = False, authentication = False, source=''):
|
||||
_allow_kylinsign = False
|
||||
_verify_kylinsign = False
|
||||
try:
|
||||
|
@ -760,36 +763,48 @@ def PolicyKit_Authority(details = '', sender = None, InstPolicy = False, source=
|
|||
with open(inst_policies_path, "r") as f:
|
||||
lines = f.readlines()
|
||||
for line in lines:
|
||||
if "allow-kylinsign" in line:
|
||||
if "allow-kylinsign" in line and "#allow-kylinsign" not in line:
|
||||
_allow_kylinsign = True
|
||||
if "verify-kylinsign" in line:
|
||||
if "verify-kylinsign" in line and "#verify-kylinsign" not in line:
|
||||
_verify_kylinsign = True
|
||||
if _allow_kylinsign == True and _verify_kylinsign == False: #策略: 阻止
|
||||
logging.debug("unknown sources apply installation policies: deter")
|
||||
return False,_("The package is unsigned, refuses to install.")
|
||||
logging.info("unknown sources apply installation policies: deter")
|
||||
return True,"USI-policy-deter"
|
||||
elif _allow_kylinsign == True and _verify_kylinsign == True: #策略: 警告
|
||||
logging.debug("unknown sources apply installation policies: warning")
|
||||
logging.info("unknown sources apply installation policies: warning")
|
||||
elif _allow_kylinsign == False and _verify_kylinsign == False: #策略: 关闭
|
||||
logging.debug("unknown sources apply installation policies: close")
|
||||
logging.info("unknown sources apply installation policies: close")
|
||||
return True,"USI-policy-close"
|
||||
else:
|
||||
logging.warning("Unknown sources apply installation policies get failed.")
|
||||
|
||||
#用户鉴权
|
||||
logging.debug("Authentication via PolicyKit .")
|
||||
logging.info("Authentication via PolicyKit .")
|
||||
details = {'polkit.message':details}
|
||||
cancel_id = ''
|
||||
action = get_policykit_authority_action_enum(source)
|
||||
kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
|
||||
kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority')
|
||||
(granted, notused , details) = kit.CheckAuthorization(
|
||||
('system-bus-name', {'name': sender}),
|
||||
action, details, dbus.UInt32(1),cancel_id, timeout=60*60*24*7)
|
||||
if granted:
|
||||
logging.info("Authentication success ...")
|
||||
return True,_("Authentication success.")
|
||||
|
||||
if False:
|
||||
source=source+'-self'
|
||||
|
||||
logging.info('authentication status: %r.',authentication)
|
||||
if True == authentication:
|
||||
action = get_policykit_authority_action_enum(source)
|
||||
logging.info("PolicyKit source: %s, action: %s.",source,action)
|
||||
|
||||
kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
|
||||
kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority')
|
||||
(granted, notused , details) = kit.CheckAuthorization(
|
||||
('system-bus-name', {'name': sender}),
|
||||
action, details, dbus.UInt32(1),cancel_id, timeout=60*60*24*7)
|
||||
if granted:
|
||||
logging.info("Authentication success ...")
|
||||
return True,_("Authentication success.")
|
||||
else:
|
||||
logging.info("Cancel authentication ...")
|
||||
return False,_("Cancel authentication.")
|
||||
else:
|
||||
logging.info("Authentication failure ...")
|
||||
return False,_("Authentication failure.")
|
||||
return True,_("Authentication success.")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
return False,str(e)
|
||||
|
|
|
@ -453,12 +453,43 @@ class UpdateManager():
|
|||
sender_name = get_proc_from_dbus_name(sender)
|
||||
caller = get_caller_from_enum(sender_name)
|
||||
caller_trans = get_source_name_from_enum(sender_name)
|
||||
if deb_verify(deb_path) != 0: #验签失败,提权
|
||||
(status,error_string) = PolicyKit_Authority(caller_trans+_(" requires authentication to install software packages."),
|
||||
sender,InstPolicy=True,source=source)
|
||||
if not status:
|
||||
self.dbusController.InstalldebFinished(False,error_string,'')
|
||||
return
|
||||
|
||||
if not deb_verify(deb_path): #验签失败,提权
|
||||
if not self.sqlite3_server.deb_policy_keep:
|
||||
(status,error_string) = PolicyKit_Authority(caller_trans+_(" requires authentication to install software packages."),
|
||||
sender = sender, InstPolicy = True,
|
||||
authentication = self.configs_uncover.getWithDefault("InstallAndPurge","install_authority",True),
|
||||
source=source)
|
||||
if not status:
|
||||
self.dbusController.InstalldebFinished(False,error_string,'')
|
||||
return
|
||||
else:
|
||||
logging.info("Start check deb policy timeout...")
|
||||
|
||||
if error_string == "USI-policy-close" or error_string == "USI-policy-deter":
|
||||
self.sqlite3_server.deb_policy_keep = False
|
||||
self.sqlite3_server.deb_policy_timestamp = 0
|
||||
else:
|
||||
self.sqlite3_server.deb_policy_keep = True
|
||||
def _check_deb_policy():
|
||||
if self.sqlite3_server.deb_policy_timestamp % 10 == 0:
|
||||
logging.info("Checking for deb policy timeout(%d)...",self.sqlite3_server.deb_policy_timestamp)
|
||||
if (self.sqlite3_server.deb_policy_timestamp <= 0):
|
||||
logging.warning("Deb policy timeout")
|
||||
self.sqlite3_server.deb_policy_keep = False
|
||||
return False
|
||||
else:
|
||||
self.sqlite3_server.deb_policy_timestamp = self.sqlite3_server.deb_policy_timestamp - 1
|
||||
return True
|
||||
|
||||
from gi.repository import GLib
|
||||
self.sqlite3_server.deb_policy_timestamp = 60 * 5
|
||||
GLib.timeout_add_seconds(1,_check_deb_policy)
|
||||
|
||||
else:
|
||||
self.sqlite3_server.deb_policy_timestamp = 60 * 5
|
||||
logging.info("Deb policy keep, ignore...")
|
||||
|
||||
self.deb_obj.update({"debname":str(debname)})
|
||||
self.deb_obj.update({"old_version":""})
|
||||
self.deb_obj.update({"source":str(caller)})
|
||||
|
|
|
@ -420,11 +420,35 @@ class UpdateManagerDbusController(dbus.service.Object):
|
|||
purge_list = [str(pkg) for pkg in _purge_list]
|
||||
sender_name = get_proc_from_dbus_name(sender)
|
||||
logging.info(COLORMETHOR_PREFIX+'Method'+COLORLOG_SUFFIX+' DistPurgePackages Sender:%s and purge list is:%s...',sender_name, purge_list)
|
||||
(status, details) = PolicyKit_Authority(get_source_name_from_enum(sender_name)+_(" requires authentication to uninstall software packages."),
|
||||
sender,source=sender_name)
|
||||
if not status:
|
||||
self.PurgePackagesFinished(False,details,'')
|
||||
return self.RETURN_UNKNOWN_CODE,details
|
||||
|
||||
if not self.parent.sqlite3_server.purge_policy_keep:
|
||||
(status, details) = PolicyKit_Authority(get_source_name_from_enum(sender_name)+_(" requires authentication to uninstall software packages."),
|
||||
sender = sender, InstPolicy = False,
|
||||
authentication = self.parent.configs_uncover.getWithDefault("InstallAndPurge","purge_authority",False),
|
||||
source=sender_name)
|
||||
if not status:
|
||||
self.PurgePackagesFinished(False,details,'')
|
||||
return self.RETURN_UNKNOWN_CODE,details
|
||||
else:
|
||||
logging.info("Start check purge policy timeout...")
|
||||
self.parent.sqlite3_server.purge_policy_keep = True
|
||||
def _check_purge_policy():
|
||||
if self.parent.sqlite3_server.purge_policy_timestamp % 10 == 0:
|
||||
logging.info("Checking for purge policy timeout(%d)...",self.parent.sqlite3_server.purge_policy_timestamp)
|
||||
if (self.parent.sqlite3_server.purge_policy_timestamp <= 0):
|
||||
logging.warning("Purge policy timeout")
|
||||
self.parent.sqlite3_server.purge_policy_keep = False
|
||||
return False
|
||||
else:
|
||||
self.parent.sqlite3_server.purge_policy_timestamp = self.parent.sqlite3_server.purge_policy_timestamp - 1
|
||||
return True
|
||||
|
||||
from gi.repository import GLib
|
||||
self.parent.sqlite3_server.purge_policy_timestamp = 60 * 5
|
||||
GLib.timeout_add_seconds(1,_check_purge_policy)
|
||||
else:
|
||||
self.parent.sqlite3_server.purge_policy_timestamp = 60 * 5
|
||||
logging.info("Purge policy keep, ignore...")
|
||||
|
||||
#目前只有360使用这个环境变量 当其他包也使用时 可以将这个权限放开
|
||||
if True:
|
||||
|
|
|
@ -7,4 +7,8 @@ upload_installer_log = False
|
|||
[InstallMode]
|
||||
shutdown_install = False
|
||||
manual_install = False
|
||||
auto_install = False
|
||||
auto_install = False
|
||||
|
||||
[InstallAndPurge]
|
||||
install_authority = True
|
||||
purge_authority = False
|
|
@ -199,6 +199,10 @@ msgstr "བདེན་དཔང་ར་སྤྲོད་ལེགས་འག
|
|||
msgid "Authentication failure."
|
||||
msgstr "བདེན་དཔང་ར་སྤྲོད་ཕམ་སོང་།"
|
||||
|
||||
#: ../SystemUpdater/Core/utils.py:753
|
||||
msgid "Cancel authentication."
|
||||
msgstr "ཕྱིར་འབུད་བྱ་རྒྱུ།"
|
||||
|
||||
#: ../SystemUpdater/Core/enums.py:101
|
||||
msgid "Deb format exception, read local deb file error."
|
||||
msgstr "མཉེན་ཆས་ཀྱི་ཁུག་མའི་རྣམ་གཞག་རྒྱུན་ལྡན་མིན་པས་ཕམ་ཁ་བླངས།"
|
||||
|
|
|
@ -2745,6 +2745,10 @@ msgstr "认证成功"
|
|||
msgid "Authentication failure."
|
||||
msgstr "认证失败"
|
||||
|
||||
#: ../SystemUpdater/Core/utils.py:753
|
||||
msgid "Cancel authentication."
|
||||
msgstr "取消认证"
|
||||
|
||||
#: ../SystemUpdater/Core/enums.py:101
|
||||
msgid "Deb format exception, read local deb file error."
|
||||
msgstr "软件包格式异常,读取失败。"
|
||||
|
|
|
@ -2675,6 +2675,10 @@ msgstr "認證成功。"
|
|||
msgid "Authentication failure."
|
||||
msgstr "認證失敗。"
|
||||
|
||||
#: ../SystemUpdater/Core/utils.py:753
|
||||
msgid "Cancel authentication."
|
||||
msgstr "取消認證"
|
||||
|
||||
#: ../SystemUpdater/Core/enums.py:101
|
||||
msgid "Deb format exception, read local deb file error."
|
||||
msgstr "軟體包格式異常,讀取失敗。"
|
||||
|
|
|
@ -2707,6 +2707,10 @@ msgstr "認證成功。"
|
|||
msgid "Authentication failure."
|
||||
msgstr "認證失敗。"
|
||||
|
||||
#: ../SystemUpdater/Core/utils.py:753
|
||||
msgid "Cancel authentication."
|
||||
msgstr "取消認證"
|
||||
|
||||
#: ../SystemUpdater/Core/enums.py:101
|
||||
msgid "Deb format exception, read local deb file error."
|
||||
msgstr "軟體包格式異常,讀取失敗。"
|
||||
|
|
Loading…
Reference in New Issue