获取更新推送,读取json文件生成白名单过滤

This commit is contained in:
luoxueyi 2021-09-07 17:22:59 +08:00
parent 93c5667247
commit 8e9175a775
2 changed files with 140 additions and 40 deletions

View File

@ -223,8 +223,8 @@ class UpdateList():
#FIXME: 最好将这个常量通过配置文件读
self.GROUPS_JSON_PKG = 'kylin-update-desktop-config'
self.input_config_path = '/home/x/share/outconfig'
self.output_config_path = '/home/x/share/inconfig'
self.input_config_path = '/home/kylin/wangsong/inconfig'
self.output_config_path = '/home/kylin/wangsong/outconfig'
# a stable machine uniq id
try:
@ -590,7 +590,7 @@ class UpdateList():
def update(self, cache,start_install_alone, eventloop_callback=None):
self.held_back = []
upgrade_pkgs = []
'''
dist-upgrade 标记在此处进行
来判断将要删除的包 如果存在要删除的break的包的话 会从dist-upgrade切换到upgrade 目前此功能不使用 默认使用dist-upgrade
@ -626,6 +626,7 @@ class UpdateList():
pkgname += "#auto"
self.pkgs_install.append(pkgname)
elif pkg.marked_upgrade:
upgrade_pkgs.append(pkg)
self.pkgs_upgrade.append(pkg.name)
elif pkg.marked_delete:
self.pkgs_remove.append(pkg.name)
@ -636,26 +637,22 @@ class UpdateList():
self._make_json(cache,self.pkgs_install,self.pkgs_upgrade,self.pkgs_remove)
return True
#FIXME: 目前此功能不使用 但是以此按应用进行分组是更好的展示升级列表的方式
# self.update_groups = self._make_groups(cache, self.pkgs_upgrade,
# eventloop_callback)
fu = filter.UpdateListFilterCache()
# fu = filter.UpdateListFilterCache("/")
#源过滤
allowed_origin_upgrade_pkgs = fu.check_in_allowed_origin(upgrade_pkgs)
logging.info("allowed_origin_upgrade_pkgs: %s"%" ".join([i.name for i in allowed_origin_upgrade_pkgs]))
logging.info("allowed_origin_upgrade_pkgs count: %s", len(allowed_origin_upgrade_pkgs))
pass
# upgrade_pkgs
# print("upgrade_pkgs: %s"%" ".join([i.name for i in upgrade_pkgs]))
#whitelist
whitelist_filter_upgrade_pkgs = fu.is_pkgname_in_whitelist(allowed_origin_upgrade_pkgs)
logging.info("whitelist_filter_upgrade_pkgs: %s"%" ".join([i.name for i in whitelist_filter_upgrade_pkgs]))
logging.info("whitelist_filter_upgrade_pkgs count: %s", len(whitelist_filter_upgrade_pkgs))
# #源过滤
# allowed_origin_upgrade_pkgs = fu.check_in_allowed_origin(upgrade_pkgs)
# print("allowed_origin_upgrade_pkgs: %s"%" ".join([i.name for i in allowed_origin_upgrade_pkgs]))
# #whitelist
# whitelist_filter_upgrade_pkgs = fu.is_pkgname_in_whitelist(allowed_origin_upgrade_pkgs)
# print("whitelist_filter_upgrade_pkgs: %s"%" ".join([i.name for i in whitelist_filter_upgrade_pkgs]))
# #blacklist_filter
# blacklist_filter_pkgs = fu.is_pkgname_in_blacklist(whitelist_filter_upgrade_pkgs)
# print("blacklist_filter_pkgs: %s"%" ".join([i.name for i in blacklist_filter_pkgs]))
return True

View File

@ -10,6 +10,9 @@ import os
import string
import subprocess
import sys
import json
import dbus
import threading
try:
from typing import AbstractSet, cast, DefaultDict, Dict, Iterable, List
@ -47,6 +50,8 @@ import apt_pkg
import distro_info
ImportantListPath="/var/lib/kylin-software-properties/template/important.list"
DesktopSystemPath="/usr/share/kylin-update-desktop-config/data/"
# no py3 lsb_release in debian :/
@ -60,41 +65,124 @@ DISTRO_ID = subprocess.check_output(
class UpdateListFilterCache(apt.Cache):
def __init__(self, rootdir):
print("\nUpdateListFilterCache\n")
def __init__(self):
# whitelist
self.upgradeList = []
# 必须升级的包
self.installList = []
# self._cached_candidate_pkgnames = set() # type: Set[str]
self.allowed_origins = get_allowed_origins()
self.allowed_origins = deleteDuplicatedElementFromList(self.allowed_origins)
print("Allowed origins are: ",
logging.info("Allowed origins are: %s",
self.allowed_origins)
self.blacklist = apt_pkg.config.value_list(
"Unattended-Upgrade::Package-Blacklist")
"Kylin-update-manager::Package-Blacklist")
self.blacklist = deleteDuplicatedElementFromList(self.blacklist)
print("Initial blacklist: ", " ".join(self.blacklist))
# print("Initial blacklist: ", " ".join(self.blacklist))
self.whitelist = apt_pkg.config.value_list(
"Unattended-Upgrade::Package-Whitelist")
"Kylin-update-manager::Package-Whitelist")
self.whitelist = deleteDuplicatedElementFromList(self.whitelist)
print("Initial whitelist: ", " ".join(self.whitelist))
# print("Initial whitelist: ", " ".join(self.whitelist))
self.strict_whitelist = apt_pkg.config.find_b(
"Unattended-Upgrade::Package-Whitelist-Strict", False)
print("Initial whitelist (%s): %s"%(
"strict" if self.strict_whitelist else "not strict",
" ".join(self.whitelist)))
# apt.Cache.__init__(self, rootdir=rootdir)
"Kylin-update-manager::Package-Whitelist-Strict", False)
# print("Initial whitelist (%s): %s"%(
# "strict" if self.strict_whitelist else "not strict",
# " ".join(self.whitelist)))
# update importantlist
# initUpdateImportantList()
# 获取list
self.initLocalPackagesList()
#除掉不在cache中的包
self.checkInCache()
def checkInCache(self):
print("start Check in cache")
tmplist = []
cache = apt.Cache()
for i in self.upgradeList:
try:
cache[i]
tmplist.append(i)
except Exception as e:
print("not found pkg: ", str(e))
pass
self.upgradeList = tmplist
def initLocalPackagesList(self):
jsonfiles = []
tmplist = []
# 获取importantlist 本次更新推送
with open(ImportantListPath, 'r') as f:
text = f.read()
importantList = text.split()
print("importantList: %s"%importantList)
f.close()
if not importantList:
logging.error("importantList is empty")
exit(-1)
# 获取/usr/share/kylin-update-desktop-config/data/下所有json文件
for root,dirs,files in os.walk(DesktopSystemPath):
pass
for i in files:
if ".json" in i:
jsonfiles.append(i.split('.')[0])
logging.info("all files: %s", jsonfiles)
# 找到importantlist中对应的json文件
for i in importantList:
if i not in jsonfiles:
# 说明这个是单独的包,不在分组中
# 加入更新列表
if i not in self.upgradeList:
self.upgradeList.append(i)
else:
# 在分组中
# 获取每个对应json文件中的upgrade_list
if i in jsonfiles:
filepath = os.path.join(DesktopSystemPath, i)
filepath = filepath+".json"
with open(filepath, 'r') as f:
pkgdict = f.read()
jsonfile = json.loads(pkgdict)
tmplist = jsonfile['install_list']
print("\ntmplist: ", tmplist)
for j in tmplist:
if j not in self.upgradeList:
self.upgradeList.append(j)
f.close()
logging.info("self.upgradeList: %s", self.upgradeList)
print("\n\n")
#
# print("jsonfile silent_install_list: ", jsonfile['silent_install_list'])
# 更新前检测,必须安装的包
# self.installList = jsonfile['install_list']
# print("jsonfile install_list: ", self.installList)
# # 可以更新的列表, 白名单
# self.upgradeList = jsonfile['upgrade_list']
# print("jsonfile upgrade_list: ", self.upgradeList)
def check_in_allowed_origin(self, pkgs):
new_upgrade_pkgs = []
i = 0
for pkg in pkgs:
i+=1
# print("checking %d pkgname: %s"%(i, pkg))
for v in pkg.versions:
if is_in_allowed_origin(v, self.allowed_origins):
if is_in_allowed_origin(v, self.allowed_origins) and not pkg in new_upgrade_pkgs:
new_upgrade_pkgs.append(pkg)
else:
pkg.mark_keep()
return new_upgrade_pkgs
def is_pkgname_in_blacklist(self, pkgs):
@ -111,9 +199,10 @@ class UpdateListFilterCache(apt.Cache):
def is_pkgname_in_whitelist(self, pkgs):
whitelist_filter_upgrade_pkgs = []
for pkg in pkgs:
if pkg.name in self.whitelist:
if pkg.name in self.upgradeList:
whitelist_filter_upgrade_pkgs.append(pkg)
else :
pkg.mark_keep()
pass
# print("skipping whitelist package %s" % pkg.name)
@ -127,7 +216,7 @@ def get_allowed_origins():
This will take substitutions (like distro_id) into account.
"""
allowed_origins = get_allowed_origins_legacy()
key = "Unattended-Upgrade::Origins-Pattern"
key = "Kylin-update-manager::Origins-Pattern"
try:
for s in apt_pkg.config.value_list(key):
allowed_origins.append(substitute(s))
@ -140,7 +229,7 @@ def get_allowed_origins_legacy():
# type: () -> List[str]
""" legacy support for old Allowed-Origins var """
allowed_origins = [] # type: List[str]
key = "Unattended-Upgrade::Allowed-Origins"
key = "Kylin-update-manager::Allowed-Origins"
try:
for s in apt_pkg.config.value_list(key):
# if there is a ":" use that as seperator, else use spaces
@ -194,9 +283,9 @@ def is_allowed_origin(origin, allowed_origins):
# type: (Union[apt.package.Origin, apt_pkg.PackageFile], List[str]) -> bool
# local origin is allowed by default
if origin.component == 'now' and origin.archive == 'now' and \
not origin.label and not origin.site:
return True
# if origin.component == 'now' and origin.archive == 'now' and \
# not origin.label and not origin.site:
# return True
for allowed in allowed_origins:
if match_whitelist_string(allowed, origin):
return True
@ -255,6 +344,20 @@ def deleteDuplicatedElementFromList(list):
resultList.append(item)
return resultList
def initUpdateImportantList():
lock = threading.Lock()
bus = dbus.SystemBus()
try:
obj = bus.get_object('com.kylin.software.properties', '/com/kylin/software/properties')
interface = dbus.Interface(obj, dbus_interface='com.kylin.software.properties.interface')
lock.acquire()
retval = interface.updateSourceTemplate()
lock.release()
except Exception as e:
logging.error("initUpdateImportantList: %s"%str(e))
if retval == False:
logging.info("updateSourceTemplate failed")
print("\nupdate SourceTemplate\n")
class UnknownMatcherError(ValueError):
pass