From 50b77f005674054faeb50b8e40e5468c7722bb46 Mon Sep 17 00:00:00 2001 From: wangsong Date: Thu, 16 Sep 2021 20:36:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E6=97=A0=E6=95=88=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 + .pybuild/cpython3_3.8/.pydistutils.cfg | 12 - .../build/UpdateManager/Core/AlertWatcher.py | 101 --- .../build/UpdateManager/Core/MyCache.py | 434 ----------- .../build/UpdateManager/Core/UpdateList.py | 676 ------------------ .../build/UpdateManager/Core/__init__.py | 0 .../build/UpdateManager/Core/filter.py | 333 --------- .../build/UpdateManager/Core/roam.py | 212 ------ .../build/UpdateManager/Core/utils.py | 557 --------------- .../build/UpdateManager/UpdateManager.py | 278 ------- .../build/UpdateManager/UpdateManagerDbus.py | 167 ----- .../UpdateManager/UpdateManagerVersion.py | 1 - .../build/UpdateManager/__init__.py | 0 .../backend/InstallBackendAptdaemon.py | 165 ----- .../build/UpdateManager/backend/__init__.py | 133 ---- .vscode/launch.json | 16 - kylin-system-updater | 3 +- 17 files changed, 5 insertions(+), 3086 deletions(-) delete mode 100644 .pybuild/cpython3_3.8/.pydistutils.cfg delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/Core/AlertWatcher.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/Core/MyCache.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/Core/UpdateList.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/Core/__init__.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/Core/filter.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/Core/roam.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/Core/utils.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/UpdateManager.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/UpdateManagerDbus.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/UpdateManagerVersion.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/__init__.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/backend/InstallBackendAptdaemon.py delete mode 100644 .pybuild/cpython3_3.8/build/UpdateManager/backend/__init__.py delete mode 100644 .vscode/launch.json diff --git a/.gitignore b/.gitignore index ca9f52d..c8a5046 100755 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,8 @@ *venv* __pycache__ +.vscode +.pybuild + # Misc .*cache diff --git a/.pybuild/cpython3_3.8/.pydistutils.cfg b/.pybuild/cpython3_3.8/.pydistutils.cfg deleted file mode 100644 index 4733393..0000000 --- a/.pybuild/cpython3_3.8/.pydistutils.cfg +++ /dev/null @@ -1,12 +0,0 @@ -[clean] -all=1 -[build] -build-lib=/home/kylin/new_kylin-system-updater/kylin-update-manager_dist/.pybuild/cpython3_3.8/build -[install] -force=1 -install-layout=deb -install-scripts=$base/bin -install-lib=/usr/lib/python3.8/dist-packages -prefix=/usr -[easy_install] -allow_hosts=None diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/Core/AlertWatcher.py b/.pybuild/cpython3_3.8/build/UpdateManager/Core/AlertWatcher.py deleted file mode 100644 index 83e8ba7..0000000 --- a/.pybuild/cpython3_3.8/build/UpdateManager/Core/AlertWatcher.py +++ /dev/null @@ -1,101 +0,0 @@ -# AlertWatcher.py -# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*- -# -# Copyright (c) 2010 Mohamed Amine IL Idrissi -# -# Author: Mohamed Amine IL Idrissi -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 2 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 -# USA - -from __future__ import absolute_import - -from gi.repository import GObject -import dbus -from dbus.mainloop.glib import DBusGMainLoop - - -class AlertWatcher(GObject.GObject): - """ a class that checks for alerts and reports them, like a battery - or network warning """ - - __gsignals__ = {"network-alert": (GObject.SignalFlags.RUN_FIRST, - None, - (GObject.TYPE_INT,)), - "battery-alert": (GObject.SignalFlags.RUN_FIRST, - None, - (GObject.TYPE_BOOLEAN,)), - "network-3g-alert": (GObject.SignalFlags.RUN_FIRST, - None, - (GObject.TYPE_BOOLEAN, - GObject.TYPE_BOOLEAN,)), - } - - def __init__(self): - GObject.GObject.__init__(self) - DBusGMainLoop(set_as_default=True) - self.bus = dbus.Bus(dbus.Bus.TYPE_SYSTEM) - # make it always connected if NM isn't available - self.network_state = 3 - - def check_alert_state(self): - try: - #network - obj = self.bus.get_object("org.freedesktop.NetworkManager", - "/org/freedesktop/NetworkManager") - obj.connect_to_signal( - "StateChanged", - self._on_network_state_changed, - dbus_interface="org.freedesktop.NetworkManager") - interface = dbus.Interface(obj, "org.freedesktop.DBus.Properties") - self.network_state = interface.Get( - "org.freedesktop.NetworkManager", "State") - self._network_alert(self.network_state) - - # power - # obj = self.bus.get_object('org.freedesktop.UPower', - # '/org/freedesktop/UPower') - # obj.connect_to_signal("Changed", self._power_changed, - # dbus_interface="org.freedesktop.UPower") - # self._power_changed() - # 3g - # self._update_3g_state() - except dbus.exceptions.DBusException: - pass - - def _on_network_state_changed(self, state): - self._network_alert(state) - # self._update_3g_state() - - # def _update_3g_state(self): - # from .roam import NetworkManagerHelper - # nm = NetworkManagerHelper() - # on_3g = nm.is_active_connection_gsm_or_cdma() - # is_roaming = nm.is_active_connection_gsm_or_cdma_roaming() - # self._network_3g_alert(on_3g, is_roaming) - - # def _network_3g_alert(self, on_3g, is_roaming): - # self.emit("network-3g-alert", on_3g, is_roaming) - - def _network_alert(self, state): - self.network_state = state - self.emit("network-alert", state) - - # def _power_changed(self): - # obj = self.bus.get_object("org.freedesktop.UPower", - # "/org/freedesktop/UPower") - # interface = dbus.Interface(obj, "org.freedesktop.DBus.Properties") - # on_battery = interface.Get("org.freedesktop.UPower", "OnBattery") - # self.emit("battery-alert", on_battery) diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/Core/MyCache.py b/.pybuild/cpython3_3.8/build/UpdateManager/Core/MyCache.py deleted file mode 100644 index e0a9922..0000000 --- a/.pybuild/cpython3_3.8/build/UpdateManager/Core/MyCache.py +++ /dev/null @@ -1,434 +0,0 @@ -# MyCache.py -# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*- -# -# Copyright (c) 2004-2008 Canonical -# -# Author: Michael Vogt -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 2 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 -# USA - -from __future__ import absolute_import, print_function - -import warnings -warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning) -import apt -import apt_pkg -import logging -import os -from urllib.error import HTTPError -from urllib.request import urlopen -from urllib.parse import urlsplit -from http.client import BadStatusLine -import socket -import subprocess -import re -import DistUpgrade.DistUpgradeCache -from gettext import gettext as _ -try: - from launchpadlib.launchpad import Launchpad -except ImportError: - Launchpad = None - -CHANGELOGS_POOL = "https://changelogs.ubuntu.com/changelogs/pool/" -CHANGELOGS_URI = CHANGELOGS_POOL + "%s/%s/%s/%s_%s/%s" - - -class HttpsChangelogsUnsupportedError(Exception): - """ https changelogs with credentials are unsupported because of the - lack of certitifcation validation in urllib2 which allows MITM - attacks to steal the credentials - """ - pass - - -class MyCache(DistUpgrade.DistUpgradeCache.MyCache): - - CHANGELOG_ORIGIN = "Ubuntu" - - def __init__(self, progress, rootdir=None): - apt.Cache.__init__(self, progress, rootdir) - # save for later - self.rootdir = rootdir - # raise if we have packages in reqreinst state - # and let the caller deal with that (runs partial upgrade) - assert len(self.req_reinstall_pkgs) == 0 - # check if the dpkg journal is ok (we need to do that here - # too because libapt will only do it when it tries to lock - # the packaging system) - assert(not self._dpkgJournalDirty()) - # init the regular cache - self._initDepCache() - self.all_changes = {} - self.all_news = {} - # on broken packages, try to fix via saveDistUpgrade() - if self._depcache.broken_count > 0: - self.saveDistUpgrade() - assert (self._depcache.broken_count == 0 - and self._depcache.del_count == 0) - self.launchpad = None - - def _dpkgJournalDirty(self): - """ - test if the dpkg journal is dirty - (similar to debSystem::CheckUpdates) - """ - d = os.path.dirname( - apt_pkg.config.find_file("Dir::State::status")) + "/updates" - for f in os.listdir(d): - if re.match("[0-9]+", f): - return True - return False - - def _initDepCache(self): - self._depcache.read_pinfile() - self._depcache.init() - - def clear(self): - self._initDepCache() - - @property - def required_download(self): - """ get the size of the packages that are required to download """ - pm = apt_pkg.PackageManager(self._depcache) - fetcher = apt_pkg.Acquire() - pm.get_archives(fetcher, self._list, self._records) - return fetcher.fetch_needed - - @property - def install_count(self): - return self._depcache.inst_count - - def keep_count(self): - return self._depcache.keep_count - - @property - def del_count(self): - return self._depcache.del_count - - def _check_dependencies(self, target, deps): - """Return True if any of the dependencies in deps match target.""" - # TODO: handle virtual packages - for dep_or in deps: - if not dep_or: - continue - match = True - for base_dep in dep_or: - if (base_dep.name != target.package.shortname - or not apt_pkg.check_dep( - target.version, base_dep.relation, base_dep.version)): - match = False - if match: - return True - return False - - def find_removal_justification(self, pkg): - target = pkg.installed - if not target: - return False - for cpkg in self: - candidate = cpkg.candidate - if candidate is not None: - if (self._check_dependencies( - target, candidate.get_dependencies("Conflicts")) - and self._check_dependencies( - target, candidate.get_dependencies("Replaces"))): - logging.info( - "%s Conflicts/Replaces %s; allowing removal" % ( - candidate.package.shortname, pkg.shortname)) - return True - return False - - def saveDistUpgrade(self): - """ this functions mimics a upgrade but will never remove anything """ - #upgrade(True) 为True时使用dist-upgrade进行升级 - self._depcache.upgrade(True) - wouldDelete = self._depcache.del_count - wouldDelete = 0 - if wouldDelete > 0: - deleted_pkgs = [pkg for pkg in self if pkg.marked_delete] - assert wouldDelete == len(deleted_pkgs) - for pkg in deleted_pkgs: - if self.find_removal_justification(pkg): - wouldDelete -= 1 - if wouldDelete > 0: - self.clear() - assert (self._depcache.broken_count == 0 - and self._depcache.del_count == 0) - # else: - # assert self._depcache.broken_count == 0 - self._depcache.upgrade() - return wouldDelete - - def _strip_epoch(self, verstr): - " strip of the epoch " - vers_no_epoch = verstr.split(":") - if len(vers_no_epoch) > 1: - verstr = "".join(vers_no_epoch[1:]) - return verstr - - def _get_changelog_or_news(self, name, fname, strict_versioning=False, - changelogs_uri=None): - " helper that fetches the file in question " - # don't touch the gui in this function, it needs to be thread-safe - pkg = self[name] - - # get the src package name - srcpkg = pkg.candidate.source_name - - # assume "main" section - src_section = "main" - # use the section of the candidate as a starting point - section = pkg._pcache._depcache.get_candidate_ver(pkg._pkg).section - - # get the source version - srcver_epoch = pkg.candidate.source_version - srcver = self._strip_epoch(srcver_epoch) - - split_section = section.split("/") - if len(split_section) > 1: - src_section = split_section[0] - - # lib is handled special - prefix = srcpkg[0] - if srcpkg.startswith("lib"): - prefix = "lib" + srcpkg[3] - - # the changelogs_uri argument overrides the default changelogs_uri, - # this is useful for e.g. PPAs where we construct the changelogs - # path differently - if changelogs_uri: - uri = changelogs_uri - else: - uri = CHANGELOGS_URI % (src_section, prefix, srcpkg, srcpkg, - srcver, fname) - - # https uris are not supported when they contain a username/password - # because the urllib2 https implementation will not check certificates - # and so its possible to do a man-in-the-middle attack to steal the - # credentials - res = urlsplit(uri) - if res.scheme == "https" and res.username: - raise HttpsChangelogsUnsupportedError( - "https locations with username/password are not" - "supported to fetch changelogs") - - # print("Trying: %s " % uri) - changelog = urlopen(uri) - #print(changelog.read()) - # do only get the lines that are new - alllines = "" - regexp = "^%s \\((.*)\\)(.*)$" % (re.escape(srcpkg)) - - while True: - line = changelog.readline().decode("UTF-8", "replace") - if line == "": - break - match = re.match(regexp, line) - if match: - # strip epoch from installed version - # and from changelog too - installed = getattr(pkg.installed, "version", None) - if installed and ":" in installed: - installed = installed.split(":", 1)[1] - changelogver = match.group(1) - if changelogver and ":" in changelogver: - changelogver = changelogver.split(":", 1)[1] - # we test for "==" here for changelogs - # to ensure that the version - # is actually really in the changelog - if not - # just display it all, this catches cases like: - # gcc-defaults with "binver=4.3.1" and srcver=1.76 - # - # for NEWS.Debian we do require the changelogver > installed - if strict_versioning: - if (installed - and apt_pkg.version_compare(changelogver, - installed) < 0): - break - else: - if (installed - and apt_pkg.version_compare(changelogver, - installed) == 0): - break - alllines = alllines + line - return alllines - - def _extract_ppa_changelog_uri(self, name): - """Return the changelog URI from the Launchpad API - - Return None in case of an error. - """ - if not Launchpad: - logging.warning("Launchpadlib not available, cannot retrieve PPA " - "changelog") - return None - - cdt = self[name].candidate - for uri in cdt.uris: - if urlsplit(uri).hostname != 'ppa.launchpad.net': - continue - match = re.search('http.*/(.*)/(.*)/ubuntu/.*', uri) - if match is not None: - user, ppa = match.group(1), match.group(2) - break - else: - logging.error("Unable to find a valid PPA candidate URL.") - return - - # Login on launchpad if we are not already - if self.launchpad is None: - self.launchpad = Launchpad.login_anonymously('update-manager', - 'production', - version='devel') - - archive = self.launchpad.archives.getByReference( - reference='~%s/ubuntu/%s' % (user, ppa) - ) - if archive is None: - logging.error("Unable to retrieve the archive from the Launchpad " - "API.") - return - - spphs = archive.getPublishedSources(source_name=cdt.source_name, - exact_match=True, - version=cdt.source_version) - if not spphs: - logging.error("No published sources were retrieved from the " - "Launchpad API.") - return - - return spphs[0].changelogUrl() - - def _guess_third_party_changelogs_uri_by_source(self, name): - pkg = self[name] - deb_uri = pkg.candidate.uri - if deb_uri is None: - return None - srcrec = pkg.candidate.record.get("Source") - if not srcrec: - return None - # srcpkg can be "apt" or "gcc-default (1.0)" - srcpkg = srcrec.split("(")[0].strip() - if "(" in srcrec: - srcver = srcrec.split("(")[1].rstrip(")") - else: - srcver = pkg.candidate.source_version - base_uri = deb_uri.rpartition("/")[0] - return base_uri + "/%s_%s.changelog" % (srcpkg, srcver) - - def _guess_third_party_changelogs_uri_by_binary(self, name): - """ guess changelogs uri based on ArchiveURI by replacing .deb - with .changelog - """ - # there is always a pkg and a pkg.candidate, no need to add - # check here - pkg = self[name] - deb_uri = pkg.candidate.uri - if deb_uri: - return "%s.changelog" % deb_uri.rsplit(".", 1)[0] - return None - - def get_news_and_changelog(self, name, lock): - self.get_news(name) - self.get_changelog(name) - try: - lock.release() - except Exception: - pass - - def get_news(self, name): - " get the NEWS.Debian file from the changelogs location " - try: - news = self._get_changelog_or_news(name, "NEWS.Debian", True) - except Exception: - return - if news: - self.all_news[name] = news - - def _fetch_changelog_for_third_party_package(self, name, origins): - # Special case for PPAs - changelogs_uri_ppa = None - for origin in origins: - if origin.origin.startswith('LP-PPA-'): - try: - changelogs_uri_ppa = self._extract_ppa_changelog_uri(name) - break - except Exception: - logging.exception("Unable to connect to the Launchpad " - "API.") - # Try non official changelog location - changelogs_uri_binary = \ - self._guess_third_party_changelogs_uri_by_binary(name) - changelogs_uri_source = \ - self._guess_third_party_changelogs_uri_by_source(name) - error_message = "" - for changelogs_uri in [changelogs_uri_ppa, - changelogs_uri_binary, - changelogs_uri_source]: - if changelogs_uri: - try: - changelog = self._get_changelog_or_news( - name, "changelog", False, changelogs_uri) - self.all_changes[name] += changelog - except (HTTPError, HttpsChangelogsUnsupportedError): - # no changelogs_uri or 404 - error_message = _( - "This update does not come from a " - "source that supports changelogs.") - except (IOError, BadStatusLine, socket.error): - # network errors and others - logging.exception("error on changelog fetching") - error_message = _( - "Failed to download the list of changes. \n" - "Please check your Internet connection.") - self.all_changes[name] += error_message - - def get_changelog(self, name): - " get the changelog file from the changelog location " - origins = self[name].candidate.origins - self.all_changes[name] = _("Changes for %s versions:\n" - "Installed version: %s\n" - "Available version: %s\n\n") % \ - (name, getattr(self[name].installed, "version", None), - self[name].candidate.version) - if self.CHANGELOG_ORIGIN not in [o.origin for o in origins]: - self._fetch_changelog_for_third_party_package(name, origins) - return - # fixup epoch handling version - srcpkg = self[name].candidate.source_name - srcver_epoch = self[name].candidate.source_version.replace(':', '%3A') - try: - changelog = self._get_changelog_or_news(name, "changelog") - if len(changelog) == 0: - changelog = _("The changelog does not contain any relevant " - "changes.\n\n" - "Please use http://launchpad.net/ubuntu/+source/" - "%s/%s/+changelog\n" - "until the changes become available or try " - "again later.") % (srcpkg, srcver_epoch) - except HTTPError: - changelog = _("The list of changes is not available yet.\n\n" - "Please use http://launchpad.net/ubuntu/+source/" - "%s/%s/+changelog\n" - "until the changes become available or try again " - "later.") % (srcpkg, srcver_epoch) - except (IOError, BadStatusLine, socket.error) as e: - print("caught exception: ", e) - changelog = _("Failed to download the list " - "of changes. \nPlease " - "check your Internet " - "connection.") - self.all_changes[name] += changelog diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/Core/UpdateList.py b/.pybuild/cpython3_3.8/build/UpdateManager/Core/UpdateList.py deleted file mode 100644 index 99fd282..0000000 --- a/.pybuild/cpython3_3.8/build/UpdateManager/Core/UpdateList.py +++ /dev/null @@ -1,676 +0,0 @@ -# UpdateList.py -# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*- -# -# Copyright (c) 2004-2013 Canonical -# -# Author: Michael Vogt -# Dylan McCall -# Michael Terry -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 2 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 -# USA - -from __future__ import print_function - -import warnings -warnings.filterwarnings("ignore", "Accessed deprecated property", - DeprecationWarning) - -from gettext import gettext as _, install -import apt -import logging -import itertools -import os -import random -import glob -import json -from gi.repository import Gio - -import shutil -from .utils import humanize_size -from UpdateManager.Core import utils -from UpdateManager.Core import filter - -class UpdateItem(): - def __init__(self, pkg, name, icon, to_remove): - self.icon = icon - self.name = name - self.pkg = pkg - self.to_remove = to_remove - - def is_selected(self): - if not self.to_remove: - return self.pkg.marked_install or self.pkg.marked_upgrade - else: - return self.pkg.marked_delete - - -class UpdateGroup(UpdateItem): - _depcache = {} - - def __init__(self, pkg, name, icon, to_remove): - UpdateItem.__init__(self, pkg, name, icon, to_remove) - self._items = set() - self._deps = set() - self.core_item = None - if pkg is not None: - self.core_item = UpdateItem(pkg, name, icon, to_remove) - self._items.add(self.core_item) - - @property - def items(self): - all_items = [] - all_items.extend(self._items) - return sorted(all_items, key=lambda a: a.name.lower()) - - def add(self, pkg, cache=None, eventloop_callback=None, to_remove=False): - name = utils.get_package_label(pkg) - icon = Gio.ThemedIcon.new("package") - self._items.add(UpdateItem(pkg, name, icon, to_remove)) - # If the pkg is in self._deps, stop here. We have already calculated - # the recursive dependencies for this package, no need to do it again. - if cache and pkg.name in cache and pkg.name not in self._deps: - if not self._deps: - # Initial deps haven't been calculated. As we're checking - # whether _deps is empty in is_dependency, we must init now or - # it won't be done at all. - self._init_deps(cache, eventloop_callback) - self._add_deps(pkg, cache, eventloop_callback) - - def contains(self, item): - return item in self._items - - def _init_deps(self, cache, eventloop_callback): - for item in self._items: - if item.pkg and item.pkg.name not in self._deps: - self._add_deps(item.pkg, cache, eventloop_callback) - - def _add_deps(self, pkg, cache, eventloop_callback): - """Adds pkg and dependencies of pkg to the dependency list.""" - if pkg is None or pkg.candidate is None or pkg.name in self._deps: - # This shouldn't really happen. If we land here often, it's a sign - # that something has gone wrong. Unless all pkgs are None it's not - # a critical issue - a hit to the performance at most. - reason = ((not pkg or not pkg.candidate) - and "Package was None or didn't have a candidate." - or "%s already in _deps." % pkg.name) - logging.debug("Useless call to _add_deps. %s" % reason) - return - if len(self._deps) % 200 == 0 and callable(eventloop_callback): - # Don't spin the loop every time _add_deps is called. - eventloop_callback() - - self._deps.add(pkg.name) - - if pkg.name in self._depcache: - for dep in self._depcache[pkg.name]: - if dep not in self._deps and dep in cache: - self._add_deps(cache[dep], cache, eventloop_callback) - else: - candidate = pkg.candidate - dependencies = candidate.get_dependencies('Depends', 'Recommends') - for dependency_pkg in itertools.chain.from_iterable(dependencies): - name = dependency_pkg.name - if name not in self._deps and name in cache: - self._depcache.setdefault(pkg.name, []).append(name) - self._add_deps(cache[name], cache, eventloop_callback) - - def is_dependency(self, maybe_dep, cache=None, eventloop_callback=None): - if not self._deps and cache: - self._init_deps(cache, eventloop_callback) - - return maybe_dep.name in self._deps - - def packages_are_selected(self): - for item in self.items: - if item.is_selected(): - return True - return False - - def selection_is_inconsistent(self): - pkgs_installing = [item for item in self.items if item.is_selected()] - return (len(pkgs_installing) > 0 - and len(pkgs_installing) < len(self.items)) - - def get_total_size(self): - if self.to_remove: - return 0 - size = 0 - for item in self.items: - size += getattr(item.pkg.candidate, "size", 0) - return size - - -class UpdateApplicationGroup(UpdateGroup): - def __init__(self, pkg, application, to_remove): - name = application.get_display_name() - icon = application.get_icon() - super(UpdateApplicationGroup, self).__init__(pkg, name, icon, - to_remove) - - -class UpdatePackageGroup(UpdateGroup): - def __init__(self, pkg, to_remove): - name = utils.get_package_label(pkg) - icon = Gio.ThemedIcon.new("package") - super(UpdatePackageGroup, self).__init__(pkg, name, icon, to_remove) - - -class UpdateSystemGroup(UpdateGroup): - def __init__(self, cache, to_remove): - # Translators: the %s is a distro name, like 'Ubuntu' and 'base' as in - # the core components and packages. - name = _("%s base") % utils.get_ubuntu_flavor_name(cache=cache) - icon = Gio.ThemedIcon.new("distributor-logo") - super(UpdateSystemGroup, self).__init__(None, name, icon, to_remove) - - -class UpdateOrigin(): - def __init__(self, desc, importance): - self.packages = [] - self.importance = importance - self.description = desc - - -class UpdateList(): - """ - class that contains the list of available updates in - self.pkgs[origin] where origin is the user readable string - """ - - # the key in the debian/control file used to add the phased - # updates percentage - PHASED_UPDATES_KEY = "Phased-Update-Percentage" - - # the file that contains the uniq machine id - UNIQ_MACHINE_ID_FILE = "/etc/machine-id" - # use the dbus one as a fallback - UNIQ_MACHINE_ID_FILE_FALLBACK = "/var/lib/dbus/machine-id" - - APP_INSTALL_PATTERN = "/usr/share/app-install/desktop/%s:*.desktop" - - # the configuration key to turn phased-updates always on - ALWAYS_INCLUDE_PHASED_UPDATES = ( - "Update-Manager::Always-Include-Phased-Updates") - # ... or always off - NEVER_INCLUDE_PHASED_UPDATES = ( - "Update-Manager::Never-Include-Phased-Updates") - - def __init__(self, parent, dist=None): - self.dist = (dist if dist else utils.get_dist()) - self.distUpgradeWouldDelete = 0 - self.update_groups = [] - self.random = random.Random() - - #FIXME: 最好将这个常量通过配置文件读 - self.GROUPS_JSON_PKG = 'kylin-update-desktop-config' - - self.INPUT_CONFIG_PATH = '/usr/share/kylin-update-desktop-config/data' - self.OUTPUT_CONFIG_PATH = os.getenv('HOME') + '/.config' +'/update_manager_config' - - self.IMPORTANT_LIST_PATH="/var/lib/kylin-software-properties/template/important.list" - - # important推送列表 - self.important_list = [] - - #所有的组升级安装列表 - self.local_upgrade_list = {} - # a stable machine uniq id - try: - with open(self.UNIQ_MACHINE_ID_FILE) as f: - self.machine_uniq_id = f.read() - except FileNotFoundError: - with open(self.UNIQ_MACHINE_ID_FILE_FALLBACK) as f: - self.machine_uniq_id = f.read() - - if 'XDG_DATA_DIRS' in os.environ and os.environ['XDG_DATA_DIRS']: - data_dirs = os.environ['XDG_DATA_DIRS'] - else: - data_dirs = '/usr/local/share/:/usr/share/' - - #FIX 此处需要修复 application_dirs 包含正确的desktop文件的目录 - self.application_dirs = [os.path.join(base, 'applications') - for base in data_dirs.split(':')] - - if 'XDG_CURRENT_DESKTOP' in os.environ: - self.current_desktop = os.environ.get('XDG_CURRENT_DESKTOP') - else: - self.current_desktop = '' - self.desktop_cache = {} - - def _file_is_application(self, file_path): - # WARNING: This is called often if there's a lot of updates. A poor - # performing call here has a huge impact on the overall performance! - #通过判断包的配置文件是否存在.desktop - if not file_path.endswith(".desktop"): - # First the obvious case: If the path doesn't end in a .desktop - # extension, this isn't a desktop file. - return False - - #通过判断.desktop 文件是否在/usr/share/applications 里面 表示属于应用 - file_path = os.path.abspath(file_path) - for app_dir in self.application_dirs: - if file_path.startswith(app_dir): - return True - return False - - def _rate_application_for_package(self, application, pkg): - score = 0 - desktop_file = os.path.basename(application.get_filename()) - application_id = os.path.splitext(desktop_file)[0] - - if application.should_show(): - score += 1 - - if application_id == pkg.name: - score += 5 - - return score - - def _get_application_for_package(self, pkg): - desktop_files = [] - rated_applications = [] - - #拿到应用的desktop文件判断为一个应用 - for installed_file in pkg.installed_files: - if self._file_is_application(installed_file): - desktop_files.append(installed_file) - #此部分强制进行添加应用 - if pkg.name in self.desktop_cache: - desktop_files += self.desktop_cache[pkg.name] - - for desktop_file in desktop_files: - try: - application = Gio.DesktopAppInfo.new_from_filename( - desktop_file) - application.set_desktop_env(self.current_desktop) - except Exception as e: - logging.warning("Error loading .desktop file %s: %s" % - (desktop_file, e)) - continue - score = self._rate_application_for_package(application, pkg) - if score > 0: - rated_applications.append((score, application)) - - rated_applications.sort(key=lambda app: app[0], reverse=True) - if len(rated_applications) > 0: - return rated_applications[0][1] - else: - return None - - def _populate_desktop_cache(self, pkg_names): - if not pkg_names: - # No updates; This shouldn't have happened. - logging.warning("_populate_desktop_cache called with empty list " - "of packages.") - return - elif len(pkg_names) == 1: - # One update; Let glob do the matching. - pattern = self.APP_INSTALL_PATTERN % pkg_names[0] - else: - # More than one update available. Glob all desktop files and store - # those that match an upgradeable package. - pattern = self.APP_INSTALL_PATTERN % "*" - - for desktop_file in glob.iglob(pattern): - try: - pkg = desktop_file.split('/')[-1].split(":")[0] - except IndexError: - # app-install-data desktop file had an unexpected naming - # convention. As we can't extract the package name from - # the path, just ignore it. - logging.error("Could not extract package name from '%s'. " - "File ignored." % desktop_file) - continue - - if pkg in pkg_names: - self.desktop_cache.setdefault(pkg, []).append(desktop_file) - logging.debug("App candidate for %s: %s" % - (pkg, desktop_file)) - - def _is_security_update(self, pkg): - """ This will test if the pkg is a security update. - This includes if there is a newer version in -updates, but also - an older update available in -security. For example, if - installed pkg A v1.0 is available in both -updates (as v1.2) and - -security (v1.1). we want to display it as a security update. - - :return: True if the update comes from the security pocket - """ - if not self.dist: - return False - inst_ver = pkg._pkg.current_ver - for ver in pkg._pkg.version_list: - # discard is < than installed ver - if (inst_ver - and apt.apt_pkg.version_compare(ver.ver_str, - inst_ver.ver_str) <= 0): - continue - # check if we have a match - for (verFileIter, index) in ver.file_list: - if verFileIter.archive == "%s-security" % self.dist and \ - verFileIter.origin == "Ubuntu": - indexfile = pkg._pcache._list.find_index(verFileIter) - if indexfile: # and indexfile.IsTrusted: - return True - return False - - def _is_ignored_phased_update(self, pkg): - """ This will test if the pkg is a phased update and if - it needs to get installed or ignored. - - :return: True if the updates should be ignored - """ - # allow the admin to override this - if apt.apt_pkg.config.find_b( - self.ALWAYS_INCLUDE_PHASED_UPDATES, False): - return False - - if self.PHASED_UPDATES_KEY in pkg.candidate.record: - if apt.apt_pkg.config.find_b( - self.NEVER_INCLUDE_PHASED_UPDATES, False): - logging.info("holding back phased update per configuration") - return True - - # its important that we always get the same result on - # multiple runs of the update-manager, so we need to - # feed a seed that is a combination of the pkg/ver/machine - self.random.seed("%s-%s-%s" % ( - pkg.candidate.source_name, pkg.candidate.version, - self.machine_uniq_id)) - threshold = pkg.candidate.record[self.PHASED_UPDATES_KEY] - percentage = self.random.randint(0, 100) - if percentage > int(threshold): - logging.info("holding back phased update %s (%s < %s)" % ( - pkg.name, threshold, percentage)) - return True - return False - - def _get_linux_packages(self): - "Return all binary packages made by the linux-meta source package" - # Hard code this rather than generate from source info in cache because - # that might only be available if we have deb-src lines. I think we - # could also generate it by iterating over all the binary package info - # we have, but that is costly. These don't change often. - return ['linux', - 'linux-cloud-tools-generic', - 'linux-cloud-tools-lowlatency', - 'linux-cloud-tools-virtual', - 'linux-crashdump', - 'linux-generic', - 'linux-generic-lpae', - 'linux-headers-generic', - 'linux-headers-generic-lpae', - 'linux-headers-lowlatency', - 'linux-headers-lowlatency-lpae', - 'linux-headers-server', - 'linux-headers-virtual', - 'linux-image', - 'linux-image-extra-virtual', - 'linux-image-generic', - 'linux-image-generic-lpae', - 'linux-image-lowlatency', - 'linux-image-virtual', - 'linux-lowlatency', - 'linux-signed-generic', - 'linux-signed-image-generic', - 'linux-signed-image-lowlatency', - 'linux-signed-lowlatency', - 'linux-source', - 'linux-tools-generic', - 'linux-tools-generic-lpae', - 'linux-tools-lowlatency', - 'linux-tools-virtual', - 'linux-virtual'] - - def _make_groups(self, cache, pkgs, eventloop_callback, to_remove=False): - if not pkgs: - return [] - ungrouped_pkgs = [] - app_groups = [] - pkg_groups = [] - - for pkg in pkgs: - #查看这个包属于那个应用的 并获取这个应用的一些属性 例如 应用名称和图标的等等 - app = self._get_application_for_package(pkg) - if app is not None: - #包含 应用名称和图标 - app_group = UpdateApplicationGroup(pkg, app, to_remove) - app_groups.append(app_group) - else: - ungrouped_pkgs.append(pkg) - - # Stick together applications and their immediate dependencies - #将应用和它们的依赖关系 结合在一起 - for pkg in list(ungrouped_pkgs): - dep_groups = [] - for group in app_groups: - if group.is_dependency(pkg, cache, eventloop_callback): - dep_groups.append(group) - if len(dep_groups) > 1: - break - #此来决定是这一个包所独有的依赖 如果一个依赖存在2个以上的就不 - if len(dep_groups) == 1: - dep_groups[0].add(pkg, cache, eventloop_callback, to_remove) - ungrouped_pkgs.remove(pkg) - - system_group = None - if ungrouped_pkgs: - # Separate out system base packages. If we have already found an - # application for all updates, don't bother. - meta_group = UpdateGroup(None, None, None, to_remove) - flavor_package = utils.get_ubuntu_flavor_package(cache=cache) - meta_pkgs = [flavor_package, "ubuntu-standard", "ubuntu-minimal"] - meta_pkgs.extend(self._get_linux_packages()) - for pkg in meta_pkgs: - if pkg in cache: - meta_group.add(cache[pkg]) - for pkg in ungrouped_pkgs: - if meta_group.is_dependency(pkg, cache, eventloop_callback): - if system_group is None: - system_group = UpdateSystemGroup(cache, to_remove) - system_group.add(pkg) - else: - pkg_groups.append(UpdatePackageGroup(pkg, to_remove)) - - app_groups.sort(key=lambda a: a.name.lower()) - pkg_groups.sort(key=lambda a: a.name.lower()) - if system_group: - pkg_groups.append(system_group) - - return app_groups + pkg_groups - - def _read_important_list(self): - header = '' - desc = '' - # 获取importantlist 本次更新推送 - try: - with open(self.IMPORTANT_LIST_PATH, 'r') as f: - data = f.read() - self.important_list = data.split() - logging.info("importantList: %s",self.important_list) - return True - except Exception as e: - header = _("read important list failed") - desc = ("%s",str(e)) - logging.info(header + desc) - return False,header,desc - - def _make_pkg_info_json(self,cache,pkgs_list): - size = 0 - total_size = 0 - pkgs_info_json = {} - for pkg_name in pkgs_list: - try: - pkg = cache[pkg_name] - - #获取下载大小 - size = getattr(pkg.candidate, "size", 0) - total_size = total_size + size - pkgs_info_json.update({pkg_name:{"size":size}}) - except Exception as e: - logging.info("this package(%s) not in list and error mes:%s",pkg_name,e) - pass - pkgs_info_json.update({"total_size":humanize_size(total_size)}) - return pkgs_info_json - - #检查包是否在cache中 返回新得列表 - def _check_pkg_in_cache(self,cache,pkgs_list): - new_pkgs_list = [] - for pkg_name in pkgs_list: - #检查是否在cache 以及 是否安装检查 - if pkg_name in cache and not cache[pkg_name].is_installed: - new_pkgs_list.append(pkg_name) - else: - pass - # logging.info("this package(%s) not in list ",pkg_name) - return new_pkgs_list - - def _make_output_json(self,data,upgrade_pkgs_json,install_pkgs_json,hold_pkgs_list,remove_pkgs_list): - groups_base_info = {} - output_json = {} - - #FIXME: 确定输出文件的文件名 以及放置位置 - output_config_name = self.OUTPUT_CONFIG_PATH + '/' + data['package'] + '_output.json' - - #4、添加一些基础信息 - groups_base_info.update({"package":data['package']}) - groups_base_info.update({"version":data['version']}) - groups_base_info.update({"name":data['name']}) - groups_base_info.update({"description":data['description']}) - groups_base_info.update({"icon":data['icon']}) - - #5、添加升级的内容 - output_json.update(groups_base_info) - output_json.update({"upgrade_list":upgrade_pkgs_json}) - output_json.update({"install_list":install_pkgs_json}) - output_json.update({"hold_list":hold_pkgs_list}) - output_json.update({"remove_list":remove_pkgs_list}) - - #6 产生JSON文件 - with open(output_config_name, 'w', encoding='utf-8') as f: - json.dump(output_json, f, ensure_ascii=False, indent=4) - logging.info("Generate Jsonfile(%s) to complete... ",output_config_name) - - def _make_groups_upgrade(self,cache,pkgs_install = [], pkgs_upgrade = [], pkgs_remove = []): - try: - files = os.listdir(self.INPUT_CONFIG_PATH) #获得文件夹中所有文件的名称列表 - upgrade_groups_list = [] - - for file in files: - #判是否是目录以及是否以JSON结尾 - if file.endswith('.json'): - with open(self.INPUT_CONFIG_PATH+"/"+file,'r') as f: - data = json.load(f) - group_name = data['package'] - - #过滤没有推送的配置文件 - if not group_name in self.important_list: - continue - - upgrade_pkgs_list = data['upgrade_list'] - hold_pkgs_list = data['hold_list'] - - #这个安装升级列表中包含当前系统的cache中没有的包 需要过滤 - remove_pkgs_list = data['remove_list'] - - #检查包是否在cache中 以及是否已经安装 - new_install_pkgs_list = self._check_pkg_in_cache(cache,data['install_list']) - - #进行交集 升级列表 - upgrade_intersection_pkgs = list(set(pkgs_upgrade) & set(upgrade_pkgs_list)) - - #判断当前是否可升级或者新装的包 - if len(new_install_pkgs_list) == 0 and len(upgrade_intersection_pkgs) == 0: - continue - - #在总升级列表中移除这些包 - for pkg in upgrade_intersection_pkgs: - pkgs_upgrade.remove(pkg) - - #3、生成升级的包列表JSON - upgrade_pkgs_json = self._make_pkg_info_json(cache,upgrade_intersection_pkgs) - - #2、生成安装的软件列表 - install_pkgs_json = self._make_pkg_info_json(cache,new_install_pkgs_list) - - #输出JSON配置文件 - self._make_output_json(data,upgrade_pkgs_json,install_pkgs_json,hold_pkgs_list,remove_pkgs_list) - - upgrade_groups_list.append(group_name) - - #添加到字典维护的升级列表 - self.local_upgrade_list.update({group_name:{"pkgs_upgrade":upgrade_intersection_pkgs,"pkgs_install":new_install_pkgs_list}}) - logging.info("group(%s) upgrade:%d install:%d",group_name,len(upgrade_intersection_pkgs),len(new_install_pkgs_list)) - else: - pass - #添加所有可升级的组列表 - self.local_upgrade_list.update({"upgrade_groups_list":upgrade_groups_list}) - except Exception as e: - logging.warning("Generate Jsonfile to failed... ") - logging.error(e) - - - def update(self, cache, eventloop_callback=None): - pkgs_install = [] - pkgs_upgrade = [] - pkgs_remove = [] - - header = '' - desc = '' - - self._read_important_list() - - #important_list 为空时此次不需要升级 - if not self.important_list: - #不需要升级 全部的软件都是新的 - header = _("No software updates are available.") - desc = _('important_list is Empty') - return True,header,desc - - #查找所有可升级的包 - for pkg in cache: - try: - if pkg.marked_install: - pkgs_install.append(pkg) - elif pkg.marked_upgrade: - pkgs_upgrade.append(pkg) - elif pkg.marked_delete: - pkgs_remove.append(pkg) - except Exception as e: - logging.error(e) - - logging.info("System all upgradeable packages:new_install:%d upgrade:%d remove:%d",len(pkgs_install),len(pkgs_upgrade),len(pkgs_remove)) - - #源过滤 - # fu = filter.UpdateListFilterCache() - # allowed_origin_upgrade_pkgs = fu.check_in_allowed_origin(pkgs_upgrade) - - self._make_groups_upgrade(cache,pkgs_upgrade = ([pkg.name for pkg in pkgs_upgrade])) - - #是否存在可升级的组 - if self.local_upgrade_list.get('upgrade_groups_list',[]): - #增加需要移除的包列表 - self.local_upgrade_list.update({"pkgs_remove":[pkg.name for pkg in pkgs_remove]}) - return True,header,desc - else: - #不需要升级 全部的软件都是新的 - header = _("The software on this computer is up to date.") - desc = '' - return True,header,desc - - #FIXME: 目前此功能不使用 但是以此按应用进行分组是更好的展示升级列表的方式 - # self.update_groups = self._make_groups(cache, self.pkgs_upgrade, - # eventloop_callback) \ No newline at end of file diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/Core/__init__.py b/.pybuild/cpython3_3.8/build/UpdateManager/Core/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/Core/filter.py b/.pybuild/cpython3_3.8/build/UpdateManager/Core/filter.py deleted file mode 100644 index d800f34..0000000 --- a/.pybuild/cpython3_3.8/build/UpdateManager/Core/filter.py +++ /dev/null @@ -1,333 +0,0 @@ -#!/usr/bin/python3 - -import apt -import apt_pkg -import fnmatch -import logging -import logging.handlers -import re -import os -import string -import subprocess -import sys -import json -import dbus -import threading - -try: - from typing import AbstractSet, cast, DefaultDict, Dict, Iterable, List - AbstractSet # pyflakes - DefaultDict # pyflakes - Dict # pyflakes - Iterable # pyflakes - List # pyflakes - from typing import Set, Tuple, Union - Set # pyflakes - Tuple # pyflakes - Union # pyflakes -except ImportError: - pass - -from email.message import Message -from gettext import gettext as _ - -import apt -import apt_pkg - - -ImportantListPath="/var/lib/kylin-software-properties/template/important.list" -DesktopSystemPath="/usr/share/kylin-update-desktop-config/data/" - - -# no py3 lsb_release in debian :/ -DISTRO_CODENAME = subprocess.check_output( - ["lsb_release", "-c", "-s"], universal_newlines=True).strip() # type: str -DISTRO_DESC = subprocess.check_output( - ["lsb_release", "-d", "-s"], universal_newlines=True).strip() # type: str -DISTRO_ID = subprocess.check_output( - ["lsb_release", "-i", "-s"], universal_newlines=True).strip() # type: str - - -class UpdateListFilterCache(apt.Cache): - - def __init__(self): - # whitelist - self.upgradeList = [] - # 必须升级的包 - self.installList = [] - # self._cached_candidate_pkgnames = set() # type: Set[str] - - self.allowed_origins = get_allowed_origins() - self.allowed_origins = deleteDuplicatedElementFromList(self.allowed_origins) - logging.info("Allowed origins are: %s", - self.allowed_origins) - - self.blacklist = apt_pkg.config.value_list( - "Kylin-update-manager::Package-Blacklist") - self.blacklist = deleteDuplicatedElementFromList(self.blacklist) - # print("Initial blacklist: ", " ".join(self.blacklist)) - - self.whitelist = apt_pkg.config.value_list( - "Kylin-update-manager::Package-Whitelist") - self.whitelist = deleteDuplicatedElementFromList(self.whitelist) - # print("Initial whitelist: ", " ".join(self.whitelist)) - - self.strict_whitelist = apt_pkg.config.find_b( - "Kylin-update-manager::Package-Whitelist-Strict", False) - # print("Initial whitelist (%s): %s"%( - # "strict" if self.strict_whitelist else "not strict", - # " ".join(self.whitelist))) - - # update importantlist - # initUpdateImportantList() - - # 获取list - # self.initLocalPackagesList() - - #除掉不在cache中的包 - # self.checkInCache() - - def checkInCache(self): - logging.info("start Check in cache") - tmplist = [] - cache = apt.Cache() - for i in self.upgradeList: - try: - cache[i] - tmplist.append(i) - except Exception as e: - # print("not found pkg: ", str(e)) - pass - self.upgradeList = tmplist - - def initLocalPackagesList(self): - jsonfiles = [] - tmplist = [] - - # 获取importantlist 本次更新推送 - with open(ImportantListPath, 'r') as f: - text = f.read() - importantList = text.split() - logging.info("importantList: %s",importantList) - f.close() - - if not importantList: - logging.error("importantList is empty") - exit(-1) - - # 获取/usr/share/kylin-update-desktop-config/data/下所有json文件 - for root,dirs,files in os.walk(DesktopSystemPath): - pass - for i in files: - if ".json" in i: - jsonfiles.append(i.split('.')[0]) - # logging.info("all files: %s", jsonfiles) - - # 找到importantlist中对应的json文件 - for i in importantList: - if i not in jsonfiles: - # 说明这个是单独的包,不在分组中 - # 加入更新列表 - if i not in self.upgradeList: - self.upgradeList.append(i) - else: - # 在分组中 - # 获取每个对应json文件中的upgrade_list - if i in jsonfiles: - filepath = os.path.join(DesktopSystemPath, i) - filepath = filepath+".json" - with open(filepath, 'r') as f: - pkgdict = f.read() - jsonfile = json.loads(pkgdict) - tmplist = jsonfile['install_list'] - # print("\ntmplist: ", tmplist) - for j in tmplist: - if j not in self.upgradeList: - self.upgradeList.append(j) - f.close() - # logging.info("self.upgradeList: %s", self.upgradeList) - - # - # print("jsonfile silent_install_list: ", jsonfile['silent_install_list']) - - # 更新前检测,必须安装的包 - # self.installList = jsonfile['install_list'] - # print("jsonfile install_list: ", self.installList) - - # # 可以更新的列表, 白名单 - # self.upgradeList = jsonfile['upgrade_list'] - # print("jsonfile upgrade_list: ", self.upgradeList) - - def check_in_allowed_origin(self, pkgs): - new_upgrade_pkgs = [] - for pkg in pkgs: - # print("checking %d pkgname: %s"%(i, pkg)) - for v in pkg.versions: - if is_in_allowed_origin(v, self.allowed_origins) and not pkg in new_upgrade_pkgs: - new_upgrade_pkgs.append(pkg) - # else: - # pkg.mark_keep() - return new_upgrade_pkgs - - def is_pkgname_in_blacklist(self, pkgs): - blacklist_filter_pkgs = [] - for pkg in pkgs: - if pkg.name in self.blacklist: - pass - # print("skipping blacklisted package %s" % pkg.name) - else : - blacklist_filter_pkgs.append(pkg) - - return blacklist_filter_pkgs - - def is_pkgname_in_whitelist(self, pkgs): - whitelist_filter_upgrade_pkgs = [] - for pkg in pkgs: - if pkg.name in self.upgradeList: - whitelist_filter_upgrade_pkgs.append(pkg) - else : - pkg.mark_keep() - pass - # print("skipping whitelist package %s" % pkg.name) - - return whitelist_filter_upgrade_pkgs - - -def get_allowed_origins(): - # type: () -> List[str] - """ return a list of allowed origins from apt.conf - - This will take substitutions (like distro_id) into account. - """ - allowed_origins = get_allowed_origins_legacy() - key = "Kylin-update-manager::Origins-Pattern" - try: - for s in apt_pkg.config.value_list(key): - allowed_origins.append(substitute(s)) - except ValueError: - print("Unable to parse %s." % key) - raise - return allowed_origins - -def get_allowed_origins_legacy(): - # type: () -> List[str] - """ legacy support for old Allowed-Origins var """ - allowed_origins = [] # type: List[str] - key = "Kylin-update-manager::Allowed-Origins" - try: - for s in apt_pkg.config.value_list(key): - # if there is a ":" use that as seperator, else use spaces - if re.findall(r'(? str - """ substitude known mappings and return a new string - - Currently supported ${distro-release} - """ - mapping = {"distro_codename": get_distro_codename(), - "distro_id": get_distro_id()} - return string.Template(line).substitute(mapping) - - -def get_distro_codename(): - # type: () -> str - return DISTRO_CODENAME - - -def get_distro_id(): - # type: () -> str - return DISTRO_ID - -def is_in_allowed_origin(ver, allowed_origins): - # type: (apt.package.Version, List[str]) -> bool - if not ver: - return False - for origin in ver.origins: - if is_allowed_origin(origin, allowed_origins): - return True - return False - -def is_allowed_origin(origin, allowed_origins): - # type: (Union[apt.package.Origin, apt_pkg.PackageFile], List[str]) -> bool - - # local origin is allowed by default - # if origin.component == 'now' and origin.archive == 'now' and \ - # not origin.label and not origin.site: - # return True - for allowed in allowed_origins: - if match_whitelist_string(allowed, origin): - return True - return False - -def match_whitelist_string(whitelist, origin): - # type: (str, Union[apt.package.Origin, apt_pkg.PackageFile]) -> bool - """ - take a whitelist string in the form "origin=Debian,label=Debian-Security" - and match against the given python-apt origin. A empty whitelist string - never matches anything. - """ - whitelist = whitelist.strip() - if whitelist == "": - logging.warning("empty match string matches nothing") - return False - res = True - # make "\," the html quote equivalent - whitelist = whitelist.replace("\\,", "%2C") - for token in whitelist.split(","): - # strip and unquote the "," back - (what, value) = [s.strip().replace("%2C", ",") - for s in token.split("=")] - # logging.debug("matching %s=%s against %s" % ( - # what, value, origin)) - # support substitution here as well - value = substitute(value) - # first char is apt-cache policy output, send is the name - # in the Release file - if what in ("o", "origin"): - match = fnmatch.fnmatch(origin.origin, value) - elif what in ("l", "label"): - match = fnmatch.fnmatch(origin.label, value) - elif what in ("a", "suite", "archive"): - match = fnmatch.fnmatch(origin.archive, value) - elif what in ("c", "component"): - match = fnmatch.fnmatch(origin.component, value) - elif what in ("site",): - match = fnmatch.fnmatch(origin.site, value) - elif what in ("n", "codename",): - match = fnmatch.fnmatch(origin.codename, value) - else: - raise UnknownMatcherError( - "Unknown whitelist entry for matcher %s (token %s)" % ( - what, token)) - # update res - res = res and match - # logging.debug("matching %s=%s against %s" % ( - # what, value, origin)) - return res - -def deleteDuplicatedElementFromList(list): - resultList = [] - for item in list: - if not item in resultList: - resultList.append(item) - return resultList - - -class UnknownMatcherError(ValueError): - pass \ No newline at end of file diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/Core/roam.py b/.pybuild/cpython3_3.8/build/UpdateManager/Core/roam.py deleted file mode 100644 index 0714d89..0000000 --- a/.pybuild/cpython3_3.8/build/UpdateManager/Core/roam.py +++ /dev/null @@ -1,212 +0,0 @@ -# utils.py -# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*- -# -# Copyright (c) 2011 Canonical -# -# Author: Alex Chiang -# Michael Vogt -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 2 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 -# USA - - -from __future__ import print_function - -import dbus -import sys - - -class ModemManagerHelper(object): - - # data taken from - # http://projects.gnome.org/NetworkManager/developers/mm-spec-04.html - MM_DBUS_IFACE = "org.freedesktop.ModemManager" - MM_DBUS_IFACE_MODEM = MM_DBUS_IFACE + ".Modem" - - # MM_MODEM_TYPE - MM_MODEM_TYPE_GSM = 1 - MM_MODEM_TYPE_CDMA = 2 - - # GSM - # Not registered, not searching for new operator to register. - MM_MODEM_GSM_NETWORK_REG_STATUS_IDLE = 0 - # Registered on home network. - MM_MODEM_GSM_NETWORK_REG_STATUS_HOME = 1 - # Not registered, searching for new operator to register with. - MM_MODEM_GSM_NETWORK_REG_STATUS_SEARCHING = 2 - # Registration denied. - MM_MODEM_GSM_NETWORK_REG_STATUS_DENIED = 3 - # Unknown registration status. - MM_MODEM_GSM_NETWORK_REG_STATUS_UNKNOWN = 4 - # Registered on a roaming network. - MM_MODEM_GSM_NETWORK_REG_STATUS_ROAMING = 5 - - # CDMA - # Registration status is unknown or the device is not registered. - MM_MODEM_CDMA_REGISTRATION_STATE_UNKNOWN = 0 - # Registered, but roaming status is unknown or cannot be provided - # by the device. The device may or may not be roaming. - MM_MODEM_CDMA_REGISTRATION_STATE_REGISTERED = 1 - # Currently registered on the home network. - MM_MODEM_CDMA_REGISTRATION_STATE_HOME = 2 - # Currently registered on a roaming network. - MM_MODEM_CDMA_REGISTRATION_STATE_ROAMING = 3 - - def __init__(self): - self.bus = dbus.SystemBus() - self.proxy = self.bus.get_object("org.freedesktop.ModemManager", - "/org/freedesktop/ModemManager") - modem_manager = dbus.Interface(self.proxy, self.MM_DBUS_IFACE) - self.modems = modem_manager.EnumerateDevices() - - @staticmethod - def get_dbus_property(proxy, interface, property): - props = dbus.Interface(proxy, "org.freedesktop.DBus.Properties") - property = props.Get(interface, property) - return property - - def is_gsm_roaming(self): - for m in self.modems: - dev = self.bus.get_object(self.MM_DBUS_IFACE, m) - type = self.get_dbus_property(dev, self.MM_DBUS_IFACE_MODEM, - "Type") - if type != self.MM_MODEM_TYPE_GSM: - continue - net = dbus.Interface(dev, - self.MM_DBUS_IFACE_MODEM + ".Gsm.Network") - reg = net.GetRegistrationInfo() - # Be conservative about roaming. If registration unknown, - # assume yes. - # MM_MODEM_GSM_NETWORK_REG_STATUS - if reg[0] in (self.MM_MODEM_GSM_NETWORK_REG_STATUS_UNKNOWN, - self.MM_MODEM_GSM_NETWORK_REG_STATUS_ROAMING): - return True - return False - - def is_cdma_roaming(self): - for m in self.modems: - dev = self.bus.get_object(self.MM_DBUS_IFACE, m) - type = self.get_dbus_property(dev, self.MM_DBUS_IFACE_MODEM, - "Type") - if type != self.MM_MODEM_TYPE_CDMA: - continue - cdma = dbus.Interface(dev, self.MM_DBUS_IFACE_MODEM + ".Cdma") - (cmda_1x, evdo) = cdma.GetRegistrationState() - # Be conservative about roaming. If registration unknown, - # assume yes. - # MM_MODEM_CDMA_REGISTRATION_STATE - roaming_states = (self.MM_MODEM_CDMA_REGISTRATION_STATE_REGISTERED, - self.MM_MODEM_CDMA_REGISTRATION_STATE_ROAMING) - # evdo trumps cmda_1x (thanks to Mathieu Trudel-Lapierre) - if evdo in roaming_states: - return True - elif cmda_1x in roaming_states: - return True - return False - - -class NetworkManagerHelper(object): - NM_DBUS_IFACE = "org.freedesktop.NetworkManager" - - # connection states - # Old enum values are for NM 0.7 - - # The NetworkManager daemon is in an unknown state. - NM_STATE_UNKNOWN = 0 - # The NetworkManager daemon is connecting a device. - NM_STATE_CONNECTING_OLD = 2 - NM_STATE_CONNECTING = 40 - NM_STATE_CONNECTING_LIST = [NM_STATE_CONNECTING_OLD, - NM_STATE_CONNECTING] - # The NetworkManager daemon is connected. - NM_STATE_CONNECTED_OLD = 3 - NM_STATE_CONNECTED_LOCAL = 50 - NM_STATE_CONNECTED_SITE = 60 - NM_STATE_CONNECTED_GLOBAL = 70 - NM_STATE_CONNECTED_LIST = [NM_STATE_CONNECTED_OLD, - NM_STATE_CONNECTED_LOCAL, - NM_STATE_CONNECTED_SITE, - NM_STATE_CONNECTED_GLOBAL] - - # The device type is unknown. - NM_DEVICE_TYPE_UNKNOWN = 0 - # The device is wired Ethernet device. - NM_DEVICE_TYPE_ETHERNET = 1 - # The device is an 802.11 WiFi device. - NM_DEVICE_TYPE_WIFI = 2 - # The device is a GSM-based cellular WAN device. - NM_DEVICE_TYPE_GSM = 3 - # The device is a CDMA/IS-95-based cellular WAN device. - NM_DEVICE_TYPE_CDMA = 4 - - def __init__(self): - self.bus = dbus.SystemBus() - self.proxy = self.bus.get_object("org.freedesktop.NetworkManager", - "/org/freedesktop/NetworkManager") - - @staticmethod - def get_dbus_property(proxy, interface, property): - props = dbus.Interface(proxy, "org.freedesktop.DBus.Properties") - property = props.Get(interface, property) - return property - - def is_active_connection_gsm_or_cdma(self): - res = False - actives = self.get_dbus_property( - self.proxy, self.NM_DBUS_IFACE, 'ActiveConnections') - for a in actives: - active = self.bus.get_object(self.NM_DBUS_IFACE, a) - default_route = self.get_dbus_property( - active, self.NM_DBUS_IFACE + ".Connection.Active", 'Default') - if not default_route: - continue - devs = self.get_dbus_property( - active, self.NM_DBUS_IFACE + ".Connection.Active", 'Devices') - for d in devs: - dev = self.bus.get_object(self.NM_DBUS_IFACE, d) - type = self.get_dbus_property( - dev, self.NM_DBUS_IFACE + ".Device", 'DeviceType') - if type == self.NM_DEVICE_TYPE_GSM: - return True - elif type == self.NM_DEVICE_TYPE_CDMA: - return True - else: - continue - return res - - def is_active_connection_gsm_or_cdma_roaming(self): - res = False - if self.is_active_connection_gsm_or_cdma(): - mmhelper = ModemManagerHelper() - res |= mmhelper.is_gsm_roaming() - res |= mmhelper.is_cdma_roaming() - return res - - -if __name__ == "__main__": - - # test code - if sys.argv[1:] and sys.argv[1] == "--test": - mmhelper = ModemManagerHelper() - print("is_gsm_roaming", mmhelper.is_gsm_roaming()) - print("is_cdma_romaing", mmhelper.is_cdma_roaming()) - - # roaming? - nmhelper = NetworkManagerHelper() - is_roaming = nmhelper.is_active_connection_gsm_or_cdma_roaming() - print("roam: ", is_roaming) - if is_roaming: - sys.exit(1) - sys.exit(0) diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/Core/utils.py b/.pybuild/cpython3_3.8/build/UpdateManager/Core/utils.py deleted file mode 100644 index 103f8bf..0000000 --- a/.pybuild/cpython3_3.8/build/UpdateManager/Core/utils.py +++ /dev/null @@ -1,557 +0,0 @@ -# utils.py -# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*- -# -# Copyright (c) 2004-2013 Canonical -# -# Authors: Michael Vogt -# Michael Terry -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 2 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 -# USA - -from __future__ import print_function - -from gettext import gettext as _ -from gettext import ngettext -from stat import (S_IMODE, ST_MODE, S_IXUSR) -from math import ceil - -import apt -import apt_pkg -apt_pkg.init_config() - -import locale -import logging -import re -import os -import subprocess -import sys -import time -from urllib.request import ( - ProxyHandler, - Request, - build_opener, - install_opener, - urlopen, -) -from urllib.parse import urlsplit - -from copy import copy - - -class ExecutionTime(object): - """ - Helper that can be used in with statements to have a simple - measure of the timing of a particular block of code, e.g. - with ExecutionTime("db flush"): - db.flush() - """ - def __init__(self, info=""): - self.info = info - - def __enter__(self): - self.now = time.time() - - def __exit__(self, type, value, stack): - print("%s: %s" % (self.info, time.time() - self.now)) - - -def get_string_with_no_auth_from_source_entry(entry): - tmp = copy(entry) - url_parts = urlsplit(tmp.uri) - if url_parts.username: - tmp.uri = tmp.uri.replace(url_parts.username, "hidden-u") - if url_parts.password: - tmp.uri = tmp.uri.replace(url_parts.password, "hidden-p") - return str(tmp) - - -def is_unity_running(): - """ return True if Unity is currently running """ - unity_running = False - try: - import dbus - bus = dbus.SessionBus() - unity_running = bus.name_has_owner("com.canonical.Unity") - except Exception: - logging.exception("could not check for Unity dbus service") - return unity_running - - -def is_child_of_process_name(processname, pid=None): - if not pid: - pid = os.getpid() - while pid > 0: - stat_file = "/proc/%s/stat" % pid - with open(stat_file) as stat_f: - stat = stat_f.read() - # extract command (inside ()) - command = stat.partition("(")[2].rpartition(")")[0] - if command == processname: - return True - # get parent (second to the right of command) and check that next - pid = int(stat.rpartition(")")[2].split()[1]) - return False - - -def inside_chroot(): - """ returns True if we are inside a chroot - """ - # if there is no proc or no pid 1 we are very likely inside a chroot - if not os.path.exists("/proc") or not os.path.exists("/proc/1"): - return True - # if the inode is differnt for pid 1 "/" and our "/" - return os.stat("/") != os.stat("/proc/1/root") - - -def wrap(t, width=70, subsequent_indent=""): - """ helpers inspired after textwrap - unfortunately - we can not use textwrap directly because it break - packagenames with "-" in them into new lines - """ - out = "" - for s in t.split(): - if (len(out) - out.rfind("\n")) + len(s) > width: - out += "\n" + subsequent_indent - out += s + " " - return out - - -def twrap(s, **kwargs): - msg = "" - paras = s.split("\n") - for par in paras: - s = wrap(par, **kwargs) - msg += s + "\n" - return msg - - -def lsmod(): - " return list of loaded modules (or [] if lsmod is not found) " - modules = [] - # FIXME raise? - if not os.path.exists("/sbin/lsmod"): - return [] - p = subprocess.Popen(["/sbin/lsmod"], stdout=subprocess.PIPE, - universal_newlines=True) - lines = p.communicate()[0].split("\n") - # remove heading line: "Modules Size Used by" - del lines[0] - # add lines to list, skip empty lines - for line in lines: - if line: - modules.append(line.split()[0]) - return modules - - -def check_and_fix_xbit(path): - " check if a given binary has the executable bit and if not, add it" - if not os.path.exists(path): - return - mode = S_IMODE(os.stat(path)[ST_MODE]) - if not ((mode & S_IXUSR) == S_IXUSR): - os.chmod(path, mode | S_IXUSR) - - -def country_mirror(): - " helper to get the country mirror from the current locale " - # special cases go here - lang_mirror = {'c': ''} - # no lang, no mirror - if 'LANG' not in os.environ: - return '' - lang = os.environ['LANG'].lower() - # check if it is a special case - if lang[:5] in lang_mirror: - return lang_mirror[lang[:5]] - # now check for the most comon form (en_US.UTF-8) - if "_" in lang: - country = lang.split(".")[0].split("_")[1] - if "@" in country: - country = country.split("@")[0] - return country + "." - else: - return lang[:2] + "." - return '' - - -def get_dist(): - " return the codename of the current runing distro " - # support debug overwrite - dist = os.environ.get("META_RELEASE_FAKE_CODENAME") - if dist: - logging.warning("using fake release name '%s' (because of " - "META_RELEASE_FAKE_CODENAME environment) " % dist) - return dist - # then check the real one - from subprocess import Popen, PIPE - p = Popen(["lsb_release", "-c", "-s"], stdout=PIPE, - universal_newlines=True) - res = p.wait() - if res != 0: - sys.stderr.write("lsb_release returned exitcode: %i\n" % res) - return "unknown distribution" - dist = p.stdout.readline().strip() - p.stdout.close() - return dist - - -def get_dist_version(): - " return the version of the current running distro " - # support debug overwrite - desc = os.environ.get("META_RELEASE_FAKE_VERSION") - if desc: - logging.warning("using fake release version '%s' (because of " - "META_RELEASE_FAKE_VERSION environment) " % desc) - return desc - # then check the real one - from subprocess import Popen, PIPE - p = Popen(["lsb_release", "-r", "-s"], stdout=PIPE, - universal_newlines=True) - res = p.wait() - if res != 0: - sys.stderr.write("lsb_release returned exitcode: %i\n" % res) - return "unknown distribution" - desc = p.stdout.readline().strip() - p.stdout.close() - return desc - - -class HeadRequest(Request): - def get_method(self): - return "HEAD" - - -def url_downloadable(uri, debug_func=None): - """ - helper that checks if the given uri exists and is downloadable - (supports optional debug_func function handler to support - e.g. logging) - - Supports http (via HEAD) and ftp (via size request) - """ - if not debug_func: - lambda x: True - debug_func("url_downloadable: %s" % uri) - (scheme, netloc, path, querry, fragment) = urlsplit(uri) - debug_func("s='%s' n='%s' p='%s' q='%s' f='%s'" % (scheme, netloc, path, - querry, fragment)) - if scheme in ("http", "https"): - try: - http_file = urlopen(HeadRequest(uri)) - http_file.close() - if http_file.code == 200: - return True - return False - except Exception as e: - debug_func("error from httplib: '%s'" % e) - return False - elif scheme == "ftp": - import ftplib - try: - f = ftplib.FTP(netloc) - f.login() - f.cwd(os.path.dirname(path)) - size = f.size(os.path.basename(path)) - f.quit() - if debug_func: - debug_func("ftplib.size() returned: %s" % size) - if size != 0: - return True - except Exception as e: - if debug_func: - debug_func("error from ftplib: '%s'" % e) - return False - return False - - -def init_proxy(gsettings=None): - """ init proxy settings - - * use apt.conf http proxy if present, - * otherwise look into synaptics config file, - * otherwise the default behavior will use http_proxy environment - if present - """ - SYNAPTIC_CONF_FILE = "/root/.synaptic/synaptic.conf" - proxies = {} - # generic apt config wins - if apt_pkg.config.find("Acquire::http::Proxy") != '': - proxies["http"] = apt_pkg.config.find("Acquire::http::Proxy") - # then synaptic - elif os.path.exists(SYNAPTIC_CONF_FILE): - cnf = apt_pkg.Configuration() - apt_pkg.read_config_file(cnf, SYNAPTIC_CONF_FILE) - use_proxy = cnf.find_b("Synaptic::useProxy", False) - if use_proxy: - proxy_host = cnf.find("Synaptic::httpProxy") - proxy_port = str(cnf.find_i("Synaptic::httpProxyPort")) - if proxy_host and proxy_port: - proxies["http"] = "http://%s:%s/" % (proxy_host, proxy_port) - if apt_pkg.config.find("Acquire::https::Proxy") != '': - proxies["https"] = apt_pkg.config.find("Acquire::https::Proxy") - elif "http" in proxies: - proxies["https"] = proxies["http"] - # if we have a proxy, set it - if proxies: - # basic verification - for proxy in proxies.values(): - if not re.match("https?://\\w+", proxy): - print("proxy '%s' looks invalid" % proxy, file=sys.stderr) - return - proxy_support = ProxyHandler(proxies) - opener = build_opener(proxy_support) - install_opener(opener) - if "http" in proxies: - os.putenv("http_proxy", proxies["http"]) - if "https" in proxies: - os.putenv("https_proxy", proxies["https"]) - return proxies - - -def on_battery(): - """ - Check via dbus if the system is running on battery. - This function is using UPower per default, if UPower is not - available it falls-back to DeviceKit.Power. - """ - try: - import dbus - bus = dbus.Bus(dbus.Bus.TYPE_SYSTEM) - try: - devobj = bus.get_object('org.freedesktop.UPower', - '/org/freedesktop/UPower') - dev = dbus.Interface(devobj, 'org.freedesktop.DBus.Properties') - return dev.Get('org.freedesktop.UPower', 'OnBattery') - except dbus.exceptions.DBusException as e: - error_unknown = 'org.freedesktop.DBus.Error.ServiceUnknown' - if e._dbus_error_name != error_unknown: - raise - devobj = bus.get_object('org.freedesktop.DeviceKit.Power', - '/org/freedesktop/DeviceKit/Power') - dev = dbus.Interface(devobj, "org.freedesktop.DBus.Properties") - return dev.Get("org.freedesktop.DeviceKit.Power", "on_battery") - except Exception: - #import sys - #print("on_battery returned error: ", e, file=sys.stderr) - return False - - -def inhibit_sleep(): - """ - Send a dbus signal to logind to not suspend the system, it will be - released when the return value drops out of scope - """ - try: - from gi.repository import Gio, GLib - connection = Gio.bus_get_sync(Gio.BusType.SYSTEM) - - var, fdlist = connection.call_with_unix_fd_list_sync( - 'org.freedesktop.login1', '/org/freedesktop/login1', - 'org.freedesktop.login1.Manager', 'Inhibit', - GLib.Variant('(ssss)', - ('shutdown:sleep', - 'UpdateManager', 'Updating System', - 'block')), - None, 0, -1, None, None) - inhibitor = Gio.UnixInputStream(fd=fdlist.steal_fds()[var[0]]) - - return inhibitor - except Exception: - #print("could not send the dbus Inhibit signal: %s" % e) - return False - - -def str_to_bool(str): - if str == "0" or str.upper() == "FALSE": - return False - return True - - -def get_lang(): - import logging - try: - (locale_s, encoding) = locale.getdefaultlocale() - return locale_s - except Exception: - logging.exception("gedefaultlocale() failed") - return None - - -def get_ubuntu_flavor(cache=None): - """ try to guess the flavor based on the running desktop """ - # this will (of course) not work in a server environment, - # but the main use case for this is to show the right - # release notes. - pkg = get_ubuntu_flavor_package(cache=cache) - return pkg.split('-', 1)[0] - - -def _load_meta_pkg_list(): - # This could potentially introduce a circular dependency, but the config - # parser logic is simple, and doesn't rely on any UpdateManager code. - from DistUpgrade.DistUpgradeConfigParser import DistUpgradeConfig - parser = DistUpgradeConfig('/usr/share/ubuntu-release-upgrader') - return parser.getlist('Distro', 'MetaPkgs') - - -def get_ubuntu_flavor_package(cache=None): - """ try to guess the flavor metapackage based on the running desktop """ - # From spec, first if ubuntu-desktop is installed, use that. - # Second, grab first installed one from DistUpgrade.cfg. - # Lastly, fallback to ubuntu-desktop again. - meta_pkgs = ['ubuntu-desktop'] - - try: - meta_pkgs.extend(sorted(_load_meta_pkg_list())) - except Exception as e: - print('Could not load list of meta packages:', e) - - if cache is None: - cache = apt.Cache() - for meta_pkg in meta_pkgs: - cache_pkg = cache[meta_pkg] if meta_pkg in cache else None - if cache_pkg and cache_pkg.is_installed: - return meta_pkg - return 'ubuntu-desktop' - - -def get_ubuntu_flavor_name(cache=None): - """ try to guess the flavor name based on the running desktop """ - pkg = get_ubuntu_flavor_package(cache=cache) - lookup = {'ubuntustudio-desktop': 'Ubuntu Studio'} - if pkg in lookup: - return lookup[pkg] - elif pkg.endswith('-desktop'): - return capitalize_first_word(pkg.rsplit('-desktop', 1)[0]) - elif pkg.endswith('-netbook'): - return capitalize_first_word(pkg.rsplit('-netbook', 1)[0]) - else: - return 'Ubuntu' - - -# Unused by update-manager, but still used by ubuntu-release-upgrader -def error(parent, summary, message): - import gi - gi.require_version("Gtk", "3.0") - from gi.repository import Gtk, Gdk - d = Gtk.MessageDialog(parent=parent, - flags=Gtk.DialogFlags.MODAL, - type=Gtk.MessageType.ERROR, - buttons=Gtk.ButtonsType.CLOSE) - d.set_markup("%s\n\n%s" % (summary, message)) - d.realize() - d.get_window().set_functions(Gdk.WMFunction.MOVE) - d.set_title("") - d.run() - d.destroy() - return False - - -def humanize_size(bytes): - """ - Convert a given size in bytes to a nicer better readable unit - """ - - if bytes < 1000 * 1000: - # to have 0 for 0 bytes, 1 for 0-1000 bytes and for 1 and above - # round up - size_in_kb = int(ceil(bytes / float(1000))) - # TRANSLATORS: download size of small updates, e.g. "250 kB" - return ngettext("%(size).0f kB", "%(size).0f kB", size_in_kb) % { - "size": size_in_kb} - else: - # TRANSLATORS: download size of updates, e.g. "2.3 MB" - return locale.format_string(_("%.1f MB"), bytes / 1000.0 / 1000.0) - - -def get_arch(): - return apt_pkg.config.find("APT::Architecture") - - -def is_port_already_listening(port): - """ check if the current system is listening on the given tcp port """ - # index in the line - INDEX_LOCAL_ADDR = 1 - #INDEX_REMOTE_ADDR = 2 - INDEX_STATE = 3 - # state (st) that we care about - STATE_LISTENING = '0A' - # read the data - with open("/proc/net/tcp") as net_tcp: - for line in net_tcp.readlines(): - line = line.strip() - if not line: - continue - # split, values are: - # sl local_address rem_address st tx_queue rx_queue tr - # tm->when retrnsmt uid timeout inode - values = line.split() - state = values[INDEX_STATE] - if state != STATE_LISTENING: - continue - local_port_str = values[INDEX_LOCAL_ADDR].split(":")[1] - local_port = int(local_port_str, 16) - if local_port == port: - return True - return False - - -def iptables_active(): - """ Return True if iptables is active """ - # FIXME: is there a better way? - iptables_empty = """Chain INPUT (policy ACCEPT) -target prot opt source destination - -Chain FORWARD (policy ACCEPT) -target prot opt source destination - -Chain OUTPUT (policy ACCEPT) -target prot opt source destination -""" - if os.getuid() != 0: - raise OSError("Need root to check the iptables state") - if not os.path.exists("/sbin/iptables"): - return False - out = subprocess.Popen(["iptables", "-nL"], - stdout=subprocess.PIPE, - universal_newlines=True).communicate()[0] - if out == iptables_empty: - return False - return True - - -def capitalize_first_word(string): - """ this uppercases the first word's first letter - """ - if len(string) > 1 and string[0].isalpha() and not string[0].isupper(): - return string[0].capitalize() + string[1:] - return string - - -def get_package_label(pkg): - """ this takes a package synopsis and uppercases the first word's - first letter - """ - name = getattr(pkg.candidate, "summary", "") - return capitalize_first_word(name) - - -if __name__ == "__main__": - #print(mirror_from_sources_list()) - #print(on_battery()) - #print(inside_chroot()) - #print(iptables_active()) - error(None, "bar", "baz") diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/UpdateManager.py b/.pybuild/cpython3_3.8/build/UpdateManager/UpdateManager.py deleted file mode 100644 index 53b94cd..0000000 --- a/.pybuild/cpython3_3.8/build/UpdateManager/UpdateManager.py +++ /dev/null @@ -1,278 +0,0 @@ -# UpdateManager.py -# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*- - -from __future__ import absolute_import, print_function - - -import warnings -warnings.filterwarnings("ignore", "Accessed deprecated property", - DeprecationWarning) -import logging -import sys -from gettext import gettext as _ - -import os -import apt_pkg -import dbus -import shutil -import dbus.service -from dbus.mainloop.glib import DBusGMainLoop -DBusGMainLoop(set_as_default=True) - -from DistUpgrade.DistUpgradeCache import NotEnoughFreeSpaceError -from .Core.MyCache import MyCache -from .UpdateManagerDbus import UpdateManagerDbusController -from .Core.UpdateList import UpdateList -from .backend import (InstallBackend, - get_backend) - -#安装完成之后是否有请求需要重启 -REBOOT_REQUIRED_FILE = "/var/run/reboot-required" - -INSTALL_ALONE_PROGRESS = "alone" - -class UpdateManager(): - - def __init__(self,options): - self.options = options - self.cache = None - self.update_list = None - - #表示是否处于更新和安装的状态 - self.is_updating = False - self.is_upgrading = False - - #建立dbus - self.dbusController = self._setup_dbus() - - # # FIXME: 目前此功能没有进行测试不知道是否可以进行检查 安装的时进行检查磁盘空间 - def check_free_space(self,cache): - err_sum = _("Not enough free disk space") - err_msg = _("The upgrade needs a total of %s free space on " - "disk '%s'. " - "Please free at least an additional %s of disk " - "space on '%s'. %s") - # specific ways to resolve lack of free space - remedy_archivedir = _("Remove temporary packages of former " - "installations using 'sudo apt clean'.") - remedy_boot = _("You can remove old kernels using " - "'sudo apt autoremove', and you could also " - "set COMPRESS=xz in " - "/etc/initramfs-tools/initramfs.conf to " - "reduce the size of your initramfs.") - remedy_root = _("Empty your trash and remove temporary " - "packages of former installations using " - "'sudo apt clean'.") - remedy_tmp = _("Reboot to clean up files in /tmp.") - remedy_usr = _("") - # check free space and error if its not enough - try: - cache.checkFreeSpace() - except NotEnoughFreeSpaceError as e: - # CheckFreeSpace examines where packages are cached - archivedir = apt_pkg.config.find_dir("Dir::Cache::archives") - err_long = "" - for req in e.free_space_required_list: - if err_long != "": - err_long += " " - if req.dir == archivedir: - err_long += err_msg % (req.size_total, req.dir, - req.size_needed, req.dir, - remedy_archivedir) - elif req.dir == "/boot": - err_long += err_msg % (req.size_total, req.dir, - req.size_needed, req.dir, - remedy_boot) - elif req.dir == "/": - err_long += err_msg % (req.size_total, req.dir, - req.size_needed, req.dir, - remedy_root) - elif req.dir == "/tmp": - err_long += err_msg % (req.size_total, req.dir, - req.size_needed, req.dir, - remedy_tmp) - elif req.dir == "/usr": - err_long += err_msg % (req.size_total, req.dir, - req.size_needed, req.dir, - remedy_usr) - #在此抛出异常 - # self.window_main.start_error(False, err_sum, err_long) - return False - except SystemError: - logging.exception("free space check failed") - - return True - - #进行更新的操作 - def start_update(self): - try: - self.is_updating = True - update_backend = get_backend(self, InstallBackend.ACTION_UPDATE) - update_backend.start() - except Exception as e: - logging.error(e) - - #进行升级的操作 - def start_install(self,partial_upgrade_list = []): - #检查磁盘的状态 - if self.check_free_space(self.cache) == False: - return - logging.info("Disk Check finished...") - try: - self.is_upgrading = True - install_backend = get_backend(self, InstallBackend.ACTION_INSTALL) - install_backend.start(partial_upgrade_list=partial_upgrade_list) - except Exception as e: - logging.error(e) - - #进行升级的操作-传入包列表 - def start_install_alone(self,pkgs_install = [], pkgs_upgrade = [], pkgs_remove = [],pkgs_purge = []): - #检查磁盘的状态 - if self.check_free_space(self.cache) == False: - return - logging.info("Disk Check finished...") - self.is_upgrading = True - install_backend = get_backend(self, InstallBackend.ACTION_INSTALL) - install_backend.start_alone(pkgs_install,pkgs_upgrade,pkgs_remove,pkgs_purge) - - #更新结束之后会调到此获取要升级的列表 and 更新cache 生成升级组列表JSON - def start_available(self, cancelled_update=False): - _success,header,desc = self.refresh_cache() - - #特殊情况的处理 单独安装包需要直接退出 安装or卸载执行完毕后 还会调到start_available - if _success == True and header == INSTALL_ALONE_PROGRESS: - return - else: - #发送更新升级列表完成的标志 - self.dbusController.UpdateDetectFinished(_success,self.update_list.local_upgrade_list.get('upgrade_groups_list',[]),header,desc) - - #检查安装完成之后需要重启吗 - if os.path.exists(REBOOT_REQUIRED_FILE): - logging.error("REBOOT_REQUIRED_FILE") - - def refresh_cache(self): - _success = True - header = None - desc = None - try: - #第一次进入 之后update不进入 - if self.cache is None: - self.cache = MyCache(None) - else: - self.cache.open(None) - self.cache._initDepCache() - except AssertionError: - header = _("Software index is broken") - desc = _("It is impossible to install or remove any software. " - "Please use the package manager \"Synaptic\" or run " - "\"sudo apt-get install -f\" in a terminal to fix " - "this issue at first.") - _success = False - return _success,header,desc - - except SystemError as e: - header = _("Could not initialize the package information") - desc = _("An unresolvable problem occurred while " - "initializing the package information.\n\n" - "Please report this bug against the 'update-manager' " - "package and include the following error " - "message:\n") + str(e) - _success = False - return _success,header,desc - - self.update_list = UpdateList(self) - - '''1、 - dist-upgrade 标记在此处进行 - 来判断将要删除的包 如果存在要删除的break的包的话 会从dist-upgrade切换到upgrade 目前此功能不使用 默认使用dist-upgrade - ''' - self.distUpgradeWouldDelete = self.cache.saveDistUpgrade() - - #2、 安装JSON分组配置文件包 安装完毕会重新调start_available --> here 安装失败就直接退出不会进行下面的操作 - try: - pkg_json = self.cache[self.update_list.GROUPS_JSON_PKG] - #是否安装 - if pkg_json.is_installed: - #是否可升级 - if pkg_json.is_upgradable: - logging.info("groups JSON ConfigPkgs(%s) start upgrading...",self.GROUPS_JSON_PKG) - self.start_install_alone(pkgs_upgrade = [self.update_list.GROUPS_JSON_PKG]) - #直接退出 - _success = True - header = INSTALL_ALONE_PROGRESS - return _success,header,desc - else: - logging.info("ConfigPkgs(%s) No need to upgrade...",self.update_list.GROUPS_JSON_PKG) - else: - logging.info("groups JSON ConfigPkgs(%s) start new installing...",self.update_list.GROUPS_JSON_PKG) - self.start_install_alone(pkgs_install = [self.update_list.GROUPS_JSON_PKG]) - #直接退出 - _success = True - header = INSTALL_ALONE_PROGRESS - return _success,header,desc - - #FIXME: 错误处理未做 报告到控制面板 - except Exception as e: - logging.warning("groups JSON ConfigPkgs(%s) install failed...",self.update_list.GROUPS_JSON_PKG) - logging.error(e) - _success = False - - #3、 判断目录是JSON配置文件夹是否缺失 缺失后进行修复 卸载重新安装步骤 - if not os.path.exists(self.update_list.INPUT_CONFIG_PATH): - logging.info("groups JSON Config Path(%s) Missing and Trying to fix...",self.update_list.INPUT_CONFIG_PATH) - #将软件包卸载 之后进行重新安装here --> purge --> start_available 进行判断是否安装未安装重新安装 - self.start_install_alone(pkgs_purge = [self.update_list.GROUPS_JSON_PKG]) - #直接退出 - _success = True - header = INSTALL_ALONE_PROGRESS - return _success,header,desc - - #4、 清空上次输出的分组JSON文件 - try: - if not os.path.exists(self.update_list.OUTPUT_CONFIG_PATH): - os.makedirs(self.update_list.OUTPUT_CONFIG_PATH) - logging.info('making the configuration file is complete...') - else: - shutil.rmtree(self.update_list.OUTPUT_CONFIG_PATH) - os.makedirs(self.update_list.OUTPUT_CONFIG_PATH) - logging.info('Emptying the configuration file is complete...') - except Exception as e: - logging.warning(e) - - #FIXME: 5、 待开发功能 根据监测存在配置文件 不存在进行重新安装包 再检测还是未存在的话 就判断此次没有可升级的 - - #FIXME: 6、 出错后未进行处理 更新important.list 文件错误的话 - # self.dbusController._on_update_important_list() - - try: - _success,header,desc = self.update_list.update(self.cache) - except SystemError as e: - header = _("Could not calculate the upgrade") - desc = _("An unresolvable problem occurred while " - "calculating the upgrade.\n\n" - "Please report this bug against the 'update-manager' " - "package and include the following error " - "message:\n") + str(e) - _success = False - - return _success,header,desc - - def _setup_dbus(self): - # check if there is another g-a-i already and if not setup one - # listening on dbus - try: - bus = dbus.SystemBus() - except Exception: - logging.error("warning: could not initiate dbus") - return - try: - proxy_obj = bus.get_object('com.kylin.systemupgrade', - '/com/kylin/systemupgrade') - - logging.warning("kylin-update-manager have started...") - sys.exit(0) - except dbus.DBusException: - bus_name = dbus.service.BusName('com.kylin.systemupgrade', - bus) - logging.info('initiate dbus success ...') - return UpdateManagerDbusController(self, bus_name) \ No newline at end of file diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/UpdateManagerDbus.py b/.pybuild/cpython3_3.8/build/UpdateManager/UpdateManagerDbus.py deleted file mode 100644 index 9193003..0000000 --- a/.pybuild/cpython3_3.8/build/UpdateManager/UpdateManagerDbus.py +++ /dev/null @@ -1,167 +0,0 @@ -#!/usr/bin/python3 -import dbus -import dbus.service -import logging -import threading - -from .Core.AlertWatcher import AlertWatcher -from .Core.roam import NetworkManagerHelper - -#dbus 建立 -class UpdateManagerDbusController(dbus.service.Object): - """ this is a helper to provide the UpdateManagerIFace """ - - INTERFACE = 'com.kylin.systemupgrade.interface' - - def __init__(self, parent, bus_name, - object_path='/com/kylin/systemupgrade'): - dbus.service.Object.__init__(self, bus_name, object_path) - self.parent = parent - - #网络检测 电池检测等等的启动检查 - self.alert_watcher = AlertWatcher() - self.alert_watcher.check_alert_state() - self.alert_watcher.connect("network-alert", self._on_network_alert) - self.connected = False - self.transaction = None - - #更新important.list的本次升级的列表 - def _on_update_important_list(self): - lock = threading.Lock() - bus = dbus.SystemBus() - try: - obj = bus.get_object('com.kylin.software.properties', '/com/kylin/software/properties') - interface = dbus.Interface(obj, dbus_interface='com.kylin.software.properties.interface') - lock.acquire() - retval = interface.updateSourceTemplate() - lock.release() - except Exception as e: - logging.error("update sourceTemplate Failed and Error mes:%s"%str(e)) - return False - - if retval == False: - logging.warning("update SourceTemplate failed") - return False - else: - logging.info("update sourceTemplate successed...") - return True - - #检测网络的状态 - def _on_network_alert(self, watcher, state): - if state in NetworkManagerHelper.NM_STATE_CONNECTED_LIST: - self.connected = True - logging.info('Network Connected ...') - else: - self.connected = False - logging.info('Network Disconnected ...') - - #更新的dbus - @dbus.service.method(INTERFACE,out_signature='b') - def UpdateDetect(self): - try: - #处于更新和升级中的话 不进行更新 - if self.parent.is_updating or self.parent.is_upgrading: - logging.info('In the process of updating or Upgrading...') - return False,'In the process of updating or Upgrading...' - else: - self.parent.start_update() - logging.info('method dbus updating ...') - return True - except Exception: - return False - - #全部升级 - @dbus.service.method(INTERFACE,out_signature='b') - def DistUpgradeSystem(self): - try: - #处于更新和升级中的话 不进行升级 - if self.parent.is_updating or self.parent.is_upgrading: - logging.info('In the process of updating or Upgrading...') - return False,'In the process of updating or Upgrading...' - else: - upgrade_groups_list = self.parent.update_list.local_upgrade_list.get('upgrade_groups_list',[]) - if upgrade_groups_list: - logging.info('method dbus upgrading ...') - self.parent.start_install() - return True - else: - return False - except Exception: - return False - - #部分升级 - @dbus.service.method(INTERFACE,in_signature='as',out_signature='bs') - def DistUpgradePartial(self,_partial_upgrade_list): - try: - #处于更新和升级中的话 不进行升级 - if self.parent.is_updating or self.parent.is_upgrading: - logging.info('In the process of updating or Upgrading...') - return False,'In the process of updating or Upgrading...' - else: - partial_upgrade_list = [str(i) for i in _partial_upgrade_list] - #本地维护的可升级的组 - upgrade_groups_list = self.parent.update_list.local_upgrade_list.get('upgrade_groups_list',[]) - upgrade_list = list(set(partial_upgrade_list) & set(upgrade_groups_list)) - - if upgrade_list: - logging.info('dbus partial_upgrade(%s)',upgrade_list) - self.parent.start_install(upgrade_list) - return True,'dbus upgrading' - else: - logging.info('input upgrade list(%s) not in local upgrade_list(%s)',partial_upgrade_list,upgrade_groups_list) - return False,'upgrade_list is empty' - except Exception as e: - return False,e - - # 取消transaction - @dbus.service.method(INTERFACE, out_signature='bs') - def CancelDownload(self): - status = False - message = "" - try: - if self.transaction.cancellable == True: - self.transaction.cancel() - status = True - message = "Success" - logging.info("dbus-mothod cancel task Success") - elif self.transaction == None or self.transaction.cancellable == False: - message = "Can not Cancel" - except Exception as e: - return (status,str(e)) - return (status, message) - - #更新进度信息 0~100 进度信息 101为非预期的信号 - @dbus.service.signal(INTERFACE,signature='is') - def UpdateDetectStatusChanged(self,progress,status): - logging.info("emit progress = %d , status = %s",progress,status) - - #更新完成的信号 - @dbus.service.signal(INTERFACE,signature='basss') - def UpdateDetectFinished(self, success, upgrade_group,error_string='',error_desc='',): - logging.info("emit update success = %r , upgrade_group = %a, error_string = %s , error_desc = %s ",\ - success,upgrade_group, error_string,error_desc) - - #升级的进度信息 0~100 进度信息 101为非预期的信号 - @dbus.service.signal(INTERFACE,signature='asis') - def UpdateDloadAndInstStaChanged(self,groups_list,progress,status): - logging.info("emit upgrade groups_list = %s progress = %d , status = %s",groups_list,progress,status) - - #升级完成的信号 - @dbus.service.signal(INTERFACE,signature='basss') - def UpdateDownloadFinished(self, success, upgrade_group,error_string='',error_desc='',): - logging.info("emit success = %r , upgrade_group = %a, error_string = %s , error_desc = %s ",\ - success,upgrade_group, error_string,error_desc) - - #发送下载包信息 - @dbus.service.signal(INTERFACE, signature='iiiii') - def UpdateDownloadInfo(self, current_items, total_items, currenty_bytes, total_bytes, current_cps): - logging.info("emit current_items = %d, total_items = %d, currenty_bytes = %d, total_bytes = %d, current_cps = %d .",\ - current_items, total_items, \ - currenty_bytes, total_bytes,\ - current_cps) - - # 信号是否可取消 - @dbus.service.signal(INTERFACE, signature='b') - def Cancelable(self, Cancelable): - logging.info("emit Cancelable: %r",\ - Cancelable) diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/UpdateManagerVersion.py b/.pybuild/cpython3_3.8/build/UpdateManager/UpdateManagerVersion.py deleted file mode 100644 index 12253b5..0000000 --- a/.pybuild/cpython3_3.8/build/UpdateManager/UpdateManagerVersion.py +++ /dev/null @@ -1 +0,0 @@ -VERSION = '1:20.04.9' diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/__init__.py b/.pybuild/cpython3_3.8/build/UpdateManager/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/backend/InstallBackendAptdaemon.py b/.pybuild/cpython3_3.8/build/UpdateManager/backend/InstallBackendAptdaemon.py deleted file mode 100644 index c8a40e0..0000000 --- a/.pybuild/cpython3_3.8/build/UpdateManager/backend/InstallBackendAptdaemon.py +++ /dev/null @@ -1,165 +0,0 @@ -#!/usr/bin/env python - -from __future__ import print_function - -from aptdaemon import client, errors -from defer import inline_callbacks -from aptdaemon.enums import (EXIT_SUCCESS, - EXIT_FAILED, - get_error_description_from_enum, - get_error_string_from_enum, - get_status_string_from_enum - ) - -from UpdateManager.backend import InstallBackend -import logging -from gettext import gettext as _ -import dbus - -class InstallBackendAptdaemon(InstallBackend): - """Makes use of aptdaemon to refresh the cache and to install updates.""" - - def __init__(self, window_main, action): - InstallBackend.__init__(self, window_main, action) - self.window_main = window_main - #客户端连接aptdeamon的dbus接口 - self.client = client.AptClient() - self.trans_failed_msg = None - - self.trans_progress = 0 - self.trans_status = '' - - @inline_callbacks - def update(self): - """刷新包cache""" - try: - trans = yield self.client.update_cache(defer=True) - self.window_main.dbusController.transaction = trans - #注册回调函数 接收更新的状态 - yield self._show_transaction(trans, self.ACTION_UPDATE, - _("Checking for updates…"), False) - except errors.NotAuthorizedError: - self._action_done(self.ACTION_UPDATE, - authorized=False, success=False, - error_string=None, error_desc=None) - except Exception: - self._action_done(self.ACTION_UPDATE, - authorized=True, success=False, - error_string=None, error_desc=None) - raise - - @inline_callbacks - def commit(self, pkgs_install, pkgs_upgrade, pkgs_remove,pkgs_purge): - """Commit a list of package adds and removes""" - try: - reinstall = downgrade = [] - trans = yield self.client.commit_packages( - pkgs_install, reinstall, pkgs_remove, purge = pkgs_purge, upgrade = pkgs_upgrade, - downgrade = downgrade, defer=True) - self.window_main.dbusController.transaction = trans - - yield self._show_transaction(trans, self.ACTION_INSTALL, - _("Installing updates…"), True) - except errors.NotAuthorizedError: - self._action_done(self.ACTION_INSTALL, - authorized=False, success=False, - error_string=None, error_desc=None) - except errors.TransactionFailed as e: - self.trans_failed_msg = str(e) - except dbus.DBusException as e: - if e.get_dbus_name() != "org.freedesktop.DBus.Error.NoReply": - raise - self._action_done(self.ACTION_INSTALL, - authorized=False, success=False, - error_string=None, error_desc=None) - except Exception: - self._action_done(self.ACTION_INSTALL, - authorized=True, success=False, - error_string=None, error_desc=None) - raise - - #进度回调 - def _on_progress_changed(self, trans,progress,action): - self.trans_progress = progress - if action == self.ACTION_UPDATE: - self.window_main.dbusController.UpdateDetectStatusChanged(self.trans_progress,self.trans_status) - else: - self.window_main.dbusController.UpdateDloadAndInstStaChanged(self.upgrade_groups_list,self.trans_progress,self.trans_status) - - #同步状态回调 - def _on_status_changed(self, trans, status,action): - #转化词条 - self.trans_status = get_status_string_from_enum(status) - - if action == self.ACTION_UPDATE: - self.window_main.dbusController.UpdateDetectStatusChanged(self.trans_progress,self.trans_status) - else: - #升级的时候发送状态信号时需要上传更新组信息self.upgrade_groups_list - self.window_main.dbusController.UpdateDloadAndInstStaChanged(self.upgrade_groups_list,self.trans_progress,self.trans_status) - - def _on_details_changed(self, trans, details): - logging.info(details) - - def _on_download_changed(self, trans, details): - logging.info(details) - - # eta 下载速度不正确,取消掉 - def _on_progress_download_changed(self,trans,current_items, total_items, currenty_bytes, total_bytes, current_cps, eta): - if self.action == self.ACTION_INSTALL: - self.window_main.dbusController.UpdateDownloadInfo(\ - current_items, total_items, \ - currenty_bytes, total_bytes, \ - current_cps) - - def _on_cancellable_changed(self, trans, Cancelable): - self.window_main.dbusController.Cancelable(Cancelable) - - @inline_callbacks - def _show_transaction(self, trans, action, header, show_details): - - #更新和升级最后完成和失败都会走此在此进行完成之后的处理 - trans.connect("finished", self._on_finished, action) - - #升级和更新的状态信息和进度 - trans.connect("status-changed", self._on_status_changed,action) - trans.connect("progress-changed", self._on_progress_changed,action) - - #取消升级 - trans.connect("cancellable-changed", self._on_cancellable_changed) - - #下载的进度信息 - trans.connect("progress-details-changed", self._on_progress_download_changed) - - # trans.connect("medium-required", self._on_medium_required) - # trans.connect("status-details-changed", self._on_details_changed) - #状态改变的时候的回调函数 - # trans.connect("download-changed", self._on_download_changed) - # trans.connect("config-file-conflict", self._on_config_file_conflict) - # yield trans.set_debconf_frontend("ukui") - yield trans.run() - - def _on_finished(self, trans, status, action): - error_string = '' - error_desc = '' - trans_failed = False - - if status == EXIT_FAILED: - error_string = get_error_string_from_enum(trans.error.code) - error_desc = get_error_description_from_enum(trans.error.code) - if self.trans_failed_msg: - trans_failed = True - error_desc = error_desc + "\n" + self.trans_failed_msg - is_success = (status == EXIT_SUCCESS) - - try: - self._action_done(action, - authorized=True, success=is_success, - error_string=error_string, error_desc=error_desc, - trans_failed=trans_failed) - except TypeError: - # this module used to be be lazily imported and in older code - # trans_failed= is not accepted - # TODO: this workaround can be dropped in Ubuntu 20.10 - self._action_done(action, - authorized=True, success=is_success, - error_string=error_string, error_desc=error_desc) diff --git a/.pybuild/cpython3_3.8/build/UpdateManager/backend/__init__.py b/.pybuild/cpython3_3.8/build/UpdateManager/backend/__init__.py deleted file mode 100644 index 49cc4a7..0000000 --- a/.pybuild/cpython3_3.8/build/UpdateManager/backend/__init__.py +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/env python -# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*- - -"""Integration of package managers into UpdateManager""" -# (c) 2005-2009 Canonical, GPL - -from __future__ import absolute_import - -from apt import Cache -import logging -import os -from gettext import gettext as _ - - -class InstallBackend(): - ACTION_UPDATE = 0 - ACTION_INSTALL = 1 - - def __init__(self, window_main, action): - self.window_main = window_main - self.action = action - self.upgrade_groups_list = [] - - def start(self,partial_upgrade_list = []): - - #FIXME: 在下载升级的能抑制系统关闭或者睡眠 参考ubuntu此部分代码 - - if self.action == self.ACTION_INSTALL: - pkgs_install = [] - pkgs_upgrade = [] - pkgs_remove = [] - pkgs_purge = [] - try: - #可选升级不为空 - if partial_upgrade_list: - self.upgrade_groups_list = partial_upgrade_list - #全部升级列表 - else: - self.upgrade_groups_list = self.window_main.update_list.local_upgrade_list.get('upgrade_groups_list',[]) - - #遍历升级组列表 - if self.upgrade_groups_list: - for group_name in self.upgrade_groups_list: - pkgs_install += self.window_main.update_list.local_upgrade_list.get(group_name,[]).get('pkgs_install',[]) - pkgs_upgrade += self.window_main.update_list.local_upgrade_list.get(group_name,[]).get('pkgs_upgrade',[]) - - pkgs_remove = self.window_main.update_list.local_upgrade_list.get("pkgs_remove",[]) - else: - logging.info("no upgradeable packages") - return - - #目前不确定#auto含义 大概是自动安装新包 类似加-y ubuntu这样做 - new_pkgs_install = [] - for pkgname in pkgs_install: - pkgname += "#auto" - new_pkgs_install.append(pkgname) - - logging.info("commit install:%d , upgrade:%d remove:%d",len(pkgs_install),len(pkgs_upgrade),len(pkgs_remove)) - self.commit(new_pkgs_install, pkgs_upgrade, pkgs_remove,pkgs_purge) - except Exception as e: - logging.error(e) - else: - self.update() - - def start_alone(self,pkgs_install = [], pkgs_upgrade = [], pkgs_remove = [],pkgs_purge = []): - os.environ["APT_LISTCHANGES_FRONTEND"] = "none" - - if self.action == self.ACTION_INSTALL: - # Get the packages which should be installed and update - self.commit(pkgs_install, pkgs_upgrade, pkgs_remove,pkgs_purge) - else: - self.update() - - def update(self): - """Run a update to refresh the package list""" - raise NotImplementedError - - def commit(self, pkgs_install, pkgs_upgrade, pkgs_remove,pkgs_purge): - """Commit the cache changes """ - raise NotImplementedError - - #出现错误和更新升级完成都会调到此方法 进行处理 - def _action_done(self, action, authorized, success, error_string, - error_desc, trans_failed=False): - - #升级完成后走的分支 - if action == self.ACTION_INSTALL: - self.window_main.is_upgrading = False - if success: - #当组列表为空时 表示现在的单独进行安装某些包或卸载,不发信号到控制面板 - if self.upgrade_groups_list: - self.window_main.dbusController.UpdateDownloadFinished(success,self.upgrade_groups_list,'','') - else: - self.window_main.start_available() - elif error_string or error_desc: - logging.warning(error_string + error_desc) - self.window_main.dbusController.UpdateDownloadFinished(success,self.upgrade_groups_list,error_string,error_desc) - else: - self.window_main.dbusController.UpdateDownloadFinished(success,self.upgrade_groups_list,'','') - else: - self.window_main.is_updating = False - if success: - self.window_main.start_available() - elif error_string or error_desc: - logging.warning(error_string + error_desc) - self.window_main.dbusController.UpdateDetectFinished(success,[],error_string,error_desc) - else: - self.window_main.dbusController.UpdateDetectFinished(success,[],'','') - -# try aptdaemon -if os.path.exists("/usr/sbin/aptd") \ - and "UPDATE_MANAGER_FORCE_BACKEND_SYNAPTIC" not in os.environ: - # check if the gtkwidgets are installed as well - try: - from .InstallBackendAptdaemon import InstallBackendAptdaemon - except ImportError: - logging.exception("importing aptdaemon") - - -def get_backend(*args, **kwargs): - """Select and return a package manager backend.""" - # try aptdaemon - if (os.path.exists("/usr/sbin/aptd") - and "UPDATE_MANAGER_FORCE_BACKEND_SYNAPTIC" not in os.environ): - # check if the gtkwidgets are installed as well - try: - return InstallBackendAptdaemon(*args, **kwargs) - except NameError: - logging.exception("using aptdaemon failed") - - # nothing found, raise - raise Exception("No working backend found, please try installing " - "aptdaemon or synaptic") diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index 374f003..0000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - // 使用 IntelliSense 了解相关属性。 - // 悬停以查看现有属性的描述。 - // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "name": "Python: 当前文件", - "type": "python", - "request": "launch", - "program": "${file}", - "console": "integratedTerminal", - "sudo":true - } - ] -} \ No newline at end of file diff --git a/kylin-system-updater b/kylin-system-updater index d09d70b..a68ade4 100755 --- a/kylin-system-updater +++ b/kylin-system-updater @@ -39,4 +39,5 @@ if __name__ == "__main__": app.start_update() loop = GLib.MainLoop() - loop.run() \ No newline at end of file + loop.run() + \ No newline at end of file