add unattendedoriginfilter

This commit is contained in:
shenyafeng 2022-10-21 17:23:57 +08:00
parent 60b501dd9b
commit 72d7430d6b
1 changed files with 214 additions and 2 deletions

View File

@ -101,12 +101,19 @@ import time
import subprocess
import json
SOURCESLIST = "/etc/apt/sources.list"
RELEASEOFFSET = 1
ORIGINOFFSET = 2
HTTPTYPE = "HTTP"
FTPTYPE = "FTP"
ARCHITECTUREMAP = ['arm64','amd64','armhf','i386','loongarch64','mips64el','sw64']
KYLIN_VERSION_FILE = "/etc/kylin-version/kylin-system-version.conf"
OTA_RESULT_FILE_PATH="/opt/apt_result/"
OTA_RESULT_FILE="/opt/apt_result/ota_result"
SYSTEM_UPDATER_CORE_LIB_PATH="/usr/share/kylin-system-updater/SystemUpdater/Core"
sys.path.append(SYSTEM_UPDATER_CORE_LIB_PATH)
from OriginFilter import UnattendUpgradeFilter
# sys.path.append(SYSTEM_UPDATER_CORE_LIB_PATH)
# from OriginFilter import UnattendUpgradeFilter
CONFIG_FILE_ROOT_PATH="/var/lib/unattended-upgrades"
UNATTENDED_UPGRADE_CONFIG_FILE_PATH="/var/lib/unattended-upgrades/unattended-upgrade.conf"
WHITE_LIST_FILE_PATH="/var/lib/kylin-system-updater/system-updater.conf"
@ -527,8 +534,213 @@ class ConfigFileManager:
else:
logging.error("no config file")
return True
def not_empty(s):
return s and s.strip()
class OriginProperty():
def __init__(self):
# 包含了本地所有源 http & ftp
self.local_sourcelist = {"http":[],"ftp":[]}
# 经过解析后的本地源,获取所有的分发属性
self.local_origin = {"http":[],"ftp":[]}
# 允许的源列表
self.allow_sources = []
# 允许的源+属性
self.allow_origin = {"http":[],"ftp":[]}
# 加载本地所有源
self.init_local_origin()
# 进行属性解析
self.analytic_properties(self.local_sourcelist)
def init_local_origin(self):
http_origin = {}
ftp_orgin = {}
#apt policy
sh_retval = os.popen("apt-cache policy").read().split("\n")
# policy = [ rv for rv in sh_retval if "http" in rv or "ftp" in rv or "release" in rv or "origin" in rv]
for rv in sh_retval:
if "http" in rv:
http_origin['sources'] = rv
http_origin['release'] = sh_retval[sh_retval.index(rv) + RELEASEOFFSET]
http_origin['origin'] = sh_retval[sh_retval.index(rv) + ORIGINOFFSET]
self.local_sourcelist['http'].append(http_origin.copy())
elif "ftp" in rv:
ftp_orgin['sources'] = rv
ftp_orgin['release'] = sh_retval[sh_retval.index(rv) + RELEASEOFFSET]
ftp_orgin['origin'] = sh_retval[sh_retval.index(rv) + ORIGINOFFSET]
self.local_sourcelist['ftp'].append(ftp_orgin.copy())
def merge_origin(self, source_type, source_origin):
is_append = True
if source_type == HTTPTYPE:
if self.local_origin['http']:
for lo in self.local_origin['http']:
if lo['origin_source'] == source_origin['origin_source'] and lo['dist'] == source_origin['dist']:
lo['component'] = list(set(lo['component']).union(set(source_origin['component'])))
is_append = False
if is_append:
self.local_origin['http'].append(source_origin.copy())
else:
self.local_origin['http'].append(source_origin.copy())
elif source_type == FTPTYPE:
if self.local_origin['ftp']:
for lo in self.local_origin['ftp']:
if lo['origin_source'] == source_origin['origin_source'] and lo['dist'] == source_origin['dist']:
lo['component'] = list(set(lo['component']).union(set(source_origin['component'])))
is_append = False
if is_append:
self.local_origin['ftp'].append(source_origin.copy())
else:
self.local_origin['ftp'].append(source_origin.copy())
def analytic_properties(self, local_sourcelist):
http_origin = {"component":[],"release":{}}
ftp_orgin = {"component":[],"release":{}}
dist_list = []
# 经过解析后的本地源,获取所有的分发属性
for ls in local_sourcelist['http']:
for item in filter(not_empty, ls['sources'].split(' ')):
if item.isdigit():
http_origin['policy_priority'] = item
elif "http" in item:
http_origin['origin_source'] = item
elif "/" in item:
dist_list = item.split("/")
dist_list.pop()
http_origin['dist'] = "/".join(dist_list)
http_origin['component'].append(item.split("/")[1])
elif item not in ARCHITECTUREMAP and item != "Packages":
http_origin['component'].append(item)
release_list = ls['release'].split(',')
release_list = [ rl.strip() for rl in release_list ]
if "release" in release_list[0]:
release_list[0] = release_list[0].lstrip("release").strip()
for rl in release_list:
if "=" in rl:
self.generate_dict(http_origin['release'], rl)
for item in filter(not_empty, ls['origin'].split(' ')):
if "origin" not in ls['origin']:
break
elif "origin" != item:
http_origin['origin'] = item
self.merge_origin(HTTPTYPE, http_origin)
http_origin = {"component":[],"release":{}}
for ls in local_sourcelist['ftp']:
for item in filter(not_empty, ls['sources'].split(' ')):
if item.isdigit():
ftp_orgin['policy_priority'] = item
elif "ftp" in item:
ftp_orgin['origin_source'] = item
elif "/" in item:
ftp_orgin['dist'] = item.split("/")[0]
ftp_orgin['component'].append(item.split("/")[1])
elif item not in ARCHITECTUREMAP and item != "Packages":
ftp_orgin['component'].append(item)
release_list = ls['release'].split(',')
if "release " in release_list[0]:
release_list[0] = release_list[0].lstrip("release ")
for rl in release_list:
if "=" in rl:
self.generate_dict(ftp_orgin['release'], rl)
for item in filter(not_empty, ls['origin'].split(' ')):
if "origin" not in ls['origin']:
break
elif "origin" != item:
ftp_orgin['origin'] = item
self.merge_origin(FTPTYPE, ftp_orgin)
ftp_orgin = {"component":[],"release":{}}
def generate_dict(self, dict, item):
item = item.strip()
if item == "":
logging.warning("empty match string matches nothing")
return False
(what, value) = [ s for s in item.split("=")]
if what in ('o', 'origin'):
dict['origin'] = value
elif what in ("l", "label"):
dict['label'] = value
elif what in ("a", "suite", "archive"):
dict['archive'] = value
elif what in ("c", "component"):
dict['component'] = value
elif what in ("site",):
dict['site'] = value
elif what in ("n", "codename",):
dict['codename'] = value
else:
dict[what] = value
# raise UnknownMatcherError(
# "Unknown whitelist entry for matcher %s (value %s)" % (
# what, value))
def get_allowed_sources(self):
# 源地址,在本地源列表中查找. 源服务器下发source.list为允许的源, 本模块屏蔽了sources.list.d下的源
# 获取允许的源
try:
old_sources_list = apt_pkg.config.find("Dir::Etc::sourcelist")
old_sources_list_d = apt_pkg.config.find("Dir::Etc::sourceparts")
old_cleanup = apt_pkg.config.find("APT::List-Cleanup")
apt_pkg.config.set("Dir::Etc::sourcelist",
os.path.abspath(SOURCESLIST))
apt_pkg.config.set("Dir::Etc::sourceparts", "xxx")
apt_pkg.config.set("APT::List-Cleanup", "0")
slist = apt_pkg.SourceList()
slist.read_main_list()
self.allow_sources = slist.list
except Exception as e:
logging.error(str(e))
finally:
apt_pkg.config.set("Dir::Etc::sourcelist",
old_sources_list)
apt_pkg.config.set("Dir::Etc::sourceparts",
old_sources_list_d)
apt_pkg.config.set("APT::List-Cleanup",
old_cleanup)
def get_allowed_origin(self):
# 获取允许的源
# 生成源与属性
self.local_origin
self.allow_sources
self.allow_origin
try:
for item in self.allow_sources:
for lo in self.local_origin['http']:
if item.uri.strip('/') == lo['origin_source'].strip('/') and item.dist == lo['dist']:
self.allow_origin['http'].append(lo)
for lo in self.local_origin['ftp']:
if item.uri.strip('/') == lo['origin_source'].strip('/') and item.dist == lo['dist']:
self.allow_origin['ftp'].append(lo)
except Exception as e:
logging.error(str(e))
def deleteDuplicatedElementFromList(list):
resultList = []
for item in list:
if not item in resultList:
resultList.append(item)
return resultList
class UnattendUpgradeFilter():
def __init__(self) -> None:
pass
def GetAllowOrigins(self):
# 获取源属性
self.origin_property = OriginProperty()
self.origin_property.get_allowed_sources()
self.origin_property.get_allowed_origin()
self.allowed_origins = get_allowed_origins(self.origin_property.allow_origin)
self.allowed_origins = deleteDuplicatedElementFromList(self.allowed_origins)
# logging.info(_("Allowed origins: %s"),
# self.allowed_origins)
return self.allowed_origins
class AcquireStatistics:
def __init__(self,fetcher) -> None: