add remove white list in sanity check

This commit is contained in:
yafeng shen 2023-06-13 19:02:25 +08:00
parent a5e86bdf96
commit d5348fdd33
1 changed files with 68 additions and 47 deletions

View File

@ -769,7 +769,7 @@ class KylinSystemUpdater:
def ConnectToSignals(self):
def update_detect_finished_handler(success,updatelist,error_status,error_cause):
if success:
logging.info("update detect success,quiting main loop")
logging.info("update detect success,quiting main loop:%s,%s"%(success,",".join(updatelist)))
self.update_group = updatelist
try:
for update_group in self.update_group:
@ -958,7 +958,8 @@ def Backup():
# return UnattendedUpgradesResult(False,"backup state error")
return False
#node_name,node_status = kylin_backup_manager.get_backup_comment_for_systemupdate()
ts = get_timestamp()
# ts = get_timestamp()
ts = "自动备份"
kylin_backup_manager.ConnectToSignals()
create_note = "系统升级新建备份"
inc_note="系统升级增量备份"
@ -2051,24 +2052,42 @@ def check_changes_for_sanity(cache, desired_pkg=None):
return False
def sanity_problem(cache, desired_pkg):
def sanity_problem(cache, desired_pkg=None):
# type: (UnattendedUpgradesCache, apt.Package) -> str
# if cache._depcache.broken_count != 0:
# return ("there are broken packages in the cache")
# If there are no packages to be installed they were kept back
# if cache.install_count == 0:
# return ("no package is selected to be upgraded or installed")
NOW_UPDATE_CONFIG = '/usr/share/kylin-update-desktop-config/config/'
OLD_UPDATE_CONFIG = '/usr/share/kylin-update-desktop-config/data/'
read_path = NOW_UPDATE_CONFIG
if os.path.exists(NOW_UPDATE_CONFIG):
read_path = NOW_UPDATE_CONFIG
elif os.path.exists(OLD_UPDATE_CONFIG):
read_path = NOW_UPDATE_CONFIG
else:
pass
remove_white=[]
if os.path.exists(read_path):
with open(os.path.join(read_path,'kylin-update-desktop-system.json'),'r') as f:
try:
data = json.load(f)
remove_white=data['remove_white_list']
except Exception as exc:
logging.warning(exc)
logging.debug("remove white list:%s"%",".join(remove_white))
changes = cache.get_changes()
if desired_pkg and desired_pkg not in changes:
logging.warning("pkg %s to be marked for upgrade/install is not marked accordingly" % desired_pkg.name)
return False
# if desired_pkg and desired_pkg not in changes:
# logging.warning("pkg %s to be marked for upgrade/install is not marked accordingly" % desired_pkg.name)
# return False
pkgs_to_remove = []
for pkg in changes:
if pkg.marked_delete:
logging.warning("pkg %s is marked to be deleted" % pkg.name)
pkgs_to_remove.append(pkg.name)
# logging.warning("pkg %s is marked to be deleted" % pkg.name)
if pkg.name not in remove_white:
pkgs_to_remove.append(pkg.name)
'''
if os_release_info['PROJECT_CODENAME'] == 'V10SP1-edu' and os_release_info['SUB_PROJECT_CODENAME']=='mavis':
pass
@ -2109,9 +2128,10 @@ def sanity_problem(cache, desired_pkg):
"accordingly" % desired_pkg.name)
return False
'''
if len(pkgs_to_remove) > 0:
logging.debug("pkgs marked to delete:%s"%",".join(pkgs_to_remove))
return False
logging.debug("remove list not in whitelist:%s"%",".join(pkgs_to_remove))
if len(pkgs_to_remove)>0:
return False
return True
@ -2763,8 +2783,8 @@ def calculate_upgradable_pkgs(cache, # type: UnattendedUpgradesCache
cache.mark_upgrade_adjusted(pkg, from_user=not pkg.is_auto_installed)
else:
pass
if sanity_problem(cache,pkg):
pkgs_to_upgrade.append(pkg)
# if sanity_problem(cache,pkg):
# pkgs_to_upgrade.append(pkg)
except Exception as e:
logging.error("error checking pkg:%s"%e)
continue
@ -2798,10 +2818,10 @@ def calculate_upgradable_pkgs(cache, # type: UnattendedUpgradesCache
# logging.debug("Checking: %s (%s)" % (
# pkg.name, getattr(pkg.candidate, "origins", [])))
#pkgs_to_upgrade.append(pkg)
if cache.get_changes():
cache.clear()
# if cache.get_changes():
# cache.clear()
return pkgs_to_upgrade
return cache.get_changes()
def get_dpkg_log_content(logfile_dpkg, install_start_time):
@ -3385,7 +3405,8 @@ def run(options, # type: Options
# find out about the packages that are upgradable (in an allowed_origin)
pkgs_to_upgrade = calculate_upgradable_pkgs(cache, options,white_list_with_version)
if options.install_only or options.download_and_install:
if (len(pkgs_to_upgrade)<len(white_list_with_version)):
# if (len(pkgs_to_upgrade)<len(white_list_with_version)):
if not sanity_problem(cache):
logging.warning("some pkgs failed in sanity check")
return UnattendedUpgradesResult(False,"sanity check failed")
pkgs_to_upgrade.sort(key=lambda p: p.name)
@ -3397,12 +3418,12 @@ def run(options, # type: Options
# stop being nice
#os.nice(old_priority - os.nice(0))
#adjust candidate versions
logging.info("adjusting candidate from kylin update manager...")
adjust_candidate_with_version(cache,white_list_with_version)
# logging.info("adjusting candidate from kylin update manager...")
# adjust_candidate_with_version(cache,white_list_with_version)
#sanity check
# download what looks good
mark_pkgs_to_upgrade(cache, pkgs)
# mark_pkgs_to_upgrade(cache, pkgs)
if options.debug:
fetcher = apt_pkg.Acquire(apt.progress.text.AcquireProgress())
@ -3423,12 +3444,12 @@ def run(options, # type: Options
except SystemError as e:
logging.error(_("GetArchives() failed: %s"), e)
if get_abnormally_installed_pkg_count() == '0' and not is_dpkg_journal_dirty():
local_pkgs_to_install = []
for item in fetcher.items:
local_pkgs_to_install.append(item.destfile)
with open(OTA_PKGS_TO_INSTALL_LIST,'w+') as f:
f.write(" ".join(local_pkgs_to_install))
# if get_abnormally_installed_pkg_count() == '0' and not is_dpkg_journal_dirty():
# local_pkgs_to_install = []
# for item in fetcher.items:
# local_pkgs_to_install.append(item.destfile)
# with open(OTA_PKGS_TO_INSTALL_LIST,'w+') as f:
# f.write(" ".join(local_pkgs_to_install))
fetcher_statistics = AcquireStatistics(fetcher=fetcher)
fetcher_statistics.GetAquireStatisticsOfPkgs()
@ -3445,29 +3466,29 @@ def run(options, # type: Options
logging.error(_("lock release failed"))
logging.info("system updater need to run shutdown install quiting...")
return UnattendedUpgradesResult(False,_("system updater install override"))
if fetcher_statistics.local_pkg_amount == 0:
logging.warning("no local pkgs to install")
try:
apt_pkg.pkgsystem_unlock()
except SystemError:
logging.error(_("lock release failed"))
return UnattendedUpgradesResult(False,_("no local pkgs to install"))
elif fetcher_statistics.remote_pkg_amount >0:
logging.warning("there're pkgs to download")
try:
apt_pkg.pkgsystem_unlock()
except SystemError:
logging.error(_("lock release failed"))
return UnattendedUpgradesResult(False,_("there're pkgs to download"))
else:
# if fetcher_statistics.local_pkg_amount == 0:
# logging.warning("no local pkgs to install")
# try:
# apt_pkg.pkgsystem_unlock()
# except SystemError:
# logging.error(_("lock release failed"))
# return UnattendedUpgradesResult(False,_("no local pkgs to install"))
# elif fetcher_statistics.remote_pkg_amount >0:
# logging.warning("there're pkgs to download")
# try:
# apt_pkg.pkgsystem_unlock()
# except SystemError:
# logging.error(_("lock release failed"))
# return UnattendedUpgradesResult(False,_("there're pkgs to download"))
# else:
#only write the pkg list when dpkg journal is clean
# if not is_dpkg_journal_dirty():
# configfilemanager.WriteListToFile(pkgs,"OTA_PKGS_TO_INSTALL_LIST")
try:
res = fetcher.run()
logging.debug("fetch.run() result: %s", res)
except SystemError as e:
logging.error("fetch.run() result: %s", e)
try:
res = fetcher.run()
logging.debug("fetch.run() result: %s", res)
except SystemError as e:
logging.error("fetch.run() result: %s", e)
# if cache.get_changes():
# cache.clear()