替换加密库cryptography

This commit is contained in:
luoxueyi 2024-04-11 11:02:59 +08:00
parent 78ade6a5e9
commit dfe76e92c8
2 changed files with 34 additions and 26 deletions

View File

@ -16,19 +16,20 @@ import tarfile
import requests
import datetime
import threading
import subprocess
from email import message
from datetime import datetime
from binascii import a2b_hex
from Crypto.PublicKey import RSA
from urllib import parse, request
from SystemUpdater.Core import enums
from Crypto.Cipher import PKCS1_OAEP
from json.decoder import JSONDecodeError
from dbus.exceptions import DBusException
from SystemUpdater.Core.UpdaterConfigParser import UpgradeConfig
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
LOCALTIDDIR = "/var/lib/kylin-system-updater/"
LOCALTIDFILE = "tidfile.conf"
MSGSNDDIR = "/var/lib/kylin-system-updater/sendinfos/"
@ -286,22 +287,29 @@ class FormatConvert():
def EncodeRSAtoBase64(self, value):
# 将value进行RSA加密并base64转码
enMsg = ""
try:
# 计算hex值
value_hex = a2b_hex(value)
# 加载公钥,填充格式OAEP
uniqueKey = self.publickey.keyvalue.encode('utf-8')
uniqueKeyorig = base64.b64decode(uniqueKey) # 公钥文件
rsa_pubkey = RSA.importKey(uniqueKeyorig) # RSA公钥
oaep_pub = PKCS1_OAEP.new(rsa_pubkey) # OAEP填充
# 加载公钥
public_key = serialization.load_pem_public_key(
base64.b64decode(self.publickey.keyvalue.encode('utf-8')),
backend=default_backend()
)
# 加密数据
encodemsg = oaep_pub.encrypt(value_hex)
encodemsg = public_key.encrypt(
a2b_hex(value),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
)
)
# 加密数据Base64转码
enMsg = base64.b64encode(encodemsg)
except ValueError:
logging.error("Value error: %s.", value)
except TypeError:
logging.error("RSA key has no private half.")
enMsg = base64.b64encode(encodemsg)
except Exception as e:
logging.error("EncodeRSAtoBase64 error: %s.", e)
return enMsg
@ -340,33 +348,33 @@ class MessageSend():
if retval != 0:
if retval == self.ERR_PARA_FROMAT:
result = "Parameter format error"
logging.debug("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
logging.error("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
elif retval == self.ERR_NO_LOACLTID:
result = "The tid value in packageInfo is abnormal, but the message is saved successfully"
logging.debug("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
logging.info("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
# 将返回的tid保存到本地
key = PackageInfo['packageName']+'_'+PackageInfo['messageType']
self.SaveTid(key, retid)
elif retval == self.ERR_ABNORMAL_SHA:
result = "Abnormal UploadedMessage Sha256"
logging.debug("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
logging.error("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
elif retval == self.ERR_UPLOADMSG_SHA:
result = "Description The UploadedMessageSha256 was decrypted incorrectly"
logging.debug("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
logging.error("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
elif retval == self.ERR_UPLOADMSG_CTS:
result = "The createTimeStamp field of UploadedMessage is abnormal"
logging.debug("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
logging.error("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
elif retval == self.ERR_UPLOADMSG_CTS:
result = "Invalid key included in \"uploadedMessage\" or \"packageInfo\": <@timestamp>,<_id>,<_index>,<_type>,<createTime>,<highlight>,<sn>,<sort>, check upload field"
logging.debug("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
logging.error("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
else:
logging.debug("Sent Status: false - packageName: %s : retval: %s.", PackageInfo['packageName'], retval)
logging.error("Sent Status: false - packageName: %s : retval: %s.", PackageInfo['packageName'], retval)
# 上传失败写入本地json
if retval != self.ERR_NO_LOACLTID or retval == self.ERR_NO_LOACLTID:
self.WriteToJson(PackageInfo['messageType'], json_PackageInfo, json_UploadMessage, encodeMsg)
elif retval == 0:
result = "Send to server success"
logging.debug("Sent Status: True - packageName: %s : result: %s.", PackageInfo['packageName'], result)
logging.info("Sent Status: True - packageName: %s : result: %s.", PackageInfo['packageName'], result)
def GetLocalTid(self, key):
# 试图获取本地tid

2
debian/control vendored
View File

@ -40,7 +40,7 @@ Depends: ${python3:Depends},
aptdaemon (>=1.1.1+bzr982-0kylin32.3k5.2),
python3-distro-info,
python3-apscheduler,
python3-crypto,
python3-cryptography,
sqlite3,
kylin-update-frontend
Recommends: python3-launchpadlib