新增大数据采集接口

This commit is contained in:
luoxueyi 2021-12-10 15:16:42 +08:00
parent 424e798534
commit 7ccbdfb1ae
5 changed files with 250 additions and 9 deletions

View File

@ -2,14 +2,223 @@
# supervisory control and data acquisition
#!/usr/bin/python3
class DataCollector():
def __init__(self):
pass
# def
import json
import dbus
import base64
import hashlib
import logging
import datetime
import configparser
from binascii import a2b_hex
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from json.decoder import JSONDecodeError
from dbus.exceptions import DBusException
LOCALTIDPATH = "/var/lib/kylin-system-updater/system-updater.conf"
class UpdateMsgCollector():
def __init__(self, manager):
self.UploadMessage = {}
self.PackageInfo = {}
self.updateManager = manager
#秘钥
self.publickey = UniqueKey()
# 转换 & 加密
self.convertor = FormatConvert()
# 发送器
self.sender = MessageSend()
def GenUploadMessage(self, dict_message):
UploadMessage = {}
# 获取将要上传的数据,获取东八区时间
UploadMessage['createTimeStamp'] = get_east_8_time()
try:
for key in dict_message.keys():
UploadMessage[key] = dict_message[key]
except Exception as e:
logging.error(str(e))
json_UploadMessage = self.convertor.convertJson(UploadMessage)
logging.info('Generate UploadMessage: %s.',json_UploadMessage)
self.UploadMessage = UploadMessage.copy()
UploadMessage.clear()
def GenPackageInfo(self, messageType, packageName):
PackageInfo = {}
PackageInfo['messageType'] = str(messageType)
PackageInfo['packageName'] = str(packageName)
# 获取本地tid
localtid = self.updateManager.configs.getWithDefault("TID", "localtid", "")
PackageInfo["tid"] = str(localtid)
# logging.info("Get local tid:(%s).",localtid)
json_PackageInfo = self.convertor.convertJson(PackageInfo)
logging.info('Generate PackageInfo: %s.',json_PackageInfo)
self.PackageInfo = PackageInfo.copy()
PackageInfo.clear()
def UpdateMsg(self, messageType, dict_message):
# para: messageType(数据类型): "update-infos"、 "install-infos"、 "remove-info"
# para: dict_message(数据内容): 必须包含 "packageName", 采集器会进行检测
if messageType == "":
messageType = "System-update"
if type(dict_message) != dict and "packageName" not in dict_message.keys():
raise AttributeError("'%s' object has no attribute '%s'" % ("dict_message", "packageName"))
return False
# 生成UploadMessage与PackageInfo
try:
self.GenPackageInfo(messageType, dict_message["packageName"])
self.GenUploadMessage(dict_message)
except Exception as e:
logging.error(str(e))
# sha256
json_UploadMessage = self.convertor.dictConvertJson(self.UploadMessage)
json_PackageInfo = self.convertor.dictConvertJson(self.PackageInfo)
shaValue = self.convertor.Sha256Value(json_UploadMessage)
encodeMsg = self.convertor.EncodeRSAtoBase64(shaValue)
# dbus发送
self.sender.MsgSendToServer(json_UploadMessage, json_PackageInfo, encodeMsg)
class FormatConvert():
pass
def dictConvertJson(self, dict_msg):
#字典转换为json格式字符串
try:
json_file = json.dumps(dict_msg)
except JSONDecodeError as e:
logging.error(str(e))
return json_file
def JsonConvertDict(self, json_file):
# json格式字符串转换为字典
try:
dict_file = json.loads(json_file)
except JSONDecodeError as e:
logging.error(str(e))
return dict_file
def Sha256Value(self, json_file):
# 计算sha256值
hsobj = hashlib.sha256()
try:
hsobj.update(json_file.encode("utf-8"))
except ValueError as e:
logging.error("SHA256 value error: %s.",str(e))
except Exception as e:
logging.error(str(e))
return hsobj.hexdigest()
def EncodeRSAtoBase64(self, value):
# 将value进行RSA加密并base64转码
try:
# 计算hex值
value_hex = a2b_hex(value)
# 加载公钥,填充格式OAEP
uniqueKey = self.publickey.keyvalue.encode('utf-8')
uniqueKeyorig = base64.b64decode(uniqueKey) # 公钥文件
rsa_pubkey = RSA.importKey(uniqueKeyorig) # RSA公钥
oaep_pub = PKCS1_OAEP.new(rsa_pubkey) # OAEP填充
# 加密数据
encodemsg = oaep_pub.encrypt(value_hex)
# 加密数据Base64转码
enMsg = base64.b64encode(encodemsg)
except ValueError:
logging.error("Value error: %s.", value)
except TypeError:
logging.error("RSA key has no private half.")
return enMsg
class MessageSend():
pass
ERR_PARA_FROMAT = 1
ERR_NO_LOACLTID = 2
ERR_ABNORMAL_SHA = 3
ERR_UPLOADMSG_SHA = 4
ERR_UPLOADMSG_CTS = 5
def MsgSendToServer(self, UploadMessage, PackageInfo, encodeMsg):
daqbus = dbus.SystemBus()
try:
daqobj = daqbus.get_object('com.kylin.daq', '/com/kylin/daq')
daqinterface = dbus.Interface(daqobj, dbus_interface='com.kylin.daq.interface')
except DBusException as e:
logging.error(str(e))
# 调用发送接口
try:
retval,retid = daqinterface.UploadMessage(PackageInfo, UploadMessage, encodeMsg)
except AttributeError:
logging.error("Call UploadMessage: Attribute Error.")
self.Send_finally(retval, retid, PackageInfo, UploadMessage)
def Send_finally(self, retval, retid, json_PackageInfo, json_UploadMessage, encodeMsg):
# 根据发送结果进行处理
result = ''
PackageInfo = FormatConvert.JsonConvertDict(json_PackageInfo)
if retval != 0:
if retval == self.ERR_PARA_FROMAT:
result = "Parameter format error"
logging.info("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
elif retval == self.ERR_NO_LOACLTID:
result = "The tid value in packageInfo is abnormal, but the message is saved successfully"
logging.info("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
# 将返回的tid保存到本地
elif retval == self.ERR_ABNORMAL_SHA:
result = "Abnormal UploadedMessage Sha256"
logging.info("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
elif retval == self.ERR_UPLOADMSG_SHA:
result = "Description The UploadedMessageSha256 was decrypted incorrectly"
logging.info("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
elif retval == self.ERR_UPLOADMSG_CTS:
result = "The createTimeStamp field of UploadedMessage is abnormal"
logging.info("Sent Status: false - packageName: %s : result: %s.", PackageInfo['packageName'], result)
# 上传失败写入本地json
if retval != self.ERR_NO_LOACLTID:
# cv.WriteToJson(udi.isi)
pass
def WriteToJson(self, ):
#发送失败时写入本地json中定时发送
pass
class UniqueKey():
keyvalue = "LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FR\
OEFNSUlCQ2dLQ0FRRUFzdW1NTFJEdlFNb0tEQkRJODRqSgpqc1A0Mk55V0pWVEZob2Jra3ZiT05j\
dExYTXVzRmo2TzJUblZYU3Z6VlVLSjRqZkpwT2l2WEphOVB5Z2wzYTRnClBzSU40enNCMEdOY0tr\
R3VsS2RrV2x6S3lWQ2xlTzhiQnN6SjkwbTc3cWF6YWg3a1A0TUl0WTVFczBpSkpiR0oKN1MxcERj\
MlJkNnVFQWJLaXJyRTFlNzlFTEd4am5VN2V5NWkyRDE2WWJoZEQwZ2lNa2RHR3piQXBKTWZWRVJR\
TQo1NXorMFVqdS8zSFJhNFY3b3p2TGRPRE5HUURaeWNJU0l3VHBLbFR3RjBxazdCNjVhTUlJenQ1\
dnhOK1lxYU1GClppZFRLNzcxNjdqNEExZ3F3MG45bjlybWVXUGRWZ3dudnRtVXp4Q1krNk05SXpK\
TDI3eWpRUTV1WGQ3RVdMT3IKbndJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg=="
def get_east_8_time():
import time
# UTC时间
utc_time = datetime.utcnow()
# 转时间字符串
utc_time = utc_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
time_suffix = utc_time.split(".")[1]
# 字符串转时间元祖
utc_time = time.strptime(utc_time, "%Y-%m-%d %H:%M:%S.%f")
# 时间元祖转时间戳
utc_time = time.mktime(utc_time)
# 生成东八区时间时间戳
now_time = utc_time + 8*60*60
# 时间戳转时间元祖
now_time = time.localtime(now_time)
# 时间元祖转字符串
now_time = time.strftime("%Y-%m-%d %H:%M:%S",now_time)
now_time = now_time + "." +time_suffix
return now_time

View File

@ -18,6 +18,7 @@ from .Core.UpdateList import UpdateList
from .backend import InstallBackend,get_backend
from .Core.Database import Sqlite3Server
from .Core.loop import mainloop
from .Core.DataAcquisition import UpdateMsgCollector
import time
from gettext import gettext as _
@ -53,6 +54,10 @@ class UpdateManager():
#配置文件
self.configs = UpgradeConfig("/var/lib/kylin-system-updater/")
#数据采集器
self.collector = UpdateMsgCollector(self)
#检查是否需要重新启动aptdeamon 目前需要重启的有限速功能
def restart_aptdeamon(self):
if self.is_restart_aptdeamon == True:

View File

@ -11,7 +11,7 @@ from .Core.utils import humanize_size
from SystemUpdater.Core.utils import (
unLockedEnableShutdown,
)
from .Core.DataAcquisition import MessageSend
from .Core.DataAcquisition import UpdateMsgCollector
UPDATER_DBUS_INTERFACE = 'com.kylin.systemupgrade.interface'
UPDATER_DBUS_PATH = '/com/kylin/systemupgrade'
@ -407,6 +407,17 @@ class UpdateManagerDbusController(dbus.service.Object):
return True, str(value[1][1:-2])
except:
return False, 0
# # dbus接口: 后端大数据采集
@dbus.service.method(UPDATER_DBUS_INTERFACE, in_signature='ss', out_signature='b')
def DataBackendCollect(self, messageType, uploadMessage):
logging.info(COLORMETHOR_PREFIX+'method'+COLORLOG_SUFFIX+' DataBackendCollect, messageType is %s ...',messageType)
try:
self.parent.collector.UpdateMsg(messageType, uploadMessage)
except AttributeError:
pass
except Exception as e:
logging.error(str(e))
#更新进度信息 0~100 进度信息 101为非预期的信号
@dbus.service.signal(UPDATER_DBUS_INTERFACE,signature='is')

View File

@ -8,3 +8,5 @@ isclosefilter = False
[ConfigPkgStatus]
mustexistconfigpkg = True
[TID]
localtid =

View File

@ -39,7 +39,7 @@
| UnattendedUpgradeValue | ss | bs | 获取是否允许关机前更新 | |
| PurgePackages | as | b | 卸载软件包 | |
| InstalldebFile | ssbb | b | 安装本地deb包 | |
| | | | | |
| DataBackendCollect | ss | b | | |
| | | | | |
### Method分析
@ -160,6 +160,20 @@
#### DataBackendCollect
- `介绍:` 后端数据采集
- `入参`: `messageType: ` 消息埋点(string) `uploadMessage: ` 上传数据(json格式字符串),必须包含的字段: "packageName"
- `示例:`
```sh
messageType = "update-infos", uploadMessage = "{\"packageName\":\"kylin-system-updater\",\"author\":\"Xueyi Luo\" ...}"
```