kylin-system-updater/unattended-upgrades/kylin-unattended-upgrade

4176 lines
168 KiB
Plaintext
Raw Normal View History

2022-11-03 19:10:26 +08:00
#!/usr/bin/python3
# Copyright (c) 2005-2018 Canonical Ltd
#
# AUTHOR:
# Michael Vogt <mvo@ubuntu.com>
# Balint Reczey <rbalint@ubuntu.com>
# This file is part of unattended-upgrades
#
# unattended-upgrades is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation; either version 2 of the License, or (at
# your option) any later version.
#
# unattended-upgrades is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with unattended-upgrades; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#from backend.build.lib.SystemUpdater.backend.DownloadBackend import FetchProgress
# from email.policy import default
import stat
import atexit
import copy
import datetime
import errno
# import email.charset
import fcntl
import fnmatch
import gettext
# from backend.SystemUpdater.Core.utils import error
#from zoneinfo import ZoneInfoNotFoundError
'''
try:
from gi.repository.Gio import NetworkMonitor
except ImportError:
pass
'''
import grp
import io
import locale
import logging
2023-05-29 14:47:39 +08:00
2022-11-03 19:10:26 +08:00
import re
import os
import select
import signal
import socket
import string
import subprocess
import sys
import syslog
import shutil
try:
from typing import AbstractSet, cast, DefaultDict, Dict, Iterable, List
AbstractSet # pyflakes
DefaultDict # pyflakes
Dict # pyflakes
Iterable # pyflakes
List # pyflakes
from typing import Set, Tuple, Union
Set # pyflakes
Tuple # pyflakes
Union # pyflakes
except ImportError:
pass
from collections import defaultdict, namedtuple
from datetime import date
from email.message import Message
import email
from gettext import gettext as _
from io import StringIO
from optparse import (
OptionParser,
SUPPRESS_HELP,
)
from subprocess import (
Popen,
PIPE,
)
from textwrap import wrap
import apt
import apt_inst
import apt_pkg
import dbus
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
#import distro_info
from apt.progress.base import InstallProgress
import configparser
import fcntl
import time
import subprocess
import json
SOURCESLIST = "/etc/apt/sources.list"
RELEASEOFFSET = 1
ORIGINOFFSET = 2
HTTPTYPE = "HTTP"
FTPTYPE = "FTP"
ARCHITECTUREMAP = ['arm64','amd64','armhf','i386','loongarch64','mips64el','sw64']
2023-05-29 14:47:39 +08:00
VERSION_FILE = '/etc/kylin-version/kylin-system-version.conf'
2022-11-03 19:10:26 +08:00
OTA_RESULT_FILE_PATH="/opt/apt_result/"
OTA_RESULT_FILE="/opt/apt_result/ota_result"
2023-05-29 14:47:39 +08:00
#SYSTEM_UPDATER_CORE_LIB_PATH="/usr/share/kylin-system-updater/SystemUpdater/Core"
2022-11-03 19:10:26 +08:00
# sys.path.append(SYSTEM_UPDATER_CORE_LIB_PATH)
# from OriginFilter import UnattendUpgradeFilter
2023-05-29 14:47:39 +08:00
KYLIN_VERSION_FILE = "/etc/kylin-version/kylin-system-version.conf"
2022-11-03 19:10:26 +08:00
CONFIG_FILE_ROOT_PATH="/var/lib/unattended-upgrades"
UNATTENDED_UPGRADE_CONFIG_FILE_PATH="/var/lib/unattended-upgrades/unattended-upgrade.conf"
2023-05-29 14:47:39 +08:00
UNATTENDED_UPGRADE_POLICY_FILE_PATH="/var/lib/unattended-upgrades/unattended-upgrades-policy.conf"
2022-11-03 19:10:26 +08:00
WHITE_LIST_FILE_PATH="/var/lib/kylin-system-updater/system-updater.conf"
TIMESTAMP_PATH="/var/lib/kylin-software-properties/template/kylin-source-status"
CONTROL_PANEL_LOCK_FILE = "/tmp/auto-upgrade/ukui-control-center.lock"
# the reboot required flag file used by packages
REBOOT_REQUIRED_FILE = "/var/run/reboot-required"
KEPT_PACKAGES_FILE = "var/lib/unattended-upgrades/kept-back"
MAIL_BINARY = "/usr/bin/mail"
SENDMAIL_BINARY = "/usr/sbin/sendmail"
USERS = "/usr/bin/users"
# no py3 lsb_release in debian :/
DISTRO_CODENAME = subprocess.check_output(
["lsb_release", "-c", "-s"], universal_newlines=True).strip() # type: str
DISTRO_DESC = subprocess.check_output(
["lsb_release", "-d", "-s"], universal_newlines=True).strip() # type: str
DISTRO_ID = subprocess.check_output(
["lsb_release", "-i", "-s"], universal_newlines=True).strip() # type: str
# Number of days before release of devel where we enable unattended
# upgrades.
DEVEL_UNTIL_RELEASE = datetime.timedelta(days=21)
# progress information is written here
PROGRESS_LOG = "/var/run/unattended-upgrades.progress"
PID_FILE = "/var/run/unattended-upgrades.pid"
LOCK_FILE = "/var/run/kylin-unattended-upgrade.lock"
NOTIFICATION_PIPE = '/tmp/notification.pipe'
TIME_STAMP = "/var/lib/unattended-upgrades/unattended-upgrades-timestamp"
UNATTENDED_UPGRADE_PKG_LIST_FILE_PATH="/var/lib/kylin-system-updater/json/auto-upgrade-list.json"
OTA_PKGS_TO_INSTALL_LIST="/var/lib/unattended-upgrades/ota_pkgs_to_install_list"
# 禁止关机锁文件路径
FILELOCK_PATH = "/tmp/lock/"
SHUTDOWN_BLOCK_FILELOCK = "kylin-update.lock"
pidfile = None
# set from the sigint signal handler
SIGNAL_STOP_REQUEST = False
2023-05-29 14:47:39 +08:00
def kysec_pre_upgrade():
if os.path.exists("/usr/share/kysec-maintain/sys-upgrade-pre.sh"):
logging.debug("kysec pre-upgrade settings...")
subprocess.run(["/bin/sh","/usr/share/kysec-maintain/sys-upgrade-pre.sh"])
2022-11-03 19:10:26 +08:00
2023-05-29 14:47:39 +08:00
def kysec_post_upgrade():
if os.path.exists("/usr/share/kysec-maintain/sys-upgrade-post.sh"):
logging.debug("kysec post-upgrade settings...")
subprocess.run(["/bin/sh","/usr/share/kysec-maintain/sys-upgrade-post.sh"])
2022-11-03 19:10:26 +08:00
def reload_options_config():
#添加默认保留旧配置
apt_pkg.config["DPkg::Options::"] = "--force-confold"
options_new = list(set(apt_pkg.config.value_list("DPkg::Options")))
for option in ("--force-confnew","--force-confdef"):
if option in options_new:
options_new.remove(option)
#清除所有配置重新加载
apt_pkg.config.clear("DPkg::Options")
for option in options_new:
apt_pkg.config["DPkg::Options::"] = option
#去除安装推荐和建议的软件包
if apt_pkg.config.find_b("APT::Install-Recommends",False) == True:
apt_pkg.config.clear("APT::Install-Recommends")
if apt_pkg.config.find_b("APT::Install-Suggests",False) == True:
apt_pkg.config.clear("APT::Install-Suggests")
if apt_pkg.config.find("Dir::Etc::sourceparts","")!="":
apt_pkg.config["Dir::Etc::sourceparts"]=""
apt_pkg.init_system()
def get_default_version():
version = ""
data = {'version':""}
INPUT_CONFIG_PATH = '/usr/share/kylin-update-desktop-config/config/kylin-update-desktop-system.json'
if os.path.isfile(INPUT_CONFIG_PATH): # 存在
# 读取JSON文件
with open(INPUT_CONFIG_PATH, "r") as f:
try :
data = json.load(f)
version = data['version']
except json.JSONDecodeError as e:
logging.error(str(e))
return version
def ReadOsRelease(file):
2023-05-29 14:47:39 +08:00
osreleasedict = {}
2022-11-03 19:10:26 +08:00
try:
with open(file) as f:
lines = f.readlines()
for line in lines:
ls = line.strip().split('=',1)
osreleasedict.update({ls[0]:ls[1].strip('"')})
except Exception as e:
pass
2023-05-29 14:47:39 +08:00
if 'PROJECT_CODENAME' not in osreleasedict.keys():
osreleasedict.update({'PROJECT_CODENAME':''})
2022-11-03 19:10:26 +08:00
if 'SUB_PROJECT_CODENAME' not in osreleasedict.keys():
osreleasedict.update({'SUB_PROJECT_CODENAME':''})
return osreleasedict
2023-05-29 14:47:39 +08:00
'''
2022-11-03 19:10:26 +08:00
#安装时禁止关机 进行加锁
def LockedPreventShutdown():
global pidfile
#不为空是表示以及被锁
if pidfile != None:
logging.error("pidfile file disc not is None,Has been locked...")
return False
if not os.path.exists(FILELOCK_PATH):
#不存在创建
logging.info("File(%s) is not exists and will be create",FILELOCK_PATH)
os.makedirs(FILELOCK_PATH)
else:
#当目录存在时进行删除 不删除进行创建文件的话会报错
# file cannot be locked.[Errno 11] Resource temporarily unavailable
# 资源被占用报错
shutil.rmtree(FILELOCK_PATH)
logging.info("File(%s) is exists and will be delete and create",FILELOCK_PATH)
os.makedirs(FILELOCK_PATH)
try:
pidfile = open(os.path.join(FILELOCK_PATH, SHUTDOWN_BLOCK_FILELOCK), "w+")
fcntl.flock(pidfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
logging.info("Shutdown Has been locked...")
return True
except Exception as e:
logging.error("file cannot be locked." + str(e))
pidfile.close()
pidfile = None
return False
#解锁禁止关机
def unLockedEnableShutdown():
global pidfile
#未加锁退出
if not pidfile:
logging.info("Not locked and Quitting ...")
return False
try:
fcntl.flock(pidfile, fcntl.LOCK_UN)
logging.info("Shutdown Has been unlocked...")
pidfile.close()
pidfile = None
# Fix 修复权限问题 当普通用户无法使用 所以直接删除目录
if os.path.exists(FILELOCK_PATH):
shutil.rmtree(FILELOCK_PATH)
logging.info('Emptying the lockPath(%s) is complete...',FILELOCK_PATH)
else:
logging.info("Emptying the lockPath(%s) is Failed...",FILELOCK_PATH)
return True
except Exception as e:
logging.error("unlock failed." + str(e))
pidfile.close()
pidfile = None
return False
2023-05-29 14:47:39 +08:00
'''
2022-11-03 19:10:26 +08:00
def is_dpkg_journal_dirty():
# type: () -> bool
"""
Return True if the dpkg journal is dirty
(similar to debSystem::CheckUpdates)
"""
logging.debug("checking whether dpkg journal is dirty")
d = os.path.join("/var/lib/dpkg/",
#os.path.dirname(apt_pkg.config.find_file("Dir::State::status")),
"updates")
for f in os.listdir(d):
if re.match("[0-9]+", f) or re.match("tmp.i",f):
return True
return False
def get_abnormally_installed_pkg_count():
output = subprocess.check_output('dpkg -l|grep ^i[^i]|wc -l',shell=True)
return output.decode().strip()
def get_white_list_with_version(srclist,list,namelist):
for name_with_version in srclist:
nvlist = name_with_version.strip().split('=',1)
if nvlist[0] != '' and nvlist[1] != '':
list.append(nvlist)
namelist.append(nvlist[0])
#global timeStamp
def get_timestamp():
global timeStamp
config=configparser.ConfigParser(allow_no_value=True)
2023-05-29 14:47:39 +08:00
config.optionxform = str
2022-11-03 19:10:26 +08:00
config.read(TIMESTAMP_PATH)
time_value=time.localtime(int(config.get("Server","UpdateTime")))
logging.debug(("获取软件源时间戳:%s"),time_value)
timeStamp="自动备份:"+time.strftime("%Y-%m-%d %H:%M:%S",time_value)+" "+config.get("Server","UpdateTime")
return timeStamp
def WriteValueToFile(file,section,option,value):
config=configparser.ConfigParser(allow_no_value=True)
2023-05-29 14:47:39 +08:00
config.optionxform = str
2022-11-03 19:10:26 +08:00
config.add_section(section)
config.set(section,option,value)
config.write(open(file,"w"))
def signal_handler_int(signal,frame):
# type: (int, object) -> None
logging.warning("SIGINT received, will stop")
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
os._exit(1)
def signal_handler_usr1(signal,frame):
# type: (int, object) -> None
logging.warning("SIGUSR1 received, will stop")
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
os._exit(1)
def signal_handler_term(signal,frame):
# type: (int, object) -> None
logging.warning("SIGTERM received, will stop")
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
os._exit(1)
# messages to be logged only once
logged_msgs = set() # type: AbstractSet[str]
NEVER_PIN = -32768
2023-05-29 14:47:39 +08:00
2022-11-03 19:10:26 +08:00
class InhibitShutdownLock():
def __init__(self):
self.inhibit_lock = None
#安装时禁止关机 进行加锁
def lock(self, caller='Kylin System Updater'):
"""
Send a dbus signal to logind to not suspend the system, it will be
released when the return value drops out of scope
"""
try:
from gi.repository import Gio, GLib
connection = Gio.bus_get_sync(Gio.BusType.SYSTEM)
var, fdlist = connection.call_with_unix_fd_list_sync(
'org.freedesktop.login1', '/org/freedesktop/login1',
'org.freedesktop.login1.Manager', 'Inhibit',
GLib.Variant('(ssss)',
('shutdown',
caller, 'Installing Packages',
'block')),
None, 0, -1, None, None)
self.inhibit_lock = Gio.UnixInputStream(fd=fdlist.steal_fds()[var[0]])
logging.info("Shutdown Has been locked...")
except Exception as e:
logging.error(e)
#解锁禁止关机
def unlock(self):
try:
if self.inhibit_lock != None:
self.inhibit_lock.close()
self.inhibit_lock == None
logging.info("Shutdown Has been unlocked...")
else:
logging.info("Not locked and Quitting ...")
except Exception as e:
logging.error("unlock failed." + str(e))
class LoggingDateTime:
"""The date/time representation for the dpkg log file timestamps"""
LOG_DATE_TIME_FMT = "%Y-%m-%d %H:%M:%S"
@classmethod
def as_string(cls):
# type: () -> str
"""Return the current date and time as LOG_DATE_TIME_FMT string"""
return datetime.datetime.now().strftime(cls.LOG_DATE_TIME_FMT)
@classmethod
def from_string(cls, logstr):
# type: (str) -> datetime.datetime
"""Take a LOG_DATE_TIME_FMT string and return datetime object"""
return datetime.datetime.strptime(logstr, cls.LOG_DATE_TIME_FMT)
class UnknownMatcherError(ValueError):
pass
class NoAllowedOriginError(ValueError):
pass
class FILE_LOCK(object):
def __init__(self,name):
self.fobj = open(name,'w')
self.fd = self.fobj.fileno()
def get_lock(self):
try:
fcntl.flock(self.fd,fcntl.LOCK_EX|fcntl.LOCK_NB)
return True
except:
return False
def unlock(self):
self.fobj.close()
class ConfigFileManager:
def __init__(self,rootdir):
self.filenamelist = []
if not os.path.exists(rootdir):
os.mkdirs(rootdir)
self.rootdir = rootdir
def SetRootDir(self,rootdir):
if not os.path.exists(rootdir):
os.mkdirs(rootdir)
self.rootdir=rootdir
def AddFileName(self,filename):
self.filenamelist.append(filename)
file = os.path.join(self.rootdir,filename)
if not os.path.exists(file):
with open(file,'w+') as f:
f.close()
def RemoveFileName(self,filename):
file = os.path.join(self.rootdir,filename)
if os.path.exists(file):
os.remove(file)
if filename in self.filenamelist:
self.filenamelist.remove(filename)
def CheckFileNameExistence(self,filename):
if filename in self.filenamelist:
return True
else:
return False
def WriteListToFile(self,list,filename):
file = os.path.join(self.rootdir,filename)
if os.path.exists(file):
with open(file,'w+') as f:
f.write(" ".join(list))
return 0
else:
return 1
def ReadListFromFile(self,file,section,option):
config = configparser.ConfigParser(allow_no_value=True)
if os.path.exists(file):
config.read(file)
try:
str=config[section][option]
list=str.strip().split(",")
return list
except Exception as e:
logging.error(e)
return False
else:
logging.error("no config file")
return True
def not_empty(s):
return s and s.strip()
class OriginProperty():
def __init__(self):
# 包含了本地所有源 http & ftp
self.local_sourcelist = {"http":[],"ftp":[]}
# 经过解析后的本地源,获取所有的分发属性
self.local_origin = {"http":[],"ftp":[]}
# 允许的源列表
self.allow_sources = []
# 允许的源+属性
self.allow_origin = {"http":[],"ftp":[]}
# 加载本地所有源
self.init_local_origin()
# 进行属性解析
self.analytic_properties(self.local_sourcelist)
def init_local_origin(self):
http_origin = {}
ftp_orgin = {}
#apt policy
sh_retval = os.popen("apt-cache policy").read().split("\n")
# policy = [ rv for rv in sh_retval if "http" in rv or "ftp" in rv or "release" in rv or "origin" in rv]
for rv in sh_retval:
if "http" in rv:
http_origin['sources'] = rv
http_origin['release'] = sh_retval[sh_retval.index(rv) + RELEASEOFFSET]
http_origin['origin'] = sh_retval[sh_retval.index(rv) + ORIGINOFFSET]
self.local_sourcelist['http'].append(http_origin.copy())
elif "ftp" in rv:
ftp_orgin['sources'] = rv
ftp_orgin['release'] = sh_retval[sh_retval.index(rv) + RELEASEOFFSET]
ftp_orgin['origin'] = sh_retval[sh_retval.index(rv) + ORIGINOFFSET]
self.local_sourcelist['ftp'].append(ftp_orgin.copy())
def merge_origin(self, source_type, source_origin):
is_append = True
if source_type == HTTPTYPE:
if self.local_origin['http']:
for lo in self.local_origin['http']:
if lo['origin_source'] == source_origin['origin_source'] and lo['dist'] == source_origin['dist']:
lo['component'] = list(set(lo['component']).union(set(source_origin['component'])))
is_append = False
if is_append:
self.local_origin['http'].append(source_origin.copy())
else:
self.local_origin['http'].append(source_origin.copy())
elif source_type == FTPTYPE:
if self.local_origin['ftp']:
for lo in self.local_origin['ftp']:
if lo['origin_source'] == source_origin['origin_source'] and lo['dist'] == source_origin['dist']:
lo['component'] = list(set(lo['component']).union(set(source_origin['component'])))
is_append = False
if is_append:
self.local_origin['ftp'].append(source_origin.copy())
else:
self.local_origin['ftp'].append(source_origin.copy())
def analytic_properties(self, local_sourcelist):
http_origin = {"component":[],"release":{}}
ftp_orgin = {"component":[],"release":{}}
dist_list = []
# 经过解析后的本地源,获取所有的分发属性
for ls in local_sourcelist['http']:
for item in filter(not_empty, ls['sources'].split(' ')):
if item.isdigit():
http_origin['policy_priority'] = item
elif "http" in item:
http_origin['origin_source'] = item
elif "/" in item:
dist_list = item.split("/")
dist_list.pop()
http_origin['dist'] = "/".join(dist_list)
http_origin['component'].append(item.split("/")[1])
elif item not in ARCHITECTUREMAP and item != "Packages":
http_origin['component'].append(item)
release_list = ls['release'].split(',')
release_list = [ rl.strip() for rl in release_list ]
if "release" in release_list[0]:
release_list[0] = release_list[0].lstrip("release").strip()
for rl in release_list:
if "=" in rl:
self.generate_dict(http_origin['release'], rl)
for item in filter(not_empty, ls['origin'].split(' ')):
if "origin" not in ls['origin']:
break
elif "origin" != item:
http_origin['origin'] = item
self.merge_origin(HTTPTYPE, http_origin)
http_origin = {"component":[],"release":{}}
for ls in local_sourcelist['ftp']:
for item in filter(not_empty, ls['sources'].split(' ')):
if item.isdigit():
ftp_orgin['policy_priority'] = item
elif "ftp" in item:
ftp_orgin['origin_source'] = item
elif "/" in item:
ftp_orgin['dist'] = item.split("/")[0]
ftp_orgin['component'].append(item.split("/")[1])
elif item not in ARCHITECTUREMAP and item != "Packages":
ftp_orgin['component'].append(item)
release_list = ls['release'].split(',')
if "release " in release_list[0]:
release_list[0] = release_list[0].lstrip("release ")
for rl in release_list:
if "=" in rl:
self.generate_dict(ftp_orgin['release'], rl)
for item in filter(not_empty, ls['origin'].split(' ')):
if "origin" not in ls['origin']:
break
elif "origin" != item:
ftp_orgin['origin'] = item
self.merge_origin(FTPTYPE, ftp_orgin)
ftp_orgin = {"component":[],"release":{}}
def generate_dict(self, dict, item):
item = item.strip()
if item == "":
logging.warning("empty match string matches nothing")
return False
(what, value) = [ s for s in item.split("=")]
if what in ('o', 'origin'):
dict['origin'] = value
elif what in ("l", "label"):
dict['label'] = value
elif what in ("a", "suite", "archive"):
dict['archive'] = value
elif what in ("c", "component"):
dict['component'] = value
elif what in ("site",):
dict['site'] = value
elif what in ("n", "codename",):
dict['codename'] = value
else:
dict[what] = value
# raise UnknownMatcherError(
# "Unknown whitelist entry for matcher %s (value %s)" % (
# what, value))
def get_allowed_sources(self):
# 源地址,在本地源列表中查找. 源服务器下发source.list为允许的源, 本模块屏蔽了sources.list.d下的源
# 获取允许的源
try:
old_sources_list = apt_pkg.config.find("Dir::Etc::sourcelist")
old_sources_list_d = apt_pkg.config.find("Dir::Etc::sourceparts")
old_cleanup = apt_pkg.config.find("APT::List-Cleanup")
apt_pkg.config.set("Dir::Etc::sourcelist",
os.path.abspath(SOURCESLIST))
apt_pkg.config.set("Dir::Etc::sourceparts", "xxx")
apt_pkg.config.set("APT::List-Cleanup", "0")
slist = apt_pkg.SourceList()
slist.read_main_list()
self.allow_sources = slist.list
except Exception as e:
logging.error(str(e))
finally:
apt_pkg.config.set("Dir::Etc::sourcelist",
old_sources_list)
apt_pkg.config.set("Dir::Etc::sourceparts",
old_sources_list_d)
apt_pkg.config.set("APT::List-Cleanup",
old_cleanup)
def get_allowed_origin(self):
# 获取允许的源
# 生成源与属性
self.local_origin
self.allow_sources
self.allow_origin
try:
for item in self.allow_sources:
for lo in self.local_origin['http']:
if item.uri.strip('/') == lo['origin_source'].strip('/') and item.dist == lo['dist']:
self.allow_origin['http'].append(lo)
for lo in self.local_origin['ftp']:
if item.uri.strip('/') == lo['origin_source'].strip('/') and item.dist == lo['dist']:
self.allow_origin['ftp'].append(lo)
except Exception as e:
logging.error(str(e))
def deleteDuplicatedElementFromList(list):
resultList = []
for item in list:
if not item in resultList:
resultList.append(item)
return resultList
class UnattendUpgradeFilter():
def __init__(self) -> None:
pass
def GetAllowOrigins(self):
# 获取源属性
self.origin_property = OriginProperty()
self.origin_property.get_allowed_sources()
self.origin_property.get_allowed_origin()
self.allowed_origins = get_allowed_origins(self.origin_property.allow_origin)
self.allowed_origins = deleteDuplicatedElementFromList(self.allowed_origins)
# logging.info(_("Allowed origins: %s"),
# self.allowed_origins)
return self.allowed_origins
class AcquireStatistics:
def __init__(self,fetcher) -> None:
self.fetcher = fetcher
self.local_pkg_amount = 0
self.remote_pkg_amount = 0
self.incomplete_pkg_amount = 0
self.local_pkg_paths = []
def GetAquireStatisticsOfPkgs(self):
for item in self.fetcher.items:
self.local_pkg_paths.append(item.destfile)
if not item.complete:
self.incomplete_pkg_amount+=1
if item.local:
self.local_pkg_amount+=1
else:
self.remote_pkg_amount+=1
def ResetFetcher(self,fetcher):
self.fetcher=fetcher
self.local_pkg_paths=[]
self.local_pkg_amount = 0
self.remote_pkg_amount = 0
self.incomplete_pkg_amount = 0
class KylinSystemUpdater:
def __init__(self) -> None:
DBusGMainLoop(set_as_default=True)
self.loop = GLib.MainLoop()
self.system_bus = dbus.SystemBus()
self.update_proxy = self.system_bus.get_object('com.kylin.systemupgrade','/com/kylin/systemupgrade')
self.data_collect_proxy = self.system_bus.get_object('com.kylin.systemupgrade','/com/kylin/systemupgrade/utils')
self.data_collect_interface = dbus.Interface(self.data_collect_proxy,dbus_interface='com.kylin.systemupgrade.interface')
self.update_interface = dbus.Interface(self.update_proxy,dbus_interface='com.kylin.systemupgrade.interface')
self.success = False
self.whitelist_with_candidate_version = []
self.update_group = []
# self.group_rec=[]
# self.single_rec=[]
self.errdict = {}
def AddPackageInstallErrorRecord(self,pkg,errmsg):
self.errdict.update({pkg:errmsg})
def DumpInstallErrorRecord(self):
errlist = []
for key in self.errdict.keys():
errlist.append("%s,%s"%(key,self.errdict[key]))
return errlist
def DataBackendCollect(self,updateinfo,json_file):
self.data_collect_interface.DataBackendCollect(updateinfo,json_file)
def InsertUpgradeHistory(self,history):
return self.data_collect_interface.InsertUpgradeHistory(history)
def GetConfigValue(self,section,value):
return self.update_interface.GetConfigValue(section,value)
def SetConfigValue(self,section,option,value):
return self.update_interface.SetConfigValue(section,option,value)
def UpdateDetect(self):
ret = self.update_interface.UpdateDetect()
return ret
def GetUnattendedUpgradeValue(self):
ret = self.update_interface.UnattendedUpgradeValue('get','')
return ret[0]
def CheckRebootRequired(self,msg):
ret = self.update_interface.CheckRebootRequired(msg)
def GetDatabaseInfo(self,section,value):
return self.update_interface.GetSetDatabaseInfo(1,section,value)
def ConnectToSignals(self):
def update_detect_finished_handler(success,updatelist,error_status,error_cause):
if success:
logging.info("update detect success,quiting main loop")
self.update_group = updatelist
try:
for update_group in self.update_group:
json_file_path = ("/var/lib/kylin-system-updater/json/%s.json"%(update_group))
if os.path.exists(json_file_path):
with open(json_file_path,'r') as f:
data = json.load(f)
# package_name = data['package']
# upgrade_list = []
# install_list = []
'''
gp = PackageGroup(package_name)
for key in data['upgrade_list'].keys():
PackageGroup.AddPackageToUpgradeList(Package(key,data['upgrade_list'][key]['new_version']))
# upgrade_list.append((key,data['upgrade_list'][key]['new_version']))
for key in data['install_list'].keys():
PackageGroup.AddPackageToInstallList(Package(key,data['install_list'][key]['new_version']))
# install_list.append((key,data['install_list'][key]['new_version']))
self.group_list.append(gp)
'''
for key in data['upgrade_list'].keys():
if key in ["total_download_size","total_install_size"]:
pass
else:
self.whitelist_with_candidate_version.append((key,data['upgrade_list'][key]['new_version']))
for key in data['install_list'].keys():
if key in ["total_download_size","total_install_size"]:
pass
else:
2023-05-29 14:47:39 +08:00
self.whitelist_with_candidate_version.append((key,data['install_list'][key]['new_version']))
2022-11-03 19:10:26 +08:00
'''
if os.path.exists(UNATTENDED_UPGRADE_PKG_LIST_FILE_PATH):
with open(UNATTENDED_UPGRADE_PKG_LIST_FILE_PATH, "r") as f:
row_data = json.load(f)
for key in row_data['upgrade_list'].keys():
self.whitelist_with_candidate_version.append((key,row_data['upgrade_list'][key]['new_version']))
for key in row_data['install_list'].keys():
self.whitelist_with_candidate_version.append((key,row_data['install_list'][key]['new_version']))
for key in row_data['group_json'].keys():
self.group_rec.append((key,row_data['group_json'][key]['new_version'],row_data['group_json'][key]['changelog']))
for key in row_data['single_json'].keys():
self.single_rec.append((key,row_data['single_json'][key]['new_version'],row_data['single_json'][key]['changelog']))
'''
except Exception as e:
logging.error(e)
self.loop.quit()
else:
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
logging.error("update detect failed:%s,%s"%(error_status,error_cause))
os._exit(0)
return success
self.update_proxy.connect_to_signal('UpdateDetectFinished',update_detect_finished_handler,
dbus_interface='com.kylin.systemupgrade.interface')
return
def RunMainloop(self):
logging.info("update manager:running mainloop")
self.loop.run()
def QuitMainloop(self):
logging.info("update manager:quiting mainloop")
self.loop.quit()
class LoginManager:
def __init__(self) -> None:
DBusGMainLoop(set_as_default=True)
self.loop = GLib.MainLoop()
self.system_bus = dbus.SystemBus()
self.login_proxy = self.system_bus.get_object('org.freedesktop.login1', '/org/freedesktop/login1')
self.login_interface = dbus.Interface(self.login_proxy,dbus_interface='org.freedesktop.login1.Manager')
def SetExtraInhibitShutdownDelaySec(self,time):
try:
self.login_interface.SetExtraInhibitShutdownDelaySec(time)
except Exception as e:
logging.error(e)
class KylinBackupManager:
def __init__(self) -> None:
DBusGMainLoop(set_as_default=True)
self.loop = GLib.MainLoop()
self.system_bus = dbus.SystemBus()
self.backup_proxy = self.system_bus.get_object('com.kylin.backup','/')
self.backup_interface = dbus.Interface(self.backup_proxy,dbus_interface='com.kylin.backup.manager')
self.success = False
def mount_backup_partition(self):
return self.backup_interface.Mount_backup_partition()
def get_backup_state(self):
return self.backup_interface.getBackupState()
def get_backup_comment_for_systemupdate(self):
return self.backup_interface.getBackupCommentForSystemUpdate()
def auto_backup_for_system_update_noreturn(self,timeStamp,create_note,inc_note,userName,uid):
self.backup_interface.autoBackUpForSystemUpdate_noreturn(timeStamp,create_note,inc_note,userName,uid)
return
def ConnectToSignals(self):
def backup_start_handler(result):
logging.debug("backup start result:%d"%result)
if result == 31 or result == 30:
logging.debug("backup start success")
else:
logging.error("backup start failed")
UpdateInfos = {}
UpdateInfos.update({"packageName":"kylin-unattended-upgrade"})
UpdateInfos.update({"source":"kylin unattended upgrade"})
UpdateInfos.update({"status":0})
UpdateInfos.update({"errorCode":"backup start failed"})
json_file = json.dumps(UpdateInfos.copy())
UpdateInfos.clear()
kylin_system_updater.DataBackendCollect("UpdateInfos",json_file)
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
os._exit(1)
def backup_result_handler(result):
if result:
logging.debug("backup success,quiting main loop")
self.loop.quit()
else:
logging.error("backup failed")
UpdateInfos = {}
UpdateInfos.update({"packageName":"kylin-unattended-upgrade"})
UpdateInfos.update({"source":"kylin unattended upgrade"})
UpdateInfos.update({"status":0})
UpdateInfos.update({"errorCode":"backup failed"})
json_file = json.dumps(UpdateInfos.copy())
UpdateInfos.clear()
kylin_system_updater.DataBackendCollect("UpdateInfos",json_file)
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
subprocess.Popen('dbus-send --system --type=signal / com.kylin.install.notification.BackupFailure',shell=True)
os._exit(1)
def send_rate_handler(sta,pro):
logging.debug(("receive backup_rate_signal_handler 状态:%d 进度:%d"),sta,pro)
if pro == 100:
logging.debug("backup finished, quiting mainloop")
self.loop.quit()
self.backup_proxy.connect_to_signal('sendStartBackupResult',backup_start_handler,
dbus_interface='com.kylin.backup.manager')
self.backup_proxy.connect_to_signal('sendBackupResult',backup_result_handler,
dbus_interface='com.kylin.backup.manager')
self.backup_proxy.connect_to_signal('sendRate',send_rate_handler,
dbus_interface='com.kylin.backup.manager')
return
def RunMainloop(self):
logging.info("backup manager:running mainloop")
self.loop.run()
def QuitMainloop(self):
logging.info("backup manager:quiting mainloop")
self.loop.quit()
def ReadValueFromFile(file,section,option):
config=configparser.ConfigParser(allow_no_value=True)
2023-05-29 14:47:39 +08:00
config.optionxform = str
2022-11-03 19:10:26 +08:00
try:
config.read(file)
value = config[section][option]
except Exception as e:
return ''
return value
def Backup():
# do backup
kylin_backup_manager = KylinBackupManager()
backup_partition_status = kylin_backup_manager.mount_backup_partition()
2023-05-29 14:47:39 +08:00
logging.info("backup partition status:%d"%backup_partition_status)
2022-11-03 19:10:26 +08:00
if backup_partition_status not in [0,5]:
logging.error("backup partition error:%d"%backup_partition_status)
2023-05-29 14:47:39 +08:00
# return UnattendedUpgradesResult(False,"backup partition error")
return False
2022-11-03 19:10:26 +08:00
status_code,result = kylin_backup_manager.get_backup_state()
2023-05-29 14:47:39 +08:00
2022-11-03 19:10:26 +08:00
if result == 0 and status_code == 99:
pass
else:
logging.error("backup state error:",status_code,result)
2023-05-29 14:47:39 +08:00
# return UnattendedUpgradesResult(False,"backup state error")
return False
2022-11-03 19:10:26 +08:00
#node_name,node_status = kylin_backup_manager.get_backup_comment_for_systemupdate()
ts = get_timestamp()
kylin_backup_manager.ConnectToSignals()
create_note = "系统升级新建备份"
inc_note="系统升级增量备份"
userName="root"
uid=os.getuid()
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","backup")
kylin_backup_manager.auto_backup_for_system_update_noreturn(ts,create_note,inc_note,userName,uid)
kylin_backup_manager.RunMainloop()
2023-05-29 14:47:39 +08:00
return True
2022-11-03 19:10:26 +08:00
'''
if node_name != timeStamp:
logging.info("need backup")
#do actual backup
kylin_backup_manager.ConnectToSignals()
create_note = "系统升级新建备份"
inc_note="系统升级增量备份"
userName="root"
uid=os.getuid()
kylin_backup_manager.auto_backup_for_system_update_noreturn(timeStamp,create_note,inc_note,userName,uid)
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","backup")
kylin_backup_manager.RunMainloop()
'''
PkgPin = namedtuple('PkgPin', ['pkg', 'priority'])
PkgFilePin = namedtuple('PkgFilePin', ['id', 'priority'])
class UnattendedUpgradesCache(apt.Cache):
def __init__(self, rootdir, whitelist_with_version,blacklist):
2023-05-29 14:47:39 +08:00
# self._cached_candidate_pkgnames = set() # type: Set[str]
unattended_upgrade_filter = UnattendUpgradeFilter()
self.allowed_origins = unattended_upgrade_filter.GetAllowOrigins()
2022-11-03 19:10:26 +08:00
logging.info(_("Allowed origins are: %s"),
", ".join(self.allowed_origins))
self.blacklist = blacklist
'''
apt_pkg.config.value_list(
"Unattended-Upgrade::Package-Blacklist")
'''
# logging.info(_("Initial blacklist: %s"), " ".join(self.blacklist))
# logging.info("pkg list with version:",whitelist_with_version)
self.whitelist_with_version = whitelist_with_version
self.whitelist = []
self.get_white_list()
# self.whitelist_with_version = []
# self.get_white_list_with_version()
# self.whitelist = apt_pkg.config.value_list(
# "Unattended-Upgrade::Package-Whitelist")
self.strict_whitelist = False
'''
apt_pkg.config.find_b(
"Unattended-Upgrade::Package-Whitelist-Strict", False)
'''
# logging.info(_("Initial whitelist (%s): %s"),
# "strict" if self.strict_whitelist else "not strict",
# " ".join(self.whitelist))
apt.Cache.__init__(self, rootdir=rootdir)
# pre-heat lazy-loaded modules to avoid crash on python upgrade
# datetime.datetime.strptime("", "")
# generate versioned_kernel_pkgs_regexp for later use
# self.versioned_kernel_pkgs_regexp = versioned_kernel_pkgs_regexp()
# self.running_kernel_pkgs_regexp = running_kernel_pkgs_regexp()
'''
if self.versioned_kernel_pkgs_regexp:
logging.debug("Using %s regexp to find kernel packages",
self.versioned_kernel_pkgs_regexp.pattern)
else:
logging.debug("APT::VersionedKernelPackages is not set")
if self.running_kernel_pkgs_regexp:
logging.debug("Using %s regexp to find running kernel packages",
self.running_kernel_pkgs_regexp.pattern)
'''
def get_white_list(self):
for name_with_version in self.whitelist_with_version:
self.whitelist.append(name_with_version[0])
def find_better_version(self, pkg):
# type (apt.Package) -> apt.package.Version
if pkg.is_installed and pkg.versions[0] > pkg.installed:
logging.debug(
"Package %s has a higher version available, checking if it is "
"from an allowed origin and is not pinned down.", pkg.name)
for v in pkg.versions:
if pkg.installed < v \
and pkg.installed.policy_priority <= v.policy_priority \
and is_in_allowed_origin(v, self.allowed_origins):
return v
return None
def find_kept_packages(self, dry_run):
# type: (bool) -> KeptPkgs
""" Find kept packages not collected already """
kept_packages = KeptPkgs(set)
if dry_run:
logging.info(_("The list of kept packages can't be calculated in "
"dry-run mode."))
return kept_packages
for pkg in self:
better_version = self.find_better_version(pkg)
if better_version:
logging.info(self.kept_package_excuse(pkg._pkg,
self.blacklist,
self.whitelist,
self.strict_whitelist,
better_version))
kept_packages.add(pkg, better_version, self)
return kept_packages
def kept_package_excuse(self, pkg, # apt.Package
blacklist, # type: List[str]
whitelist, # type: List[str]
strict_whitelist, # type: bool
better_version # type: apt.package.Version
):
# type: (...) -> str
""" Log the excuse the package is kept back for """
if pkg.selected_state == apt_pkg.SELSTATE_HOLD:
return _("Package %s is marked to be held back.") % pkg.name
elif is_pkgname_in_blacklist(pkg.name, blacklist):
return _("Package %s is blacklisted.") % pkg.name
elif whitelist:
if strict_whitelist:
if not is_pkgname_in_whitelist(pkg.name, whitelist):
return (_(
"Package %s is not on the strict whitelist.")
% pkg.name)
else:
if not is_pkgname_in_whitelist(pkg.name, whitelist):
return (_(
"Package %s is not whitelisted and it is not a"
" dependency of a whitelisted package.")
% pkg.name)
elif not any([o.trusted for o in better_version.origins]):
return _("Package %s's origin is not trusted.") % pkg.name
return (_("Package %s is kept back because a related package"
" is kept back or due to local apt_preferences(5).")
% pkg.name)
def pinning_from_regex_list(self, regexps, priority):
# type: (List[str], int) -> List[PkgPin]
""" Represent blacklist as Python regexps as list of pkg pinnings"""
pins = [] # type: List[PkgPin]
for regex in regexps:
if python_regex_is_posix(regex):
pins.append(PkgPin('/^' + regex + '/', priority))
else:
# Python regex is not also an equivalent POSIX regexp.
# This is expected to be rare. Go through all the package names
# and pin all the matching ones.
for pkg in self._cache.packages:
if re.match(regex, pkg.name):
pins.append(PkgPin(pkg.name, priority))
return pins
def pinning_from_config(self):
# type: () -> List[Union[PkgPin, PkgFilePin]]
""" Represent configuration as list of pinnings
Assumes self.allowed_origins to be already set.
"""
pins = [] # type: List[Union[PkgPin, PkgFilePin]]
# mark not allowed origins with 'never' pin
for pkg_file in self._cache.file_list: # type: ignore
if not is_allowed_origin(pkg_file, self.allowed_origins):
# Set the magic 'never' pin on not allowed origins
logging.debug("Marking not allowed %s with %s pin", pkg_file,
NEVER_PIN)
pins.append(PkgFilePin(pkg_file.id, NEVER_PIN))
# TODO(rbalint) pin not trusted origins with NEVER_PIN
elif self.strict_whitelist:
# set even allowed origins to -1 and set individual package
# priorities up later
pins.append(PkgFilePin(pkg_file.id, -1))
# mark blacklisted packages with 'never' pin
pins.extend(self.pinning_from_regex_list( # type: ignore
self.blacklist, NEVER_PIN))
# set priority of whitelisted packages to high
pins.extend(self.pinning_from_regex_list( # type: ignore
self.whitelist, 900))
if self.strict_whitelist:
policy = self._depcache.policy
# pin down already pinned packages which are not on the whitelist
# to not install locally pinned up packages accidentally
for pkg in self._cache.packages:
if pkg.has_versions:
pkg_ver = policy.get_candidate_ver(pkg) # type: ignore
if pkg_ver is not None \
and policy.get_priority(pkg_ver) > -1:
# the pin is higher than set for allowed origins, thus
# there is extra pinning configuration
if not is_pkgname_in_whitelist(pkg.name,
self.whitelist):
pins.append(PkgPin(pkg.name, NEVER_PIN))
return pins
def apply_pinning(self, pins):
# type: (List[Union[PkgPin, PkgFilePin]]) -> None
""" Apply the list of pins """
policy = self._depcache.policy
pkg_files = {f.id: f for f in self._cache.file_list} # type: ignore
for pin in pins:
logging.debug("Applying pinning: %s" % str(pin))
if isinstance(pin, PkgPin):
policy.create_pin('Version', pin.pkg, '*', # type: ignore
pin.priority)
elif isinstance(pin, PkgFilePin):
logging.debug("Applying pin %s to package_file: %s"
% (pin.priority, str(pkg_files[pin.id])))
policy.set_priority(pkg_files[pin.id], # type: ignore
pin.priority)
def open(self, progress=None):
apt.Cache.open(self, progress)
# apply pinning generated from unattended-upgrades configuration
# self.apply_pinning(self.pinning_from_config())
def adjust_candidate_with_version(self,pkg,version):
if pkg.candidate.version == version:
return True
2022-11-03 19:10:26 +08:00
for v in pkg.versions:
if v.version == version:
2023-05-29 14:47:39 +08:00
logging.debug("pkg %s adjusting candidate version: %s" %(pkg.name,v))
if is_in_allowed_origin(v,self.allowed_origins):
pkg.candidate = v
return True
2022-11-03 19:10:26 +08:00
return False
def adjust_candidate(self, pkg):
# type: (apt.Package) -> bool
""" Adjust origin and return True if adjustment took place
This is needed when e.g. a package is available in
the security pocket but there is also a package in the
updates pocket with a higher version number
"""
try:
new_cand = ver_in_allowed_origin(pkg, self.allowed_origins)
# Only adjust to lower versions to avoid flipping back and forth
# and to avoid picking a newer version, not selected by apt.
# This helps avoiding upgrades to experimental's packages.
if pkg.candidate is not None: #and new_cand < pkg.candidate:
logging.debug("adjusting candidate version: %s" % new_cand)
pkg.candidate = new_cand
return True
else:
return False
except NoAllowedOriginError:
return False
def call_checked(self, function, pkg, **kwargs):
""" Call function and check if package is in the wanted state
"""
try:
function(pkg, **kwargs)
except SystemError as e:
logging.warning(
_("package %s upgradable but fails to "
"be marked for upgrade (%s)"), pkg.name, e)
self.clear()
return False
return ((function == apt.package.Package.mark_upgrade
or function == apt.package.Package.mark_install)
and (pkg.marked_upgrade or pkg.marked_install))
def call_adjusted(self, function, pkg, **kwargs):
"""Call function, but with adjusting
packages in changes to come from allowed origins
Note that as a side effect more package's candidate can be
adjusted than only the one's in the final changes set.
"""
new_pkgs_to_adjust = [] # List[str]
# if not is_pkg_change_allowed(pkg, self.blacklist, self.whitelist,
# self.strict_whitelist):
# return
# if function == apt.package.Package.mark_upgrade \
# and not pkg.is_upgradable:
# if not apt_pkg.config.find_b("Unattended-Upgrade::Allow-downgrade",
# False):
# return
# else:
# function = apt.package.Package.mark_install
marking_succeeded = self.call_checked(function, pkg, **kwargs)
if not marking_succeeded:
logging.error("%s mark failed"%pkg.name)
return marking_succeeded
'''
if (not marking_succeeded
or not check_changes_for_sanity(self, desired_pkg=pkg)) \
and allow_marking_fallback():
logging.debug("falling back to adjusting %s's dependencies"
% pkg.name)
self.clear()
# adjust candidates in advance if needed
for pkg_name in self._cached_candidate_pkgnames:
self.adjust_candidate(self[pkg_name])
self.adjust_candidate(pkg)
for dep in transitive_dependencies(pkg, self, level=1):
try:
self.adjust_candidate(self[dep])
except KeyError:
pass
self.call_checked(function, pkg, **kwargs)
for marked_pkg in self.get_changes():
if marked_pkg.name in self._cached_candidate_pkgnames:
continue
if not is_in_allowed_origin(marked_pkg.candidate,
self.allowed_origins):
try:
ver_in_allowed_origin(marked_pkg,
self.allowed_origins)
# important! this avoids downgrades below
if pkg.is_installed and not pkg.is_upgradable and \
apt_pkg.config.find_b("Unattended-Upgrade::Allow-"
"downgrade", False):
continue
new_pkgs_to_adjust.append(marked_pkg)
except NoAllowedOriginError:
pass
if new_pkgs_to_adjust:
new_pkg_adjusted = False
for pkg_to_adjust in new_pkgs_to_adjust:
if self.adjust_candidate(pkg_to_adjust):
self._cached_candidate_pkgnames.add(pkg_to_adjust.name)
new_pkg_adjusted = True
if new_pkg_adjusted:
self.call_adjusted(function, pkg, **kwargs)
'''
def mark_upgrade_adjusted(self, pkg, **kwargs):
self.call_adjusted(apt.package.Package.mark_upgrade, pkg, **kwargs)
def mark_install_adjusted(self, pkg, **kwargs):
self.call_adjusted(apt.package.Package.mark_install, pkg, **kwargs)
class LogInstallProgress(apt.progress.base.InstallProgress):
""" Install progress that writes to self.progress_log
(/var/run/unattended-upgrades.progress by default)
"""
def __init__(self, logfile_dpkg, verbose=False,
progress_log=PROGRESS_LOG):
# type: (str, bool, str) -> None
apt.progress.base.InstallProgress.__init__(self)
self.logfile_dpkg = logfile_dpkg
self.progress_log = progress_log
# self.progress_log = os.path.join(apt_pkg.config.find_dir("Dir"),
# progress_log)
self.verbose = verbose
self.output_logfd = None # type: int
self.start_time = None
self.max_delay = 3600
def status_change(self, pkg, percent, status):
'''
if self.start_time is None:
self.start_time = time.time()
else:
if (time.time() - self.start_time) > self.max_delay:
logging.warning(_(
"Giving up on lockfile after %s minutes of delay"),
self.max_delay / 60)
sys.exit(1)
'''
# type: (str, float, str) -> None
with open(self.progress_log, "w") as f:
per=str(int(float(percent)))
f.write("%s"%per)
#f.write(_("Progress: %s %% (%s)") % (percent, pkg))
'''
if re.search("Installed",status):
UpdateInfos = {}
UpdateInfos.update({"packageName":"kylin-unattended-upgrade"})
UpdateInfos.update({"appname":str(pkg)})
UpdateInfos.update({"source":"kylin unattended upgrade"})
UpdateInfos.update({"status":1})
json_file = json.dumps(UpdateInfos.copy())
UpdateInfos.clear()
kylin_system_updater.DataBackendCollect("UpdateInfos",json_file)
'''
logging.info("%s:%s:%s"%(pkg,percent,status))
def error(self,pkg, errormsg):
'''
for key in package_deps.keys():
if str(pkg) in package_deps[key]:
group_name = kylin_system_updater.FindPackageGroup(key)
UpdateInfos = {}
UpdateInfos.update({"packageName":"kylin-unattended-upgrade"})
UpdateInfos.update({"appname":group_name})
UpdateInfos.update({"source":"kylin unattended upgrade"})
UpdateInfos.update({"status":0})
UpdateInfos.update({"errorCode":str(errormsg)})
json_file = json.dumps(UpdateInfos.copy())
UpdateInfos.clear()
kylin_system_updater.DataBackendCollect("UpdateInfos",json_file)
kylin_system_updater.AddPackageInstallErrorRecord(str(pkg),str(errormsg))
'''
logging.error("%s:%s"%(pkg,errormsg))
kylin_system_updater.AddPackageInstallErrorRecord(str(pkg),str(errormsg))
def _fixup_fds(self):
# () -> None
required_fds = [0, 1, 2, # stdin, stdout, stderr
self.writefd,
self.write_stream.fileno(),
self.statusfd,
self.status_stream.fileno()
]
# ensure that our required fds close on exec
for fd in required_fds[3:]:
old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)
# close all fds
proc_fd = "/proc/self/fd"
if os.path.exists(proc_fd):
error_count = 0
for fdname in os.listdir(proc_fd):
try:
fd = int(fdname)
except Exception:
print("ERROR: can not get fd for %s" % fdname)
if fd in required_fds:
continue
try:
os.close(fd)
# print("closed: ", fd)
except OSError as e:
# there will be one fd that can not be closed
# as its the fd from pythons internal diropen()
# so its ok to ignore one close error
error_count += 1
if error_count > 1:
print("ERROR: os.close(%s): %s" % (fd, e))
def _redirect_stdin(self):
# type: () -> None
REDIRECT_INPUT = os.devnull
fd = os.open(REDIRECT_INPUT, os.O_RDWR)
os.dup2(fd, 0)
def _redirect_output(self):
# type: () -> None
# do not create log in dry-run mode, just output to stdout/stderr
if not apt_pkg.config.find_b("Debug::pkgDPkgPM", False):
logfd = self._get_logfile_dpkg_fd()
os.dup2(logfd, 1)
os.dup2(logfd, 2)
def _get_logfile_dpkg_fd(self):
# type: () -> int
logfd = os.open(
self.logfile_dpkg, os.O_RDWR | os.O_APPEND | os.O_CREAT, 0o640)
try:
adm_gid = grp.getgrnam("adm").gr_gid
os.fchown(logfd, 0, adm_gid)
except (KeyError, OSError):
pass
return logfd
def update_interface(self):
# type: () -> None
# call super class first
apt.progress.base.InstallProgress.update_interface(self)
self._do_verbose_output_if_needed()
def _do_verbose_output_if_needed(self):
# type: () -> None
# if we are in debug mode, nothing to be more verbose about
if apt_pkg.config.find_b("Debug::pkgDPkgPM", False):
return
# handle verbose
if self.verbose:
if self.output_logfd is None:
self.output_logfd = os.open(self.logfile_dpkg, os.O_RDONLY)
os.lseek(self.output_logfd, 0, os.SEEK_END)
try:
select.select([self.output_logfd], [], [], 0)
# FIXME: this should be OSError, but in py2.7 it is still
# select.error
except select.error as e:
if e.errno != errno.EINTR: # type: ignore
logging.exception("select failed")
# output to stdout in verbose mode only
os.write(1, os.read(self.output_logfd, 1024))
def _log_in_dpkg_log(self, msg):
# type: (str) -> None
logfd = self._get_logfile_dpkg_fd()
os.write(logfd, msg.encode("utf-8"))
os.close(logfd)
def finish_update(self):
# type: () -> None
self._log_in_dpkg_log("Log ended: %s\n\n"
% LoggingDateTime.as_string())
def fork(self):
# type: () -> int
self._log_in_dpkg_log("Log started: %s\n"
% LoggingDateTime.as_string())
pid = os.fork()
if pid == 0:
self._fixup_fds()
self._redirect_stdin()
self._redirect_output()
return pid
class Unlocked:
"""
Context manager for unlocking the apt lock while cache.commit() is run
"""
def __enter__(self):
# type: () -> None
try:
apt_pkg.pkgsystem_unlock_inner()
except Exception:
# earlier python-apt used to leak lock
logging.warning("apt_pkg.pkgsystem_unlock() failed due to not "
"holding the lock but trying to continue")
pass
def __exit__(self, exc_type, exc_value, exc_tb):
# type: (object, object, object) -> None
apt_pkg.pkgsystem_lock_inner()
class KeptPkgs(defaultdict):
"""
Packages to keep by highest allowed pretty-printed origin
"""
def add(self, pkg, # type: apt.Package
version, # type: apt.package.Version
cache # type: UnattendedUpgradesCache
):
# type: (...) -> None
for origin in version.origins:
if is_allowed_origin(origin, cache.allowed_origins):
self[origin.origin + " " + origin.archive].add(pkg.name)
return
class UnattendedUpgradesResult:
"""
Represent the (potentially partial) results of an unattended-upgrades
run
"""
def __init__(self,
success, # type: bool
result_str="", # type: str
pkgs=[], # type: List[str]
pkgs_kept_back=KeptPkgs(set), # type: KeptPkgs
pkgs_removed=[], # type: List[str]
pkgs_kept_installed=[], # type: List[str]
update_stamp=False # type: bool
):
# type: (...) -> None
self.success = success
self.result_str = result_str
self.pkgs = pkgs
self.pkgs_kept_back = pkgs_kept_back
self.pkgs_removed = pkgs_removed
self.pkgs_kept_installed = pkgs_kept_installed
self.update_stamp = update_stamp
def is_dpkg_journal_dirty():
# type: () -> bool
"""
Return True if the dpkg journal is dirty
(similar to debSystem::CheckUpdates)
"""
logging.debug("checking whether dpkg journal is dirty")
d = os.path.join("/var/lib/dpkg/",
#os.path.dirname(apt_pkg.config.find_file("Dir::State::status")),
"updates")
for f in os.listdir(d):
if re.match("[0-9]+", f) or re.match("tmp.i",f):
return True
return False
def get_abnormally_installed_pkg_count():
output = subprocess.check_output('dpkg -l|grep ^i[^i]|wc -l',shell=True)
return output.decode().strip()
def signal_handler(signal, frame):
# type: (int, object) -> None
logging.warning("SIGTERM received, will stop")
global SIGNAL_STOP_REQUEST
SIGNAL_STOP_REQUEST = True
def log_once(msg):
# type: (str) -> None
global logged_msgs
if msg not in logged_msgs:
logging.info(msg)
logged_msgs.add(msg) # type: ignore
def should_stop():
# type: () -> bool
"""
Return True if u-u needs to stop due to signal received or due to the
system started to run on battery.
"""
if SIGNAL_STOP_REQUEST:
logging.warning("SIGNAL received, stopping")
return True
'''
try:
if apt_pkg.config.find_b("Unattended-Upgrade::OnlyOnACPower", True) \
and subprocess.call("on_ac_power") == 1:
logging.warning("System is on battery power, stopping")
return True
except FileNotFoundError:
log_once(
_("Checking if system is running on battery is skipped. Please "
"install powermgmt-base package to check power status and skip "
"installing updates when the system is running on battery."))
if apt_pkg.config.find_b(
"Unattended-Upgrade::Skip-Updates-On-Metered-Connections", True):
try:
if NetworkMonitor.get_network_metered(
NetworkMonitor.get_default()):
logging.warning(_("System is on metered connection, stopping"))
return True
except NameError:
log_once(_("Checking if connection is metered is skipped. Please "
"install python3-gi package to detect metered "
"connections and skip downloading updates."))
'''
return False
def substitute(line):
# type: (str) -> str
""" substitude known mappings and return a new string
Currently supported ${distro-release}
"""
mapping = {"distro_codename": get_distro_codename(),
"distro_id": get_distro_id()}
return string.Template(line).substitute(mapping)
def get_distro_codename():
# type: () -> str
return DISTRO_CODENAME
def get_distro_id():
# type: () -> str
return DISTRO_ID
def allow_marking_fallback():
# type: () -> bool
return apt_pkg.config.find_b(
"Unattended-Upgrade::Allow-APT-Mark-Fallback",
get_distro_codename() != "sid")
def versioned_kernel_pkgs_regexp():
apt_versioned_kernel_pkgs = apt_pkg.config.value_list(
"APT::VersionedKernelPackages")
if apt_versioned_kernel_pkgs:
return re.compile("(" + "|".join(
["^" + p + "-[1-9][0-9]*\\.[0-9]+\\.[0-9]+-[0-9]+(-.+)?$"
for p in apt_versioned_kernel_pkgs]) + ")")
else:
return None
def running_kernel_pkgs_regexp():
apt_versioned_kernel_pkgs = apt_pkg.config.value_list(
"APT::VersionedKernelPackages")
if apt_versioned_kernel_pkgs:
running_kernel_version = subprocess.check_output(
["uname", "-r"], universal_newlines=True).rstrip()
kernel_escaped = re.escape(running_kernel_version)
try:
kernel_noflavor_escaped = re.escape(
re.match("[1-9][0-9]*\\.[0-9]+\\.[0-9]+-[0-9]+",
running_kernel_version)[0])
return re.compile("(" + "|".join(
[("^" + p + "-" + kernel_escaped + "$|^"
+ p + "-" + kernel_noflavor_escaped + "$")
for p in apt_versioned_kernel_pkgs]) + ")")
except TypeError:
# flavor could not be cut from version
return re.compile("(" + "|".join(
[("^" + p + "-" + kernel_escaped + "$")
for p in apt_versioned_kernel_pkgs]) + ")")
else:
return None
def get_allowed_origins_legacy():
# type: () -> List[str]
""" legacy support for old Allowed-Origins var """
allowed_origins = [] # type: List[str]
key = "Unattended-Upgrade::Allowed-Origins"
try:
for s in apt_pkg.config.value_list(key):
# if there is a ":" use that as seperator, else use spaces
if re.findall(r'(?<!\\):', s):
(distro_id, distro_codename) = re.split(r'(?<!\\):', s)
else:
(distro_id, distro_codename) = s.split()
# unescape "\:" back to ":"
distro_id = re.sub(r'\\:', ':', distro_id)
# escape "," (see LP: #824856) - can this be simpler?
distro_id = re.sub(r'([^\\]),', r'\1\\,', distro_id)
distro_codename = re.sub(r'([^\\]),', r'\1\\,', distro_codename)
# convert to new format
allowed_origins.append("o=%s,a=%s" % (substitute(distro_id),
substitute(distro_codename)))
except ValueError:
logging.error(_("Unable to parse %s." % key))
raise
return allowed_origins
2023-05-29 14:47:39 +08:00
def get_allowed_origins(allow_origin):
""" return a list of allowed origins
2022-11-03 19:10:26 +08:00
"""
2023-05-29 14:47:39 +08:00
allowed_origins = []
origin = ''
archive = ''
uri = ''
label = ''
for ao in (allow_origin['http']+allow_origin['ftp']):
if 'origin' in ao['release']:
origin = 'o='+ao['release']['origin']
else:
origin = 'o='
if 'archive' in ao['release']:
archive = 'a='+ao['release']['archive']
else:
archive = 'a='
if 'label' in ao['release']:
label = 'l='+ao['release']['label']
else:
label = 'l='
if 'origin_source' in ao:
uri = 'uri='+ao['origin_source']
else:
uri = 'uri='
allowed_origins.append(origin+","+archive+","+label+","+uri)
2022-11-03 19:10:26 +08:00
return allowed_origins
2023-05-29 14:47:39 +08:00
2022-11-03 19:10:26 +08:00
def match_whitelist_string(whitelist, origin):
# type: (str, Union[apt.package.Origin, apt_pkg.PackageFile]) -> bool
"""
take a whitelist string in the form "origin=Debian,label=Debian-Security"
and match against the given python-apt origin. A empty whitelist string
never matches anything.
"""
whitelist = whitelist.strip()
if whitelist == "":
logging.warning("empty match string matches nothing")
return False
res = True
# make "\," the html quote equivalent
whitelist = whitelist.replace("\\,", "%2C")
for token in whitelist.split(","):
# strip and unquote the "," back
(what, value) = [s.strip().replace("%2C", ",")
for s in token.split("=")]
# logging.debug("matching %s=%s against %s" % (
# what, value, origin))
# support substitution here as well
value = substitute(value)
# first char is apt-cache policy output, send is the name
# in the Release file
if what in ("o", "origin"):
match = fnmatch.fnmatch(origin.origin, value)
elif what in ("l", "label"):
match = fnmatch.fnmatch(origin.label, value)
elif what in ("a", "suite", "archive"):
match = fnmatch.fnmatch(origin.archive, value)
elif what in ("c", "component"):
match = fnmatch.fnmatch(origin.component, value)
elif what in ("site",):
match = fnmatch.fnmatch(origin.site, value)
elif what in ("n", "codename",):
match = fnmatch.fnmatch(origin.codename, value)
elif what in ("uri"):
pass
else:
raise UnknownMatcherError(
"Unknown whitelist entry for matcher %s (token %s)" % (
what, token))
# update res
res = res and match
# logging.debug("matching %s=%s against %s" % (
# what, value, origin))
return res
def python_regex_is_posix(expression):
# type: (str) -> bool
""" Returns if the Python regex is also an equivalent POSIX regex """
return re.match("^[-a-zA-Z0-9\\^\\$\\+\\.:]*$", expression) is not None
def cache_commit(cache, # type: apt.Cache
logfile_dpkg, # type: str
verbose, # type: bool
iprogress=None, # type: apt.progress.base.InstallProgress
):
# type: (...) -> Tuple[bool, Exception]
"""Commit the changes from the given cache to the system"""
error = None
res = False
if iprogress is None:
iprogress = LogInstallProgress(logfile_dpkg, verbose,progress_log=PROGRESS_LOG)
try:
# with Unlocked():
res = cache.commit(fetch_progress=None,install_progress=iprogress,allow_unauthenticated=True)
#cache.open()
except SystemError as e:
error = e
if verbose:
logging.exception("Exception happened during upgrade.")
cache.clear()
return res, error
def upgrade_normal(cache, logfile_dpkg, verbose):
# type: (apt.Cache, str, bool) -> bool
res, error = cache_commit(cache, logfile_dpkg, verbose)
if res:
logging.info(_("All upgrades installed"))
else:
logging.error(_("Installing the upgrades failed!"))
logging.error(_("error message: %s"), error)
logging.error(_("dpkg returned a error! See %s for details"),
logfile_dpkg)
return res
def upgrade_in_minimal_steps(cache, # type: UnattendedUpgradesCache
pkgs_to_upgrade, # type: List[str]
logfile_dpkg="", # type: str
verbose=False, # type: bool
):
# type: (...) -> bool
install_log = LogInstallProgress(logfile_dpkg, verbose)
res = True
# to upgrade contains the package names
to_upgrade = set(pkgs_to_upgrade)
for pkgname in upgrade_order(to_upgrade, cache):
# upgrade packages and dependencies in increasing expected size of
# package sets to upgrade/install together
if pkgname not in to_upgrade:
# pkg is upgraded in a previous set
continue
if should_stop():
return False
try:
pkg = cache[pkgname]
except KeyError:
continue
try:
if pkg.is_upgradable \
or candidate_version_changed(pkg):
cache.mark_upgrade_adjusted(
pkg, from_user=not pkg.is_auto_installed)
elif not pkg.is_installed:
cache.mark_install_adjusted(pkg, from_user=False)
else:
continue
except Exception as e:
logging.warning(
_("package %s upgradable but fails to "
"be marked for upgrade (%s)"), pkgname, e)
cache.clear()
res = False
continue
# double check that we are not running into side effects like
# what could have been caused LP: #1020680
if not check_changes_for_sanity(cache):
logging.info("While building minimal partition: "
"cache has not allowed changes")
cache.clear()
continue
changes = [p.name for p in cache.get_changes()]
if not changes:
continue
# write progress log information
if len(pkgs_to_upgrade) > 0:
all_count = len(pkgs_to_upgrade)
remaining_count = all_count - len(to_upgrade)
percent = remaining_count / float(all_count * 100.0)
else:
percent = 100.0
install_log.status_change(pkg=",".join(changes),
percent=percent,
status="")
# apply changes
logging.debug("applying set %s" % changes)
res, error = cache_commit(cache, logfile_dpkg, verbose, install_log)
if error:
if verbose:
logging.exception("Exception happened during upgrade.")
logging.error(_("Installing the upgrades failed!"))
logging.error(_("error message: %s"), error)
logging.error(_("dpkg returned a error! See %s for details"),
logfile_dpkg)
return False
to_upgrade = to_upgrade - set(changes)
logging.debug("left to upgrade %s" % to_upgrade)
if len(to_upgrade) == 0:
logging.info(_("All upgrades installed"))
break
return res
def is_allowed_origin(origin, allowed_origins):
# type: (Union[apt.package.Origin, apt_pkg.PackageFile], List[str]) -> bool
# local origin is allowed by default
if origin.component == 'now' and origin.archive == 'now' and \
not origin.label and not origin.site:
return True
for allowed in allowed_origins:
if match_whitelist_string(allowed, origin):
return True
return False
def is_in_allowed_origin(ver, allowed_origins):
# type: (apt.package.Version, List[str]) -> bool
if not ver:
return False
for origin in ver.origins:
if is_allowed_origin(origin, allowed_origins):
return True
return False
def ver_in_allowed_origin(pkg, allowed_origins):
# type: (apt.Package, List[str]) -> apt.package.Version
for ver in pkg.versions:
if is_in_allowed_origin(ver, allowed_origins):
# leave as soon as we have the highest new candidate
return ver
raise NoAllowedOriginError()
def is_pkgname_in_blacklist(pkgname, blacklist):
# type: (str, List[str]) -> bool
for blacklist_regexp in blacklist:
if re.match(blacklist_regexp, pkgname):
logging.debug("skipping blacklisted package %s" % pkgname)
return True
return False
def is_pkgname_in_whitelist(pkgname, whitelist):
# type: (str, List[str]) -> bool
# a empty whitelist means the user does not want to use this feature
if not whitelist:
return True
for whitelist_regexp in whitelist:
if re.match(whitelist_regexp, pkgname):
logging.debug("only upgrading the following package %s" %
pkgname)
return True
return False
def is_pkg_change_allowed(pkg, blacklist, whitelist, strict_whitelist):
# type: (apt.Package, List[str], List[str], bool) -> bool
if is_pkgname_in_blacklist(pkg.name, blacklist):
logging.debug("pkg %s package has been blacklisted" % pkg.name)
return False
# a strict whitelist will not allow any changes not in the
# whitelist, most people will want the relaxed whitelist
# that whitelists a package but pulls in the package
# dependencies
if strict_whitelist and \
not is_pkgname_in_whitelist(pkg.name, whitelist):
logging.debug("pkg %s package is not whitelisted" %
pkg.name)
return False
if pkg._pkg.selected_state == apt_pkg.SELSTATE_HOLD:
logging.debug("pkg %s is on hold" % pkg.name)
return False
return True
def transitive_dependencies(pkg, # type: apt.Package
cache, # type: apt.Cache
acc=set(), # type AbstractSet[str]
valid_types=None, # type: AbstractSet[str]
level=None # type: int
):
# type (...) -> AbstractSet[str]
""" All (transitive) dependencies of the package
Note that alternative (|) dependencies are collected, too
"""
if not pkg.candidate or level is not None and level < 1:
return acc
for dep in pkg.candidate.dependencies:
for base_dep in dep:
if base_dep.name not in acc:
if not valid_types or base_dep.rawtype in valid_types:
acc.add(base_dep.name)
try:
transitive_dependencies(
cache[base_dep.name], cache, acc, valid_types,
level=(level - 1 if level is not None else None))
except KeyError:
pass
return acc
def upgrade_order(to_upgrade, cache):
# type: (AbstractSet[str], apt.Cache) -> List[str]
""" Sort pkg names by the expected number of other packages to be upgraded
with it. The calculation is not 100% accurate, it is an approximation.
"""
upgrade_set_sizes = {}
# calculate upgrade sets
follow_deps = {'Depends', 'PreDepends', 'Recommends'}
for pkgname in to_upgrade:
try:
pkg = cache[pkgname]
except KeyError:
continue
upgrade_set_sizes[pkgname] = len(transitive_dependencies(
pkg, cache, valid_types=follow_deps).intersection(to_upgrade))
return sorted(upgrade_set_sizes, key=upgrade_set_sizes.get)
def check_changes_for_sanity(cache, desired_pkg=None):
# type: (UnattendedUpgradesCache, apt.Package) -> bool
sanity_check_result = sanity_problem(cache, desired_pkg)
if sanity_check_result is None:
return True
else:
logging.debug("sanity check failed for: %s:%s : %s"
% (desired_pkg.name,str({str(p.candidate) for p in cache.get_changes()}),
sanity_check_result))
return False
def sanity_problem(cache, desired_pkg):
# type: (UnattendedUpgradesCache, apt.Package) -> str
# if cache._depcache.broken_count != 0:
# return ("there are broken packages in the cache")
# If there are no packages to be installed they were kept back
# if cache.install_count == 0:
# return ("no package is selected to be upgraded or installed")
changes = cache.get_changes()
2023-05-29 14:47:39 +08:00
if desired_pkg and desired_pkg not in changes:
logging.warning("pkg %s to be marked for upgrade/install is not marked accordingly" % desired_pkg.name)
return False
2022-11-03 19:10:26 +08:00
2023-05-29 14:47:39 +08:00
pkgs_to_remove = []
2022-11-03 19:10:26 +08:00
for pkg in changes:
2023-05-29 14:47:39 +08:00
if pkg.marked_delete:
logging.warning("pkg %s is marked to be deleted" % pkg.name)
pkgs_to_remove.append(pkg.name)
'''
2022-11-03 19:10:26 +08:00
if os_release_info['PROJECT_CODENAME'] == 'V10SP1-edu' and os_release_info['SUB_PROJECT_CODENAME']=='mavis':
pass
elif pkg.marked_delete:
2023-05-29 14:47:39 +08:00
logging.warning("pkg %s is marked to be deleted" % pkg.name)
pkgs_to_remove.append(pkg.name)
2022-11-03 19:10:26 +08:00
if pkg.marked_install or pkg.marked_upgrade:
# apt will never fallback from a trusted to a untrusted
# origin so its good enough if we have a single trusted one
# if not any([o.trusted for o in pkg.candidate.origins]):
# return ("pkg %s is not from a trusted origin" % pkg.name)
if not is_in_allowed_origin(pkg.candidate, cache.allowed_origins):
return ("pkg %s is not in an allowed origin" % pkg.name)
2023-05-29 14:47:39 +08:00
2022-11-03 19:10:26 +08:00
if not is_pkg_change_allowed(pkg,
cache.blacklist,
cache.whitelist,
cache.strict_whitelist):
return ("pkg %s is blacklisted or is not whitelisted"
% pkg.name)
2023-05-29 14:47:39 +08:00
2022-11-03 19:10:26 +08:00
# check if the package is unsafe to upgrade unattended
2023-05-29 14:47:39 +08:00
2022-11-03 19:10:26 +08:00
ignore_require_restart = apt_pkg.config.find_b(
"Unattended-Upgrade::IgnoreAppsRequireRestart", False)
upgrade_requires = pkg.candidate.record.get("Upgrade-Requires")
if pkg.marked_upgrade and ignore_require_restart is False \
and upgrade_requires == "app-restart":
return ("pkg %s requires app-restart, it is not safe to "
"upgrade it unattended")
2023-05-29 14:47:39 +08:00
# check that the package we want to upgrade is in the change set
if desired_pkg and desired_pkg not in changes:
logging.warning("pkg %s to be marked for upgrade/install is not marked "
"accordingly" % desired_pkg.name)
return False
'''
if len(pkgs_to_remove) > 0:
logging.debug("pkgs marked to delete:%s"%",".join(pkgs_to_remove))
return False
return True
2022-11-03 19:10:26 +08:00
def is_deb(file):
# type: (str) -> bool
if file.endswith(".deb"):
return True
else:
return False
def pkgname_from_deb(debfile):
# type: (str) -> str
# FIXME: add error checking here
try:
control = apt_inst.DebFile(debfile).control.extractdata("control")
sections = apt_pkg.TagSection(control)
return sections["Package"]
except (IOError, SystemError) as e:
logging.error("failed to read deb file %s (%s)" % (debfile, e))
# dumb fallback
return debfile.split("_")[0]
def get_md5sum_for_file_in_deb(deb_file, conf_file):
# type: (str, str) -> str
dpkg_cmd = ["dpkg-deb", "--fsys-tarfile", deb_file]
tar_cmd = ["tar", "-x", "-O", "-f", "-", "." + conf_file]
md5_cmd = ["md5sum"]
dpkg_p = Popen(dpkg_cmd, stdout=PIPE)
tar_p = Popen(tar_cmd, stdin=dpkg_p.stdout, stdout=PIPE,
universal_newlines=True)
md5_p = Popen(md5_cmd, stdin=tar_p.stdout, stdout=PIPE,
universal_newlines=True)
pkg_md5sum = md5_p.communicate()[0].split()[0]
for __p in [dpkg_p, tar_p, md5_p]:
p = cast(Popen, __p)
p.stdout.close()
p.wait()
return pkg_md5sum
def get_md5sum_for_file_installed(conf_file, prefix):
# type: (str, str) -> str
try:
with open(prefix + conf_file, 'rb') as fb:
for hash_string in apt_pkg.Hashes(fb).hashes: # type: ignore
if hash_string.hashtype == 'MD5Sum':
return hash_string.hashvalue
return None
except IsADirectoryError:
# the package replaces a directory wih a configuration file
#
# if the package changed this way it is safe to assume that
# the transition happens without showing a prompt but if the admin
# created the directory the admin will need to resolve it after
# being notified about the unexpected prompt
logging.debug("found conffile %s is a directory on the system "
% conf_file)
return "dir"
except FileNotFoundError:
# if the local file got deleted by the admin thats ok but it may still
# trigger a conffile promp (see debian #788049)
logging.debug("conffile %s in missing on the system" % conf_file)
return ""
def map_conf_file(conf_file, conffiles):
# type: (str, Union[AbstractSet[str], Dict[str, str]]) -> str
"""Find respective conffile in a set of conffiles with some heuristics
"""
if conf_file in conffiles:
return conf_file
elif os.path.join(conf_file, os.path.basename(conf_file)) in conffiles:
# new /etc/foo may be old /etc/foo/foo, like in LP: #1822745
return os.path.join(conf_file, os.path.basename(conf_file))
elif os.path.dirname(conf_file) in conffiles:
# new /etc/foo/foo may be old /etc/foo, probably by accident
return os.path.dirname(conf_file)
# TODO: peek into package's dpkg-maintscript-helper mv_conffile usage
else:
return None
# prefix is *only* needed for the build-in tests
def conffile_prompt(destFile, prefix=""):
# type: (str, str) -> bool
logging.debug("check_conffile_prompt(%s)" % destFile)
pkgname = pkgname_from_deb(destFile)
# get the conffiles for the /var/lib/dpkg/status file
status_file = apt_pkg.config.find("Dir::State::status")
with open(status_file, "r") as f:
tagfile = apt_pkg.TagFile(f)
conffiles = ""
for section in tagfile:
if section.get("Package") == pkgname:
logging.debug("found pkg: %s" % pkgname)
if "Conffiles" in section:
conffiles = section.get("Conffiles")
break
# get conffile value from pkg, its ok if the new version
# does not have conffiles anymore
pkg_conffiles = set() # type: AbstractSet[str]
try:
deb = apt_inst.DebFile(destFile)
pkg_conffiles = set(deb.control.extractdata(
"conffiles").strip().decode("utf-8").split("\n"))
except SystemError as e:
print(_("Apt returned an error, exiting"))
print(_("error message: %s") % e)
logging.error(_("Apt returned an error, exiting"))
logging.error(_("error message: %s"), e)
raise
except LookupError as e:
logging.debug("No conffiles in deb %s (%s)" % (destFile, e))
if not pkg_conffiles:
return False
# Conffiles:
# /etc/bash_completion.d/m-a c7780fab6b14d75ca54e11e992a6c11c
dpkg_status_conffiles = {}
for line in conffiles.splitlines():
# ignore empty lines
line = line.strip()
if not line:
continue
# show what we do
logging.debug("conffile line: %s", line)
li = line.split()
conf_file = li[0]
md5 = li[1]
if len(li) > 2:
obs = li[2]
else:
obs = None
# ignore if conffile is obsolete
if obs == "obsolete":
continue
# ignore state "newconffile" until its clearer if there
# might be a dpkg prompt (LP: #936870)
if md5 == "newconffile":
continue
new_conf_file = map_conf_file(conf_file, pkg_conffiles)
if not new_conf_file:
logging.debug("%s not in package conffiles %s" % (
conf_file, pkg_conffiles))
continue
# record for later
dpkg_status_conffiles[conf_file] = md5
# test against the installed file, if the local file got deleted
# by the admin thats ok but it may still trigger a conffile prompt
# (see debian #788049)
current_md5 = get_md5sum_for_file_installed(conf_file, prefix)
logging.debug("current md5: %s" % current_md5)
# hashes are the same, no conffile prompt
if current_md5 == md5:
continue
# calculate md5sum from the deb (may take a bit)
pkg_md5sum = get_md5sum_for_file_in_deb(destFile, new_conf_file)
logging.debug("pkg_md5sum: %s" % pkg_md5sum)
# the md5sum in the deb is unchanged, this will not
# trigger a conffile prompt
if pkg_md5sum == md5:
continue
# if we made it to this point:
# current_md5 != pkg_md5sum != md5
# and that will trigger a conffile prompt, we can
# stop processing at this point and just return True
return True
# now check if there are conffiles in the pkg that where not there
# in the previous version in the dpkg status file
if pkg_conffiles:
for conf_file in pkg_conffiles:
old_conf_file = map_conf_file(conf_file, dpkg_status_conffiles)
if not old_conf_file:
pkg_md5sum = get_md5sum_for_file_in_deb(destFile, conf_file)
current_md5 = get_md5sum_for_file_installed(conf_file, prefix)
if current_md5 != "" and pkg_md5sum != current_md5:
return True
return False
def dpkg_conffile_prompt():
# type: () -> bool
if "DPkg::Options" not in apt_pkg.config:
return True
options = apt_pkg.config.value_list("DPkg::Options")
for option in options:
option = option.strip()
if option in ["--force-confold", "--force-confnew"]:
return False
return True
def rewind_cache(cache, pkgs_to_upgrade):
# type: (UnattendedUpgradesCache, List[apt.Package]) -> None
""" set the cache back to the state with packages_to_upgrade """
cache.clear()
for pkg2 in pkgs_to_upgrade:
cache.mark_install_adjusted(pkg2, from_user=not pkg2.is_auto_installed)
if cache.broken_count > 0:
raise AssertionError("rewind_cache created a broken cache")
def host():
# type: () -> str
return socket.getfqdn()
def wrap_indent(t, subsequent_indent=" "):
# type: (str, str) -> str
return "\n".join(wrap(t, break_on_hyphens=False,
subsequent_indent=subsequent_indent))
def setup_apt_listchanges(conf="/etc/apt/listchanges.conf"):
# type: (str) -> None
""" deal with apt-listchanges """
# apt-listchanges will always send a mail if there is a mail address
# set in the config regardless of the frontend used, so set it to
# mail if we have a sendmail and to none if not (as it appears to
# not check if sendmail is there or not), debian bug #579733
if os.path.exists(SENDMAIL_BINARY):
os.environ["APT_LISTCHANGES_FRONTEND"] = "mail"
else:
os.environ["APT_LISTCHANGES_FRONTEND"] = "none"
def _send_mail_using_mailx(from_address, to_address, subject, body):
# type: (str, str, str, str) -> int
# ensure that the body is a byte stream and that we do not
# break on encoding errors (the default error mode is "strict")
encoded_body = body.encode(
locale.getpreferredencoding(False), errors="replace")
# we use a binary pipe to stdin to ensure we do not break on
# unicode encoding errors (e.g. because the user is running a
# ascii only system like the buildds)
mail = subprocess.Popen(
[MAIL_BINARY, "-r", from_address, "-s", subject, to_address],
stdin=subprocess.PIPE, universal_newlines=False)
mail.stdin.write(encoded_body)
mail.stdin.close()
ret = mail.wait()
return ret
def _send_mail_using_sendmail(from_address, to_address, subject, body):
# type: (str, str, str, str) -> int
# format as a proper mail
msg = Message()
msg['Subject'] = subject
msg['From'] = from_address
msg['To'] = to_address
msg['Auto-Submitted'] = "auto-generated"
# order is important here, Message() first, then Charset()
# then msg.set_charset()
charset = email.charset.Charset("utf-8")
charset.body_encoding = email.charset.QP # type: ignore
msg.set_payload(body, charset)
# and send it away
sendmail = subprocess.Popen(
[SENDMAIL_BINARY, "-oi", "-t"],
stdin=subprocess.PIPE, universal_newlines=True)
sendmail.stdin.write(msg.as_string())
sendmail.stdin.close()
ret = sendmail.wait()
return ret
def send_summary_mail(pkgs, # type: List[str]
res, # type: bool
result_str, # type: str
pkgs_kept_back, # type: KeptPkgs
pkgs_removed, # type: List[str]
pkgs_kept_installed, # type: List[str]
mem_log, # type: StringIO
dpkg_log_content, # type: str
):
# type: (...) -> None
""" send mail (if configured in Unattended-Upgrade::Mail) """
to_email = apt_pkg.config.find("Unattended-Upgrade::Mail", "")
if not to_email:
return
if not os.path.exists(MAIL_BINARY) and not os.path.exists(SENDMAIL_BINARY):
logging.error(_("No /usr/bin/mail or /usr/sbin/sendmail, "
"can not send mail. "
"You probably want to install the mailx package."))
return
# The admin may well wish to get a mail report regardless of what was done.
# This is now set by Unattended-Upgrade::MailReport values of:
# "always", "only-on-error" or "on-change"
# (you can achieve "never" by not setting Unattended-Upgrade::Mail).
# If this is not set, then set it using any legacy MailOnlyOnError
# setting (default True)
#
mail_opt = apt_pkg.config.find("Unattended-Upgrade::MailReport")
if (mail_opt == ""): # None set - map from legacy value
if apt_pkg.config.find_b("Unattended-Upgrade::MailOnlyOnError", False):
mail_opt = "only-on-error"
else:
mail_opt = "on-change"
# if the operation was successful and the user has requested to get
# mails only on errors, just exit here
if (res and (mail_opt == "only-on-error")):
return
# if the run was successful but nothing had to be done skip sending email
# unless the admin wants it anyway
if (((mail_opt != "always") and res and not pkgs and not pkgs_kept_back
and not pkgs_removed)):
return
# Check if reboot-required flag is present
reboot_flag_str = _(
"[reboot required]") if os.path.isfile(REBOOT_REQUIRED_FILE) else ""
# Check if packages are kept on hold
hold_flag_str = (_("[package on hold]") if pkgs_kept_back
or pkgs_kept_installed else "")
logging.debug("Sending mail to %s" % to_email)
subject = _(
"{hold_flag}{reboot_flag} unattended-upgrades result for "
"{machine}: {result}").format(
hold_flag=hold_flag_str, reboot_flag=reboot_flag_str,
machine=host(), result="SUCCESS" if res else "FAILURE").strip()
body = wrap_indent(_("Unattended upgrade result: %s") % result_str)
body += "\n\n"
if os.path.isfile(REBOOT_REQUIRED_FILE):
body += _(
"Warning: A reboot is required to complete this upgrade, "
"or a previous one.\n\n")
if pkgs:
if res:
body += _("Packages that were upgraded:\n")
else:
body += _("Packages that attempted to upgrade:\n")
body += " " + wrap_indent(" ".join(pkgs))
body += "\n\n"
if pkgs_kept_back:
body += _("Packages with upgradable origin but kept back:\n")
for origin, origin_pkgs in pkgs_kept_back.items():
body += " " + origin + ":\n"
body += " " + wrap_indent(" ".join(origin_pkgs),
subsequent_indent=" ") + "\n"
body += "\n"
if pkgs_removed:
body += _("Packages that were auto-removed:\n")
body += " " + wrap_indent(" ".join(pkgs_removed))
body += "\n\n"
if pkgs_kept_installed:
body += _("Packages that were kept from being auto-removed:\n")
body += " " + wrap_indent(" ".join(pkgs_kept_installed))
body += "\n\n"
if dpkg_log_content:
body += _("Package installation log:") + "\n"
body += dpkg_log_content
body += "\n\n"
body += _("Unattended-upgrades log:\n")
body += mem_log.getvalue()
from_email = apt_pkg.config.find("Unattended-Upgrade::Sender", "root")
if os.path.exists(SENDMAIL_BINARY):
ret = _send_mail_using_sendmail(from_email, to_email, subject, body)
elif os.path.exists(MAIL_BINARY):
ret = _send_mail_using_mailx(from_email, to_email, subject, body)
else:
raise AssertionError(
"This should never be reached as we previously validated that we "
"either have sendmail or mailx. Maybe they've been removed in "
"this right moment?")
logging.debug("mail returned: %s", ret)
def do_install(cache, # type: UnattendedUpgradesCache
pkgs_to_upgrade, # type: List[str]
options, # type: Options
logfile_dpkg, # type: str
):
# type: (...) -> bool
#setup_apt_listchanges()
logging.info(_("Writing dpkg log to %s"), logfile_dpkg)
# if cache.get_changes():
# cache.clear()
pkg_install_success = False
iprogress = LogInstallProgress(logfile_dpkg, verbose=True,progress_log=PROGRESS_LOG)
try:
pkg_install_success = cache.commit(fetch_progress=apt.progress.text.AcquireProgress(outfile = logfile_fd),install_progress=iprogress,allow_unauthenticated=True)
except Exception as e:
logging.error("cache commit error:%s"%e)
'''
try:
if options.minimal_upgrade_steps:
# try upgrade all "pkgs" in minimal steps
pkg_install_success = upgrade_in_minimal_steps(
cache, pkgs_to_upgrade,
logfile_dpkg,
options.verbose or options.debug)
else:
mark_pkgs_to_upgrade(cache, pkgs_to_upgrade)
pkg_install_success = upgrade_normal(
cache, logfile_dpkg, options.verbose or options.debug)
except Exception as e:
# print unhandled exceptions here this way, while stderr is redirected
os.write(2, ("Exception: %s\n" % e).encode('utf-8'))
pkg_install_success = False
'''
return pkg_install_success
def _setup_alternative_rootdir(rootdir):
# type: (str) -> None
# clear system unattended-upgrade stuff
apt_pkg.config.clear("Unattended-Upgrade")
# read rootdir (taken from apt.Cache, but we need to run it
# here before the cache gets initialized
if os.path.exists(rootdir + "/etc/apt/apt.conf"):
apt_pkg.read_config_file(apt_pkg.config,
rootdir + "/etc/apt/apt.conf")
if os.path.isdir(rootdir + "/etc/apt/apt.conf.d"):
apt_pkg.read_config_dir(apt_pkg.config,
rootdir + "/etc/apt/apt.conf.d")
logdir = os.path.join(rootdir, "var", "log", "unattended-upgrades")
if not os.path.exists(logdir):
os.makedirs(logdir)
apt.apt_pkg.config.set("Unattended-Upgrade::LogDir", logdir)
def _get_logdir():
# type: () -> str
logdir = apt_pkg.config.find_dir(
"Unattended-Upgrade::LogDir",
# COMPAT only
apt_pkg.config.find_dir("APT::UnattendedUpgrades::LogDir",
"/var/log/unattended-upgrades/"))
return logdir
def _setup_logging(options,logfile):
# ensure this is run only once
if len(logging.root.handlers) > 0:
return None
# init the logging
# logdir = _get_logdir()
# logfile = os.path.join(
# logdir,
# apt_pkg.config.find(
# "Unattended-Upgrade::LogFile",
# # COMPAT only
# apt_pkg.config.find("APT::UnattendedUpgrades::LogFile",
# "unattended-upgrades.log")))
# if not options.dry_run and not os.path.exists(logdir):
# os.makedirs(logdir)
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
filename=logfile)
# additional logging
logger = logging.getLogger()
# mem_log = StringIO()
# if options.apt_debug:
# apt_pkg.config.set("Debug::pkgProblemResolver", "1")
# apt_pkg.config.set("Debug::pkgDepCache::AutoInstall", "1")
if options.debug:
logger.setLevel(logging.DEBUG)
2023-05-29 14:47:39 +08:00
# stdout_handler = logging.StreamHandler(sys.stdout)
# logger.addHandler(stdout_handler)
2022-11-03 19:10:26 +08:00
elif options.verbose:
logger.setLevel(logging.INFO)
2023-05-29 14:47:39 +08:00
stdout_handler = logging.StreamHandler(sys.stdout)
logger.addHandler(stdout_handler)
'''
if apt_pkg.config.find("Unattended-Upgrade::Mail", ""):
mem_log_handler = logging.StreamHandler(mem_log)
logger.addHandler(mem_log_handler)
Configure syslog if necessary
syslogEnable = apt_pkg.config.find_b("Unattended-Upgrade::SyslogEnable",
False)
if syslogEnable:
syslogFacility = apt_pkg.config.find(
"Unattended-Upgrade::SyslogFacility",
"daemon")
syslogHandler = logging.handlers.SysLogHandler(
address='/dev/log',
facility=syslogFacility) # type: ignore
syslogHandler.setFormatter(
logging.Formatter("unattended-upgrade: %(message)s"))
known = syslogHandler.facility_names.keys() # type: ignore
if syslogFacility.lower() in known:
logger.addHandler(syslogHandler)
logging.info("Enabled logging to syslog via %s facility "
% syslogFacility)
else:
logging.warning("Syslog facility %s was not found"
% syslogFacility)
return mem_log
'''
2022-11-03 19:10:26 +08:00
def logged_in_users():
# type: () -> AbstractSet[str]
"""Return a list of logged in users"""
# the "users" command always returns a single line with:
# "user1, user1, user2"
users = subprocess.check_output(
USERS, universal_newlines=True).rstrip('\n')
return set(users.split())
2023-05-29 14:47:39 +08:00
def reboot_if_needed():
# type: () -> None
"""auto-reboot (if required and the config for this is set)"""
if not os.path.exists(REBOOT_REQUIRED_FILE):
return
needreboot = ReadValueFromFile(UNATTENDED_UPGRADE_POLICY_FILE_PATH,"autoUpgradePolicy","automaticReboot")
if needreboot == 'off':
return
# reboot at the specified time
when = ReadValueFromFile(UNATTENDED_UPGRADE_POLICY_FILE_PATH,"autoUpgradePolicy","automaticRebootTime")
logging.warning("Found %s, rebooting" % REBOOT_REQUIRED_FILE)
cmd = ["/sbin/shutdown", "-r", when]
try:
shutdown_msg = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
if shutdown_msg.strip():
logging.warning("Shutdown msg: %s", shutdown_msg.strip())
except Exception as e:
logging.error("Failed to issue shutdown: %s", e)
2022-11-03 19:10:26 +08:00
def reboot_if_requested_and_needed():
# type: () -> None
"""auto-reboot (if required and the config for this is set)"""
if not os.path.exists(REBOOT_REQUIRED_FILE):
return
if not apt_pkg.config.find_b(
"Unattended-Upgrade::Automatic-Reboot", False):
return
# see if we need to check for logged in users
if not apt_pkg.config.find_b(
"Unattended-Upgrade::Automatic-Reboot-WithUsers", True):
users = logged_in_users()
if users:
msg = gettext.ngettext(
"Found %s, but not rebooting because %s is logged in." % (
REBOOT_REQUIRED_FILE, users),
"Found %s, but not rebooting because %s are logged in." % (
REBOOT_REQUIRED_FILE, users),
len(users))
logging.warning(msg)
return
# reboot at the specified time
when = apt_pkg.config.find(
"Unattended-Upgrade::Automatic-Reboot-Time", "now")
logging.warning("Found %s, rebooting" % REBOOT_REQUIRED_FILE)
cmd = ["/sbin/shutdown", "-r", when]
try:
shutdown_msg = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
if shutdown_msg.strip():
logging.warning("Shutdown msg: %s", shutdown_msg.strip())
except Exception as e:
logging.error("Failed to issue shutdown: %s", e)
def write_stamp_file():
# type: () -> None
statedir = os.path.join(apt_pkg.config.find_dir("Dir::State"), "periodic")
if not os.path.exists(statedir):
os.makedirs(statedir)
with open(os.path.join(statedir, "unattended-upgrades-stamp"), "w"):
pass
def try_to_upgrade(pkg, # type: apt.Package
pkgs_to_upgrade, # type: List[apt.Package]
cache, # type: UnattendedUpgradesCache
version):
# type: (...) -> None
try:
try:
# try to adjust pkg itself first, if that throws an exception it
# can't be upgraded on its own
cache.adjust_candidate_with_version(pkg,version)
'''
if not pkg.is_upgradable and not apt_pkg.config.find_b(
"Unattended-Upgrade::Allow-downgrade", False):
return
'''
except NoAllowedOriginError:
return
2023-05-29 14:47:39 +08:00
# cache._cached_candidate_pkgnames.add(pkg.name)
2022-11-03 19:10:26 +08:00
if not pkg.installed:
cache.mark_install_adjusted(pkg,from_user=True)
else:
cache.mark_upgrade_adjusted(pkg, from_user=not pkg.is_auto_installed)
if check_changes_for_sanity(cache, pkg):
# add to packages to upgrade
pkgs_to_upgrade.append(pkg)
else:
rewind_cache(cache, pkgs_to_upgrade)
except (SystemError, NoAllowedOriginError) as e:
# can't upgrade
logging.warning(
_("package %s upgradable but fails to "
"be marked for upgrade (%s)"), pkg.name, e)
rewind_cache(cache, pkgs_to_upgrade)
def candidate_version_changed(pkg):
'''type: apt.Package'''
return (pkg.is_installed and pkg.candidate
and pkg.candidate.version != pkg.installed.version)
# and apt_pkg.config.find_b(
# 'Unattended-Upgrade::Allow-downgrade', False)
# )
def calculate_upgradable_pkgs(cache, # type: UnattendedUpgradesCache
options, # type: Options
whitelist):
# type: (...) -> List[apt.Package]
pkgs_to_upgrade = [] # type: List[apt.Package]
# now do the actual upgrade
for pkgname in whitelist:
try:
pkg = cache[pkgname[0]]
2023-05-29 14:47:39 +08:00
adjust_candidate_result = cache.adjust_candidate_with_version(pkg,pkgname[1])
if (not adjust_candidate_result):
logging.warning("%s-%s :can not adjust candidate version"%(pkgname[0],pkgname[1]))
continue
if not pkg.installed:
cache.mark_install_adjusted(pkg,from_user=True)
elif pkg.is_upgradable:
cache.mark_upgrade_adjusted(pkg, from_user=not pkg.is_auto_installed)
else:
pass
if sanity_problem(cache,pkg):
pkgs_to_upgrade.append(pkg)
2022-11-03 19:10:26 +08:00
except Exception as e:
logging.error("error checking pkg:%s"%e)
continue
2023-05-29 14:47:39 +08:00
'''
if check_changes_for_sanity(cache, pkg):
# add to packages to upgrade
pkgs_to_upgrade.append(pkg)
'''
2022-11-03 19:10:26 +08:00
#for pkg in cache:
# if pkg.name not in cache.whitelist:
# logging.debug("%s not in whitelist skipping..."%(pkg.name))
# continue
'''
if options.debug and pkg.is_upgradable \
or candidate_version_changed(pkg):
logging.debug("Checking: %s (%s)" % (
pkg.name, getattr(pkg.candidate, "origins", [])))
2023-05-29 14:47:39 +08:00
2022-11-03 19:10:26 +08:00
if (pkg.is_upgradable or candidate_version_changed(pkg) or not pkg.is_installed):
try:
ver_in_allowed_origin(pkg, cache.allowed_origins)
except NoAllowedOriginError:
continue
2023-05-29 14:47:39 +08:00
try_to_upgrade(pkg,
2022-11-03 19:10:26 +08:00
pkgs_to_upgrade,
cache,pkgname[1])
2023-05-29 14:47:39 +08:00
'''
2022-11-03 19:10:26 +08:00
# logging.debug("Checking: %s (%s)" % (
# pkg.name, getattr(pkg.candidate, "origins", [])))
#pkgs_to_upgrade.append(pkg)
2023-05-29 14:47:39 +08:00
if cache.get_changes():
cache.clear()
2022-11-03 19:10:26 +08:00
return pkgs_to_upgrade
def get_dpkg_log_content(logfile_dpkg, install_start_time):
# type: (str, datetime.datetime) -> str
logging.debug("Extracting content from %s since %s" % (
logfile_dpkg, install_start_time))
content = []
found_start = False
try:
with io.open(logfile_dpkg, encoding='utf-8', errors='replace') as fp:
# read until we find the last "Log started: "
for line in fp.readlines():
# scan for the first entry we need (minimal-step mode
# creates a new stanza for each individual install)
if not found_start and line.startswith("Log started: "):
stanza_start = LoggingDateTime.from_string(
line[len("Log started: "):-1])
if stanza_start >= install_start_time:
found_start = True
if found_start:
# skip progress indicator until #860931 is fixed in apt
# and dpkg
if re.match(
"^\\(Reading database \\.\\.\\. ()|([0-9]+%)$",
line):
continue
content.append(line)
return "".join(content)
except FileNotFoundError:
return ""
def get_auto_removable(cache):
# type: (apt.Cache) -> AbstractSet[str]
return {pkg.name for pkg in cache
if pkg.is_auto_removable}
def is_autoremove_valid(cache, # type: UnattendedUpgradesCache
pkgname, # type: str
auto_removable, # type: AbstractSet[str]
):
# type: (...) -> bool
changes = cache.get_changes()
if not changes:
# package is already removed
return True
pkgnames = {pkg.name for pkg in changes}
for pkg in changes:
if not is_pkg_change_allowed(pkg, cache.blacklist, cache.whitelist,
cache.strict_whitelist):
logging.warning(
_("Keeping the following auto-removable package(s) because "
"they include %s which is set to be kept unmodified: %s"),
pkg.name, " ".join(sorted(pkgnames)))
return False
if not pkgnames.issubset(auto_removable):
if pkgname != "":
logging.warning(
_("Keeping auto-removable %s package(s) because it would"
" also remove the following packages which should "
"be kept in this step: %s"), pkgname,
" ".join(sorted(pkgnames - auto_removable)))
else:
logging.warning(
_("Keeping %s auto-removable package(s) because it would"
" also remove the following packages which should "
"be kept in this step: %s"), len(auto_removable),
" ".join(sorted(pkgnames - auto_removable)))
return False
for packagename in pkgnames:
if cache.running_kernel_pkgs_regexp and \
cache.running_kernel_pkgs_regexp.match(packagename):
logging.warning(
_("Keeping the following auto-removable package(s) because "
"they include %s which package is related to the running "
"kernel: %s"), packagename, " ".join(sorted(pkgnames)))
return False
if cache.install_count > 0:
logging.error(
"The following packages are marked for installation or upgrade "
"which is not allowed when performing autoremovals: %s",
" ".join([pkg.name for pkg in changes if not pkg.marked_delete]))
return False
return True
def do_auto_remove(cache, # type: UnattendedUpgradesCache
auto_removable, # type: AbstractSet[str]
logfile_dpkg, # type: str
minimal_steps, # type: bool
verbose=False, # type: bool
dry_run=False # type: bool
):
# type: (...) -> Tuple[bool, List[str], List[str]]
res = True
if not auto_removable:
return (res, [], [])
pkgs_removed = [] # type: List[str]
pkgs_kept_installed = [] # type: List[str]
if minimal_steps:
for pkgname in auto_removable:
if should_stop():
pkgs_kept_installed = list(auto_removable - set(pkgs_removed))
return (False, pkgs_removed, pkgs_kept_installed)
logging.debug("marking %s for removal" % pkgname)
if pkgname in pkgs_removed:
continue
try:
pkg = cache[pkgname]
except KeyError:
continue
pkg.mark_delete()
if not is_autoremove_valid(cache, pkgname, auto_removable):
# this situation can occur when removing newly unused packages
# would also remove old unused packages which are not set
# for removal, thus getting there is not handled as an error
pkgs_kept_installed.append(pkgname)
cache.clear()
continue
if not dry_run:
changes = cache.get_changes()
pkgnames = {pkg.name for pkg in changes}
res, error = cache_commit(cache, logfile_dpkg, verbose)
if not res:
break
pkgs_removed.extend(pkgnames)
else:
cache.clear()
else:
for pkgname in auto_removable:
try:
pkg = cache[pkgname]
except KeyError:
continue
pkg.mark_delete()
if is_autoremove_valid(cache, "", auto_removable):
# do it in one step
if not dry_run:
res, error = cache_commit(cache, logfile_dpkg, verbose)
else:
cache.clear()
else:
cache.clear()
if res:
logging.info(_("Packages that were successfully auto-removed: %s"),
" ".join(sorted(pkgs_removed)))
logging.info(_("Packages that are kept back: %s"),
" ".join(sorted(pkgs_kept_installed)))
if not res:
cache.clear()
logging.error(_("Auto-removing the packages failed!"))
logging.error(_("Error message: %s"), error)
logging.error(_("dpkg returned an error! See %s for details"),
logfile_dpkg)
return (res, pkgs_removed, pkgs_kept_installed)
def clean_downloaded_packages(fetcher):
# type: (apt_pkg.Acquire) -> None
for item in fetcher.items:
try:
os.unlink(item.destfile)
except OSError:
pass
'''
archivedir = os.path.dirname(
apt_pkg.config.find_dir("Dir::Cache::archives"))
for item in fetcher.items:
if os.path.dirname(os.path.abspath(item.destfile)) == archivedir:
try:
os.unlink(item.destfile)
except OSError:
pass
'''
def is_update_day():
# type: () -> bool
# check if patch days are configured
patch_days = apt_pkg.config.value_list("Unattended-Upgrade::Update-Days")
if not patch_days:
return True
# validate patch days
today = date.today()
# abbreviated localized dayname
if today.strftime("%a") in patch_days:
return True
# full localized dayname
if today.strftime("%A") in patch_days:
return True
# by number (Sun: 0, Mon: 1, ...)
if today.strftime("%w") in patch_days:
return True
# today is not a patch day
logging.info(
"Skipping update check: today is %s,%s,%s but patch days are %s",
today.strftime("%w"), today.strftime("%a"), today.strftime("%A"),
", ".join(patch_days))
return False
def update_kept_pkgs_file(kept_pkgs, kept_file):
# type: (DefaultDict[str, List[str]], str) -> None
if kept_pkgs:
pkgs_all_origins = set()
for origin_pkgs in kept_pkgs.values():
pkgs_all_origins.update(origin_pkgs)
try:
with open(kept_file, "w") as kf:
kf.write(" ".join(sorted(pkgs_all_origins)))
except FileNotFoundError:
logging.error(_("Could not open %s for saving list of packages "
"kept back." % kept_file))
else:
if os.path.exists(kept_file):
os.remove(kept_file)
def main(options, rootdir="/"):
# type: (Options, str) -> int
# useful for testing
# if not rootdir == "/":
# _setup_alternative_rootdir(rootdir)
# see debian #776752
# install_start_time = datetime.datetime.now().replace(microsecond=0)
# logging.info("unattended-upgrades start time:%s"%install_start_time)
# get log
'''
dpkg_journal_dirty = is_dpkg_journal_dirty()
abnormal_pkg_count = get_abnormally_installed_pkg_count()
logging.info("abnormal pkg count:%s,dpkg dirty:%s"%(abnormal_pkg_count,dpkg_journal_dirty))
if os_release_info['PROJECT_CODENAME'] == 'V10SP1-edu' and os_release_info['SUB_PROJECT_CODENAME']=='mavis':
if dpkg_journal_dirty or abnormal_pkg_count != '0':
ret = subprocess.run("dpkg --configure -a",shell=True,stdout=open(logfile,'a+'),stderr=open(logfile,'a+'))
logging.info("dpkg fix return :%s"%ret.returncode)
'''
# lock for the shutdown check
# uu_lock = apt_pkg.get_lock(LOCK_FILE)
# if uu_lock < 0:
# logging.error("Lock file is already taken, exiting")
# WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
# return 1
try:
2023-05-29 14:47:39 +08:00
kysec_pre_upgrade()
2022-11-03 19:10:26 +08:00
res = run(options, rootdir, logfile_dpkg)
2023-05-29 14:47:39 +08:00
kysec_post_upgrade()
2022-11-03 19:10:26 +08:00
logging.info("result:%s,%s"%(res.success,res.result_str))
release = ''
version = ''
os_release_info = ReadOsRelease('/etc/os-release')
if 'KYLIN_RELEASE_ID' in os_release_info:
release = os_release_info['KYLIN_RELEASE_ID']
2023-05-29 14:47:39 +08:00
#version = ReadValueFromFile(VERSION_FILE,'SYSTEM','version')
2022-11-03 19:10:26 +08:00
version = get_default_version()
logging.debug("release:%s,version:%s"%(release,version))
2023-05-29 14:47:39 +08:00
if options.install_only or options.download_and_install:
2022-11-03 19:10:26 +08:00
#history record
history = {}
date = time.strftime("%Y-%m-%d %H:%M:%S")
history.update({"date":date})
history.update({"appname":"kylin-unattended-upgrade"})
history.update({"appname_cn":"自动更新"})
history.update({"version":""})
history.update({"description":"download and install security upgrades automatically"})
history.update({"keyword":"1"})
history.update({"changelog":""})
history.update({"status":"success"})
history.update({"errorcode":"cache commit error"})
history.update({"status_cn":"成功"})
#data collect info
UpdateInfos = {}
UpdateInfos.update({"packageName":"kylin-unattended-upgrade"})
UpdateInfos.update({"appname":"kylin-unattended-upgrade"})
UpdateInfos.update({"source":"kylin unattended upgrade"})
UpdateInfos.update({"status":1})
UpdateInfos.update({"errorCode":"cache commit error"})
if res.success and len(res.pkgs) > 0 :
2023-05-29 14:47:39 +08:00
#if res.result_str == "total_install":
2022-11-03 19:10:26 +08:00
# with open(TIME_STAMP,'w') as f:
# f.write(time.time())
2023-05-29 14:47:39 +08:00
config=configparser.ConfigParser(allow_no_value=True)
config.read(KYLIN_VERSION_FILE)
config.set("SYSTEM","os_version",release)
config.set("SYSTEM","update_version",version)
with open(KYLIN_VERSION_FILE,'w') as f:
config.write(f)
# kylin_system_updater.SetConfigValue("SYSTEM","os_version",release)
# kylin_system_updater.SetConfigValue("SYSTEM","update_version",original_version)
kylin_system_updater.InsertUpgradeHistory(history)
json_file = json.dumps(UpdateInfos.copy())
kylin_system_updater.DataBackendCollect("UpdateInfos",json_file)
2022-11-03 19:10:26 +08:00
elif not res.success:
errorlist = kylin_system_updater.DumpInstallErrorRecord()
errorlist.append("cache commit error")
errcode = "\n".join(errorlist)
if options.install_only:
history.update({"status":"failed"})
history.update({"status_cn":"失败"})
history.update({"errorcode":errcode})
kylin_system_updater.InsertUpgradeHistory(history)
UpdateInfos.update({"status":0})
UpdateInfos.update({"errorCode":errcode})
json_file = json.dumps(UpdateInfos.copy())
kylin_system_updater.DataBackendCollect("UpdateInfos",json_file)
else:
logging.info("no pkgs to install")
2023-05-29 14:47:39 +08:00
if 'PROJECT_CODENAME' in os_release_info:
if os_release_info['PROJECT_CODENAME']=='V10SP1-edu':
if 'SUB_PROJECT_CODENAME' in os_release_info:
if os_release_info['SUB_PROJECT_CODENAME']=='mavis':
localtime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
config_to_result = configparser.ConfigParser(allow_no_value=True)
config_to_result.add_section("OTA")
config_to_result.set("OTA","time",localtime)
config_to_result.set("OTA","version","1.0")
config_to_result.set("OTA","upgrade","0")
config_to_result.set("OTA","status","failed")
if res.success:
if options.mode == 'shutdown':
config_to_result.set("OTA","status","success")
if len(res.pkgs) > 0 :
config_to_result.set("OTA","upgrade","1")
if not os.path.exists(OTA_RESULT_FILE_PATH):
os.makedirs(OTA_RESULT_FILE_PATH)
# os.chmod(OTA_RESULT_FILE_PATH,stat.S_IRUSR|stat.S_IWUSR|stat.S_IWGRP|stat.S_IRGRP|stat.S_IWOTH|stat.S_IROTH)
if not os.path.exists(OTA_RESULT_FILE):
f = open(OTA_RESULT_FILE,'w')
f.close()
with open(OTA_RESULT_FILE,"w+") as f:
config_to_result.write(f)
subprocess.Popen("chmod -R 777 %s"%(OTA_RESULT_FILE_PATH),shell=True)
# os.chmod(OTA_RESULT_FILE,stat.S_IRUSR|stat.S_IWUSR|stat.S_IWGRP|stat.S_IRGRP|stat.S_IWOTH|stat.S_IROTH)
# os.chmod(OTA_RESULT_FILE,stat.S_IRWXU|stat.S_IRWXG|stat.S_IRWXO)
2022-11-03 19:10:26 +08:00
# WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
'''
if res.success and res.result_str:
# complete, successful run
update_kept_pkgs_file(res.pkgs_kept_back,
os.path.join(rootdir, KEPT_PACKAGES_FILE))
if res.result_str and not options.dry_run:
# there is some meaningful result which is worth an email
log_content = get_dpkg_log_content(logfile_dpkg,
install_start_time)
send_summary_mail(res.pkgs, res.success, res.result_str,
res.pkgs_kept_back, res.pkgs_removed,
res.pkgs_kept_installed, mem_log,
log_content)
if res.update_stamp:
# write timestamp file
write_stamp_file()
if not options.dry_run:
# check if the user wants a reboot
reboot_if_requested_and_needed()
'''
os.close(shutdown_lock)
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
if res.success:
return 0
else:
return 1
except Exception as e:
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
logging.error(e)
2023-05-29 14:47:39 +08:00
if options.install_only:
reboot_if_needed()
2022-11-03 19:10:26 +08:00
# logger = logging.getLogger()
# logger.exception(_("An error occurred: %s"), e)
# log_content = get_dpkg_log_content(logfile_dpkg,
# install_start_time)
# if not options.dry_run:
# send_summary_mail(["<unknown>"], False, _("An error occurred"),
# None, [], [], mem_log, log_content)
# Re-raise exceptions for apport
# raise
def mark_pkgs_to_upgrade(cache, pkgs_to_upgrade):
# type (apt.Cache, List[str]) -> None
for pkg_name in pkgs_to_upgrade:
try:
pkg = cache[pkg_name]
except KeyError:
continue
if pkg.is_upgradable \
or (pkg.is_installed
and pkg.candidate.version != pkg.installed.version):
cache.mark_upgrade_adjusted(pkg,
from_user=not pkg.is_auto_installed)
# pkg.mark_upgrade()
elif not pkg.is_installed:
cache.mark_install_adjusted(pkg, from_user=True)
# pkg.mark_install()
else:
pass
def adjust_candidate_with_version(cache,namelistwithversion):
for pkgname in namelistwithversion:
try:
pkg = cache[pkgname[0]]
except KeyError:
continue
for v in pkg.versions:
if v.version == pkgname[1] and is_in_allowed_origin(v,cache.allowed_origins):
logging.info("package:%s , candidate version:%s"%(pkgname[0],pkgname[1]))
pkg.candidate = v
# if str(v) in versionlist:
# pkg.candidate = v
'''
dep_list = []
dep = pkg.candidate.get_dependencies("PreDepends")
for d in dep:
dep_list.append(d.or_dependencies[0].name)
dep = pkg.candidate.get_dependencies("Depends")
for d in dep:
dep_list.append(d.or_dependencies[0].name)
package_deps.update({pkg:dep_list})
'''
def run(options, # type: Options
rootdir, # type: str
# mem_log, # type: StringIO
logfile_dpkg, # type: str
# install_start_time, # type: datetime.datetime
):
# type: (...) -> UnattendedUpgradesResult
# check if today is a patch day
# if not is_update_day():
# return UnattendedUpgradesResult(True)
logging.info(_("Starting unattended upgrades script"))
reload_options_config()
# check if u-u should be stopped already
if should_stop():
return UnattendedUpgradesResult(False)
#global os_release_info
# check to see if want to auto-upgrade the devel release
'''
if apt_pkg.config.find("Unattended-Upgrade::DevRelease") == "auto":
try:
if DISTRO_ID.lower() == 'ubuntu':
devel = (distro_info.UbuntuDistroInfo() .
devel(result="object"))
elif DISTRO_ID.lower() == 'debian':
devel = (distro_info.DebianDistroInfo() .
devel(result="object"))
else:
devel = (distro_info.DistroInfo(DISTRO_ID) .
devel(result="object"))
except Exception as e:
logging.warning("Could not figure out development release: %s" % e)
else:
if ((devel.series == DISTRO_CODENAME
and devel.release is not None
and devel.release - date.today() > DEVEL_UNTIL_RELEASE)):
syslog.syslog((_("Not running on this development "
"release before %s") %
(devel.release - DEVEL_UNTIL_RELEASE
- datetime.timedelta(days=1))))
logging.warning(_("Not running on this development "
"release before %s") %
(devel.release - DEVEL_UNTIL_RELEASE
- datetime.timedelta(days=1)))
return UnattendedUpgradesResult(True)
logging.debug("Running on the development release")
elif "(development branch)" in DISTRO_DESC and not\
apt_pkg.config.find_b("Unattended-Upgrade::DevRelease", True):
syslog.syslog(_("Not running on the development release."))
logging.info(_("Not running on the development release."))
return UnattendedUpgradesResult(True)
'''
#kylin_system_updater = KylinSystemUpdater()
'''
if kylin_system_updater.GetUnattendedUpgradeValue:
pass
else:
return UnattendedUpgradesResult(False)
kylin_system_updater.ConnectToSignals()
kylin_system_updater.GetWhiteList()
kylin_system_updater.RunMainloop()
'''
# check and get lock
try:
apt_pkg.pkgsystem_lock()
except SystemError:
logging.error(_("Lock could not be acquired (another package "
"manager running?)"))
#print(_("Cache lock can not be acquired, exiting"))
return UnattendedUpgradesResult(
False, _("Lock could not be acquired"))
# check if the journal is dirty and if so, take emergceny action
# the alternative is to leave the system potentially unsecure until
# the user comes in and fixes
'''
if is_dpkg_journal_dirty() and \
apt_pkg.config.find_b("Unattended-Upgrade::AutoFixInterruptedDpkg",
False):
logging.warning(
_("Unclean dpkg state detected, trying to correct"))
print(_("Unclean dpkg state detected, trying to correct"))
env = copy.copy(os.environ)
env["DPKG_FRONTEND_LOCKED"] = "1"
try:
with Unlocked():
output = subprocess.check_output(
["dpkg", "--force-confold", "--configure", "-a"],
env=env,
universal_newlines=True)
except subprocess.CalledProcessError as e:
output = e.output
logging.warning(_("dpkg --configure -a output:\n%s"), output)
'''
white_list_with_version = kylin_system_updater.whitelist_with_candidate_version#config_manager.ReadListFromFile(WHITE_LIST_FILE_PATH,'AutoUpgrade','upgradelist')
logging.info("upgrade list from kylin system updater:")
logging.debug(white_list_with_version)
'''
for w in white_list_with_version:
whitelistwithversion.append('-'.join(w))
logging.debug("whitelist from kylin system updater:%s"%("\n".join(whitelistwithversion)))
'''
# namelist = []
# namelist_with_version = []
# get_white_list_with_version(white_list_with_version,namelist_with_version,namelist)
# get a cache
try:
cache = UnattendedUpgradesCache(rootdir=rootdir,whitelist_with_version=white_list_with_version,blacklist=[])
#cache.whitelist=white_list
except SystemError as error:
# print(_("Apt returned an error, exiting"))
# print(_("error message: %s") % error)
logging.error(_("Apt returned an error, exiting"))
logging.error(_("error message: %s"), error)
return UnattendedUpgradesResult(
False, _("Apt returned an error, exiting"))
'''
2022-11-03 19:10:26 +08:00
if cache._depcache.broken_count > 0:
2023-05-29 14:47:39 +08:00
print(_("Cache has broken packages, exiting"))
2022-11-03 19:10:26 +08:00
logging.error(_("Cache has broken packages, exiting"))
return UnattendedUpgradesResult(
False, _("Cache has broken packages, exiting"))
'''
2022-11-03 19:10:26 +08:00
# FIXME: make this into a ContextManager
# be nice when calculating the upgrade as its pretty CPU intensive
'''
old_priority = os.nice(0)
try:
# Check that we will be able to restore the priority
os.nice(-1)
os.nice(20)
except OSError as e:
if e.errno in (errno.EPERM, errno.EACCES):
pass
else:
raise
2023-05-29 14:47:39 +08:00
'''
2022-11-03 19:10:26 +08:00
#auto_removable = get_auto_removable(cache)
# find out about the packages that are upgradable (in an allowed_origin)
pkgs_to_upgrade = calculate_upgradable_pkgs(cache, options,white_list_with_version)
2023-05-29 14:47:39 +08:00
if options.install_only or options.download_and_install:
if (len(pkgs_to_upgrade)<len(white_list_with_version)):
logging.warning("some pkgs failed in sanity check")
return UnattendedUpgradesResult(False,"sanity check failed")
2022-11-03 19:10:26 +08:00
pkgs_to_upgrade.sort(key=lambda p: p.name)
pkgs = [pkg.name for pkg in pkgs_to_upgrade]
2023-05-29 14:47:39 +08:00
logging.debug("pkgs that look like they should be upgraded or installed: %s"
% "\n".join(pkgs))
2022-11-03 19:10:26 +08:00
# FIXME: make this into a ContextManager
# stop being nice
#os.nice(old_priority - os.nice(0))
#adjust candidate versions
logging.info("adjusting candidate from kylin update manager...")
adjust_candidate_with_version(cache,white_list_with_version)
#sanity check
# download what looks good
mark_pkgs_to_upgrade(cache, pkgs)
if options.debug:
fetcher = apt_pkg.Acquire(apt.progress.text.AcquireProgress())
else:
fetcher = apt_pkg.Acquire()
list = apt_pkg.SourceList()
list.read_main_list()
recs = cache._records
pm = apt_pkg.PackageManager(cache._depcache)
configfilemanager = ConfigFileManager("/var/lib/unattended-upgrades")
# don't start downloading during shutdown
# TODO: download files one by one and check for stop request after each of
# them
if should_stop():
return UnattendedUpgradesResult(False, _("Upgrade was interrupted"))
try:
pm.get_archives(fetcher, list, recs)
except SystemError as e:
logging.error(_("GetArchives() failed: %s"), e)
if get_abnormally_installed_pkg_count() == '0' and not is_dpkg_journal_dirty():
local_pkgs_to_install = []
for item in fetcher.items:
local_pkgs_to_install.append(item.destfile)
with open(OTA_PKGS_TO_INSTALL_LIST,'w+') as f:
f.write(" ".join(local_pkgs_to_install))
fetcher_statistics = AcquireStatistics(fetcher=fetcher)
fetcher_statistics.GetAquireStatisticsOfPkgs()
logging.debug("%d local,%d remote"%(fetcher_statistics.local_pkg_amount,fetcher_statistics.remote_pkg_amount))
if options.install_only:
2022-11-03 19:10:26 +08:00
if options.mode == 'shutdown':
if kylin_system_updater.GetConfigValue('InstallMode','shutdown_install'):
pass
else:
try:
apt_pkg.pkgsystem_unlock()
except SystemError:
logging.error(_("lock release failed"))
logging.info("system updater need to run shutdown install quiting...")
return UnattendedUpgradesResult(False,_("system updater install override"))
if fetcher_statistics.local_pkg_amount == 0:
logging.warning("no local pkgs to install")
try:
apt_pkg.pkgsystem_unlock()
except SystemError:
logging.error(_("lock release failed"))
2023-05-29 14:47:39 +08:00
return UnattendedUpgradesResult(False,_("no local pkgs to install"))
2022-11-03 19:10:26 +08:00
elif fetcher_statistics.remote_pkg_amount >0:
logging.warning("there're pkgs to download")
try:
apt_pkg.pkgsystem_unlock()
except SystemError:
logging.error(_("lock release failed"))
return UnattendedUpgradesResult(False,_("there're pkgs to download"))
else:
#only write the pkg list when dpkg journal is clean
# if not is_dpkg_journal_dirty():
# configfilemanager.WriteListToFile(pkgs,"OTA_PKGS_TO_INSTALL_LIST")
try:
res = fetcher.run()
logging.debug("fetch.run() result: %s", res)
except SystemError as e:
logging.error("fetch.run() result: %s", e)
# if cache.get_changes():
# cache.clear()
pkg_install_success = True
install_result = ''
if len(pkgs_to_upgrade) > 0:
if 'PROJECT_CODENAME' in os_release_info:
if os_release_info['PROJECT_CODENAME']=='V10SP1-edu':
if 'SUB_PROJECT_CODENAME' in os_release_info:
if os_release_info['SUB_PROJECT_CODENAME']=='mavis':
pass
else:
2023-05-29 14:47:39 +08:00
logging.info("need backup")
backup_result = False
backup_result = Backup()
if (backup_result):
pass
else:
logging.debug("backup failed...")
return UnattendedUpgradesResult(False,"backup failed")
2022-11-03 19:10:26 +08:00
# do install
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","install")
#send install start msg to notify
# if os.path.exists(NOTIFICATION_PIPE):
# with open(NOTIFICATION_PIPE,'w') as p:
# p.write('install start')
with open(PROGRESS_LOG,'w+') as f:
f.write('0')
subprocess.Popen('dbus-send --system --type=signal / com.kylin.install.notification.InstallStart',shell=True)
inhibitshutdownlock.lock()
# if LockedPreventShutdown():
# pass
# else:
# logging.error("cannot get shutdown lock,exiting...")
# WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
# sys.exit(1)
logging.debug("InstCount=%i DelCount=%i BrokenCount=%i"
% (cache._depcache.inst_count,
cache._depcache.del_count,
cache._depcache.broken_count))
2023-05-29 14:47:39 +08:00
logging.info("shutdown safe manager")
2022-11-03 19:10:26 +08:00
pkg_install_success = do_install(cache,
pkgs,
options,
logfile_dpkg)
2023-05-29 14:47:39 +08:00
logging.info("reset safe manager")
2022-11-03 19:10:26 +08:00
# unLockedEnableShutdown()
inhibitshutdownlock.unlock()
subprocess.Popen('dbus-send --system --type=signal / com.kylin.install.notification.InstallFinish',shell=True)
if pkg_install_success:
clean_downloaded_packages(fetcher)
kylin_system_updater.CheckRebootRequired("unattended-upgrades")
logging.debug("pkg number:%d,pkg in whitelist number:%d"%(len(pkgs),len(white_list_with_version)))
if len(pkgs) == len(white_list_with_version):
install_result = "total_install"
else:
install_result = "partial_install"
logging.debug("install result:%s"%install_result)
try:
apt_pkg.pkgsystem_unlock()
except SystemError:
logging.error(_("lock release failed"))
return UnattendedUpgradesResult(pkg_install_success,
install_result,
pkgs)
elif options.download_only:
if fetcher_statistics.remote_pkg_amount>0:
pass
else:
logging.info("no pkgs need to download")
#return UnattendedUpgradesResult(True,_("there're no pkgs to download"))
retry_times=10
if retry_times<0:
retry_times = 1
while retry_times >0:
try:
res = fetcher.run()
logging.debug("fetch.run() result: %s", res)
except SystemError as e:
logging.error("fetch.run() result: %s", e)
fetcher_statistics.ResetFetcher(fetcher)
fetcher_statistics.GetAquireStatisticsOfPkgs()
logging.debug("incomplete download pkg number:%d"%fetcher_statistics.incomplete_pkg_amount)
retry_times-=1
if fetcher_statistics.incomplete_pkg_amount >0:
logging.debug("%d incomplete pkgs,%d try times left")
fetcher.shutdown()
try:
pm.get_archives(fetcher, list, recs)
except SystemError as e:
logging.error(_("GetArchives() failed: %s"), e)
else:
break
#fetcher_statistics.ResetFetcher(fetcher)
#fetcher_statistics.GetAquireStatisticsOfPkgs()
2023-05-29 14:47:39 +08:00
insmod = ReadValueFromFile(UNATTENDED_UPGRADE_POLICY_FILE_PATH,"autoUpgradePolicy","installmode")
2022-11-03 19:10:26 +08:00
if fetcher_statistics.incomplete_pkg_amount == 0 and len(pkgs_to_upgrade) > 0:
if os_release_info['PROJECT_CODENAME'] == 'V10SP1-edu' and os_release_info['SUB_PROJECT_CODENAME']=='mavis':
docker_image_fetch_result = 0
#docker image fetch for mavis and laika
if os.path.exists("/usr/bin/service_runtime_ota.sh"):
docker_image_fetch_result = subprocess.run(["/usr/bin/service_runtime_ota.sh"], shell=True)
if docker_image_fetch_result.returncode == 0:
logging.info("all pkgs downloaded")
else:
return UnattendedUpgradesResult(False,_("docker fetch failed"))
logging.info("pkg number:%d"%(len(pkgs)))
login_manager.SetExtraInhibitShutdownDelaySec(1800)
configfilemanager.AddFileName("OTA_PKGS_TO_INSTALL")
subprocess.Popen('dbus-send --system --type=signal / com.kylin.update.notification.DownloadFinish', shell=True)
kylin_system_updater.SetConfigValue('InstallMode','auto_install','True')
elif insmod == 'bshutdown':
logging.info("pkg number:%d"%(len(pkgs)))
login_manager.SetExtraInhibitShutdownDelaySec(1800)
configfilemanager.AddFileName("OTA_PKGS_TO_INSTALL")
kylin_system_updater.SetConfigValue('InstallMode','auto_install','True')
elif insmod == 'timing':
pass
else:
pass
try:
apt_pkg.pkgsystem_unlock()
except SystemError:
logging.error(_("lock release failed"))
return UnattendedUpgradesResult(True,_("all pkgs downloaded"))
2023-05-29 14:47:39 +08:00
elif fetcher_statistics.incomplete_pkg_amount > 0 and len(pkgs_to_upgrade) > 0:
2022-11-03 19:10:26 +08:00
try:
apt_pkg.pkgsystem_unlock()
except SystemError:
logging.error(_("lock release failed"))
return UnattendedUpgradesResult(False,_("some pkgs incompletely fetched"))
2023-05-29 14:47:39 +08:00
else:
try:
apt_pkg.pkgsystem_unlock()
except SystemError:
logging.error(_("lock release failed"))
return UnattendedUpgradesResult(True,_("all pkgs downloaded"))
elif options.download_and_install:
if len(pkgs)==0:
logging.info("no pkgs to install")
return UnattendedUpgradesResult(True,_("there're no pkgs to install"))
if fetcher_statistics.remote_pkg_amount>0:
pass
else:
logging.info("no pkgs need to download")
#return UnattendedUpgradesResult(True,_("there're no pkgs to download"))
retry_times=10
if retry_times<0:
retry_times = 1
while retry_times >0:
try:
res = fetcher.run()
logging.debug("fetch.run() result: %s", res)
except SystemError as e:
logging.error("fetch.run() result: %s", e)
fetcher_statistics.ResetFetcher(fetcher)
fetcher_statistics.GetAquireStatisticsOfPkgs()
logging.debug("incomplete download pkg number:%d"%fetcher_statistics.incomplete_pkg_amount)
retry_times-=1
if fetcher_statistics.incomplete_pkg_amount >0:
logging.debug("%d incomplete pkgs,%d try times left")
fetcher.shutdown()
try:
pm.get_archives(fetcher, list, recs)
except SystemError as e:
logging.error(_("GetArchives() failed: %s"), e)
else:
break
pkg_install_success = True
install_result = ''
backup_result = False
backup_result = Backup()
if (backup_result):
pass
else:
logging.debug("backup failed...")
return UnattendedUpgradesResult(False,"backup failed")
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","install")
inhibitshutdownlock.lock()
logging.debug("InstCount=%i DelCount=%i BrokenCount=%i"
% (cache._depcache.inst_count,
cache._depcache.del_count,
cache._depcache.broken_count))
logging.info("shutdown safe manager")
pkg_install_success = do_install(cache,
pkgs,
options,
logfile_dpkg)
logging.info("reset safe manager")
# unLockedEnableShutdown()
inhibitshutdownlock.unlock()
subprocess.Popen('dbus-send --system --type=signal / com.kylin.install.notification.InstallFinish',shell=True)
if pkg_install_success:
clean_downloaded_packages(fetcher)
kylin_system_updater.CheckRebootRequired("unattended-upgrades")
logging.debug("pkg number:%d,pkg in whitelist number:%d"%(len(pkgs),len(white_list_with_version)))
if len(pkgs) == len(white_list_with_version):
install_result = "total_install"
else:
install_result = "partial_install"
logging.debug("install result:%s"%install_result)
try:
apt_pkg.pkgsystem_unlock()
except SystemError:
logging.error(_("lock release failed"))
return UnattendedUpgradesResult(pkg_install_success,install_result,pkgs)
2022-11-03 19:10:26 +08:00
else:
try:
apt_pkg.pkgsystem_unlock()
except SystemError:
logging.error(_("lock release failed"))
logging.debug(_("option is not install-only or download-only"))
try:
res = fetcher.run()
logging.debug("fetch.run() result: %s", res)
except SystemError as e:
logging.error("fetch.run() result: %s", e)
return UnattendedUpgradesResult(False,_("option is not install-only or download-only"))
'''
pkg_conffile_prompt = False
if dpkg_conffile_prompt():
# now check the downloaded debs for conffile conflicts and build
# a blacklist
conffile_blacklist = [] # type: List[str]
for item in fetcher.items:
logging.debug("%s" % item)
if item.status == item.STAT_ERROR:
print(_("An error occurred: %s") % item.error_text)
logging.error(_("An error occurred: %s"), item.error_text)
if not item.complete:
print(_("The URI %s failed to download, aborting") %
item.desc_uri)
logging.error(_("The URI %s failed to download, aborting"),
item.desc_uri)
return UnattendedUpgradesResult(
False, (_("The URI %s failed to download, aborting") %
item.desc_uri))
if not os.path.exists(item.destfile):
print(_("Download finished, but file %s not there?!?") %
item.destfile)
logging.error("Download finished, but file %s not "
"there?!?", item.destfile)
return UnattendedUpgradesResult(
False, (_("Download finished, but file %s not there?!?") %
item.destfile))
if not item.is_trusted and not apt_pkg.config.find_b(
"APT::Get::AllowUnauthenticated", False):
logging.debug("%s is blacklisted because it is not trusted")
pkg_name = pkgname_from_deb(item.destfile)
if not is_pkgname_in_blacklist(pkg_name, cache.blacklist):
conffile_blacklist.append("%s$" % re.escape(pkg_name))
if not is_deb(item.destfile):
logging.debug("%s is not a .deb file" % item)
continue
if conffile_prompt(item.destfile):
# skip package (means to re-run the whole marking again
# and making sure that the package will not be pulled in by
# some other package again!)
#
# print to stdout to ensure that this message is part of
# the cron mail (only if no summary mail is requested)
email = apt_pkg.config.find("Unattended-Upgrade::Mail", "")
if not email:
print(_("Package %s has conffile prompt and needs "
"to be upgraded manually") %
pkgname_from_deb(item.destfile))
# log to the logfile
logging.warning(_("Package %s has conffile prompt and "
"needs to be upgraded manually"),
pkgname_from_deb(item.destfile))
pkg_name = pkgname_from_deb(item.destfile)
if not is_pkgname_in_blacklist(pkg_name, cache.blacklist):
conffile_blacklist.append("%s$" % re.escape(pkg_name))
pkg_conffile_prompt = True
# redo the selection about the packages to upgrade based on the new
# blacklist
logging.debug("Packages blacklist due to conffile prompts: %s"
% conffile_blacklist)
# find out about the packages that are upgradable (in a allowed_origin)
if len(conffile_blacklist) > 0:
for regex in conffile_blacklist:
cache.blacklist.append(regex)
cache.apply_pinning(cache.pinning_from_regex_list(
conffile_blacklist, NEVER_PIN)) # type: ignore
old_pkgs_to_upgrade = pkgs_to_upgrade[:]
pkgs_to_upgrade = []
for pkg in old_pkgs_to_upgrade:
logging.debug("Checking the black and whitelist: %s" %
(pkg.name))
cache.mark_upgrade_adjusted(
pkg, from_user=not pkg.is_auto_installed)
if check_changes_for_sanity(cache):
pkgs_to_upgrade.append(pkg)
else:
logging.info(_("package %s not upgraded"), pkg.name)
cache.clear()
for pkg2 in pkgs_to_upgrade:
cache.call_adjusted(
apt.package.Package.mark_upgrade, pkg2,
from_user=not pkg2.is_auto_installed)
if cache.get_changes():
cache.clear()
else:
logging.debug("dpkg is configured not to cause conffile prompts")
# auto-removals
kernel_pkgs_remove_success = True # type: bool
kernel_pkgs_removed = [] # type: List[str]
kernel_pkgs_kept_installed = [] # type: List[str]
if (auto_removable and apt_pkg.config.find_b(
"Unattended-Upgrade::Remove-Unused-Kernel-Packages", True)):
# remove unused kernels before installing new ones because the newly
# installed ones may fill up /boot and break the system right before
# removing old ones could take place
#
# this step may also remove _auto-removable_ reverse dependencies
# of kernel packages
auto_removable_kernel_pkgs = {
p for p in auto_removable
if (cache.versioned_kernel_pkgs_regexp
and cache.versioned_kernel_pkgs_regexp.match(p)
and not cache.running_kernel_pkgs_regexp.match(p))}
if auto_removable_kernel_pkgs:
logging.info(_("Removing unused kernel packages: %s"),
" ".join(auto_removable_kernel_pkgs))
(kernel_pkgs_remove_success,
kernel_pkgs_removed,
kernel_pkgs_kept_installed) = do_auto_remove(
cache, auto_removable_kernel_pkgs, logfile_dpkg,
options.minimal_upgrade_steps,
options.verbose or options.debug, options.dry_run)
auto_removable = get_auto_removable(cache)
previous_autoremovals = auto_removable
if apt_pkg.config.find_b(
"Unattended-Upgrade::Remove-Unused-Dependencies", False):
pending_autoremovals = previous_autoremovals
else:
pending_autoremovals = set()
# exit if there is nothing to do and nothing to report
if (len(pending_autoremovals) == 0
and len(pkgs_to_upgrade) == 0):
logging.info(_("No packages found that can be upgraded unattended "
"and no pending auto-removals"))
pkgs_kept_back = cache.find_kept_packages(options.dry_run)
return UnattendedUpgradesResult(
kernel_pkgs_remove_success,
_("No packages found that can be upgraded unattended and no "
"pending auto-removals"),
pkgs_removed=kernel_pkgs_removed,
pkgs_kept_back=pkgs_kept_back,
pkgs_kept_installed=kernel_pkgs_kept_installed,
update_stamp=True)
# check if its configured for install on shutdown, if so, the
# environment UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN will
# be set by the unatteded-upgrades-shutdown script
if ("UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN" not in os.environ
and apt_pkg.config.find_b(
"Unattended-Upgrade::InstallOnShutdown", False)):
logger = logging.getLogger()
logger.debug("Configured to install on shutdown, so exiting now")
return UnattendedUpgradesResult(True)
# check if we are in dry-run mode
if options.dry_run:
logging.info("Option --dry-run given, *not* performing real actions")
apt_pkg.config.set("Debug::pkgDPkgPM", "1")
# do the install based on the new list of pkgs
pkgs = [pkg.name for pkg in pkgs_to_upgrade]
logging.info(_("Packages that will be upgraded: %s"), " ".join(pkgs))
# only perform install step if we actually have packages to install
pkg_install_success = True
if len(pkgs_to_upgrade) > 0:
if 'PROJECT_CODENAME' in os_release_info:
if os_release_info['PROJECT_CODENAME']=='V10SP1-edu':
if 'SUB_PROJECT_CODENAME' in os_release_info:
if os_release_info['SUB_PROJECT_CODENAME']=='mavis':
pass
else:
Backup()
# do install
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","install")
#send install start msg to notify
# if os.path.exists(NOTIFICATION_PIPE):
# with open(NOTIFICATION_PIPE,'w') as p:
# p.write('install start')
with open(PROGRESS_LOG,'w+') as f:
f.write('0')
subprocess.Popen('dbus-send --system --type=signal / com.kylin.install.notification.InstallStart',shell=True)
if LockedPreventShutdown():
pass
else:
logging.error("cannot get shutdown lock,exiting...")
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
sys.exit(1)
logging.debug("InstCount=%i DelCount=%i BrokenCount=%i"
% (cache._depcache.inst_count,
cache._depcache.del_count,
cache._depcache.broken_count))
pkg_install_success = do_install(cache,
pkgs,
options,
logfile_dpkg)
unLockedEnableShutdown()
subprocess.Popen('dbus-send --system --type=signal / com.kylin.install.notification.InstallFinish',shell=True)
if pkg_install_success:
kylin_system_updater.CheckRebootRequired("unattended-upgrades")
# Was the overall run succesful: only if everything installed
# fine and nothing was held back because of a conffile prompt.
# successful_run = (kernel_pkgs_remove_success and pkg_install_success
# and not pkg_conffile_prompt)
#send install finish msg to notify
# if successful_run and os.path.exists(NOTIFICATION_PIPE):
# with open(NOTIFICATION_PIPE,'w') as p:
# p.write('install finish')
# now check if any auto-removing needs to be done
if cache._depcache.broken_count > 0:
print(_("Cache has broken packages, exiting"))
logging.error(_("Cache has broken packages, exiting"))
return UnattendedUpgradesResult(
False, _("Cache has broken packages, exiting"), pkgs=pkgs)
# make sure we start autoremovals with a clear cache
# if cache.get_changes():
# cache.clear()
# the user wants *all* auto-removals to be removed
# (unless u-u got signalled to stop gracefully quickly)
pkgs_removed = [] # type: List[str]
pkgs_kept_installed = [] # type: List[str]
if ((apt_pkg.config.find_b(
"Unattended-Upgrade::Remove-Unused-Dependencies", False)
and not SIGNAL_STOP_REQUEST)):
auto_removals = get_auto_removable(cache)
(pkg_remove_success,
pkgs_removed,
pkgs_kept_installed) = do_auto_remove(
cache, auto_removals, logfile_dpkg, options.minimal_upgrade_steps,
options.verbose or options.debug,
options.dry_run)
successful_run = successful_run and pkg_remove_success
# the user wants *only new* auto-removals to be removed
elif apt_pkg.config.find_b(
"Unattended-Upgrade::Remove-New-Unused-Dependencies", False):
# calculate the new auto-removals
new_pending_autoremovals = get_auto_removable(cache)
auto_removals = new_pending_autoremovals - previous_autoremovals
(pkg_remove_success,
pkgs_removed,
pkgs_kept_installed) = do_auto_remove(
cache, auto_removals, logfile_dpkg, options.minimal_upgrade_steps,
options.verbose or options.debug,
options.dry_run)
successful_run = successful_run and pkg_remove_success
logging.debug("InstCount=%i DelCount=%i BrokenCount=%i"
% (cache._depcache.inst_count,
cache._depcache.del_count,
cache._depcache.broken_count))
clean after success install (if needed)
keep_key = "Unattended-Upgrade::Keep-Debs-After-Install"
if (not apt_pkg.config.find_b(keep_key, False)
and not options.dry_run
and pkg_install_success):
clean_downloaded_packages(fetcher)
pkgs_kept_back = cache.find_kept_packages(options.dry_run)
return UnattendedUpgradesResult(
successful_run, _("All upgrades installed"), pkgs,
pkgs_kept_back,
kernel_pkgs_removed + pkgs_removed,
kernel_pkgs_kept_installed + pkgs_kept_installed,
update_stamp=True)
install_result = ''
logging.debug("pkg number:%d,pkg in whitelist number:%d"%(len(pkgs),len(namelist_with_version)))
if len(pkgs) == len(namelist_with_version):
install_result = "total_install"
else:
install_result = "partial_install"
logging.debug("install result:%s"%install_result)
return UnattendedUpgradesResult(pkg_install_success,
install_result,
pkgs)
'''
class Options:
def __init__(self):
self.download_only = False
self.install_only = False
2023-05-29 14:47:39 +08:00
self.download_and_install = False
2022-11-03 19:10:26 +08:00
self.dry_run = False
self.debug = False
self.apt_debug = False
self.verbose = False
self.minimal_upgrade_steps = False
self.mode = None
2023-05-29 14:47:39 +08:00
2022-11-03 19:10:26 +08:00
shutdown_lock = -1
if __name__ == "__main__":
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
# lock for the shutdown check
shutdown_lock = apt_pkg.get_lock(LOCK_FILE)
if shutdown_lock < 0:
logging.error("Lock file is already taken, exiting")
#WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","idle")
sys.exit(1)
localesApp = "unattended-upgrades"
localesDir = "/usr/share/locale"
gettext.bindtextdomain(localesApp, localesDir)
gettext.textdomain(localesApp)
# set debconf to NON_INTERACTIVE
os.environ["DEBIAN_FRONTEND"] = "noninteractive"
apt_pkg.init_config()
#remove sources in sources.list.d from indexes search
apt_pkg.config.set("Dir::Etc::sourceparts", "")
# this ensures the commandline is logged in /var/log/apt/history.log
apt_pkg.config.set("Commandline::AsString", " ".join(sys.argv))
# COMPAT with the mispelling
# minimal_steps_default = (
# apt_pkg.config.find_b("Unattended-Upgrades::MinimalSteps", False)
# and apt_pkg.config.find_b("Unattended-Upgrade::MinimalSteps", False))
minimal_steps_default = False
# init the options
parser = OptionParser()
parser.add_option("-d", "--debug",
action="store_true",
default=True,
# default=apt_pkg.config.find_b(
# "Unattended-Upgrade::Debug", False),
help=_("print debug messages"))
parser.add_option("", "--apt-debug",
action="store_true", default=False,
help=_("make apt/libapt print verbose debug messages"))
parser.add_option("-v", "--verbose",
action="store_true",
default=apt_pkg.config.find_b(
"Unattended-Upgrade::Verbose", False),
help=_("print info messages"))
parser.add_option("", "--dry-run",
action="store_true", default=False,
help=_("Simulation, download but do not install"))
parser.add_option("", "--download-only",
action="store_true", default=False,
help=_("Only download, do not even try to install."))
parser.add_option("", "--install-only",
action="store_true", default=False,
2023-05-29 14:47:39 +08:00
help=_("Only install, do not even try to download."))
parser.add_option("", "--download-and-install",
action="store_true", default=False,
help=_("Download and Install."))
2022-11-03 19:10:26 +08:00
parser.add_option("", "--minimal-upgrade-steps",
action="store_true", default=minimal_steps_default,
help=_("Upgrade in minimal steps (and allow "
"interrupting with SIGTERM) (default)"))
parser.add_option("", "--no-minimal-upgrade-steps",
action="store_false", default=minimal_steps_default,
dest="minimal_upgrade_steps",
help=_("Upgrade all packages together instead of in "
"smaller sets"))
parser.add_option("", "--minimal_upgrade_steps",
action="store_true",
help=SUPPRESS_HELP,
default=minimal_steps_default)
parser.add_option(""''"","--mode",action = "store",type = "string" ,
dest = "mode",help="start mode.")
options = cast(Options, (parser.parse_args())[0])
if os.getuid() != 0:
print(_("You need to be root to run this application"))
sys.exit(1)
# ensure that we are not killed when the terminal goes away e.g. on
# shutdown
signal.signal(signal.SIGHUP, signal.SIG_IGN)
# setup signal handler for graceful stopping
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT,signal_handler_int)
signal.signal(signal.SIGUSR1,signal_handler_usr1)
# write pid to let other processes find this one
# pidf = os.path.join(apt_pkg.config.find_dir("Dir"),
# "var", "run", "unattended-upgrades.pid")
# clean up pid file on exit
with open(PID_FILE, "w") as fp:
fp.write("%s" % os.getpid())
atexit.register(os.remove, PID_FILE)
#setup log dir
# logdir = os.path.join("var", "log", "kylin-unattended-upgrades")
logdir = "/var/log/kylin-unattended-upgrades"
if not os.path.exists(logdir):
os.makedirs(logdir)
#setup log
logfile = os.path.join(logdir, 'unattended-upgrades.log')
if not os.path.exists(logfile):
with open(logfile, 'w'):
pass
logfile_fd = open(logfile,'a+')
#setup dpkg log
logfile_dpkg = os.path.join(logdir, 'unattended-upgrades-dpkg.log')
if not os.path.exists(logfile_dpkg):
with open(logfile_dpkg, 'w'):
pass
# setup logging
2023-05-29 14:47:39 +08:00
# _setup_logging(options,logfile)
logging.basicConfig(format='%(asctime)s-%(name)s-%(levelname)s-%(message)s',level=logging.DEBUG,filename=logfile)
# logging.basicConfig()
# file_handler = logging.FileHandler(filename=logfile)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# file_handler.setFormatter(formatter)
logger=logging.getLogger()
logger.setLevel(logging.DEBUG)
# logger.addHandler(file_handler)
logger.addHandler(stdout_handler)
2022-11-03 19:10:26 +08:00
#get os release info
os_release_info = ReadOsRelease('/etc/os-release')
#print(os_release_info)
config_manager = ConfigFileManager(CONFIG_FILE_ROOT_PATH)
login_manager = LoginManager()
kylin_system_updater = KylinSystemUpdater()
2023-05-29 14:47:39 +08:00
2022-11-03 19:10:26 +08:00
'''
if os_release_info['PROJECT_CODENAME'] == 'V10SP1-edu' and os_release_info['SUB_PROJECT_CODENAME']=='mavis':
pass
else:
allow_autoupdate = kylin_system_updater.GetDatabaseInfo("display","autoupdate_allow")
if allow_autoupdate == "true":
pass
else:
logging.info("auto upgrade not allow, exit")
sys.exit(0)
'''
#check control center lock
'''
if os.path.exists(CONTROL_PANEL_LOCK_FILE):
file_lock = FILE_LOCK(CONTROL_PANEL_LOCK_FILE)
if file_lock.get_lock():
logging.debug("control center not running")
file_lock.unlock()
else:
logging.warning("control center running ,exiting...")
sys.exit(1)
'''
# package_deps = {}
kylin_system_updater.ConnectToSignals()
kylin_system_updater.UpdateDetect()
kylin_system_updater.RunMainloop()
inhibitshutdownlock = InhibitShutdownLock()
if options.download_only:
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","download")
elif options.install_only:
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","preinstall")
2023-05-29 14:47:39 +08:00
elif options.download_and_install:
WriteValueToFile(UNATTENDED_UPGRADE_CONFIG_FILE_PATH,"UNATTENDED_UPGRADE","autoupdate_run_status","preinstall")
2022-11-03 19:10:26 +08:00
# run the main code
install_start_time = datetime.datetime.now().replace(microsecond=0)
logging.info("unattended-upgrades start time:%s"%install_start_time)
# get log
'''
dpkg_journal_dirty = is_dpkg_journal_dirty()
abnormal_pkg_count = get_abnormally_installed_pkg_count()
logging.info("abnormal pkg count:%s,dpkg dirty:%s"%(abnormal_pkg_count,dpkg_journal_dirty))
if os_release_info['PROJECT_CODENAME'] == 'V10SP1-edu' and os_release_info['SUB_PROJECT_CODENAME']=='mavis':
if dpkg_journal_dirty or abnormal_pkg_count != '0':
ret = subprocess.run("dpkg --configure -a",shell=True,stdout=open(logfile,'a+'),stderr=open(logfile,'a+'))
logging.info("dpkg fix return :%s"%ret.returncode)
'''
sys.exit(main(options))