This commit is contained in:
luoxueyi 2021-11-26 14:31:48 +08:00
parent a4eaab143f
commit c3900cf5af
1 changed files with 229 additions and 24 deletions

View File

@ -31,6 +31,7 @@ import apt_pkg
ImportantListPath="/var/lib/kylin-software-properties/template/important.list"
DesktopSystemPath="/usr/share/kylin-update-desktop-config/data/"
SOURCESLIST = "/etc/apt/sources.list"
# no py3 lsb_release in debian :/
DISTRO_CODENAME = subprocess.check_output(
@ -40,6 +41,13 @@ DISTRO_DESC = subprocess.check_output(
DISTRO_ID = subprocess.check_output(
["lsb_release", "-i", "-s"], universal_newlines=True).strip() # type: str
ARCHITECTUREMAP = ['arm64','amd64','armhf','i386','loongarch64','mips64el','sw64']
RELEASEOFFSET = 1
ORIGINOFFSET = 2
HTTPTYPE = "HTTP"
FTPTYPE = "FTP"
class UpdateListFilterCache(apt.Cache):
def __init__(self, window_main):
@ -49,21 +57,27 @@ class UpdateListFilterCache(apt.Cache):
# 必须升级的包
self.installList = []
self.allowed_origins = get_allowed_origins()
# 获取源属性
self.origin_property = OriginProperty()
self.origin_property.get_allowed_sources()
self.origin_property.get_allowed_origin()
self.allowed_origins = get_allowed_origins(self.origin_property.allow_origin)
self.allowed_origins = deleteDuplicatedElementFromList(self.allowed_origins)
logging.info(_("Allowed origins are: %s"),
self.allowed_origins)
self.blacklist = apt_pkg.config.value_list(
"Kylin-system-updater::Package-Blacklist")
self.blacklist = deleteDuplicatedElementFromList(self.blacklist)
# self.blacklist = apt_pkg.config.value_list(
# "Kylin-system-updater::Package-Blacklist")
# self.blacklist = deleteDuplicatedElementFromList(self.blacklist)
self.whitelist = apt_pkg.config.value_list(
"Kylin-system-updater::Package-Whitelist")
self.whitelist = deleteDuplicatedElementFromList(self.whitelist)
# self.whitelist = apt_pkg.config.value_list(
# "Kylin-system-updater::Package-Whitelist")
# self.whitelist = deleteDuplicatedElementFromList(self.whitelist)
self.strict_whitelist = apt_pkg.config.find_b(
"Kylin-system-updater::Package-Whitelist-Strict", False)
# self.strict_whitelist = apt_pkg.config.find_b(
# "Kylin-system-updater::Package-Whitelist-Strict", False)
def checkInCache(self):
logging.info("start Check in cache")
@ -178,12 +192,188 @@ class UpdateListFilterCache(apt.Cache):
pkg.mark_keep()
return whitelist_filter_upgrade_pkgs
class OriginProperty():
def __init__(self):
# 包含了本地所有源 http & ftp
self.local_sourcelist = {"http":[],"ftp":[]}
# 经过解析后的本地源,获取所有的分发属性
self.local_origin = {"http":[],"ftp":[]}
# 允许的源列表
self.allow_sources = []
# 允许的源+属性
self.allow_origin = {"http":[],"ftp":[]}
# 加载本地所有源
self.init_local_origin()
# 进行属性解析
self.analytic_properties(self.local_sourcelist)
def init_local_origin(self):
http_origin = {}
ftp_orgin = {}
#apt policy
sh_retval = os.popen("apt-cache policy").read().split("\n")
# policy = [ rv for rv in sh_retval if "http" in rv or "ftp" in rv or "release" in rv or "origin" in rv]
for rv in sh_retval:
if "http" in rv:
http_origin['sources'] = rv
http_origin['release'] = sh_retval[sh_retval.index(rv) + RELEASEOFFSET]
http_origin['origin'] = sh_retval[sh_retval.index(rv) + ORIGINOFFSET]
self.local_sourcelist['http'].append(http_origin.copy())
elif "ftp" in rv:
ftp_orgin['sources'] = rv
ftp_orgin['release'] = sh_retval[sh_retval.index(rv) + RELEASEOFFSET]
ftp_orgin['origin'] = sh_retval[sh_retval.index(rv) + ORIGINOFFSET]
self.local_sourcelist['ftp'].append(ftp_orgin.copy())
def merge_origin(self, source_type, source_origin):
is_append = True
if source_type == HTTPTYPE:
if self.local_origin['http']:
for lo in self.local_origin['http']:
if lo['origin_source'] == source_origin['origin_source'] and lo['dist'] == source_origin['dist']:
lo['component'] = list(set(lo['component']).union(set(source_origin['component'])))
is_append = False
if is_append:
self.local_origin['http'].append(source_origin.copy())
else:
self.local_origin['http'].append(source_origin.copy())
elif source_type == FTPTYPE:
if self.local_origin['ftp']:
for lo in self.local_origin['ftp']:
if lo['origin_source'] == source_origin['origin_source'] and lo['dist'] == source_origin['dist']:
lo['component'] = list(set(lo['component']).union(set(source_origin['component'])))
is_append = False
if is_append:
self.local_origin['ftp'].append(source_origin.copy())
else:
self.local_origin['ftp'].append(source_origin.copy())
def analytic_properties(self, local_sourcelist):
http_origin = {"component":[],"release":{}}
ftp_orgin = {"component":[],"release":{}}
# 经过解析后的本地源,获取所有的分发属性
for ls in local_sourcelist['http']:
for item in filter(not_empty, ls['sources'].split(' ')):
if item.isdigit():
http_origin['policy_priority'] = item
elif "http" in item:
http_origin['origin_source'] = item
elif "/" in item:
http_origin['dist'] = item.split("/")[0]
http_origin['component'].append(item.split("/")[1])
elif item not in ARCHITECTUREMAP and item != "Packages":
http_origin['component'].append(item)
for item in filter(not_empty, re.split(r'[,:\s]\s*', ls['release'])):
if "release" not in ls['release']:
break
elif "=" in item:
self.generate_dict(http_origin['release'], item)
for item in filter(not_empty, ls['origin'].split(' ')):
if "origin" not in ls['origin']:
break
elif "origin" != item:
http_origin['origin'] = item
self.merge_origin(HTTPTYPE, http_origin)
http_origin = {"component":[],"release":{}}
for ls in local_sourcelist['ftp']:
for item in filter(not_empty, ls['sources'].split(' ')):
if item.isdigit():
ftp_orgin['policy_priority'] = item
elif "ftp" in item:
ftp_orgin['origin_source'] = item
elif "/" in item:
ftp_orgin['dist'] = item.split("/")[0]
ftp_orgin['component'].append(item.split("/")[1])
elif item not in ARCHITECTUREMAP and item != "Packages":
ftp_orgin['component'].append(item)
for item in filter(not_empty, re.split(r'[.,:\s]\s*', ls['release'])):
if "release" not in ls['release']:
break
elif "=" in item:
self.generate_dict(ftp_orgin['release'], item)
for item in filter(not_empty, ls['origin'].split(' ')):
if "origin" not in ls['origin']:
break
elif "origin" != item:
ftp_orgin['origin'] = item
self.merge_origin(FTPTYPE, ftp_orgin)
ftp_orgin = {"component":[],"release":{}}
def generate_dict(self, dict, item):
item = item.strip()
if item == "":
logging.warning("empty match string matches nothing")
return False
(what, value) = [ s for s in item.split("=")]
if what in ('o', 'origin'):
dict['origin'] = value
elif what in ("l", "label"):
dict['label'] = value
elif what in ("a", "suite", "archive"):
dict['archive'] = value
elif what in ("c", "component"):
dict['component'] = value
elif what in ("site",):
dict['site'] = value
elif what in ("n", "codename",):
dict['codename'] = value
else:
dict[what] = value
# raise UnknownMatcherError(
# "Unknown whitelist entry for matcher %s (value %s)" % (
# what, value))
def get_allowed_sources(self):
# 源地址,在本地源列表中查找. 源服务器下发source.list为允许的源, 本模块屏蔽了sources.list.d下的源
# 获取允许的源
try:
old_sources_list = apt_pkg.config.find("Dir::Etc::sourcelist")
old_sources_list_d = apt_pkg.config.find("Dir::Etc::sourceparts")
old_cleanup = apt_pkg.config.find("APT::List-Cleanup")
apt_pkg.config.set("Dir::Etc::sourcelist",
os.path.abspath(SOURCESLIST))
apt_pkg.config.set("Dir::Etc::sourceparts", "xxx")
apt_pkg.config.set("APT::List-Cleanup", "0")
slist = apt_pkg.SourceList()
slist.read_main_list()
self.allow_sources = slist.list
except Exception as e:
logging.error(str(e))
finally:
apt_pkg.config.set("Dir::Etc::sourcelist",
old_sources_list)
apt_pkg.config.set("Dir::Etc::sourceparts",
old_sources_list_d)
apt_pkg.config.set("APT::List-Cleanup",
old_cleanup)
def get_allowed_origin(self):
# 获取允许的源
# 生成源与属性
self.local_origin
self.allow_sources
self.allow_origin
try:
for item in self.allow_sources:
for lo in self.local_origin['http']:
if item.uri.strip('/') == lo['origin_source'].strip('/') and item.dist == lo['dist']:
self.allow_origin['http'].append(lo)
for lo in self.local_origin['ftp']:
if item.uri.strip('/') == lo['origin_source'].strip('/') and item.dist == lo['dist']:
self.allow_origin['ftp'].append(lo)
except Exception as e:
logging.error(str(e))
def ver_in_allowed_origin(pkg, allow_origin):
# type: (apt.Package, List[str]) -> apt.package.Version
allown_versions = []
versions = _get_priority_order(pkg)
# 获取每个优先级别中 允许源的最高版本
allown_versions = _get_allow_list(versions, allow_origin)
allown_versions = _get_allowed_list(versions, allow_origin)
return allown_versions
@ -206,7 +396,7 @@ def _get_priority_order(pkg):
versions.append(ver)
return versions
def _get_allow_list(versions, allow_origin):
def _get_allowed_list(versions, allow_origin):
current_priority = -100
allown_versions = []
for ver in versions:
@ -218,21 +408,32 @@ def _get_allow_list(versions, allow_origin):
continue
return allown_versions
def get_allowed_origins():
# type: () -> List[str]
""" return a list of allowed origins from apt.conf
This will take substitutions (like distro_id) into account.
def get_allowed_origins(allow_origin):
""" return a list of allowed origins
"""
allowed_origins = []
allowed_origins = get_allowed_origins_legacy()
key = "Kylin-system-updater::Origins-Pattern"
try:
for s in apt_pkg.config.value_list(key):
allowed_origins.append(substitute(s))
except ValueError:
print("Unable to parse %s." % key)
raise
origin = ''
archive = ''
uri = ''
label = ''
for ao in (allow_origin['http']+allow_origin['ftp']):
if 'origin' in ao['release']:
origin = 'o='+ao['release']['origin']
else:
origin = 'o='
if 'archive' in ao['release']:
archive = 'a='+ao['release']['archive']
else:
archive = 'a='
if 'label' in ao['release']:
label = 'l='+ao['release']['label']
else:
label = 'l='
if 'origin_source' in ao:
uri = 'uri='+ao['origin_source']
else:
uri = 'uri='
allowed_origins.append(origin+","+archive+","+label+","+uri)
return allowed_origins
def get_allowed_origins_legacy():
@ -332,6 +533,8 @@ def match_whitelist_string(whitelist, origin):
match = fnmatch.fnmatch(origin.site, value)
elif what in ("n", "codename",):
match = fnmatch.fnmatch(origin.codename, value)
elif what in ("uri",):
match = True
else:
raise UnknownMatcherError(
"Unknown whitelist entry for matcher %s (token %s)" % (
@ -349,6 +552,8 @@ def deleteDuplicatedElementFromList(list):
resultList.append(item)
return resultList
def not_empty(s):
return s and s.strip()
class UnknownMatcherError(ValueError):
pass