去除无效文件
This commit is contained in:
parent
c75e253386
commit
50b77f0056
|
@ -2,5 +2,8 @@
|
|||
*venv*
|
||||
__pycache__
|
||||
|
||||
.vscode
|
||||
.pybuild
|
||||
|
||||
# Misc
|
||||
.*cache
|
||||
|
|
|
@ -1,12 +0,0 @@
|
|||
[clean]
|
||||
all=1
|
||||
[build]
|
||||
build-lib=/home/kylin/new_kylin-system-updater/kylin-update-manager_dist/.pybuild/cpython3_3.8/build
|
||||
[install]
|
||||
force=1
|
||||
install-layout=deb
|
||||
install-scripts=$base/bin
|
||||
install-lib=/usr/lib/python3.8/dist-packages
|
||||
prefix=/usr
|
||||
[easy_install]
|
||||
allow_hosts=None
|
|
@ -1,101 +0,0 @@
|
|||
# AlertWatcher.py
|
||||
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (c) 2010 Mohamed Amine IL Idrissi
|
||||
#
|
||||
# Author: Mohamed Amine IL Idrissi <ilidrissiamine@gmail.com>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License as
|
||||
# published by the Free Software Foundation; either version 2 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
|
||||
# USA
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from gi.repository import GObject
|
||||
import dbus
|
||||
from dbus.mainloop.glib import DBusGMainLoop
|
||||
|
||||
|
||||
class AlertWatcher(GObject.GObject):
|
||||
""" a class that checks for alerts and reports them, like a battery
|
||||
or network warning """
|
||||
|
||||
__gsignals__ = {"network-alert": (GObject.SignalFlags.RUN_FIRST,
|
||||
None,
|
||||
(GObject.TYPE_INT,)),
|
||||
"battery-alert": (GObject.SignalFlags.RUN_FIRST,
|
||||
None,
|
||||
(GObject.TYPE_BOOLEAN,)),
|
||||
"network-3g-alert": (GObject.SignalFlags.RUN_FIRST,
|
||||
None,
|
||||
(GObject.TYPE_BOOLEAN,
|
||||
GObject.TYPE_BOOLEAN,)),
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
GObject.GObject.__init__(self)
|
||||
DBusGMainLoop(set_as_default=True)
|
||||
self.bus = dbus.Bus(dbus.Bus.TYPE_SYSTEM)
|
||||
# make it always connected if NM isn't available
|
||||
self.network_state = 3
|
||||
|
||||
def check_alert_state(self):
|
||||
try:
|
||||
#network
|
||||
obj = self.bus.get_object("org.freedesktop.NetworkManager",
|
||||
"/org/freedesktop/NetworkManager")
|
||||
obj.connect_to_signal(
|
||||
"StateChanged",
|
||||
self._on_network_state_changed,
|
||||
dbus_interface="org.freedesktop.NetworkManager")
|
||||
interface = dbus.Interface(obj, "org.freedesktop.DBus.Properties")
|
||||
self.network_state = interface.Get(
|
||||
"org.freedesktop.NetworkManager", "State")
|
||||
self._network_alert(self.network_state)
|
||||
|
||||
# power
|
||||
# obj = self.bus.get_object('org.freedesktop.UPower',
|
||||
# '/org/freedesktop/UPower')
|
||||
# obj.connect_to_signal("Changed", self._power_changed,
|
||||
# dbus_interface="org.freedesktop.UPower")
|
||||
# self._power_changed()
|
||||
# 3g
|
||||
# self._update_3g_state()
|
||||
except dbus.exceptions.DBusException:
|
||||
pass
|
||||
|
||||
def _on_network_state_changed(self, state):
|
||||
self._network_alert(state)
|
||||
# self._update_3g_state()
|
||||
|
||||
# def _update_3g_state(self):
|
||||
# from .roam import NetworkManagerHelper
|
||||
# nm = NetworkManagerHelper()
|
||||
# on_3g = nm.is_active_connection_gsm_or_cdma()
|
||||
# is_roaming = nm.is_active_connection_gsm_or_cdma_roaming()
|
||||
# self._network_3g_alert(on_3g, is_roaming)
|
||||
|
||||
# def _network_3g_alert(self, on_3g, is_roaming):
|
||||
# self.emit("network-3g-alert", on_3g, is_roaming)
|
||||
|
||||
def _network_alert(self, state):
|
||||
self.network_state = state
|
||||
self.emit("network-alert", state)
|
||||
|
||||
# def _power_changed(self):
|
||||
# obj = self.bus.get_object("org.freedesktop.UPower",
|
||||
# "/org/freedesktop/UPower")
|
||||
# interface = dbus.Interface(obj, "org.freedesktop.DBus.Properties")
|
||||
# on_battery = interface.Get("org.freedesktop.UPower", "OnBattery")
|
||||
# self.emit("battery-alert", on_battery)
|
|
@ -1,434 +0,0 @@
|
|||
# MyCache.py
|
||||
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (c) 2004-2008 Canonical
|
||||
#
|
||||
# Author: Michael Vogt <mvo@debian.org>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License as
|
||||
# published by the Free Software Foundation; either version 2 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
|
||||
# USA
|
||||
|
||||
from __future__ import absolute_import, print_function
|
||||
|
||||
import warnings
|
||||
warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning)
|
||||
import apt
|
||||
import apt_pkg
|
||||
import logging
|
||||
import os
|
||||
from urllib.error import HTTPError
|
||||
from urllib.request import urlopen
|
||||
from urllib.parse import urlsplit
|
||||
from http.client import BadStatusLine
|
||||
import socket
|
||||
import subprocess
|
||||
import re
|
||||
import DistUpgrade.DistUpgradeCache
|
||||
from gettext import gettext as _
|
||||
try:
|
||||
from launchpadlib.launchpad import Launchpad
|
||||
except ImportError:
|
||||
Launchpad = None
|
||||
|
||||
CHANGELOGS_POOL = "https://changelogs.ubuntu.com/changelogs/pool/"
|
||||
CHANGELOGS_URI = CHANGELOGS_POOL + "%s/%s/%s/%s_%s/%s"
|
||||
|
||||
|
||||
class HttpsChangelogsUnsupportedError(Exception):
|
||||
""" https changelogs with credentials are unsupported because of the
|
||||
lack of certitifcation validation in urllib2 which allows MITM
|
||||
attacks to steal the credentials
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class MyCache(DistUpgrade.DistUpgradeCache.MyCache):
|
||||
|
||||
CHANGELOG_ORIGIN = "Ubuntu"
|
||||
|
||||
def __init__(self, progress, rootdir=None):
|
||||
apt.Cache.__init__(self, progress, rootdir)
|
||||
# save for later
|
||||
self.rootdir = rootdir
|
||||
# raise if we have packages in reqreinst state
|
||||
# and let the caller deal with that (runs partial upgrade)
|
||||
assert len(self.req_reinstall_pkgs) == 0
|
||||
# check if the dpkg journal is ok (we need to do that here
|
||||
# too because libapt will only do it when it tries to lock
|
||||
# the packaging system)
|
||||
assert(not self._dpkgJournalDirty())
|
||||
# init the regular cache
|
||||
self._initDepCache()
|
||||
self.all_changes = {}
|
||||
self.all_news = {}
|
||||
# on broken packages, try to fix via saveDistUpgrade()
|
||||
if self._depcache.broken_count > 0:
|
||||
self.saveDistUpgrade()
|
||||
assert (self._depcache.broken_count == 0
|
||||
and self._depcache.del_count == 0)
|
||||
self.launchpad = None
|
||||
|
||||
def _dpkgJournalDirty(self):
|
||||
"""
|
||||
test if the dpkg journal is dirty
|
||||
(similar to debSystem::CheckUpdates)
|
||||
"""
|
||||
d = os.path.dirname(
|
||||
apt_pkg.config.find_file("Dir::State::status")) + "/updates"
|
||||
for f in os.listdir(d):
|
||||
if re.match("[0-9]+", f):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _initDepCache(self):
|
||||
self._depcache.read_pinfile()
|
||||
self._depcache.init()
|
||||
|
||||
def clear(self):
|
||||
self._initDepCache()
|
||||
|
||||
@property
|
||||
def required_download(self):
|
||||
""" get the size of the packages that are required to download """
|
||||
pm = apt_pkg.PackageManager(self._depcache)
|
||||
fetcher = apt_pkg.Acquire()
|
||||
pm.get_archives(fetcher, self._list, self._records)
|
||||
return fetcher.fetch_needed
|
||||
|
||||
@property
|
||||
def install_count(self):
|
||||
return self._depcache.inst_count
|
||||
|
||||
def keep_count(self):
|
||||
return self._depcache.keep_count
|
||||
|
||||
@property
|
||||
def del_count(self):
|
||||
return self._depcache.del_count
|
||||
|
||||
def _check_dependencies(self, target, deps):
|
||||
"""Return True if any of the dependencies in deps match target."""
|
||||
# TODO: handle virtual packages
|
||||
for dep_or in deps:
|
||||
if not dep_or:
|
||||
continue
|
||||
match = True
|
||||
for base_dep in dep_or:
|
||||
if (base_dep.name != target.package.shortname
|
||||
or not apt_pkg.check_dep(
|
||||
target.version, base_dep.relation, base_dep.version)):
|
||||
match = False
|
||||
if match:
|
||||
return True
|
||||
return False
|
||||
|
||||
def find_removal_justification(self, pkg):
|
||||
target = pkg.installed
|
||||
if not target:
|
||||
return False
|
||||
for cpkg in self:
|
||||
candidate = cpkg.candidate
|
||||
if candidate is not None:
|
||||
if (self._check_dependencies(
|
||||
target, candidate.get_dependencies("Conflicts"))
|
||||
and self._check_dependencies(
|
||||
target, candidate.get_dependencies("Replaces"))):
|
||||
logging.info(
|
||||
"%s Conflicts/Replaces %s; allowing removal" % (
|
||||
candidate.package.shortname, pkg.shortname))
|
||||
return True
|
||||
return False
|
||||
|
||||
def saveDistUpgrade(self):
|
||||
""" this functions mimics a upgrade but will never remove anything """
|
||||
#upgrade(True) 为True时使用dist-upgrade进行升级
|
||||
self._depcache.upgrade(True)
|
||||
wouldDelete = self._depcache.del_count
|
||||
wouldDelete = 0
|
||||
if wouldDelete > 0:
|
||||
deleted_pkgs = [pkg for pkg in self if pkg.marked_delete]
|
||||
assert wouldDelete == len(deleted_pkgs)
|
||||
for pkg in deleted_pkgs:
|
||||
if self.find_removal_justification(pkg):
|
||||
wouldDelete -= 1
|
||||
if wouldDelete > 0:
|
||||
self.clear()
|
||||
assert (self._depcache.broken_count == 0
|
||||
and self._depcache.del_count == 0)
|
||||
# else:
|
||||
# assert self._depcache.broken_count == 0
|
||||
self._depcache.upgrade()
|
||||
return wouldDelete
|
||||
|
||||
def _strip_epoch(self, verstr):
|
||||
" strip of the epoch "
|
||||
vers_no_epoch = verstr.split(":")
|
||||
if len(vers_no_epoch) > 1:
|
||||
verstr = "".join(vers_no_epoch[1:])
|
||||
return verstr
|
||||
|
||||
def _get_changelog_or_news(self, name, fname, strict_versioning=False,
|
||||
changelogs_uri=None):
|
||||
" helper that fetches the file in question "
|
||||
# don't touch the gui in this function, it needs to be thread-safe
|
||||
pkg = self[name]
|
||||
|
||||
# get the src package name
|
||||
srcpkg = pkg.candidate.source_name
|
||||
|
||||
# assume "main" section
|
||||
src_section = "main"
|
||||
# use the section of the candidate as a starting point
|
||||
section = pkg._pcache._depcache.get_candidate_ver(pkg._pkg).section
|
||||
|
||||
# get the source version
|
||||
srcver_epoch = pkg.candidate.source_version
|
||||
srcver = self._strip_epoch(srcver_epoch)
|
||||
|
||||
split_section = section.split("/")
|
||||
if len(split_section) > 1:
|
||||
src_section = split_section[0]
|
||||
|
||||
# lib is handled special
|
||||
prefix = srcpkg[0]
|
||||
if srcpkg.startswith("lib"):
|
||||
prefix = "lib" + srcpkg[3]
|
||||
|
||||
# the changelogs_uri argument overrides the default changelogs_uri,
|
||||
# this is useful for e.g. PPAs where we construct the changelogs
|
||||
# path differently
|
||||
if changelogs_uri:
|
||||
uri = changelogs_uri
|
||||
else:
|
||||
uri = CHANGELOGS_URI % (src_section, prefix, srcpkg, srcpkg,
|
||||
srcver, fname)
|
||||
|
||||
# https uris are not supported when they contain a username/password
|
||||
# because the urllib2 https implementation will not check certificates
|
||||
# and so its possible to do a man-in-the-middle attack to steal the
|
||||
# credentials
|
||||
res = urlsplit(uri)
|
||||
if res.scheme == "https" and res.username:
|
||||
raise HttpsChangelogsUnsupportedError(
|
||||
"https locations with username/password are not"
|
||||
"supported to fetch changelogs")
|
||||
|
||||
# print("Trying: %s " % uri)
|
||||
changelog = urlopen(uri)
|
||||
#print(changelog.read())
|
||||
# do only get the lines that are new
|
||||
alllines = ""
|
||||
regexp = "^%s \\((.*)\\)(.*)$" % (re.escape(srcpkg))
|
||||
|
||||
while True:
|
||||
line = changelog.readline().decode("UTF-8", "replace")
|
||||
if line == "":
|
||||
break
|
||||
match = re.match(regexp, line)
|
||||
if match:
|
||||
# strip epoch from installed version
|
||||
# and from changelog too
|
||||
installed = getattr(pkg.installed, "version", None)
|
||||
if installed and ":" in installed:
|
||||
installed = installed.split(":", 1)[1]
|
||||
changelogver = match.group(1)
|
||||
if changelogver and ":" in changelogver:
|
||||
changelogver = changelogver.split(":", 1)[1]
|
||||
# we test for "==" here for changelogs
|
||||
# to ensure that the version
|
||||
# is actually really in the changelog - if not
|
||||
# just display it all, this catches cases like:
|
||||
# gcc-defaults with "binver=4.3.1" and srcver=1.76
|
||||
#
|
||||
# for NEWS.Debian we do require the changelogver > installed
|
||||
if strict_versioning:
|
||||
if (installed
|
||||
and apt_pkg.version_compare(changelogver,
|
||||
installed) < 0):
|
||||
break
|
||||
else:
|
||||
if (installed
|
||||
and apt_pkg.version_compare(changelogver,
|
||||
installed) == 0):
|
||||
break
|
||||
alllines = alllines + line
|
||||
return alllines
|
||||
|
||||
def _extract_ppa_changelog_uri(self, name):
|
||||
"""Return the changelog URI from the Launchpad API
|
||||
|
||||
Return None in case of an error.
|
||||
"""
|
||||
if not Launchpad:
|
||||
logging.warning("Launchpadlib not available, cannot retrieve PPA "
|
||||
"changelog")
|
||||
return None
|
||||
|
||||
cdt = self[name].candidate
|
||||
for uri in cdt.uris:
|
||||
if urlsplit(uri).hostname != 'ppa.launchpad.net':
|
||||
continue
|
||||
match = re.search('http.*/(.*)/(.*)/ubuntu/.*', uri)
|
||||
if match is not None:
|
||||
user, ppa = match.group(1), match.group(2)
|
||||
break
|
||||
else:
|
||||
logging.error("Unable to find a valid PPA candidate URL.")
|
||||
return
|
||||
|
||||
# Login on launchpad if we are not already
|
||||
if self.launchpad is None:
|
||||
self.launchpad = Launchpad.login_anonymously('update-manager',
|
||||
'production',
|
||||
version='devel')
|
||||
|
||||
archive = self.launchpad.archives.getByReference(
|
||||
reference='~%s/ubuntu/%s' % (user, ppa)
|
||||
)
|
||||
if archive is None:
|
||||
logging.error("Unable to retrieve the archive from the Launchpad "
|
||||
"API.")
|
||||
return
|
||||
|
||||
spphs = archive.getPublishedSources(source_name=cdt.source_name,
|
||||
exact_match=True,
|
||||
version=cdt.source_version)
|
||||
if not spphs:
|
||||
logging.error("No published sources were retrieved from the "
|
||||
"Launchpad API.")
|
||||
return
|
||||
|
||||
return spphs[0].changelogUrl()
|
||||
|
||||
def _guess_third_party_changelogs_uri_by_source(self, name):
|
||||
pkg = self[name]
|
||||
deb_uri = pkg.candidate.uri
|
||||
if deb_uri is None:
|
||||
return None
|
||||
srcrec = pkg.candidate.record.get("Source")
|
||||
if not srcrec:
|
||||
return None
|
||||
# srcpkg can be "apt" or "gcc-default (1.0)"
|
||||
srcpkg = srcrec.split("(")[0].strip()
|
||||
if "(" in srcrec:
|
||||
srcver = srcrec.split("(")[1].rstrip(")")
|
||||
else:
|
||||
srcver = pkg.candidate.source_version
|
||||
base_uri = deb_uri.rpartition("/")[0]
|
||||
return base_uri + "/%s_%s.changelog" % (srcpkg, srcver)
|
||||
|
||||
def _guess_third_party_changelogs_uri_by_binary(self, name):
|
||||
""" guess changelogs uri based on ArchiveURI by replacing .deb
|
||||
with .changelog
|
||||
"""
|
||||
# there is always a pkg and a pkg.candidate, no need to add
|
||||
# check here
|
||||
pkg = self[name]
|
||||
deb_uri = pkg.candidate.uri
|
||||
if deb_uri:
|
||||
return "%s.changelog" % deb_uri.rsplit(".", 1)[0]
|
||||
return None
|
||||
|
||||
def get_news_and_changelog(self, name, lock):
|
||||
self.get_news(name)
|
||||
self.get_changelog(name)
|
||||
try:
|
||||
lock.release()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def get_news(self, name):
|
||||
" get the NEWS.Debian file from the changelogs location "
|
||||
try:
|
||||
news = self._get_changelog_or_news(name, "NEWS.Debian", True)
|
||||
except Exception:
|
||||
return
|
||||
if news:
|
||||
self.all_news[name] = news
|
||||
|
||||
def _fetch_changelog_for_third_party_package(self, name, origins):
|
||||
# Special case for PPAs
|
||||
changelogs_uri_ppa = None
|
||||
for origin in origins:
|
||||
if origin.origin.startswith('LP-PPA-'):
|
||||
try:
|
||||
changelogs_uri_ppa = self._extract_ppa_changelog_uri(name)
|
||||
break
|
||||
except Exception:
|
||||
logging.exception("Unable to connect to the Launchpad "
|
||||
"API.")
|
||||
# Try non official changelog location
|
||||
changelogs_uri_binary = \
|
||||
self._guess_third_party_changelogs_uri_by_binary(name)
|
||||
changelogs_uri_source = \
|
||||
self._guess_third_party_changelogs_uri_by_source(name)
|
||||
error_message = ""
|
||||
for changelogs_uri in [changelogs_uri_ppa,
|
||||
changelogs_uri_binary,
|
||||
changelogs_uri_source]:
|
||||
if changelogs_uri:
|
||||
try:
|
||||
changelog = self._get_changelog_or_news(
|
||||
name, "changelog", False, changelogs_uri)
|
||||
self.all_changes[name] += changelog
|
||||
except (HTTPError, HttpsChangelogsUnsupportedError):
|
||||
# no changelogs_uri or 404
|
||||
error_message = _(
|
||||
"This update does not come from a "
|
||||
"source that supports changelogs.")
|
||||
except (IOError, BadStatusLine, socket.error):
|
||||
# network errors and others
|
||||
logging.exception("error on changelog fetching")
|
||||
error_message = _(
|
||||
"Failed to download the list of changes. \n"
|
||||
"Please check your Internet connection.")
|
||||
self.all_changes[name] += error_message
|
||||
|
||||
def get_changelog(self, name):
|
||||
" get the changelog file from the changelog location "
|
||||
origins = self[name].candidate.origins
|
||||
self.all_changes[name] = _("Changes for %s versions:\n"
|
||||
"Installed version: %s\n"
|
||||
"Available version: %s\n\n") % \
|
||||
(name, getattr(self[name].installed, "version", None),
|
||||
self[name].candidate.version)
|
||||
if self.CHANGELOG_ORIGIN not in [o.origin for o in origins]:
|
||||
self._fetch_changelog_for_third_party_package(name, origins)
|
||||
return
|
||||
# fixup epoch handling version
|
||||
srcpkg = self[name].candidate.source_name
|
||||
srcver_epoch = self[name].candidate.source_version.replace(':', '%3A')
|
||||
try:
|
||||
changelog = self._get_changelog_or_news(name, "changelog")
|
||||
if len(changelog) == 0:
|
||||
changelog = _("The changelog does not contain any relevant "
|
||||
"changes.\n\n"
|
||||
"Please use http://launchpad.net/ubuntu/+source/"
|
||||
"%s/%s/+changelog\n"
|
||||
"until the changes become available or try "
|
||||
"again later.") % (srcpkg, srcver_epoch)
|
||||
except HTTPError:
|
||||
changelog = _("The list of changes is not available yet.\n\n"
|
||||
"Please use http://launchpad.net/ubuntu/+source/"
|
||||
"%s/%s/+changelog\n"
|
||||
"until the changes become available or try again "
|
||||
"later.") % (srcpkg, srcver_epoch)
|
||||
except (IOError, BadStatusLine, socket.error) as e:
|
||||
print("caught exception: ", e)
|
||||
changelog = _("Failed to download the list "
|
||||
"of changes. \nPlease "
|
||||
"check your Internet "
|
||||
"connection.")
|
||||
self.all_changes[name] += changelog
|
|
@ -1,676 +0,0 @@
|
|||
# UpdateList.py
|
||||
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (c) 2004-2013 Canonical
|
||||
#
|
||||
# Author: Michael Vogt <mvo@debian.org>
|
||||
# Dylan McCall <dylanmccall@ubuntu.com>
|
||||
# Michael Terry <michael.terry@canonical.com>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License as
|
||||
# published by the Free Software Foundation; either version 2 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
|
||||
# USA
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import warnings
|
||||
warnings.filterwarnings("ignore", "Accessed deprecated property",
|
||||
DeprecationWarning)
|
||||
|
||||
from gettext import gettext as _, install
|
||||
import apt
|
||||
import logging
|
||||
import itertools
|
||||
import os
|
||||
import random
|
||||
import glob
|
||||
import json
|
||||
from gi.repository import Gio
|
||||
|
||||
import shutil
|
||||
from .utils import humanize_size
|
||||
from UpdateManager.Core import utils
|
||||
from UpdateManager.Core import filter
|
||||
|
||||
class UpdateItem():
|
||||
def __init__(self, pkg, name, icon, to_remove):
|
||||
self.icon = icon
|
||||
self.name = name
|
||||
self.pkg = pkg
|
||||
self.to_remove = to_remove
|
||||
|
||||
def is_selected(self):
|
||||
if not self.to_remove:
|
||||
return self.pkg.marked_install or self.pkg.marked_upgrade
|
||||
else:
|
||||
return self.pkg.marked_delete
|
||||
|
||||
|
||||
class UpdateGroup(UpdateItem):
|
||||
_depcache = {}
|
||||
|
||||
def __init__(self, pkg, name, icon, to_remove):
|
||||
UpdateItem.__init__(self, pkg, name, icon, to_remove)
|
||||
self._items = set()
|
||||
self._deps = set()
|
||||
self.core_item = None
|
||||
if pkg is not None:
|
||||
self.core_item = UpdateItem(pkg, name, icon, to_remove)
|
||||
self._items.add(self.core_item)
|
||||
|
||||
@property
|
||||
def items(self):
|
||||
all_items = []
|
||||
all_items.extend(self._items)
|
||||
return sorted(all_items, key=lambda a: a.name.lower())
|
||||
|
||||
def add(self, pkg, cache=None, eventloop_callback=None, to_remove=False):
|
||||
name = utils.get_package_label(pkg)
|
||||
icon = Gio.ThemedIcon.new("package")
|
||||
self._items.add(UpdateItem(pkg, name, icon, to_remove))
|
||||
# If the pkg is in self._deps, stop here. We have already calculated
|
||||
# the recursive dependencies for this package, no need to do it again.
|
||||
if cache and pkg.name in cache and pkg.name not in self._deps:
|
||||
if not self._deps:
|
||||
# Initial deps haven't been calculated. As we're checking
|
||||
# whether _deps is empty in is_dependency, we must init now or
|
||||
# it won't be done at all.
|
||||
self._init_deps(cache, eventloop_callback)
|
||||
self._add_deps(pkg, cache, eventloop_callback)
|
||||
|
||||
def contains(self, item):
|
||||
return item in self._items
|
||||
|
||||
def _init_deps(self, cache, eventloop_callback):
|
||||
for item in self._items:
|
||||
if item.pkg and item.pkg.name not in self._deps:
|
||||
self._add_deps(item.pkg, cache, eventloop_callback)
|
||||
|
||||
def _add_deps(self, pkg, cache, eventloop_callback):
|
||||
"""Adds pkg and dependencies of pkg to the dependency list."""
|
||||
if pkg is None or pkg.candidate is None or pkg.name in self._deps:
|
||||
# This shouldn't really happen. If we land here often, it's a sign
|
||||
# that something has gone wrong. Unless all pkgs are None it's not
|
||||
# a critical issue - a hit to the performance at most.
|
||||
reason = ((not pkg or not pkg.candidate)
|
||||
and "Package was None or didn't have a candidate."
|
||||
or "%s already in _deps." % pkg.name)
|
||||
logging.debug("Useless call to _add_deps. %s" % reason)
|
||||
return
|
||||
if len(self._deps) % 200 == 0 and callable(eventloop_callback):
|
||||
# Don't spin the loop every time _add_deps is called.
|
||||
eventloop_callback()
|
||||
|
||||
self._deps.add(pkg.name)
|
||||
|
||||
if pkg.name in self._depcache:
|
||||
for dep in self._depcache[pkg.name]:
|
||||
if dep not in self._deps and dep in cache:
|
||||
self._add_deps(cache[dep], cache, eventloop_callback)
|
||||
else:
|
||||
candidate = pkg.candidate
|
||||
dependencies = candidate.get_dependencies('Depends', 'Recommends')
|
||||
for dependency_pkg in itertools.chain.from_iterable(dependencies):
|
||||
name = dependency_pkg.name
|
||||
if name not in self._deps and name in cache:
|
||||
self._depcache.setdefault(pkg.name, []).append(name)
|
||||
self._add_deps(cache[name], cache, eventloop_callback)
|
||||
|
||||
def is_dependency(self, maybe_dep, cache=None, eventloop_callback=None):
|
||||
if not self._deps and cache:
|
||||
self._init_deps(cache, eventloop_callback)
|
||||
|
||||
return maybe_dep.name in self._deps
|
||||
|
||||
def packages_are_selected(self):
|
||||
for item in self.items:
|
||||
if item.is_selected():
|
||||
return True
|
||||
return False
|
||||
|
||||
def selection_is_inconsistent(self):
|
||||
pkgs_installing = [item for item in self.items if item.is_selected()]
|
||||
return (len(pkgs_installing) > 0
|
||||
and len(pkgs_installing) < len(self.items))
|
||||
|
||||
def get_total_size(self):
|
||||
if self.to_remove:
|
||||
return 0
|
||||
size = 0
|
||||
for item in self.items:
|
||||
size += getattr(item.pkg.candidate, "size", 0)
|
||||
return size
|
||||
|
||||
|
||||
class UpdateApplicationGroup(UpdateGroup):
|
||||
def __init__(self, pkg, application, to_remove):
|
||||
name = application.get_display_name()
|
||||
icon = application.get_icon()
|
||||
super(UpdateApplicationGroup, self).__init__(pkg, name, icon,
|
||||
to_remove)
|
||||
|
||||
|
||||
class UpdatePackageGroup(UpdateGroup):
|
||||
def __init__(self, pkg, to_remove):
|
||||
name = utils.get_package_label(pkg)
|
||||
icon = Gio.ThemedIcon.new("package")
|
||||
super(UpdatePackageGroup, self).__init__(pkg, name, icon, to_remove)
|
||||
|
||||
|
||||
class UpdateSystemGroup(UpdateGroup):
|
||||
def __init__(self, cache, to_remove):
|
||||
# Translators: the %s is a distro name, like 'Ubuntu' and 'base' as in
|
||||
# the core components and packages.
|
||||
name = _("%s base") % utils.get_ubuntu_flavor_name(cache=cache)
|
||||
icon = Gio.ThemedIcon.new("distributor-logo")
|
||||
super(UpdateSystemGroup, self).__init__(None, name, icon, to_remove)
|
||||
|
||||
|
||||
class UpdateOrigin():
|
||||
def __init__(self, desc, importance):
|
||||
self.packages = []
|
||||
self.importance = importance
|
||||
self.description = desc
|
||||
|
||||
|
||||
class UpdateList():
|
||||
"""
|
||||
class that contains the list of available updates in
|
||||
self.pkgs[origin] where origin is the user readable string
|
||||
"""
|
||||
|
||||
# the key in the debian/control file used to add the phased
|
||||
# updates percentage
|
||||
PHASED_UPDATES_KEY = "Phased-Update-Percentage"
|
||||
|
||||
# the file that contains the uniq machine id
|
||||
UNIQ_MACHINE_ID_FILE = "/etc/machine-id"
|
||||
# use the dbus one as a fallback
|
||||
UNIQ_MACHINE_ID_FILE_FALLBACK = "/var/lib/dbus/machine-id"
|
||||
|
||||
APP_INSTALL_PATTERN = "/usr/share/app-install/desktop/%s:*.desktop"
|
||||
|
||||
# the configuration key to turn phased-updates always on
|
||||
ALWAYS_INCLUDE_PHASED_UPDATES = (
|
||||
"Update-Manager::Always-Include-Phased-Updates")
|
||||
# ... or always off
|
||||
NEVER_INCLUDE_PHASED_UPDATES = (
|
||||
"Update-Manager::Never-Include-Phased-Updates")
|
||||
|
||||
def __init__(self, parent, dist=None):
|
||||
self.dist = (dist if dist else utils.get_dist())
|
||||
self.distUpgradeWouldDelete = 0
|
||||
self.update_groups = []
|
||||
self.random = random.Random()
|
||||
|
||||
#FIXME: 最好将这个常量通过配置文件读
|
||||
self.GROUPS_JSON_PKG = 'kylin-update-desktop-config'
|
||||
|
||||
self.INPUT_CONFIG_PATH = '/usr/share/kylin-update-desktop-config/data'
|
||||
self.OUTPUT_CONFIG_PATH = os.getenv('HOME') + '/.config' +'/update_manager_config'
|
||||
|
||||
self.IMPORTANT_LIST_PATH="/var/lib/kylin-software-properties/template/important.list"
|
||||
|
||||
# important推送列表
|
||||
self.important_list = []
|
||||
|
||||
#所有的组升级安装列表
|
||||
self.local_upgrade_list = {}
|
||||
# a stable machine uniq id
|
||||
try:
|
||||
with open(self.UNIQ_MACHINE_ID_FILE) as f:
|
||||
self.machine_uniq_id = f.read()
|
||||
except FileNotFoundError:
|
||||
with open(self.UNIQ_MACHINE_ID_FILE_FALLBACK) as f:
|
||||
self.machine_uniq_id = f.read()
|
||||
|
||||
if 'XDG_DATA_DIRS' in os.environ and os.environ['XDG_DATA_DIRS']:
|
||||
data_dirs = os.environ['XDG_DATA_DIRS']
|
||||
else:
|
||||
data_dirs = '/usr/local/share/:/usr/share/'
|
||||
|
||||
#FIX 此处需要修复 application_dirs 包含正确的desktop文件的目录
|
||||
self.application_dirs = [os.path.join(base, 'applications')
|
||||
for base in data_dirs.split(':')]
|
||||
|
||||
if 'XDG_CURRENT_DESKTOP' in os.environ:
|
||||
self.current_desktop = os.environ.get('XDG_CURRENT_DESKTOP')
|
||||
else:
|
||||
self.current_desktop = ''
|
||||
self.desktop_cache = {}
|
||||
|
||||
def _file_is_application(self, file_path):
|
||||
# WARNING: This is called often if there's a lot of updates. A poor
|
||||
# performing call here has a huge impact on the overall performance!
|
||||
#通过判断包的配置文件是否存在.desktop
|
||||
if not file_path.endswith(".desktop"):
|
||||
# First the obvious case: If the path doesn't end in a .desktop
|
||||
# extension, this isn't a desktop file.
|
||||
return False
|
||||
|
||||
#通过判断.desktop 文件是否在/usr/share/applications 里面 表示属于应用
|
||||
file_path = os.path.abspath(file_path)
|
||||
for app_dir in self.application_dirs:
|
||||
if file_path.startswith(app_dir):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _rate_application_for_package(self, application, pkg):
|
||||
score = 0
|
||||
desktop_file = os.path.basename(application.get_filename())
|
||||
application_id = os.path.splitext(desktop_file)[0]
|
||||
|
||||
if application.should_show():
|
||||
score += 1
|
||||
|
||||
if application_id == pkg.name:
|
||||
score += 5
|
||||
|
||||
return score
|
||||
|
||||
def _get_application_for_package(self, pkg):
|
||||
desktop_files = []
|
||||
rated_applications = []
|
||||
|
||||
#拿到应用的desktop文件判断为一个应用
|
||||
for installed_file in pkg.installed_files:
|
||||
if self._file_is_application(installed_file):
|
||||
desktop_files.append(installed_file)
|
||||
#此部分强制进行添加应用
|
||||
if pkg.name in self.desktop_cache:
|
||||
desktop_files += self.desktop_cache[pkg.name]
|
||||
|
||||
for desktop_file in desktop_files:
|
||||
try:
|
||||
application = Gio.DesktopAppInfo.new_from_filename(
|
||||
desktop_file)
|
||||
application.set_desktop_env(self.current_desktop)
|
||||
except Exception as e:
|
||||
logging.warning("Error loading .desktop file %s: %s" %
|
||||
(desktop_file, e))
|
||||
continue
|
||||
score = self._rate_application_for_package(application, pkg)
|
||||
if score > 0:
|
||||
rated_applications.append((score, application))
|
||||
|
||||
rated_applications.sort(key=lambda app: app[0], reverse=True)
|
||||
if len(rated_applications) > 0:
|
||||
return rated_applications[0][1]
|
||||
else:
|
||||
return None
|
||||
|
||||
def _populate_desktop_cache(self, pkg_names):
|
||||
if not pkg_names:
|
||||
# No updates; This shouldn't have happened.
|
||||
logging.warning("_populate_desktop_cache called with empty list "
|
||||
"of packages.")
|
||||
return
|
||||
elif len(pkg_names) == 1:
|
||||
# One update; Let glob do the matching.
|
||||
pattern = self.APP_INSTALL_PATTERN % pkg_names[0]
|
||||
else:
|
||||
# More than one update available. Glob all desktop files and store
|
||||
# those that match an upgradeable package.
|
||||
pattern = self.APP_INSTALL_PATTERN % "*"
|
||||
|
||||
for desktop_file in glob.iglob(pattern):
|
||||
try:
|
||||
pkg = desktop_file.split('/')[-1].split(":")[0]
|
||||
except IndexError:
|
||||
# app-install-data desktop file had an unexpected naming
|
||||
# convention. As we can't extract the package name from
|
||||
# the path, just ignore it.
|
||||
logging.error("Could not extract package name from '%s'. "
|
||||
"File ignored." % desktop_file)
|
||||
continue
|
||||
|
||||
if pkg in pkg_names:
|
||||
self.desktop_cache.setdefault(pkg, []).append(desktop_file)
|
||||
logging.debug("App candidate for %s: %s" %
|
||||
(pkg, desktop_file))
|
||||
|
||||
def _is_security_update(self, pkg):
|
||||
""" This will test if the pkg is a security update.
|
||||
This includes if there is a newer version in -updates, but also
|
||||
an older update available in -security. For example, if
|
||||
installed pkg A v1.0 is available in both -updates (as v1.2) and
|
||||
-security (v1.1). we want to display it as a security update.
|
||||
|
||||
:return: True if the update comes from the security pocket
|
||||
"""
|
||||
if not self.dist:
|
||||
return False
|
||||
inst_ver = pkg._pkg.current_ver
|
||||
for ver in pkg._pkg.version_list:
|
||||
# discard is < than installed ver
|
||||
if (inst_ver
|
||||
and apt.apt_pkg.version_compare(ver.ver_str,
|
||||
inst_ver.ver_str) <= 0):
|
||||
continue
|
||||
# check if we have a match
|
||||
for (verFileIter, index) in ver.file_list:
|
||||
if verFileIter.archive == "%s-security" % self.dist and \
|
||||
verFileIter.origin == "Ubuntu":
|
||||
indexfile = pkg._pcache._list.find_index(verFileIter)
|
||||
if indexfile: # and indexfile.IsTrusted:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _is_ignored_phased_update(self, pkg):
|
||||
""" This will test if the pkg is a phased update and if
|
||||
it needs to get installed or ignored.
|
||||
|
||||
:return: True if the updates should be ignored
|
||||
"""
|
||||
# allow the admin to override this
|
||||
if apt.apt_pkg.config.find_b(
|
||||
self.ALWAYS_INCLUDE_PHASED_UPDATES, False):
|
||||
return False
|
||||
|
||||
if self.PHASED_UPDATES_KEY in pkg.candidate.record:
|
||||
if apt.apt_pkg.config.find_b(
|
||||
self.NEVER_INCLUDE_PHASED_UPDATES, False):
|
||||
logging.info("holding back phased update per configuration")
|
||||
return True
|
||||
|
||||
# its important that we always get the same result on
|
||||
# multiple runs of the update-manager, so we need to
|
||||
# feed a seed that is a combination of the pkg/ver/machine
|
||||
self.random.seed("%s-%s-%s" % (
|
||||
pkg.candidate.source_name, pkg.candidate.version,
|
||||
self.machine_uniq_id))
|
||||
threshold = pkg.candidate.record[self.PHASED_UPDATES_KEY]
|
||||
percentage = self.random.randint(0, 100)
|
||||
if percentage > int(threshold):
|
||||
logging.info("holding back phased update %s (%s < %s)" % (
|
||||
pkg.name, threshold, percentage))
|
||||
return True
|
||||
return False
|
||||
|
||||
def _get_linux_packages(self):
|
||||
"Return all binary packages made by the linux-meta source package"
|
||||
# Hard code this rather than generate from source info in cache because
|
||||
# that might only be available if we have deb-src lines. I think we
|
||||
# could also generate it by iterating over all the binary package info
|
||||
# we have, but that is costly. These don't change often.
|
||||
return ['linux',
|
||||
'linux-cloud-tools-generic',
|
||||
'linux-cloud-tools-lowlatency',
|
||||
'linux-cloud-tools-virtual',
|
||||
'linux-crashdump',
|
||||
'linux-generic',
|
||||
'linux-generic-lpae',
|
||||
'linux-headers-generic',
|
||||
'linux-headers-generic-lpae',
|
||||
'linux-headers-lowlatency',
|
||||
'linux-headers-lowlatency-lpae',
|
||||
'linux-headers-server',
|
||||
'linux-headers-virtual',
|
||||
'linux-image',
|
||||
'linux-image-extra-virtual',
|
||||
'linux-image-generic',
|
||||
'linux-image-generic-lpae',
|
||||
'linux-image-lowlatency',
|
||||
'linux-image-virtual',
|
||||
'linux-lowlatency',
|
||||
'linux-signed-generic',
|
||||
'linux-signed-image-generic',
|
||||
'linux-signed-image-lowlatency',
|
||||
'linux-signed-lowlatency',
|
||||
'linux-source',
|
||||
'linux-tools-generic',
|
||||
'linux-tools-generic-lpae',
|
||||
'linux-tools-lowlatency',
|
||||
'linux-tools-virtual',
|
||||
'linux-virtual']
|
||||
|
||||
def _make_groups(self, cache, pkgs, eventloop_callback, to_remove=False):
|
||||
if not pkgs:
|
||||
return []
|
||||
ungrouped_pkgs = []
|
||||
app_groups = []
|
||||
pkg_groups = []
|
||||
|
||||
for pkg in pkgs:
|
||||
#查看这个包属于那个应用的 并获取这个应用的一些属性 例如 应用名称和图标的等等
|
||||
app = self._get_application_for_package(pkg)
|
||||
if app is not None:
|
||||
#包含 应用名称和图标
|
||||
app_group = UpdateApplicationGroup(pkg, app, to_remove)
|
||||
app_groups.append(app_group)
|
||||
else:
|
||||
ungrouped_pkgs.append(pkg)
|
||||
|
||||
# Stick together applications and their immediate dependencies
|
||||
#将应用和它们的依赖关系 结合在一起
|
||||
for pkg in list(ungrouped_pkgs):
|
||||
dep_groups = []
|
||||
for group in app_groups:
|
||||
if group.is_dependency(pkg, cache, eventloop_callback):
|
||||
dep_groups.append(group)
|
||||
if len(dep_groups) > 1:
|
||||
break
|
||||
#此来决定是这一个包所独有的依赖 如果一个依赖存在2个以上的就不
|
||||
if len(dep_groups) == 1:
|
||||
dep_groups[0].add(pkg, cache, eventloop_callback, to_remove)
|
||||
ungrouped_pkgs.remove(pkg)
|
||||
|
||||
system_group = None
|
||||
if ungrouped_pkgs:
|
||||
# Separate out system base packages. If we have already found an
|
||||
# application for all updates, don't bother.
|
||||
meta_group = UpdateGroup(None, None, None, to_remove)
|
||||
flavor_package = utils.get_ubuntu_flavor_package(cache=cache)
|
||||
meta_pkgs = [flavor_package, "ubuntu-standard", "ubuntu-minimal"]
|
||||
meta_pkgs.extend(self._get_linux_packages())
|
||||
for pkg in meta_pkgs:
|
||||
if pkg in cache:
|
||||
meta_group.add(cache[pkg])
|
||||
for pkg in ungrouped_pkgs:
|
||||
if meta_group.is_dependency(pkg, cache, eventloop_callback):
|
||||
if system_group is None:
|
||||
system_group = UpdateSystemGroup(cache, to_remove)
|
||||
system_group.add(pkg)
|
||||
else:
|
||||
pkg_groups.append(UpdatePackageGroup(pkg, to_remove))
|
||||
|
||||
app_groups.sort(key=lambda a: a.name.lower())
|
||||
pkg_groups.sort(key=lambda a: a.name.lower())
|
||||
if system_group:
|
||||
pkg_groups.append(system_group)
|
||||
|
||||
return app_groups + pkg_groups
|
||||
|
||||
def _read_important_list(self):
|
||||
header = ''
|
||||
desc = ''
|
||||
# 获取importantlist 本次更新推送
|
||||
try:
|
||||
with open(self.IMPORTANT_LIST_PATH, 'r') as f:
|
||||
data = f.read()
|
||||
self.important_list = data.split()
|
||||
logging.info("importantList: %s",self.important_list)
|
||||
return True
|
||||
except Exception as e:
|
||||
header = _("read important list failed")
|
||||
desc = ("%s",str(e))
|
||||
logging.info(header + desc)
|
||||
return False,header,desc
|
||||
|
||||
def _make_pkg_info_json(self,cache,pkgs_list):
|
||||
size = 0
|
||||
total_size = 0
|
||||
pkgs_info_json = {}
|
||||
for pkg_name in pkgs_list:
|
||||
try:
|
||||
pkg = cache[pkg_name]
|
||||
|
||||
#获取下载大小
|
||||
size = getattr(pkg.candidate, "size", 0)
|
||||
total_size = total_size + size
|
||||
pkgs_info_json.update({pkg_name:{"size":size}})
|
||||
except Exception as e:
|
||||
logging.info("this package(%s) not in list and error mes:%s",pkg_name,e)
|
||||
pass
|
||||
pkgs_info_json.update({"total_size":humanize_size(total_size)})
|
||||
return pkgs_info_json
|
||||
|
||||
#检查包是否在cache中 返回新得列表
|
||||
def _check_pkg_in_cache(self,cache,pkgs_list):
|
||||
new_pkgs_list = []
|
||||
for pkg_name in pkgs_list:
|
||||
#检查是否在cache 以及 是否安装检查
|
||||
if pkg_name in cache and not cache[pkg_name].is_installed:
|
||||
new_pkgs_list.append(pkg_name)
|
||||
else:
|
||||
pass
|
||||
# logging.info("this package(%s) not in list ",pkg_name)
|
||||
return new_pkgs_list
|
||||
|
||||
def _make_output_json(self,data,upgrade_pkgs_json,install_pkgs_json,hold_pkgs_list,remove_pkgs_list):
|
||||
groups_base_info = {}
|
||||
output_json = {}
|
||||
|
||||
#FIXME: 确定输出文件的文件名 以及放置位置
|
||||
output_config_name = self.OUTPUT_CONFIG_PATH + '/' + data['package'] + '_output.json'
|
||||
|
||||
#4、添加一些基础信息
|
||||
groups_base_info.update({"package":data['package']})
|
||||
groups_base_info.update({"version":data['version']})
|
||||
groups_base_info.update({"name":data['name']})
|
||||
groups_base_info.update({"description":data['description']})
|
||||
groups_base_info.update({"icon":data['icon']})
|
||||
|
||||
#5、添加升级的内容
|
||||
output_json.update(groups_base_info)
|
||||
output_json.update({"upgrade_list":upgrade_pkgs_json})
|
||||
output_json.update({"install_list":install_pkgs_json})
|
||||
output_json.update({"hold_list":hold_pkgs_list})
|
||||
output_json.update({"remove_list":remove_pkgs_list})
|
||||
|
||||
#6 产生JSON文件
|
||||
with open(output_config_name, 'w', encoding='utf-8') as f:
|
||||
json.dump(output_json, f, ensure_ascii=False, indent=4)
|
||||
logging.info("Generate Jsonfile(%s) to complete... ",output_config_name)
|
||||
|
||||
def _make_groups_upgrade(self,cache,pkgs_install = [], pkgs_upgrade = [], pkgs_remove = []):
|
||||
try:
|
||||
files = os.listdir(self.INPUT_CONFIG_PATH) #获得文件夹中所有文件的名称列表
|
||||
upgrade_groups_list = []
|
||||
|
||||
for file in files:
|
||||
#判是否是目录以及是否以JSON结尾
|
||||
if file.endswith('.json'):
|
||||
with open(self.INPUT_CONFIG_PATH+"/"+file,'r') as f:
|
||||
data = json.load(f)
|
||||
group_name = data['package']
|
||||
|
||||
#过滤没有推送的配置文件
|
||||
if not group_name in self.important_list:
|
||||
continue
|
||||
|
||||
upgrade_pkgs_list = data['upgrade_list']
|
||||
hold_pkgs_list = data['hold_list']
|
||||
|
||||
#这个安装升级列表中包含当前系统的cache中没有的包 需要过滤
|
||||
remove_pkgs_list = data['remove_list']
|
||||
|
||||
#检查包是否在cache中 以及是否已经安装
|
||||
new_install_pkgs_list = self._check_pkg_in_cache(cache,data['install_list'])
|
||||
|
||||
#进行交集 升级列表
|
||||
upgrade_intersection_pkgs = list(set(pkgs_upgrade) & set(upgrade_pkgs_list))
|
||||
|
||||
#判断当前是否可升级或者新装的包
|
||||
if len(new_install_pkgs_list) == 0 and len(upgrade_intersection_pkgs) == 0:
|
||||
continue
|
||||
|
||||
#在总升级列表中移除这些包
|
||||
for pkg in upgrade_intersection_pkgs:
|
||||
pkgs_upgrade.remove(pkg)
|
||||
|
||||
#3、生成升级的包列表JSON
|
||||
upgrade_pkgs_json = self._make_pkg_info_json(cache,upgrade_intersection_pkgs)
|
||||
|
||||
#2、生成安装的软件列表
|
||||
install_pkgs_json = self._make_pkg_info_json(cache,new_install_pkgs_list)
|
||||
|
||||
#输出JSON配置文件
|
||||
self._make_output_json(data,upgrade_pkgs_json,install_pkgs_json,hold_pkgs_list,remove_pkgs_list)
|
||||
|
||||
upgrade_groups_list.append(group_name)
|
||||
|
||||
#添加到字典维护的升级列表
|
||||
self.local_upgrade_list.update({group_name:{"pkgs_upgrade":upgrade_intersection_pkgs,"pkgs_install":new_install_pkgs_list}})
|
||||
logging.info("group(%s) upgrade:%d install:%d",group_name,len(upgrade_intersection_pkgs),len(new_install_pkgs_list))
|
||||
else:
|
||||
pass
|
||||
#添加所有可升级的组列表
|
||||
self.local_upgrade_list.update({"upgrade_groups_list":upgrade_groups_list})
|
||||
except Exception as e:
|
||||
logging.warning("Generate Jsonfile to failed... ")
|
||||
logging.error(e)
|
||||
|
||||
|
||||
def update(self, cache, eventloop_callback=None):
|
||||
pkgs_install = []
|
||||
pkgs_upgrade = []
|
||||
pkgs_remove = []
|
||||
|
||||
header = ''
|
||||
desc = ''
|
||||
|
||||
self._read_important_list()
|
||||
|
||||
#important_list 为空时此次不需要升级
|
||||
if not self.important_list:
|
||||
#不需要升级 全部的软件都是新的
|
||||
header = _("No software updates are available.")
|
||||
desc = _('important_list is Empty')
|
||||
return True,header,desc
|
||||
|
||||
#查找所有可升级的包
|
||||
for pkg in cache:
|
||||
try:
|
||||
if pkg.marked_install:
|
||||
pkgs_install.append(pkg)
|
||||
elif pkg.marked_upgrade:
|
||||
pkgs_upgrade.append(pkg)
|
||||
elif pkg.marked_delete:
|
||||
pkgs_remove.append(pkg)
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
|
||||
logging.info("System all upgradeable packages:new_install:%d upgrade:%d remove:%d",len(pkgs_install),len(pkgs_upgrade),len(pkgs_remove))
|
||||
|
||||
#源过滤
|
||||
# fu = filter.UpdateListFilterCache()
|
||||
# allowed_origin_upgrade_pkgs = fu.check_in_allowed_origin(pkgs_upgrade)
|
||||
|
||||
self._make_groups_upgrade(cache,pkgs_upgrade = ([pkg.name for pkg in pkgs_upgrade]))
|
||||
|
||||
#是否存在可升级的组
|
||||
if self.local_upgrade_list.get('upgrade_groups_list',[]):
|
||||
#增加需要移除的包列表
|
||||
self.local_upgrade_list.update({"pkgs_remove":[pkg.name for pkg in pkgs_remove]})
|
||||
return True,header,desc
|
||||
else:
|
||||
#不需要升级 全部的软件都是新的
|
||||
header = _("The software on this computer is up to date.")
|
||||
desc = ''
|
||||
return True,header,desc
|
||||
|
||||
#FIXME: 目前此功能不使用 但是以此按应用进行分组是更好的展示升级列表的方式
|
||||
# self.update_groups = self._make_groups(cache, self.pkgs_upgrade,
|
||||
# eventloop_callback)
|
|
@ -1,333 +0,0 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import apt
|
||||
import apt_pkg
|
||||
import fnmatch
|
||||
import logging
|
||||
import logging.handlers
|
||||
import re
|
||||
import os
|
||||
import string
|
||||
import subprocess
|
||||
import sys
|
||||
import json
|
||||
import dbus
|
||||
import threading
|
||||
|
||||
try:
|
||||
from typing import AbstractSet, cast, DefaultDict, Dict, Iterable, List
|
||||
AbstractSet # pyflakes
|
||||
DefaultDict # pyflakes
|
||||
Dict # pyflakes
|
||||
Iterable # pyflakes
|
||||
List # pyflakes
|
||||
from typing import Set, Tuple, Union
|
||||
Set # pyflakes
|
||||
Tuple # pyflakes
|
||||
Union # pyflakes
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
from email.message import Message
|
||||
from gettext import gettext as _
|
||||
|
||||
import apt
|
||||
import apt_pkg
|
||||
|
||||
|
||||
ImportantListPath="/var/lib/kylin-software-properties/template/important.list"
|
||||
DesktopSystemPath="/usr/share/kylin-update-desktop-config/data/"
|
||||
|
||||
|
||||
# no py3 lsb_release in debian :/
|
||||
DISTRO_CODENAME = subprocess.check_output(
|
||||
["lsb_release", "-c", "-s"], universal_newlines=True).strip() # type: str
|
||||
DISTRO_DESC = subprocess.check_output(
|
||||
["lsb_release", "-d", "-s"], universal_newlines=True).strip() # type: str
|
||||
DISTRO_ID = subprocess.check_output(
|
||||
["lsb_release", "-i", "-s"], universal_newlines=True).strip() # type: str
|
||||
|
||||
|
||||
class UpdateListFilterCache(apt.Cache):
|
||||
|
||||
def __init__(self):
|
||||
# whitelist
|
||||
self.upgradeList = []
|
||||
# 必须升级的包
|
||||
self.installList = []
|
||||
# self._cached_candidate_pkgnames = set() # type: Set[str]
|
||||
|
||||
self.allowed_origins = get_allowed_origins()
|
||||
self.allowed_origins = deleteDuplicatedElementFromList(self.allowed_origins)
|
||||
logging.info("Allowed origins are: %s",
|
||||
self.allowed_origins)
|
||||
|
||||
self.blacklist = apt_pkg.config.value_list(
|
||||
"Kylin-update-manager::Package-Blacklist")
|
||||
self.blacklist = deleteDuplicatedElementFromList(self.blacklist)
|
||||
# print("Initial blacklist: ", " ".join(self.blacklist))
|
||||
|
||||
self.whitelist = apt_pkg.config.value_list(
|
||||
"Kylin-update-manager::Package-Whitelist")
|
||||
self.whitelist = deleteDuplicatedElementFromList(self.whitelist)
|
||||
# print("Initial whitelist: ", " ".join(self.whitelist))
|
||||
|
||||
self.strict_whitelist = apt_pkg.config.find_b(
|
||||
"Kylin-update-manager::Package-Whitelist-Strict", False)
|
||||
# print("Initial whitelist (%s): %s"%(
|
||||
# "strict" if self.strict_whitelist else "not strict",
|
||||
# " ".join(self.whitelist)))
|
||||
|
||||
# update importantlist
|
||||
# initUpdateImportantList()
|
||||
|
||||
# 获取list
|
||||
# self.initLocalPackagesList()
|
||||
|
||||
#除掉不在cache中的包
|
||||
# self.checkInCache()
|
||||
|
||||
def checkInCache(self):
|
||||
logging.info("start Check in cache")
|
||||
tmplist = []
|
||||
cache = apt.Cache()
|
||||
for i in self.upgradeList:
|
||||
try:
|
||||
cache[i]
|
||||
tmplist.append(i)
|
||||
except Exception as e:
|
||||
# print("not found pkg: ", str(e))
|
||||
pass
|
||||
self.upgradeList = tmplist
|
||||
|
||||
def initLocalPackagesList(self):
|
||||
jsonfiles = []
|
||||
tmplist = []
|
||||
|
||||
# 获取importantlist 本次更新推送
|
||||
with open(ImportantListPath, 'r') as f:
|
||||
text = f.read()
|
||||
importantList = text.split()
|
||||
logging.info("importantList: %s",importantList)
|
||||
f.close()
|
||||
|
||||
if not importantList:
|
||||
logging.error("importantList is empty")
|
||||
exit(-1)
|
||||
|
||||
# 获取/usr/share/kylin-update-desktop-config/data/下所有json文件
|
||||
for root,dirs,files in os.walk(DesktopSystemPath):
|
||||
pass
|
||||
for i in files:
|
||||
if ".json" in i:
|
||||
jsonfiles.append(i.split('.')[0])
|
||||
# logging.info("all files: %s", jsonfiles)
|
||||
|
||||
# 找到importantlist中对应的json文件
|
||||
for i in importantList:
|
||||
if i not in jsonfiles:
|
||||
# 说明这个是单独的包,不在分组中
|
||||
# 加入更新列表
|
||||
if i not in self.upgradeList:
|
||||
self.upgradeList.append(i)
|
||||
else:
|
||||
# 在分组中
|
||||
# 获取每个对应json文件中的upgrade_list
|
||||
if i in jsonfiles:
|
||||
filepath = os.path.join(DesktopSystemPath, i)
|
||||
filepath = filepath+".json"
|
||||
with open(filepath, 'r') as f:
|
||||
pkgdict = f.read()
|
||||
jsonfile = json.loads(pkgdict)
|
||||
tmplist = jsonfile['install_list']
|
||||
# print("\ntmplist: ", tmplist)
|
||||
for j in tmplist:
|
||||
if j not in self.upgradeList:
|
||||
self.upgradeList.append(j)
|
||||
f.close()
|
||||
# logging.info("self.upgradeList: %s", self.upgradeList)
|
||||
|
||||
#
|
||||
# print("jsonfile silent_install_list: ", jsonfile['silent_install_list'])
|
||||
|
||||
# 更新前检测,必须安装的包
|
||||
# self.installList = jsonfile['install_list']
|
||||
# print("jsonfile install_list: ", self.installList)
|
||||
|
||||
# # 可以更新的列表, 白名单
|
||||
# self.upgradeList = jsonfile['upgrade_list']
|
||||
# print("jsonfile upgrade_list: ", self.upgradeList)
|
||||
|
||||
def check_in_allowed_origin(self, pkgs):
|
||||
new_upgrade_pkgs = []
|
||||
for pkg in pkgs:
|
||||
# print("checking %d pkgname: %s"%(i, pkg))
|
||||
for v in pkg.versions:
|
||||
if is_in_allowed_origin(v, self.allowed_origins) and not pkg in new_upgrade_pkgs:
|
||||
new_upgrade_pkgs.append(pkg)
|
||||
# else:
|
||||
# pkg.mark_keep()
|
||||
return new_upgrade_pkgs
|
||||
|
||||
def is_pkgname_in_blacklist(self, pkgs):
|
||||
blacklist_filter_pkgs = []
|
||||
for pkg in pkgs:
|
||||
if pkg.name in self.blacklist:
|
||||
pass
|
||||
# print("skipping blacklisted package %s" % pkg.name)
|
||||
else :
|
||||
blacklist_filter_pkgs.append(pkg)
|
||||
|
||||
return blacklist_filter_pkgs
|
||||
|
||||
def is_pkgname_in_whitelist(self, pkgs):
|
||||
whitelist_filter_upgrade_pkgs = []
|
||||
for pkg in pkgs:
|
||||
if pkg.name in self.upgradeList:
|
||||
whitelist_filter_upgrade_pkgs.append(pkg)
|
||||
else :
|
||||
pkg.mark_keep()
|
||||
pass
|
||||
# print("skipping whitelist package %s" % pkg.name)
|
||||
|
||||
return whitelist_filter_upgrade_pkgs
|
||||
|
||||
|
||||
def get_allowed_origins():
|
||||
# type: () -> List[str]
|
||||
""" return a list of allowed origins from apt.conf
|
||||
|
||||
This will take substitutions (like distro_id) into account.
|
||||
"""
|
||||
allowed_origins = get_allowed_origins_legacy()
|
||||
key = "Kylin-update-manager::Origins-Pattern"
|
||||
try:
|
||||
for s in apt_pkg.config.value_list(key):
|
||||
allowed_origins.append(substitute(s))
|
||||
except ValueError:
|
||||
print("Unable to parse %s." % key)
|
||||
raise
|
||||
return allowed_origins
|
||||
|
||||
def get_allowed_origins_legacy():
|
||||
# type: () -> List[str]
|
||||
""" legacy support for old Allowed-Origins var """
|
||||
allowed_origins = [] # type: List[str]
|
||||
key = "Kylin-update-manager::Allowed-Origins"
|
||||
try:
|
||||
for s in apt_pkg.config.value_list(key):
|
||||
# if there is a ":" use that as seperator, else use spaces
|
||||
if re.findall(r'(?<!\\):', s):
|
||||
(distro_id, distro_codename) = re.split(r'(?<!\\):', s)
|
||||
else:
|
||||
(distro_id, distro_codename) = s.split()
|
||||
# unescape "\:" back to ":"
|
||||
distro_id = re.sub(r'\\:', ':', distro_id)
|
||||
# escape "," (see LP: #824856) - can this be simpler?
|
||||
distro_id = re.sub(r'([^\\]),', r'\1\\,', distro_id)
|
||||
distro_codename = re.sub(r'([^\\]),', r'\1\\,', distro_codename)
|
||||
# convert to new format
|
||||
allowed_origins.append("o=%s,a=%s" % (substitute(distro_id),
|
||||
substitute(distro_codename)))
|
||||
except ValueError:
|
||||
logging.error(_("Unable to parse %s." % key))
|
||||
raise
|
||||
return allowed_origins
|
||||
|
||||
def substitute(line):
|
||||
# type: (str) -> str
|
||||
""" substitude known mappings and return a new string
|
||||
|
||||
Currently supported ${distro-release}
|
||||
"""
|
||||
mapping = {"distro_codename": get_distro_codename(),
|
||||
"distro_id": get_distro_id()}
|
||||
return string.Template(line).substitute(mapping)
|
||||
|
||||
|
||||
def get_distro_codename():
|
||||
# type: () -> str
|
||||
return DISTRO_CODENAME
|
||||
|
||||
|
||||
def get_distro_id():
|
||||
# type: () -> str
|
||||
return DISTRO_ID
|
||||
|
||||
def is_in_allowed_origin(ver, allowed_origins):
|
||||
# type: (apt.package.Version, List[str]) -> bool
|
||||
if not ver:
|
||||
return False
|
||||
for origin in ver.origins:
|
||||
if is_allowed_origin(origin, allowed_origins):
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_allowed_origin(origin, allowed_origins):
|
||||
# type: (Union[apt.package.Origin, apt_pkg.PackageFile], List[str]) -> bool
|
||||
|
||||
# local origin is allowed by default
|
||||
# if origin.component == 'now' and origin.archive == 'now' and \
|
||||
# not origin.label and not origin.site:
|
||||
# return True
|
||||
for allowed in allowed_origins:
|
||||
if match_whitelist_string(allowed, origin):
|
||||
return True
|
||||
return False
|
||||
|
||||
def match_whitelist_string(whitelist, origin):
|
||||
# type: (str, Union[apt.package.Origin, apt_pkg.PackageFile]) -> bool
|
||||
"""
|
||||
take a whitelist string in the form "origin=Debian,label=Debian-Security"
|
||||
and match against the given python-apt origin. A empty whitelist string
|
||||
never matches anything.
|
||||
"""
|
||||
whitelist = whitelist.strip()
|
||||
if whitelist == "":
|
||||
logging.warning("empty match string matches nothing")
|
||||
return False
|
||||
res = True
|
||||
# make "\," the html quote equivalent
|
||||
whitelist = whitelist.replace("\\,", "%2C")
|
||||
for token in whitelist.split(","):
|
||||
# strip and unquote the "," back
|
||||
(what, value) = [s.strip().replace("%2C", ",")
|
||||
for s in token.split("=")]
|
||||
# logging.debug("matching %s=%s against %s" % (
|
||||
# what, value, origin))
|
||||
# support substitution here as well
|
||||
value = substitute(value)
|
||||
# first char is apt-cache policy output, send is the name
|
||||
# in the Release file
|
||||
if what in ("o", "origin"):
|
||||
match = fnmatch.fnmatch(origin.origin, value)
|
||||
elif what in ("l", "label"):
|
||||
match = fnmatch.fnmatch(origin.label, value)
|
||||
elif what in ("a", "suite", "archive"):
|
||||
match = fnmatch.fnmatch(origin.archive, value)
|
||||
elif what in ("c", "component"):
|
||||
match = fnmatch.fnmatch(origin.component, value)
|
||||
elif what in ("site",):
|
||||
match = fnmatch.fnmatch(origin.site, value)
|
||||
elif what in ("n", "codename",):
|
||||
match = fnmatch.fnmatch(origin.codename, value)
|
||||
else:
|
||||
raise UnknownMatcherError(
|
||||
"Unknown whitelist entry for matcher %s (token %s)" % (
|
||||
what, token))
|
||||
# update res
|
||||
res = res and match
|
||||
# logging.debug("matching %s=%s against %s" % (
|
||||
# what, value, origin))
|
||||
return res
|
||||
|
||||
def deleteDuplicatedElementFromList(list):
|
||||
resultList = []
|
||||
for item in list:
|
||||
if not item in resultList:
|
||||
resultList.append(item)
|
||||
return resultList
|
||||
|
||||
|
||||
class UnknownMatcherError(ValueError):
|
||||
pass
|
|
@ -1,212 +0,0 @@
|
|||
# utils.py
|
||||
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (c) 2011 Canonical
|
||||
#
|
||||
# Author: Alex Chiang <achiang@canonical.com>
|
||||
# Michael Vogt <michael.vogt@ubuntu.com>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License as
|
||||
# published by the Free Software Foundation; either version 2 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
|
||||
# USA
|
||||
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import dbus
|
||||
import sys
|
||||
|
||||
|
||||
class ModemManagerHelper(object):
|
||||
|
||||
# data taken from
|
||||
# http://projects.gnome.org/NetworkManager/developers/mm-spec-04.html
|
||||
MM_DBUS_IFACE = "org.freedesktop.ModemManager"
|
||||
MM_DBUS_IFACE_MODEM = MM_DBUS_IFACE + ".Modem"
|
||||
|
||||
# MM_MODEM_TYPE
|
||||
MM_MODEM_TYPE_GSM = 1
|
||||
MM_MODEM_TYPE_CDMA = 2
|
||||
|
||||
# GSM
|
||||
# Not registered, not searching for new operator to register.
|
||||
MM_MODEM_GSM_NETWORK_REG_STATUS_IDLE = 0
|
||||
# Registered on home network.
|
||||
MM_MODEM_GSM_NETWORK_REG_STATUS_HOME = 1
|
||||
# Not registered, searching for new operator to register with.
|
||||
MM_MODEM_GSM_NETWORK_REG_STATUS_SEARCHING = 2
|
||||
# Registration denied.
|
||||
MM_MODEM_GSM_NETWORK_REG_STATUS_DENIED = 3
|
||||
# Unknown registration status.
|
||||
MM_MODEM_GSM_NETWORK_REG_STATUS_UNKNOWN = 4
|
||||
# Registered on a roaming network.
|
||||
MM_MODEM_GSM_NETWORK_REG_STATUS_ROAMING = 5
|
||||
|
||||
# CDMA
|
||||
# Registration status is unknown or the device is not registered.
|
||||
MM_MODEM_CDMA_REGISTRATION_STATE_UNKNOWN = 0
|
||||
# Registered, but roaming status is unknown or cannot be provided
|
||||
# by the device. The device may or may not be roaming.
|
||||
MM_MODEM_CDMA_REGISTRATION_STATE_REGISTERED = 1
|
||||
# Currently registered on the home network.
|
||||
MM_MODEM_CDMA_REGISTRATION_STATE_HOME = 2
|
||||
# Currently registered on a roaming network.
|
||||
MM_MODEM_CDMA_REGISTRATION_STATE_ROAMING = 3
|
||||
|
||||
def __init__(self):
|
||||
self.bus = dbus.SystemBus()
|
||||
self.proxy = self.bus.get_object("org.freedesktop.ModemManager",
|
||||
"/org/freedesktop/ModemManager")
|
||||
modem_manager = dbus.Interface(self.proxy, self.MM_DBUS_IFACE)
|
||||
self.modems = modem_manager.EnumerateDevices()
|
||||
|
||||
@staticmethod
|
||||
def get_dbus_property(proxy, interface, property):
|
||||
props = dbus.Interface(proxy, "org.freedesktop.DBus.Properties")
|
||||
property = props.Get(interface, property)
|
||||
return property
|
||||
|
||||
def is_gsm_roaming(self):
|
||||
for m in self.modems:
|
||||
dev = self.bus.get_object(self.MM_DBUS_IFACE, m)
|
||||
type = self.get_dbus_property(dev, self.MM_DBUS_IFACE_MODEM,
|
||||
"Type")
|
||||
if type != self.MM_MODEM_TYPE_GSM:
|
||||
continue
|
||||
net = dbus.Interface(dev,
|
||||
self.MM_DBUS_IFACE_MODEM + ".Gsm.Network")
|
||||
reg = net.GetRegistrationInfo()
|
||||
# Be conservative about roaming. If registration unknown,
|
||||
# assume yes.
|
||||
# MM_MODEM_GSM_NETWORK_REG_STATUS
|
||||
if reg[0] in (self.MM_MODEM_GSM_NETWORK_REG_STATUS_UNKNOWN,
|
||||
self.MM_MODEM_GSM_NETWORK_REG_STATUS_ROAMING):
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_cdma_roaming(self):
|
||||
for m in self.modems:
|
||||
dev = self.bus.get_object(self.MM_DBUS_IFACE, m)
|
||||
type = self.get_dbus_property(dev, self.MM_DBUS_IFACE_MODEM,
|
||||
"Type")
|
||||
if type != self.MM_MODEM_TYPE_CDMA:
|
||||
continue
|
||||
cdma = dbus.Interface(dev, self.MM_DBUS_IFACE_MODEM + ".Cdma")
|
||||
(cmda_1x, evdo) = cdma.GetRegistrationState()
|
||||
# Be conservative about roaming. If registration unknown,
|
||||
# assume yes.
|
||||
# MM_MODEM_CDMA_REGISTRATION_STATE
|
||||
roaming_states = (self.MM_MODEM_CDMA_REGISTRATION_STATE_REGISTERED,
|
||||
self.MM_MODEM_CDMA_REGISTRATION_STATE_ROAMING)
|
||||
# evdo trumps cmda_1x (thanks to Mathieu Trudel-Lapierre)
|
||||
if evdo in roaming_states:
|
||||
return True
|
||||
elif cmda_1x in roaming_states:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class NetworkManagerHelper(object):
|
||||
NM_DBUS_IFACE = "org.freedesktop.NetworkManager"
|
||||
|
||||
# connection states
|
||||
# Old enum values are for NM 0.7
|
||||
|
||||
# The NetworkManager daemon is in an unknown state.
|
||||
NM_STATE_UNKNOWN = 0
|
||||
# The NetworkManager daemon is connecting a device.
|
||||
NM_STATE_CONNECTING_OLD = 2
|
||||
NM_STATE_CONNECTING = 40
|
||||
NM_STATE_CONNECTING_LIST = [NM_STATE_CONNECTING_OLD,
|
||||
NM_STATE_CONNECTING]
|
||||
# The NetworkManager daemon is connected.
|
||||
NM_STATE_CONNECTED_OLD = 3
|
||||
NM_STATE_CONNECTED_LOCAL = 50
|
||||
NM_STATE_CONNECTED_SITE = 60
|
||||
NM_STATE_CONNECTED_GLOBAL = 70
|
||||
NM_STATE_CONNECTED_LIST = [NM_STATE_CONNECTED_OLD,
|
||||
NM_STATE_CONNECTED_LOCAL,
|
||||
NM_STATE_CONNECTED_SITE,
|
||||
NM_STATE_CONNECTED_GLOBAL]
|
||||
|
||||
# The device type is unknown.
|
||||
NM_DEVICE_TYPE_UNKNOWN = 0
|
||||
# The device is wired Ethernet device.
|
||||
NM_DEVICE_TYPE_ETHERNET = 1
|
||||
# The device is an 802.11 WiFi device.
|
||||
NM_DEVICE_TYPE_WIFI = 2
|
||||
# The device is a GSM-based cellular WAN device.
|
||||
NM_DEVICE_TYPE_GSM = 3
|
||||
# The device is a CDMA/IS-95-based cellular WAN device.
|
||||
NM_DEVICE_TYPE_CDMA = 4
|
||||
|
||||
def __init__(self):
|
||||
self.bus = dbus.SystemBus()
|
||||
self.proxy = self.bus.get_object("org.freedesktop.NetworkManager",
|
||||
"/org/freedesktop/NetworkManager")
|
||||
|
||||
@staticmethod
|
||||
def get_dbus_property(proxy, interface, property):
|
||||
props = dbus.Interface(proxy, "org.freedesktop.DBus.Properties")
|
||||
property = props.Get(interface, property)
|
||||
return property
|
||||
|
||||
def is_active_connection_gsm_or_cdma(self):
|
||||
res = False
|
||||
actives = self.get_dbus_property(
|
||||
self.proxy, self.NM_DBUS_IFACE, 'ActiveConnections')
|
||||
for a in actives:
|
||||
active = self.bus.get_object(self.NM_DBUS_IFACE, a)
|
||||
default_route = self.get_dbus_property(
|
||||
active, self.NM_DBUS_IFACE + ".Connection.Active", 'Default')
|
||||
if not default_route:
|
||||
continue
|
||||
devs = self.get_dbus_property(
|
||||
active, self.NM_DBUS_IFACE + ".Connection.Active", 'Devices')
|
||||
for d in devs:
|
||||
dev = self.bus.get_object(self.NM_DBUS_IFACE, d)
|
||||
type = self.get_dbus_property(
|
||||
dev, self.NM_DBUS_IFACE + ".Device", 'DeviceType')
|
||||
if type == self.NM_DEVICE_TYPE_GSM:
|
||||
return True
|
||||
elif type == self.NM_DEVICE_TYPE_CDMA:
|
||||
return True
|
||||
else:
|
||||
continue
|
||||
return res
|
||||
|
||||
def is_active_connection_gsm_or_cdma_roaming(self):
|
||||
res = False
|
||||
if self.is_active_connection_gsm_or_cdma():
|
||||
mmhelper = ModemManagerHelper()
|
||||
res |= mmhelper.is_gsm_roaming()
|
||||
res |= mmhelper.is_cdma_roaming()
|
||||
return res
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
# test code
|
||||
if sys.argv[1:] and sys.argv[1] == "--test":
|
||||
mmhelper = ModemManagerHelper()
|
||||
print("is_gsm_roaming", mmhelper.is_gsm_roaming())
|
||||
print("is_cdma_romaing", mmhelper.is_cdma_roaming())
|
||||
|
||||
# roaming?
|
||||
nmhelper = NetworkManagerHelper()
|
||||
is_roaming = nmhelper.is_active_connection_gsm_or_cdma_roaming()
|
||||
print("roam: ", is_roaming)
|
||||
if is_roaming:
|
||||
sys.exit(1)
|
||||
sys.exit(0)
|
|
@ -1,557 +0,0 @@
|
|||
# utils.py
|
||||
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (c) 2004-2013 Canonical
|
||||
#
|
||||
# Authors: Michael Vogt <mvo@debian.org>
|
||||
# Michael Terry <michael.terry@canonical.com>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License as
|
||||
# published by the Free Software Foundation; either version 2 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
|
||||
# USA
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
from gettext import gettext as _
|
||||
from gettext import ngettext
|
||||
from stat import (S_IMODE, ST_MODE, S_IXUSR)
|
||||
from math import ceil
|
||||
|
||||
import apt
|
||||
import apt_pkg
|
||||
apt_pkg.init_config()
|
||||
|
||||
import locale
|
||||
import logging
|
||||
import re
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from urllib.request import (
|
||||
ProxyHandler,
|
||||
Request,
|
||||
build_opener,
|
||||
install_opener,
|
||||
urlopen,
|
||||
)
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
from copy import copy
|
||||
|
||||
|
||||
class ExecutionTime(object):
|
||||
"""
|
||||
Helper that can be used in with statements to have a simple
|
||||
measure of the timing of a particular block of code, e.g.
|
||||
with ExecutionTime("db flush"):
|
||||
db.flush()
|
||||
"""
|
||||
def __init__(self, info=""):
|
||||
self.info = info
|
||||
|
||||
def __enter__(self):
|
||||
self.now = time.time()
|
||||
|
||||
def __exit__(self, type, value, stack):
|
||||
print("%s: %s" % (self.info, time.time() - self.now))
|
||||
|
||||
|
||||
def get_string_with_no_auth_from_source_entry(entry):
|
||||
tmp = copy(entry)
|
||||
url_parts = urlsplit(tmp.uri)
|
||||
if url_parts.username:
|
||||
tmp.uri = tmp.uri.replace(url_parts.username, "hidden-u")
|
||||
if url_parts.password:
|
||||
tmp.uri = tmp.uri.replace(url_parts.password, "hidden-p")
|
||||
return str(tmp)
|
||||
|
||||
|
||||
def is_unity_running():
|
||||
""" return True if Unity is currently running """
|
||||
unity_running = False
|
||||
try:
|
||||
import dbus
|
||||
bus = dbus.SessionBus()
|
||||
unity_running = bus.name_has_owner("com.canonical.Unity")
|
||||
except Exception:
|
||||
logging.exception("could not check for Unity dbus service")
|
||||
return unity_running
|
||||
|
||||
|
||||
def is_child_of_process_name(processname, pid=None):
|
||||
if not pid:
|
||||
pid = os.getpid()
|
||||
while pid > 0:
|
||||
stat_file = "/proc/%s/stat" % pid
|
||||
with open(stat_file) as stat_f:
|
||||
stat = stat_f.read()
|
||||
# extract command (inside ())
|
||||
command = stat.partition("(")[2].rpartition(")")[0]
|
||||
if command == processname:
|
||||
return True
|
||||
# get parent (second to the right of command) and check that next
|
||||
pid = int(stat.rpartition(")")[2].split()[1])
|
||||
return False
|
||||
|
||||
|
||||
def inside_chroot():
|
||||
""" returns True if we are inside a chroot
|
||||
"""
|
||||
# if there is no proc or no pid 1 we are very likely inside a chroot
|
||||
if not os.path.exists("/proc") or not os.path.exists("/proc/1"):
|
||||
return True
|
||||
# if the inode is differnt for pid 1 "/" and our "/"
|
||||
return os.stat("/") != os.stat("/proc/1/root")
|
||||
|
||||
|
||||
def wrap(t, width=70, subsequent_indent=""):
|
||||
""" helpers inspired after textwrap - unfortunately
|
||||
we can not use textwrap directly because it break
|
||||
packagenames with "-" in them into new lines
|
||||
"""
|
||||
out = ""
|
||||
for s in t.split():
|
||||
if (len(out) - out.rfind("\n")) + len(s) > width:
|
||||
out += "\n" + subsequent_indent
|
||||
out += s + " "
|
||||
return out
|
||||
|
||||
|
||||
def twrap(s, **kwargs):
|
||||
msg = ""
|
||||
paras = s.split("\n")
|
||||
for par in paras:
|
||||
s = wrap(par, **kwargs)
|
||||
msg += s + "\n"
|
||||
return msg
|
||||
|
||||
|
||||
def lsmod():
|
||||
" return list of loaded modules (or [] if lsmod is not found) "
|
||||
modules = []
|
||||
# FIXME raise?
|
||||
if not os.path.exists("/sbin/lsmod"):
|
||||
return []
|
||||
p = subprocess.Popen(["/sbin/lsmod"], stdout=subprocess.PIPE,
|
||||
universal_newlines=True)
|
||||
lines = p.communicate()[0].split("\n")
|
||||
# remove heading line: "Modules Size Used by"
|
||||
del lines[0]
|
||||
# add lines to list, skip empty lines
|
||||
for line in lines:
|
||||
if line:
|
||||
modules.append(line.split()[0])
|
||||
return modules
|
||||
|
||||
|
||||
def check_and_fix_xbit(path):
|
||||
" check if a given binary has the executable bit and if not, add it"
|
||||
if not os.path.exists(path):
|
||||
return
|
||||
mode = S_IMODE(os.stat(path)[ST_MODE])
|
||||
if not ((mode & S_IXUSR) == S_IXUSR):
|
||||
os.chmod(path, mode | S_IXUSR)
|
||||
|
||||
|
||||
def country_mirror():
|
||||
" helper to get the country mirror from the current locale "
|
||||
# special cases go here
|
||||
lang_mirror = {'c': ''}
|
||||
# no lang, no mirror
|
||||
if 'LANG' not in os.environ:
|
||||
return ''
|
||||
lang = os.environ['LANG'].lower()
|
||||
# check if it is a special case
|
||||
if lang[:5] in lang_mirror:
|
||||
return lang_mirror[lang[:5]]
|
||||
# now check for the most comon form (en_US.UTF-8)
|
||||
if "_" in lang:
|
||||
country = lang.split(".")[0].split("_")[1]
|
||||
if "@" in country:
|
||||
country = country.split("@")[0]
|
||||
return country + "."
|
||||
else:
|
||||
return lang[:2] + "."
|
||||
return ''
|
||||
|
||||
|
||||
def get_dist():
|
||||
" return the codename of the current runing distro "
|
||||
# support debug overwrite
|
||||
dist = os.environ.get("META_RELEASE_FAKE_CODENAME")
|
||||
if dist:
|
||||
logging.warning("using fake release name '%s' (because of "
|
||||
"META_RELEASE_FAKE_CODENAME environment) " % dist)
|
||||
return dist
|
||||
# then check the real one
|
||||
from subprocess import Popen, PIPE
|
||||
p = Popen(["lsb_release", "-c", "-s"], stdout=PIPE,
|
||||
universal_newlines=True)
|
||||
res = p.wait()
|
||||
if res != 0:
|
||||
sys.stderr.write("lsb_release returned exitcode: %i\n" % res)
|
||||
return "unknown distribution"
|
||||
dist = p.stdout.readline().strip()
|
||||
p.stdout.close()
|
||||
return dist
|
||||
|
||||
|
||||
def get_dist_version():
|
||||
" return the version of the current running distro "
|
||||
# support debug overwrite
|
||||
desc = os.environ.get("META_RELEASE_FAKE_VERSION")
|
||||
if desc:
|
||||
logging.warning("using fake release version '%s' (because of "
|
||||
"META_RELEASE_FAKE_VERSION environment) " % desc)
|
||||
return desc
|
||||
# then check the real one
|
||||
from subprocess import Popen, PIPE
|
||||
p = Popen(["lsb_release", "-r", "-s"], stdout=PIPE,
|
||||
universal_newlines=True)
|
||||
res = p.wait()
|
||||
if res != 0:
|
||||
sys.stderr.write("lsb_release returned exitcode: %i\n" % res)
|
||||
return "unknown distribution"
|
||||
desc = p.stdout.readline().strip()
|
||||
p.stdout.close()
|
||||
return desc
|
||||
|
||||
|
||||
class HeadRequest(Request):
|
||||
def get_method(self):
|
||||
return "HEAD"
|
||||
|
||||
|
||||
def url_downloadable(uri, debug_func=None):
|
||||
"""
|
||||
helper that checks if the given uri exists and is downloadable
|
||||
(supports optional debug_func function handler to support
|
||||
e.g. logging)
|
||||
|
||||
Supports http (via HEAD) and ftp (via size request)
|
||||
"""
|
||||
if not debug_func:
|
||||
lambda x: True
|
||||
debug_func("url_downloadable: %s" % uri)
|
||||
(scheme, netloc, path, querry, fragment) = urlsplit(uri)
|
||||
debug_func("s='%s' n='%s' p='%s' q='%s' f='%s'" % (scheme, netloc, path,
|
||||
querry, fragment))
|
||||
if scheme in ("http", "https"):
|
||||
try:
|
||||
http_file = urlopen(HeadRequest(uri))
|
||||
http_file.close()
|
||||
if http_file.code == 200:
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
debug_func("error from httplib: '%s'" % e)
|
||||
return False
|
||||
elif scheme == "ftp":
|
||||
import ftplib
|
||||
try:
|
||||
f = ftplib.FTP(netloc)
|
||||
f.login()
|
||||
f.cwd(os.path.dirname(path))
|
||||
size = f.size(os.path.basename(path))
|
||||
f.quit()
|
||||
if debug_func:
|
||||
debug_func("ftplib.size() returned: %s" % size)
|
||||
if size != 0:
|
||||
return True
|
||||
except Exception as e:
|
||||
if debug_func:
|
||||
debug_func("error from ftplib: '%s'" % e)
|
||||
return False
|
||||
return False
|
||||
|
||||
|
||||
def init_proxy(gsettings=None):
|
||||
""" init proxy settings
|
||||
|
||||
* use apt.conf http proxy if present,
|
||||
* otherwise look into synaptics config file,
|
||||
* otherwise the default behavior will use http_proxy environment
|
||||
if present
|
||||
"""
|
||||
SYNAPTIC_CONF_FILE = "/root/.synaptic/synaptic.conf"
|
||||
proxies = {}
|
||||
# generic apt config wins
|
||||
if apt_pkg.config.find("Acquire::http::Proxy") != '':
|
||||
proxies["http"] = apt_pkg.config.find("Acquire::http::Proxy")
|
||||
# then synaptic
|
||||
elif os.path.exists(SYNAPTIC_CONF_FILE):
|
||||
cnf = apt_pkg.Configuration()
|
||||
apt_pkg.read_config_file(cnf, SYNAPTIC_CONF_FILE)
|
||||
use_proxy = cnf.find_b("Synaptic::useProxy", False)
|
||||
if use_proxy:
|
||||
proxy_host = cnf.find("Synaptic::httpProxy")
|
||||
proxy_port = str(cnf.find_i("Synaptic::httpProxyPort"))
|
||||
if proxy_host and proxy_port:
|
||||
proxies["http"] = "http://%s:%s/" % (proxy_host, proxy_port)
|
||||
if apt_pkg.config.find("Acquire::https::Proxy") != '':
|
||||
proxies["https"] = apt_pkg.config.find("Acquire::https::Proxy")
|
||||
elif "http" in proxies:
|
||||
proxies["https"] = proxies["http"]
|
||||
# if we have a proxy, set it
|
||||
if proxies:
|
||||
# basic verification
|
||||
for proxy in proxies.values():
|
||||
if not re.match("https?://\\w+", proxy):
|
||||
print("proxy '%s' looks invalid" % proxy, file=sys.stderr)
|
||||
return
|
||||
proxy_support = ProxyHandler(proxies)
|
||||
opener = build_opener(proxy_support)
|
||||
install_opener(opener)
|
||||
if "http" in proxies:
|
||||
os.putenv("http_proxy", proxies["http"])
|
||||
if "https" in proxies:
|
||||
os.putenv("https_proxy", proxies["https"])
|
||||
return proxies
|
||||
|
||||
|
||||
def on_battery():
|
||||
"""
|
||||
Check via dbus if the system is running on battery.
|
||||
This function is using UPower per default, if UPower is not
|
||||
available it falls-back to DeviceKit.Power.
|
||||
"""
|
||||
try:
|
||||
import dbus
|
||||
bus = dbus.Bus(dbus.Bus.TYPE_SYSTEM)
|
||||
try:
|
||||
devobj = bus.get_object('org.freedesktop.UPower',
|
||||
'/org/freedesktop/UPower')
|
||||
dev = dbus.Interface(devobj, 'org.freedesktop.DBus.Properties')
|
||||
return dev.Get('org.freedesktop.UPower', 'OnBattery')
|
||||
except dbus.exceptions.DBusException as e:
|
||||
error_unknown = 'org.freedesktop.DBus.Error.ServiceUnknown'
|
||||
if e._dbus_error_name != error_unknown:
|
||||
raise
|
||||
devobj = bus.get_object('org.freedesktop.DeviceKit.Power',
|
||||
'/org/freedesktop/DeviceKit/Power')
|
||||
dev = dbus.Interface(devobj, "org.freedesktop.DBus.Properties")
|
||||
return dev.Get("org.freedesktop.DeviceKit.Power", "on_battery")
|
||||
except Exception:
|
||||
#import sys
|
||||
#print("on_battery returned error: ", e, file=sys.stderr)
|
||||
return False
|
||||
|
||||
|
||||
def inhibit_sleep():
|
||||
"""
|
||||
Send a dbus signal to logind to not suspend the system, it will be
|
||||
released when the return value drops out of scope
|
||||
"""
|
||||
try:
|
||||
from gi.repository import Gio, GLib
|
||||
connection = Gio.bus_get_sync(Gio.BusType.SYSTEM)
|
||||
|
||||
var, fdlist = connection.call_with_unix_fd_list_sync(
|
||||
'org.freedesktop.login1', '/org/freedesktop/login1',
|
||||
'org.freedesktop.login1.Manager', 'Inhibit',
|
||||
GLib.Variant('(ssss)',
|
||||
('shutdown:sleep',
|
||||
'UpdateManager', 'Updating System',
|
||||
'block')),
|
||||
None, 0, -1, None, None)
|
||||
inhibitor = Gio.UnixInputStream(fd=fdlist.steal_fds()[var[0]])
|
||||
|
||||
return inhibitor
|
||||
except Exception:
|
||||
#print("could not send the dbus Inhibit signal: %s" % e)
|
||||
return False
|
||||
|
||||
|
||||
def str_to_bool(str):
|
||||
if str == "0" or str.upper() == "FALSE":
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def get_lang():
|
||||
import logging
|
||||
try:
|
||||
(locale_s, encoding) = locale.getdefaultlocale()
|
||||
return locale_s
|
||||
except Exception:
|
||||
logging.exception("gedefaultlocale() failed")
|
||||
return None
|
||||
|
||||
|
||||
def get_ubuntu_flavor(cache=None):
|
||||
""" try to guess the flavor based on the running desktop """
|
||||
# this will (of course) not work in a server environment,
|
||||
# but the main use case for this is to show the right
|
||||
# release notes.
|
||||
pkg = get_ubuntu_flavor_package(cache=cache)
|
||||
return pkg.split('-', 1)[0]
|
||||
|
||||
|
||||
def _load_meta_pkg_list():
|
||||
# This could potentially introduce a circular dependency, but the config
|
||||
# parser logic is simple, and doesn't rely on any UpdateManager code.
|
||||
from DistUpgrade.DistUpgradeConfigParser import DistUpgradeConfig
|
||||
parser = DistUpgradeConfig('/usr/share/ubuntu-release-upgrader')
|
||||
return parser.getlist('Distro', 'MetaPkgs')
|
||||
|
||||
|
||||
def get_ubuntu_flavor_package(cache=None):
|
||||
""" try to guess the flavor metapackage based on the running desktop """
|
||||
# From spec, first if ubuntu-desktop is installed, use that.
|
||||
# Second, grab first installed one from DistUpgrade.cfg.
|
||||
# Lastly, fallback to ubuntu-desktop again.
|
||||
meta_pkgs = ['ubuntu-desktop']
|
||||
|
||||
try:
|
||||
meta_pkgs.extend(sorted(_load_meta_pkg_list()))
|
||||
except Exception as e:
|
||||
print('Could not load list of meta packages:', e)
|
||||
|
||||
if cache is None:
|
||||
cache = apt.Cache()
|
||||
for meta_pkg in meta_pkgs:
|
||||
cache_pkg = cache[meta_pkg] if meta_pkg in cache else None
|
||||
if cache_pkg and cache_pkg.is_installed:
|
||||
return meta_pkg
|
||||
return 'ubuntu-desktop'
|
||||
|
||||
|
||||
def get_ubuntu_flavor_name(cache=None):
|
||||
""" try to guess the flavor name based on the running desktop """
|
||||
pkg = get_ubuntu_flavor_package(cache=cache)
|
||||
lookup = {'ubuntustudio-desktop': 'Ubuntu Studio'}
|
||||
if pkg in lookup:
|
||||
return lookup[pkg]
|
||||
elif pkg.endswith('-desktop'):
|
||||
return capitalize_first_word(pkg.rsplit('-desktop', 1)[0])
|
||||
elif pkg.endswith('-netbook'):
|
||||
return capitalize_first_word(pkg.rsplit('-netbook', 1)[0])
|
||||
else:
|
||||
return 'Ubuntu'
|
||||
|
||||
|
||||
# Unused by update-manager, but still used by ubuntu-release-upgrader
|
||||
def error(parent, summary, message):
|
||||
import gi
|
||||
gi.require_version("Gtk", "3.0")
|
||||
from gi.repository import Gtk, Gdk
|
||||
d = Gtk.MessageDialog(parent=parent,
|
||||
flags=Gtk.DialogFlags.MODAL,
|
||||
type=Gtk.MessageType.ERROR,
|
||||
buttons=Gtk.ButtonsType.CLOSE)
|
||||
d.set_markup("<big><b>%s</b></big>\n\n%s" % (summary, message))
|
||||
d.realize()
|
||||
d.get_window().set_functions(Gdk.WMFunction.MOVE)
|
||||
d.set_title("")
|
||||
d.run()
|
||||
d.destroy()
|
||||
return False
|
||||
|
||||
|
||||
def humanize_size(bytes):
|
||||
"""
|
||||
Convert a given size in bytes to a nicer better readable unit
|
||||
"""
|
||||
|
||||
if bytes < 1000 * 1000:
|
||||
# to have 0 for 0 bytes, 1 for 0-1000 bytes and for 1 and above
|
||||
# round up
|
||||
size_in_kb = int(ceil(bytes / float(1000)))
|
||||
# TRANSLATORS: download size of small updates, e.g. "250 kB"
|
||||
return ngettext("%(size).0f kB", "%(size).0f kB", size_in_kb) % {
|
||||
"size": size_in_kb}
|
||||
else:
|
||||
# TRANSLATORS: download size of updates, e.g. "2.3 MB"
|
||||
return locale.format_string(_("%.1f MB"), bytes / 1000.0 / 1000.0)
|
||||
|
||||
|
||||
def get_arch():
|
||||
return apt_pkg.config.find("APT::Architecture")
|
||||
|
||||
|
||||
def is_port_already_listening(port):
|
||||
""" check if the current system is listening on the given tcp port """
|
||||
# index in the line
|
||||
INDEX_LOCAL_ADDR = 1
|
||||
#INDEX_REMOTE_ADDR = 2
|
||||
INDEX_STATE = 3
|
||||
# state (st) that we care about
|
||||
STATE_LISTENING = '0A'
|
||||
# read the data
|
||||
with open("/proc/net/tcp") as net_tcp:
|
||||
for line in net_tcp.readlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
# split, values are:
|
||||
# sl local_address rem_address st tx_queue rx_queue tr
|
||||
# tm->when retrnsmt uid timeout inode
|
||||
values = line.split()
|
||||
state = values[INDEX_STATE]
|
||||
if state != STATE_LISTENING:
|
||||
continue
|
||||
local_port_str = values[INDEX_LOCAL_ADDR].split(":")[1]
|
||||
local_port = int(local_port_str, 16)
|
||||
if local_port == port:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def iptables_active():
|
||||
""" Return True if iptables is active """
|
||||
# FIXME: is there a better way?
|
||||
iptables_empty = """Chain INPUT (policy ACCEPT)
|
||||
target prot opt source destination
|
||||
|
||||
Chain FORWARD (policy ACCEPT)
|
||||
target prot opt source destination
|
||||
|
||||
Chain OUTPUT (policy ACCEPT)
|
||||
target prot opt source destination
|
||||
"""
|
||||
if os.getuid() != 0:
|
||||
raise OSError("Need root to check the iptables state")
|
||||
if not os.path.exists("/sbin/iptables"):
|
||||
return False
|
||||
out = subprocess.Popen(["iptables", "-nL"],
|
||||
stdout=subprocess.PIPE,
|
||||
universal_newlines=True).communicate()[0]
|
||||
if out == iptables_empty:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def capitalize_first_word(string):
|
||||
""" this uppercases the first word's first letter
|
||||
"""
|
||||
if len(string) > 1 and string[0].isalpha() and not string[0].isupper():
|
||||
return string[0].capitalize() + string[1:]
|
||||
return string
|
||||
|
||||
|
||||
def get_package_label(pkg):
|
||||
""" this takes a package synopsis and uppercases the first word's
|
||||
first letter
|
||||
"""
|
||||
name = getattr(pkg.candidate, "summary", "")
|
||||
return capitalize_first_word(name)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
#print(mirror_from_sources_list())
|
||||
#print(on_battery())
|
||||
#print(inside_chroot())
|
||||
#print(iptables_active())
|
||||
error(None, "bar", "baz")
|
|
@ -1,278 +0,0 @@
|
|||
# UpdateManager.py
|
||||
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
|
||||
|
||||
from __future__ import absolute_import, print_function
|
||||
|
||||
|
||||
import warnings
|
||||
warnings.filterwarnings("ignore", "Accessed deprecated property",
|
||||
DeprecationWarning)
|
||||
import logging
|
||||
import sys
|
||||
from gettext import gettext as _
|
||||
|
||||
import os
|
||||
import apt_pkg
|
||||
import dbus
|
||||
import shutil
|
||||
import dbus.service
|
||||
from dbus.mainloop.glib import DBusGMainLoop
|
||||
DBusGMainLoop(set_as_default=True)
|
||||
|
||||
from DistUpgrade.DistUpgradeCache import NotEnoughFreeSpaceError
|
||||
from .Core.MyCache import MyCache
|
||||
from .UpdateManagerDbus import UpdateManagerDbusController
|
||||
from .Core.UpdateList import UpdateList
|
||||
from .backend import (InstallBackend,
|
||||
get_backend)
|
||||
|
||||
#安装完成之后是否有请求需要重启
|
||||
REBOOT_REQUIRED_FILE = "/var/run/reboot-required"
|
||||
|
||||
INSTALL_ALONE_PROGRESS = "alone"
|
||||
|
||||
class UpdateManager():
|
||||
|
||||
def __init__(self,options):
|
||||
self.options = options
|
||||
self.cache = None
|
||||
self.update_list = None
|
||||
|
||||
#表示是否处于更新和安装的状态
|
||||
self.is_updating = False
|
||||
self.is_upgrading = False
|
||||
|
||||
#建立dbus
|
||||
self.dbusController = self._setup_dbus()
|
||||
|
||||
# # FIXME: 目前此功能没有进行测试不知道是否可以进行检查 安装的时进行检查磁盘空间
|
||||
def check_free_space(self,cache):
|
||||
err_sum = _("Not enough free disk space")
|
||||
err_msg = _("The upgrade needs a total of %s free space on "
|
||||
"disk '%s'. "
|
||||
"Please free at least an additional %s of disk "
|
||||
"space on '%s'. %s")
|
||||
# specific ways to resolve lack of free space
|
||||
remedy_archivedir = _("Remove temporary packages of former "
|
||||
"installations using 'sudo apt clean'.")
|
||||
remedy_boot = _("You can remove old kernels using "
|
||||
"'sudo apt autoremove', and you could also "
|
||||
"set COMPRESS=xz in "
|
||||
"/etc/initramfs-tools/initramfs.conf to "
|
||||
"reduce the size of your initramfs.")
|
||||
remedy_root = _("Empty your trash and remove temporary "
|
||||
"packages of former installations using "
|
||||
"'sudo apt clean'.")
|
||||
remedy_tmp = _("Reboot to clean up files in /tmp.")
|
||||
remedy_usr = _("")
|
||||
# check free space and error if its not enough
|
||||
try:
|
||||
cache.checkFreeSpace()
|
||||
except NotEnoughFreeSpaceError as e:
|
||||
# CheckFreeSpace examines where packages are cached
|
||||
archivedir = apt_pkg.config.find_dir("Dir::Cache::archives")
|
||||
err_long = ""
|
||||
for req in e.free_space_required_list:
|
||||
if err_long != "":
|
||||
err_long += " "
|
||||
if req.dir == archivedir:
|
||||
err_long += err_msg % (req.size_total, req.dir,
|
||||
req.size_needed, req.dir,
|
||||
remedy_archivedir)
|
||||
elif req.dir == "/boot":
|
||||
err_long += err_msg % (req.size_total, req.dir,
|
||||
req.size_needed, req.dir,
|
||||
remedy_boot)
|
||||
elif req.dir == "/":
|
||||
err_long += err_msg % (req.size_total, req.dir,
|
||||
req.size_needed, req.dir,
|
||||
remedy_root)
|
||||
elif req.dir == "/tmp":
|
||||
err_long += err_msg % (req.size_total, req.dir,
|
||||
req.size_needed, req.dir,
|
||||
remedy_tmp)
|
||||
elif req.dir == "/usr":
|
||||
err_long += err_msg % (req.size_total, req.dir,
|
||||
req.size_needed, req.dir,
|
||||
remedy_usr)
|
||||
#在此抛出异常
|
||||
# self.window_main.start_error(False, err_sum, err_long)
|
||||
return False
|
||||
except SystemError:
|
||||
logging.exception("free space check failed")
|
||||
|
||||
return True
|
||||
|
||||
#进行更新的操作
|
||||
def start_update(self):
|
||||
try:
|
||||
self.is_updating = True
|
||||
update_backend = get_backend(self, InstallBackend.ACTION_UPDATE)
|
||||
update_backend.start()
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
|
||||
#进行升级的操作
|
||||
def start_install(self,partial_upgrade_list = []):
|
||||
#检查磁盘的状态
|
||||
if self.check_free_space(self.cache) == False:
|
||||
return
|
||||
logging.info("Disk Check finished...")
|
||||
try:
|
||||
self.is_upgrading = True
|
||||
install_backend = get_backend(self, InstallBackend.ACTION_INSTALL)
|
||||
install_backend.start(partial_upgrade_list=partial_upgrade_list)
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
|
||||
#进行升级的操作-传入包列表
|
||||
def start_install_alone(self,pkgs_install = [], pkgs_upgrade = [], pkgs_remove = [],pkgs_purge = []):
|
||||
#检查磁盘的状态
|
||||
if self.check_free_space(self.cache) == False:
|
||||
return
|
||||
logging.info("Disk Check finished...")
|
||||
self.is_upgrading = True
|
||||
install_backend = get_backend(self, InstallBackend.ACTION_INSTALL)
|
||||
install_backend.start_alone(pkgs_install,pkgs_upgrade,pkgs_remove,pkgs_purge)
|
||||
|
||||
#更新结束之后会调到此获取要升级的列表 and 更新cache 生成升级组列表JSON
|
||||
def start_available(self, cancelled_update=False):
|
||||
_success,header,desc = self.refresh_cache()
|
||||
|
||||
#特殊情况的处理 单独安装包需要直接退出 安装or卸载执行完毕后 还会调到start_available
|
||||
if _success == True and header == INSTALL_ALONE_PROGRESS:
|
||||
return
|
||||
else:
|
||||
#发送更新升级列表完成的标志
|
||||
self.dbusController.UpdateDetectFinished(_success,self.update_list.local_upgrade_list.get('upgrade_groups_list',[]),header,desc)
|
||||
|
||||
#检查安装完成之后需要重启吗
|
||||
if os.path.exists(REBOOT_REQUIRED_FILE):
|
||||
logging.error("REBOOT_REQUIRED_FILE")
|
||||
|
||||
def refresh_cache(self):
|
||||
_success = True
|
||||
header = None
|
||||
desc = None
|
||||
try:
|
||||
#第一次进入 之后update不进入
|
||||
if self.cache is None:
|
||||
self.cache = MyCache(None)
|
||||
else:
|
||||
self.cache.open(None)
|
||||
self.cache._initDepCache()
|
||||
except AssertionError:
|
||||
header = _("Software index is broken")
|
||||
desc = _("It is impossible to install or remove any software. "
|
||||
"Please use the package manager \"Synaptic\" or run "
|
||||
"\"sudo apt-get install -f\" in a terminal to fix "
|
||||
"this issue at first.")
|
||||
_success = False
|
||||
return _success,header,desc
|
||||
|
||||
except SystemError as e:
|
||||
header = _("Could not initialize the package information")
|
||||
desc = _("An unresolvable problem occurred while "
|
||||
"initializing the package information.\n\n"
|
||||
"Please report this bug against the 'update-manager' "
|
||||
"package and include the following error "
|
||||
"message:\n") + str(e)
|
||||
_success = False
|
||||
return _success,header,desc
|
||||
|
||||
self.update_list = UpdateList(self)
|
||||
|
||||
'''1、
|
||||
dist-upgrade 标记在此处进行
|
||||
来判断将要删除的包 如果存在要删除的break的包的话 会从dist-upgrade切换到upgrade 目前此功能不使用 默认使用dist-upgrade
|
||||
'''
|
||||
self.distUpgradeWouldDelete = self.cache.saveDistUpgrade()
|
||||
|
||||
#2、 安装JSON分组配置文件包 安装完毕会重新调start_available --> here 安装失败就直接退出不会进行下面的操作
|
||||
try:
|
||||
pkg_json = self.cache[self.update_list.GROUPS_JSON_PKG]
|
||||
#是否安装
|
||||
if pkg_json.is_installed:
|
||||
#是否可升级
|
||||
if pkg_json.is_upgradable:
|
||||
logging.info("groups JSON ConfigPkgs(%s) start upgrading...",self.GROUPS_JSON_PKG)
|
||||
self.start_install_alone(pkgs_upgrade = [self.update_list.GROUPS_JSON_PKG])
|
||||
#直接退出
|
||||
_success = True
|
||||
header = INSTALL_ALONE_PROGRESS
|
||||
return _success,header,desc
|
||||
else:
|
||||
logging.info("ConfigPkgs(%s) No need to upgrade...",self.update_list.GROUPS_JSON_PKG)
|
||||
else:
|
||||
logging.info("groups JSON ConfigPkgs(%s) start new installing...",self.update_list.GROUPS_JSON_PKG)
|
||||
self.start_install_alone(pkgs_install = [self.update_list.GROUPS_JSON_PKG])
|
||||
#直接退出
|
||||
_success = True
|
||||
header = INSTALL_ALONE_PROGRESS
|
||||
return _success,header,desc
|
||||
|
||||
#FIXME: 错误处理未做 报告到控制面板
|
||||
except Exception as e:
|
||||
logging.warning("groups JSON ConfigPkgs(%s) install failed...",self.update_list.GROUPS_JSON_PKG)
|
||||
logging.error(e)
|
||||
_success = False
|
||||
|
||||
#3、 判断目录是JSON配置文件夹是否缺失 缺失后进行修复 卸载重新安装步骤
|
||||
if not os.path.exists(self.update_list.INPUT_CONFIG_PATH):
|
||||
logging.info("groups JSON Config Path(%s) Missing and Trying to fix...",self.update_list.INPUT_CONFIG_PATH)
|
||||
#将软件包卸载 之后进行重新安装here --> purge --> start_available 进行判断是否安装未安装重新安装
|
||||
self.start_install_alone(pkgs_purge = [self.update_list.GROUPS_JSON_PKG])
|
||||
#直接退出
|
||||
_success = True
|
||||
header = INSTALL_ALONE_PROGRESS
|
||||
return _success,header,desc
|
||||
|
||||
#4、 清空上次输出的分组JSON文件
|
||||
try:
|
||||
if not os.path.exists(self.update_list.OUTPUT_CONFIG_PATH):
|
||||
os.makedirs(self.update_list.OUTPUT_CONFIG_PATH)
|
||||
logging.info('making the configuration file is complete...')
|
||||
else:
|
||||
shutil.rmtree(self.update_list.OUTPUT_CONFIG_PATH)
|
||||
os.makedirs(self.update_list.OUTPUT_CONFIG_PATH)
|
||||
logging.info('Emptying the configuration file is complete...')
|
||||
except Exception as e:
|
||||
logging.warning(e)
|
||||
|
||||
#FIXME: 5、 待开发功能 根据监测存在配置文件 不存在进行重新安装包 再检测还是未存在的话 就判断此次没有可升级的
|
||||
|
||||
#FIXME: 6、 出错后未进行处理 更新important.list 文件错误的话
|
||||
# self.dbusController._on_update_important_list()
|
||||
|
||||
try:
|
||||
_success,header,desc = self.update_list.update(self.cache)
|
||||
except SystemError as e:
|
||||
header = _("Could not calculate the upgrade")
|
||||
desc = _("An unresolvable problem occurred while "
|
||||
"calculating the upgrade.\n\n"
|
||||
"Please report this bug against the 'update-manager' "
|
||||
"package and include the following error "
|
||||
"message:\n") + str(e)
|
||||
_success = False
|
||||
|
||||
return _success,header,desc
|
||||
|
||||
def _setup_dbus(self):
|
||||
# check if there is another g-a-i already and if not setup one
|
||||
# listening on dbus
|
||||
try:
|
||||
bus = dbus.SystemBus()
|
||||
except Exception:
|
||||
logging.error("warning: could not initiate dbus")
|
||||
return
|
||||
try:
|
||||
proxy_obj = bus.get_object('com.kylin.systemupgrade',
|
||||
'/com/kylin/systemupgrade')
|
||||
|
||||
logging.warning("kylin-update-manager have started...")
|
||||
sys.exit(0)
|
||||
except dbus.DBusException:
|
||||
bus_name = dbus.service.BusName('com.kylin.systemupgrade',
|
||||
bus)
|
||||
logging.info('initiate dbus success ...')
|
||||
return UpdateManagerDbusController(self, bus_name)
|
|
@ -1,167 +0,0 @@
|
|||
#!/usr/bin/python3
|
||||
import dbus
|
||||
import dbus.service
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from .Core.AlertWatcher import AlertWatcher
|
||||
from .Core.roam import NetworkManagerHelper
|
||||
|
||||
#dbus 建立
|
||||
class UpdateManagerDbusController(dbus.service.Object):
|
||||
""" this is a helper to provide the UpdateManagerIFace """
|
||||
|
||||
INTERFACE = 'com.kylin.systemupgrade.interface'
|
||||
|
||||
def __init__(self, parent, bus_name,
|
||||
object_path='/com/kylin/systemupgrade'):
|
||||
dbus.service.Object.__init__(self, bus_name, object_path)
|
||||
self.parent = parent
|
||||
|
||||
#网络检测 电池检测等等的启动检查
|
||||
self.alert_watcher = AlertWatcher()
|
||||
self.alert_watcher.check_alert_state()
|
||||
self.alert_watcher.connect("network-alert", self._on_network_alert)
|
||||
self.connected = False
|
||||
self.transaction = None
|
||||
|
||||
#更新important.list的本次升级的列表
|
||||
def _on_update_important_list(self):
|
||||
lock = threading.Lock()
|
||||
bus = dbus.SystemBus()
|
||||
try:
|
||||
obj = bus.get_object('com.kylin.software.properties', '/com/kylin/software/properties')
|
||||
interface = dbus.Interface(obj, dbus_interface='com.kylin.software.properties.interface')
|
||||
lock.acquire()
|
||||
retval = interface.updateSourceTemplate()
|
||||
lock.release()
|
||||
except Exception as e:
|
||||
logging.error("update sourceTemplate Failed and Error mes:%s"%str(e))
|
||||
return False
|
||||
|
||||
if retval == False:
|
||||
logging.warning("update SourceTemplate failed")
|
||||
return False
|
||||
else:
|
||||
logging.info("update sourceTemplate successed...")
|
||||
return True
|
||||
|
||||
#检测网络的状态
|
||||
def _on_network_alert(self, watcher, state):
|
||||
if state in NetworkManagerHelper.NM_STATE_CONNECTED_LIST:
|
||||
self.connected = True
|
||||
logging.info('Network Connected ...')
|
||||
else:
|
||||
self.connected = False
|
||||
logging.info('Network Disconnected ...')
|
||||
|
||||
#更新的dbus
|
||||
@dbus.service.method(INTERFACE,out_signature='b')
|
||||
def UpdateDetect(self):
|
||||
try:
|
||||
#处于更新和升级中的话 不进行更新
|
||||
if self.parent.is_updating or self.parent.is_upgrading:
|
||||
logging.info('In the process of updating or Upgrading...')
|
||||
return False,'In the process of updating or Upgrading...'
|
||||
else:
|
||||
self.parent.start_update()
|
||||
logging.info('method dbus updating ...')
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
#全部升级
|
||||
@dbus.service.method(INTERFACE,out_signature='b')
|
||||
def DistUpgradeSystem(self):
|
||||
try:
|
||||
#处于更新和升级中的话 不进行升级
|
||||
if self.parent.is_updating or self.parent.is_upgrading:
|
||||
logging.info('In the process of updating or Upgrading...')
|
||||
return False,'In the process of updating or Upgrading...'
|
||||
else:
|
||||
upgrade_groups_list = self.parent.update_list.local_upgrade_list.get('upgrade_groups_list',[])
|
||||
if upgrade_groups_list:
|
||||
logging.info('method dbus upgrading ...')
|
||||
self.parent.start_install()
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
#部分升级
|
||||
@dbus.service.method(INTERFACE,in_signature='as',out_signature='bs')
|
||||
def DistUpgradePartial(self,_partial_upgrade_list):
|
||||
try:
|
||||
#处于更新和升级中的话 不进行升级
|
||||
if self.parent.is_updating or self.parent.is_upgrading:
|
||||
logging.info('In the process of updating or Upgrading...')
|
||||
return False,'In the process of updating or Upgrading...'
|
||||
else:
|
||||
partial_upgrade_list = [str(i) for i in _partial_upgrade_list]
|
||||
#本地维护的可升级的组
|
||||
upgrade_groups_list = self.parent.update_list.local_upgrade_list.get('upgrade_groups_list',[])
|
||||
upgrade_list = list(set(partial_upgrade_list) & set(upgrade_groups_list))
|
||||
|
||||
if upgrade_list:
|
||||
logging.info('dbus partial_upgrade(%s)',upgrade_list)
|
||||
self.parent.start_install(upgrade_list)
|
||||
return True,'dbus upgrading'
|
||||
else:
|
||||
logging.info('input upgrade list(%s) not in local upgrade_list(%s)',partial_upgrade_list,upgrade_groups_list)
|
||||
return False,'upgrade_list is empty'
|
||||
except Exception as e:
|
||||
return False,e
|
||||
|
||||
# 取消transaction
|
||||
@dbus.service.method(INTERFACE, out_signature='bs')
|
||||
def CancelDownload(self):
|
||||
status = False
|
||||
message = ""
|
||||
try:
|
||||
if self.transaction.cancellable == True:
|
||||
self.transaction.cancel()
|
||||
status = True
|
||||
message = "Success"
|
||||
logging.info("dbus-mothod cancel task Success")
|
||||
elif self.transaction == None or self.transaction.cancellable == False:
|
||||
message = "Can not Cancel"
|
||||
except Exception as e:
|
||||
return (status,str(e))
|
||||
return (status, message)
|
||||
|
||||
#更新进度信息 0~100 进度信息 101为非预期的信号
|
||||
@dbus.service.signal(INTERFACE,signature='is')
|
||||
def UpdateDetectStatusChanged(self,progress,status):
|
||||
logging.info("emit progress = %d , status = %s",progress,status)
|
||||
|
||||
#更新完成的信号
|
||||
@dbus.service.signal(INTERFACE,signature='basss')
|
||||
def UpdateDetectFinished(self, success, upgrade_group,error_string='',error_desc='',):
|
||||
logging.info("emit update success = %r , upgrade_group = %a, error_string = %s , error_desc = %s ",\
|
||||
success,upgrade_group, error_string,error_desc)
|
||||
|
||||
#升级的进度信息 0~100 进度信息 101为非预期的信号
|
||||
@dbus.service.signal(INTERFACE,signature='asis')
|
||||
def UpdateDloadAndInstStaChanged(self,groups_list,progress,status):
|
||||
logging.info("emit upgrade groups_list = %s progress = %d , status = %s",groups_list,progress,status)
|
||||
|
||||
#升级完成的信号
|
||||
@dbus.service.signal(INTERFACE,signature='basss')
|
||||
def UpdateDownloadFinished(self, success, upgrade_group,error_string='',error_desc='',):
|
||||
logging.info("emit success = %r , upgrade_group = %a, error_string = %s , error_desc = %s ",\
|
||||
success,upgrade_group, error_string,error_desc)
|
||||
|
||||
#发送下载包信息
|
||||
@dbus.service.signal(INTERFACE, signature='iiiii')
|
||||
def UpdateDownloadInfo(self, current_items, total_items, currenty_bytes, total_bytes, current_cps):
|
||||
logging.info("emit current_items = %d, total_items = %d, currenty_bytes = %d, total_bytes = %d, current_cps = %d .",\
|
||||
current_items, total_items, \
|
||||
currenty_bytes, total_bytes,\
|
||||
current_cps)
|
||||
|
||||
# 信号是否可取消
|
||||
@dbus.service.signal(INTERFACE, signature='b')
|
||||
def Cancelable(self, Cancelable):
|
||||
logging.info("emit Cancelable: %r",\
|
||||
Cancelable)
|
|
@ -1 +0,0 @@
|
|||
VERSION = '1:20.04.9'
|
|
@ -1,165 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
from aptdaemon import client, errors
|
||||
from defer import inline_callbacks
|
||||
from aptdaemon.enums import (EXIT_SUCCESS,
|
||||
EXIT_FAILED,
|
||||
get_error_description_from_enum,
|
||||
get_error_string_from_enum,
|
||||
get_status_string_from_enum
|
||||
)
|
||||
|
||||
from UpdateManager.backend import InstallBackend
|
||||
import logging
|
||||
from gettext import gettext as _
|
||||
import dbus
|
||||
|
||||
class InstallBackendAptdaemon(InstallBackend):
|
||||
"""Makes use of aptdaemon to refresh the cache and to install updates."""
|
||||
|
||||
def __init__(self, window_main, action):
|
||||
InstallBackend.__init__(self, window_main, action)
|
||||
self.window_main = window_main
|
||||
#客户端连接aptdeamon的dbus接口
|
||||
self.client = client.AptClient()
|
||||
self.trans_failed_msg = None
|
||||
|
||||
self.trans_progress = 0
|
||||
self.trans_status = ''
|
||||
|
||||
@inline_callbacks
|
||||
def update(self):
|
||||
"""刷新包cache"""
|
||||
try:
|
||||
trans = yield self.client.update_cache(defer=True)
|
||||
self.window_main.dbusController.transaction = trans
|
||||
#注册回调函数 接收更新的状态
|
||||
yield self._show_transaction(trans, self.ACTION_UPDATE,
|
||||
_("Checking for updates…"), False)
|
||||
except errors.NotAuthorizedError:
|
||||
self._action_done(self.ACTION_UPDATE,
|
||||
authorized=False, success=False,
|
||||
error_string=None, error_desc=None)
|
||||
except Exception:
|
||||
self._action_done(self.ACTION_UPDATE,
|
||||
authorized=True, success=False,
|
||||
error_string=None, error_desc=None)
|
||||
raise
|
||||
|
||||
@inline_callbacks
|
||||
def commit(self, pkgs_install, pkgs_upgrade, pkgs_remove,pkgs_purge):
|
||||
"""Commit a list of package adds and removes"""
|
||||
try:
|
||||
reinstall = downgrade = []
|
||||
trans = yield self.client.commit_packages(
|
||||
pkgs_install, reinstall, pkgs_remove, purge = pkgs_purge, upgrade = pkgs_upgrade,
|
||||
downgrade = downgrade, defer=True)
|
||||
self.window_main.dbusController.transaction = trans
|
||||
|
||||
yield self._show_transaction(trans, self.ACTION_INSTALL,
|
||||
_("Installing updates…"), True)
|
||||
except errors.NotAuthorizedError:
|
||||
self._action_done(self.ACTION_INSTALL,
|
||||
authorized=False, success=False,
|
||||
error_string=None, error_desc=None)
|
||||
except errors.TransactionFailed as e:
|
||||
self.trans_failed_msg = str(e)
|
||||
except dbus.DBusException as e:
|
||||
if e.get_dbus_name() != "org.freedesktop.DBus.Error.NoReply":
|
||||
raise
|
||||
self._action_done(self.ACTION_INSTALL,
|
||||
authorized=False, success=False,
|
||||
error_string=None, error_desc=None)
|
||||
except Exception:
|
||||
self._action_done(self.ACTION_INSTALL,
|
||||
authorized=True, success=False,
|
||||
error_string=None, error_desc=None)
|
||||
raise
|
||||
|
||||
#进度回调
|
||||
def _on_progress_changed(self, trans,progress,action):
|
||||
self.trans_progress = progress
|
||||
if action == self.ACTION_UPDATE:
|
||||
self.window_main.dbusController.UpdateDetectStatusChanged(self.trans_progress,self.trans_status)
|
||||
else:
|
||||
self.window_main.dbusController.UpdateDloadAndInstStaChanged(self.upgrade_groups_list,self.trans_progress,self.trans_status)
|
||||
|
||||
#同步状态回调
|
||||
def _on_status_changed(self, trans, status,action):
|
||||
#转化词条
|
||||
self.trans_status = get_status_string_from_enum(status)
|
||||
|
||||
if action == self.ACTION_UPDATE:
|
||||
self.window_main.dbusController.UpdateDetectStatusChanged(self.trans_progress,self.trans_status)
|
||||
else:
|
||||
#升级的时候发送状态信号时需要上传更新组信息self.upgrade_groups_list
|
||||
self.window_main.dbusController.UpdateDloadAndInstStaChanged(self.upgrade_groups_list,self.trans_progress,self.trans_status)
|
||||
|
||||
def _on_details_changed(self, trans, details):
|
||||
logging.info(details)
|
||||
|
||||
def _on_download_changed(self, trans, details):
|
||||
logging.info(details)
|
||||
|
||||
# eta 下载速度不正确,取消掉
|
||||
def _on_progress_download_changed(self,trans,current_items, total_items, currenty_bytes, total_bytes, current_cps, eta):
|
||||
if self.action == self.ACTION_INSTALL:
|
||||
self.window_main.dbusController.UpdateDownloadInfo(\
|
||||
current_items, total_items, \
|
||||
currenty_bytes, total_bytes, \
|
||||
current_cps)
|
||||
|
||||
def _on_cancellable_changed(self, trans, Cancelable):
|
||||
self.window_main.dbusController.Cancelable(Cancelable)
|
||||
|
||||
@inline_callbacks
|
||||
def _show_transaction(self, trans, action, header, show_details):
|
||||
|
||||
#更新和升级最后完成和失败都会走此在此进行完成之后的处理
|
||||
trans.connect("finished", self._on_finished, action)
|
||||
|
||||
#升级和更新的状态信息和进度
|
||||
trans.connect("status-changed", self._on_status_changed,action)
|
||||
trans.connect("progress-changed", self._on_progress_changed,action)
|
||||
|
||||
#取消升级
|
||||
trans.connect("cancellable-changed", self._on_cancellable_changed)
|
||||
|
||||
#下载的进度信息
|
||||
trans.connect("progress-details-changed", self._on_progress_download_changed)
|
||||
|
||||
# trans.connect("medium-required", self._on_medium_required)
|
||||
# trans.connect("status-details-changed", self._on_details_changed)
|
||||
#状态改变的时候的回调函数
|
||||
# trans.connect("download-changed", self._on_download_changed)
|
||||
# trans.connect("config-file-conflict", self._on_config_file_conflict)
|
||||
# yield trans.set_debconf_frontend("ukui")
|
||||
yield trans.run()
|
||||
|
||||
def _on_finished(self, trans, status, action):
|
||||
error_string = ''
|
||||
error_desc = ''
|
||||
trans_failed = False
|
||||
|
||||
if status == EXIT_FAILED:
|
||||
error_string = get_error_string_from_enum(trans.error.code)
|
||||
error_desc = get_error_description_from_enum(trans.error.code)
|
||||
if self.trans_failed_msg:
|
||||
trans_failed = True
|
||||
error_desc = error_desc + "\n" + self.trans_failed_msg
|
||||
is_success = (status == EXIT_SUCCESS)
|
||||
|
||||
try:
|
||||
self._action_done(action,
|
||||
authorized=True, success=is_success,
|
||||
error_string=error_string, error_desc=error_desc,
|
||||
trans_failed=trans_failed)
|
||||
except TypeError:
|
||||
# this module used to be be lazily imported and in older code
|
||||
# trans_failed= is not accepted
|
||||
# TODO: this workaround can be dropped in Ubuntu 20.10
|
||||
self._action_done(action,
|
||||
authorized=True, success=is_success,
|
||||
error_string=error_string, error_desc=error_desc)
|
|
@ -1,133 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
|
||||
|
||||
"""Integration of package managers into UpdateManager"""
|
||||
# (c) 2005-2009 Canonical, GPL
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
from apt import Cache
|
||||
import logging
|
||||
import os
|
||||
from gettext import gettext as _
|
||||
|
||||
|
||||
class InstallBackend():
|
||||
ACTION_UPDATE = 0
|
||||
ACTION_INSTALL = 1
|
||||
|
||||
def __init__(self, window_main, action):
|
||||
self.window_main = window_main
|
||||
self.action = action
|
||||
self.upgrade_groups_list = []
|
||||
|
||||
def start(self,partial_upgrade_list = []):
|
||||
|
||||
#FIXME: 在下载升级的能抑制系统关闭或者睡眠 参考ubuntu此部分代码
|
||||
|
||||
if self.action == self.ACTION_INSTALL:
|
||||
pkgs_install = []
|
||||
pkgs_upgrade = []
|
||||
pkgs_remove = []
|
||||
pkgs_purge = []
|
||||
try:
|
||||
#可选升级不为空
|
||||
if partial_upgrade_list:
|
||||
self.upgrade_groups_list = partial_upgrade_list
|
||||
#全部升级列表
|
||||
else:
|
||||
self.upgrade_groups_list = self.window_main.update_list.local_upgrade_list.get('upgrade_groups_list',[])
|
||||
|
||||
#遍历升级组列表
|
||||
if self.upgrade_groups_list:
|
||||
for group_name in self.upgrade_groups_list:
|
||||
pkgs_install += self.window_main.update_list.local_upgrade_list.get(group_name,[]).get('pkgs_install',[])
|
||||
pkgs_upgrade += self.window_main.update_list.local_upgrade_list.get(group_name,[]).get('pkgs_upgrade',[])
|
||||
|
||||
pkgs_remove = self.window_main.update_list.local_upgrade_list.get("pkgs_remove",[])
|
||||
else:
|
||||
logging.info("no upgradeable packages")
|
||||
return
|
||||
|
||||
#目前不确定#auto含义 大概是自动安装新包 类似加-y ubuntu这样做
|
||||
new_pkgs_install = []
|
||||
for pkgname in pkgs_install:
|
||||
pkgname += "#auto"
|
||||
new_pkgs_install.append(pkgname)
|
||||
|
||||
logging.info("commit install:%d , upgrade:%d remove:%d",len(pkgs_install),len(pkgs_upgrade),len(pkgs_remove))
|
||||
self.commit(new_pkgs_install, pkgs_upgrade, pkgs_remove,pkgs_purge)
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
else:
|
||||
self.update()
|
||||
|
||||
def start_alone(self,pkgs_install = [], pkgs_upgrade = [], pkgs_remove = [],pkgs_purge = []):
|
||||
os.environ["APT_LISTCHANGES_FRONTEND"] = "none"
|
||||
|
||||
if self.action == self.ACTION_INSTALL:
|
||||
# Get the packages which should be installed and update
|
||||
self.commit(pkgs_install, pkgs_upgrade, pkgs_remove,pkgs_purge)
|
||||
else:
|
||||
self.update()
|
||||
|
||||
def update(self):
|
||||
"""Run a update to refresh the package list"""
|
||||
raise NotImplementedError
|
||||
|
||||
def commit(self, pkgs_install, pkgs_upgrade, pkgs_remove,pkgs_purge):
|
||||
"""Commit the cache changes """
|
||||
raise NotImplementedError
|
||||
|
||||
#出现错误和更新升级完成都会调到此方法 进行处理
|
||||
def _action_done(self, action, authorized, success, error_string,
|
||||
error_desc, trans_failed=False):
|
||||
|
||||
#升级完成后走的分支
|
||||
if action == self.ACTION_INSTALL:
|
||||
self.window_main.is_upgrading = False
|
||||
if success:
|
||||
#当组列表为空时 表示现在的单独进行安装某些包或卸载,不发信号到控制面板
|
||||
if self.upgrade_groups_list:
|
||||
self.window_main.dbusController.UpdateDownloadFinished(success,self.upgrade_groups_list,'','')
|
||||
else:
|
||||
self.window_main.start_available()
|
||||
elif error_string or error_desc:
|
||||
logging.warning(error_string + error_desc)
|
||||
self.window_main.dbusController.UpdateDownloadFinished(success,self.upgrade_groups_list,error_string,error_desc)
|
||||
else:
|
||||
self.window_main.dbusController.UpdateDownloadFinished(success,self.upgrade_groups_list,'','')
|
||||
else:
|
||||
self.window_main.is_updating = False
|
||||
if success:
|
||||
self.window_main.start_available()
|
||||
elif error_string or error_desc:
|
||||
logging.warning(error_string + error_desc)
|
||||
self.window_main.dbusController.UpdateDetectFinished(success,[],error_string,error_desc)
|
||||
else:
|
||||
self.window_main.dbusController.UpdateDetectFinished(success,[],'','')
|
||||
|
||||
# try aptdaemon
|
||||
if os.path.exists("/usr/sbin/aptd") \
|
||||
and "UPDATE_MANAGER_FORCE_BACKEND_SYNAPTIC" not in os.environ:
|
||||
# check if the gtkwidgets are installed as well
|
||||
try:
|
||||
from .InstallBackendAptdaemon import InstallBackendAptdaemon
|
||||
except ImportError:
|
||||
logging.exception("importing aptdaemon")
|
||||
|
||||
|
||||
def get_backend(*args, **kwargs):
|
||||
"""Select and return a package manager backend."""
|
||||
# try aptdaemon
|
||||
if (os.path.exists("/usr/sbin/aptd")
|
||||
and "UPDATE_MANAGER_FORCE_BACKEND_SYNAPTIC" not in os.environ):
|
||||
# check if the gtkwidgets are installed as well
|
||||
try:
|
||||
return InstallBackendAptdaemon(*args, **kwargs)
|
||||
except NameError:
|
||||
logging.exception("using aptdaemon failed")
|
||||
|
||||
# nothing found, raise
|
||||
raise Exception("No working backend found, please try installing "
|
||||
"aptdaemon or synaptic")
|
|
@ -1,16 +0,0 @@
|
|||
{
|
||||
// 使用 IntelliSense 了解相关属性。
|
||||
// 悬停以查看现有属性的描述。
|
||||
// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Python: 当前文件",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"program": "${file}",
|
||||
"console": "integratedTerminal",
|
||||
"sudo":true
|
||||
}
|
||||
]
|
||||
}
|
|
@ -39,4 +39,5 @@ if __name__ == "__main__":
|
|||
app.start_update()
|
||||
|
||||
loop = GLib.MainLoop()
|
||||
loop.run()
|
||||
loop.run()
|
||||
|
Loading…
Reference in New Issue