kylin-system-updater/unattended-upgrades/kylin-unattended-upgrade

217 lines
9.3 KiB
Python

#!/usr/bin/python3
from optparse import OptionParser
import dbus
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
from threading import Event
from gettext import gettext as _
import gettext
import apt_pkg
#from apt.progress.base import AcquireProgress
from apt.progress.text import AcquireProgress
import apt
import os
import json
'''
class FetchProgress(AcquireProgress):
def __init__(self) -> None:
super().__init__()
def fetch(self, item: apt_pkg.AcquireItemDesc) -> None:
print("%s [%d%%]"%(item.description,self.current_bytes*100/self.total_bytes))
return super().fetch(item)
def fail(self, item: apt_pkg.AcquireItemDesc) -> None:
print("package fetch failed:%s"%item.description)
return super().fail(item)
def ims_hit(self, item: apt_pkg.AcquireItemDesc) -> None:
return super().ims_hit(item)
def media_change(self, media: str, drive: str) -> bool:
return super().media_change(media, drive)
def pulse(self, owner: apt_pkg.Acquire) -> bool:
return super().pulse(owner)
def start(self) -> None:
print("download start")
return super().start()
def stop(self) -> None:
print("download finished")
return super().stop()
'''
class SystemUpdater:
def __init__(self) -> None:
self.update_detect_status = False
self.update_detect_event = Event()
self.update_list = []
self.resolve_depend_status = False
self.resolve_depend_status_event = Event()
self.remove_pkgs = []
self.install_finish_status = False
self.install_finish_status_event = Event()
self.install_finish_group = []
DBusGMainLoop(set_as_default=True)
self.loop = GLib.MainLoop()
self.system_bus = dbus.SystemBus()
self.update_proxy = self.system_bus.get_object('com.kylin.systemupgrade','/com/kylin/systemupgrade',follow_name_owner_changes=True)
self.update_interface = dbus.Interface(self.update_proxy,dbus_interface='com.kylin.systemupgrade.interface')
self.update_proxy.connect_to_signal('UpdateDetectFinished',self.update_detect_finished_handler)
self.update_proxy.connect_to_signal('UpdateFixBrokenStatus',self.update_fix_broken_status)
self.update_proxy.connect_to_signal('UpdateDependResloveStatus',self.update_depend_resolve_status)
self.update_proxy.connect_to_signal('UpdateDloadAndInstStaChanged',self.update_download_install_status)
self.update_proxy.connect_to_signal('UpdateInstallFinished',self.update_install_finished)
def update_detect_finished_handler(self,success,updatelist,error_status,error_cause):
print(_("update detect finished:sucess:%s,updatelist:%s,error_status:%s,error_cause:%s")\
%(success,",".join(updatelist),error_status,error_cause))
self.update_detect_status = success
self.update_list = []
if success:
try:
for update_group in updatelist:
json_file_path = ("/var/lib/kylin-system-updater/json/%s.json"%(update_group))
if os.path.exists(json_file_path):
with open(json_file_path,'r') as f:
data = json.load(f)
for key in data['upgrade_list'].keys():
if key in ["total_download_size","total_install_size"]:
pass
else:
self.update_list.append(key)
for key in data['install_list'].keys():
if key in ["total_download_size","total_install_size"]:
pass
else:
self.update_list.append(key)
except Exception as e:
print(e)
self.update_detect_event.set()
self.QuitMainLoop()
def update_fix_broken_status(self,resolver_status,remove_status,remove_pkgs,pkg_raw_description,delete_desc,error_string,error_desc):
print(_("update fix broken status:resolver_status:%s,remove_status:%s,error_string:%s,error_desc:%s")%(resolver_status,remove_status,error_string,error_desc))
print(remove_pkgs,pkg_raw_description,delete_desc)
self.update_detect_status = False
self.update_list = []
self.update_detect_event.set()
self.QuitMainLoop()
def update_depend_resolve_status(self,resolver_status,remove_status,remove_pkgs,pkg_raw_description,delete_description,error_string,error_desc):
print(_("update depend resove status:%s,remove status:%s,remove pkgs:%s,pkg raw description:%s,delete_descrition:%s,error string:%s,error desc:%s")\
%(resolver_status,remove_status,",".join(remove_pkgs),",".join(pkg_raw_description),",".join(delete_description),error_string,error_desc))
self.resolve_depend_status = resolver_status
self.remove_pkgs = remove_pkgs
self.resolve_depend_status_event.set()
self.QuitMainLoop()
def update_download_install_status(self,group,progress,status,details):
print(_("%s update progress:%d,status:%s,details:%s")%(",".join(group),progress,status,details))
def update_install_finished(self,success,group,error_string,error_desc):
print(_("update install finisih success:%s,group:%s,error string:%s,error desc:%s")\
%(success,",".join(group),error_string,error_desc))
self.install_finish_status = success
self.install_finish_group=group
self.install_finish_status_event.set()
self.QuitMainLoop()
def download(self):
print(_("start download"))
self.UpdateDetect()
self.RunMainLoop()
print(_("update detect finish:%s,%s")%(self.update_detect_status,",".join(self.update_list)))
if self.update_detect_status and len(self.update_list)>0:
pass
# elif not self.update_detect_status and 'kylin-system-updater' in self.update_list:
# print(_("self update finished"))
else:
print(_("no pkgs to download"))
return False
cache = apt.Cache()
for pkg in self.update_list:
try:
package = cache[pkg]
if not package.installed:
package.mark_install()
else:
package.mark_upgrade()
except Exception as e:
print(e)
return False
list = apt_pkg.SourceList()
list.read_main_list()
recs = cache._records
pm = apt_pkg.PackageManager(cache._depcache)
fetcher = apt_pkg.Acquire(AcquireProgress())
try:
pm.get_archives(fetcher, list, recs)
except Exception as e:
print(e)
res = fetcher.run()
print("fetch.run() result: %d"%res)
if res == 0:
return True
else:
return False
def install(self):
print(_("start install"))
self.UpdateDetect()
self.RunMainLoop()
print(_("update detect finish:%s,%s")%(self.update_detect_status,",".join(self.update_list)))
if self.update_detect_status and len(self.update_list)>0:
pass
# elif not self.update_detect_status and 'kylin-system-updater' in self.update_list:
# print(_("self update finished"))
else:
return False
self.DistUpgradeAll(False)
self.RunMainLoop()
print(_("resolve dependency status:%s,%s")%(self.resolve_depend_status,",".join(self.remove_pkgs)))
if self.resolve_depend_status and len(self.remove_pkgs)==0:
pass
else:
return False
self.DistUpgradeAll(True)
self.RunMainLoop()
print(_("install finish status:%s,%s")%(self.install_finish_status,",".join(self.install_finish_group)))
if self.install_finish_status and len(self.install_finish_group)>0:
pass
else:
return False
return True
def UpdateDetect(self):
return self.update_interface.UpdateDetect()
def DistUpgradeAll(self,is_install):
return self.update_interface.DistUpgradeAll(is_install)
def RunMainLoop(self):
self.loop.run()
def QuitMainLoop(self):
self.loop.quit()
if __name__ == "__main__":
gettext.bindtextdomain("unattended-upgrades","/usr/share/locale")
gettext.textdomain("unattended-upgrades")
parser = OptionParser()
parser.add_option("", "--download-only",
action="store_true", dest="download_only",
default=False,help="only download without install")
parser.add_option("", "--install-only",
action="store_true", dest="install_only",
default=False,help="only install without download")
(options, args) = parser.parse_args()
print(options,args)
systemupdater = SystemUpdater()
if options.download_only:
systemupdater.download()
elif options.install_only:
systemupdater.install()
else:
pass