完成system bus的验证检测功能,编写yaml文件

This commit is contained in:
chenxinquan 2024-06-18 11:31:35 +08:00
parent 8a955b3958
commit 6b47f1953a
3 changed files with 832 additions and 0 deletions

Binary file not shown.

View File

@ -0,0 +1,812 @@
# [OK] 测试完成
# [EXCEP] 测试异常
import subprocess
import os
import sys
import dbus
import yaml
import json
import xml.etree.ElementTree as ET
import inspect
import array
import concurrent.futures
import threading
import time
import shutil
import platform
import logging
import datetime
import csv
arg_lang = ""
DEBUG_LOG= True
################################
# 常量
logger = None
def setup_logger(log_file):
# 确保日志文件所在的目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# 创建一个logger
global logger
logger = logging.getLogger('dbus')
logger.setLevel(logging.INFO)
# 创建一个handler用于写入日志文件
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
# 定义handler的输出格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
# 给logger添加handler
logger.addHandler(file_handler)
def debug_debug(*args):
if DEBUG_LOG:
# stack = inspect.stack()
# frame = stack[1].frame
# function_name = frame.f_code.co_name
# line = frame.f_lineno
# 使用全局的logger
global logger
# 将信息写入到日志中
message = ' '.join(map(str, args)) # 将args转换为字符串并用空格连接
logger.debug(message)
def debug_info(*args):
if DEBUG_LOG:
# 使用全局的logger
global logger
# 将信息写入到日志中
message = ' '.join(map(str, args)) # 将args转换为字符串并用空格连接
logger.info(message)
def debug_warn(*args):
if DEBUG_LOG:
# 使用全局的logger
global logger
# 将信息写入到日志中
message = ' '.join(map(str, args)) # 将args转换为字符串并用空格连接
logger.warn(message)
def debug_error(*args):
if DEBUG_LOG:
# 使用全局的logger
global logger
# 将信息写入到日志中
message = ' '.join(map(str, args)) # 将args转换为字符串并用空格连接
logger.error(message)
################################
# 功能函数
# 系统总线命名规范检查
method_skip_list=[
'reboot',
'Reboot',
'Suspend',
'PowerOff',
'Hibernate',
]
service_skip_list=[
'com.ksc.virus',
'com.kylin.backup',
'com.kylin.backupserver',
]
method_special_list=[
'redirection_logFileProcess',
'unlink_logFileProcess',
]
method_error_list=[
"More items found in D-Bus signature than in Python arguments",
"More items found in struct's D-Bus signature than in Python arguments",
"module 'dbus' has no attribute 'Variant'",
"Corrupt type signature",
"Fewer items found in struct's D-Bus signature than in Python arguments",
"an integer is required (got type str)",
"No such method",
"unknown input type",
]
kylin_strings = [".ukui.",".ksc.","kysec","kysdk","kylin"]
except_interface = [ 'org.freedesktop.DBus.Peer','org.freedesktop.DBus.Properties', 'org.freedesktop.DBus.Introspectable']
def check_service_control(bus,bus_name):
try:
service_object = bus.get_object(bus_name,"/")
introspection_data = dbus.Interface(service_object,"org.freedesktop.DBus.Introspectable")
xml_data = introspection_data.Introspect()
result ='failed'
errorinfo = ''
return result,errorinfo
except Exception as e:
debug_error(f"[EXCEP][{bus_name}] check_service_control 功能异常:{e}")
errorStr = str(e).replace(" ", "")
if ("dbuslimitcontrol,notsupported" in errorStr) or ("dbuslimitcontrol,envforbidden" in errorStr) or ("dbuslimitcontrol,operationnotpermitted" in errorStr):
result='passed'
errorinfo=''
else:
result='error'
errorinfo=str(e)
return result,errorinfo
def find_dbus_interface_recursive(bus,bus_name,object_path):
service_object = bus.get_object(bus_name,object_path)
introspection_data = dbus.Interface(service_object,"org.freedesktop.DBus.Introspectable")
xml_data = introspection_data.Introspect()
root = ET.fromstring(xml_data)
#debug_print(xml_data)
interface_list=[node.attrib["name"] for node in root.findall(".//interface")]
for interface in interface_list:
if interface not in except_interface:
yield interface
sub_paths=[node.attrib["name"] for node in root.findall(".//node")]
for path in sub_paths:
if object_path.endswith("/"):
new_path = object_path+path
else:
new_path = object_path+'/'+path
yield from find_dbus_interface_recursive(bus,bus_name,new_path)
def check_interface(bus,bus_name):
try:
interface_list=list(find_dbus_interface_recursive(bus,bus_name,"/"))
#debug_print(interface_list)
result='failed'
for item in interface_list:
interface=item
if interface.startswith("com.kylin."):
result='passed'
break
errorinfo=""
return interface,result,errorinfo
except Exception as e:
debug_error(f"[EXCEP][{bus_name}] check_interface 功能异常:{e}")
result='error'
interface=""
errorinfo=str(e)
return interface,result,errorinfo
def find_dbus_path_recursive(bus,bus_name,object_path):
service_object = bus.get_object(bus_name,object_path)
yield object_path
introspection_data = dbus.Interface(service_object,"org.freedesktop.DBus.Introspectable")
xml_data = introspection_data.Introspect()
root = ET.fromstring(xml_data)
sub_paths=[node.attrib["name"] for node in root.findall(".//node")]
for path in sub_paths:
if object_path.endswith("/"):
new_path = object_path+path
else:
new_path = object_path+'/'+path
yield from find_dbus_path_recursive(bus,bus_name,new_path)
def check_path(bus,bus_name):
try:
path_list=list(find_dbus_path_recursive(bus,bus_name,"/"))
#debug_print(path_list)
result='failed'
for item in path_list:
path=item
if path == "/com/kylin":
result='passed'
break
errorinfo=""
return path,result,errorinfo
except Exception as e:
debug_error(f"[EXCEP][{bus_name}] check_path 功能异常:{e}")
result='error'
path=""
errorinfo=str(e)
return path,result,errorinfo
def check_conf(bus_name):
try:
result="passed"
miss_conf=""
limit_file="/etc/dbus-1/conf/"+bus_name+".limit"
verify_file=limit_file+".verify"
bakup_file="/usr/share/dbus-1/conf/"+bus_name+".limit"
bakup_verify_file=bakup_file+".verify"
if not os.path.isfile(limit_file):
miss_conf+=limit_file+";"
result="failed"
if not os.path.isfile(verify_file):
miss_conf+=verify_file+";"
result="failed"
if not os.path.isfile(bakup_file):
miss_conf+=bakup_file+";"
result="failed"
if not os.path.isfile(bakup_verify_file):
miss_conf+=bakup_verify_file+";"
result="failed"
if result == "failed":
miss_conf="miss files: "+miss_conf
return result,miss_conf
except Exception as e:
debug_error(f"[EXCEP][{bus_name}] check_conf 功能异常:{e}")
result='error'
return result,miss_conf
def check_name(bus_name):
if bus_name.startswith("com.kylin."):
return bus_name,'passed'
else:
return bus_name,'failed'
def generate_input(char):
if char == 's':
return "genmai_dbus_test"
elif char == 'g':
return 'a'
elif char == 'i' or char == 'u' or char == 'x' or char == 't' or char == 'y'or char == 'n'or char == 'q':
return 0
elif char == 'd' or char == 'f':
return 0.0
elif char == 'b':
return False
elif char == 'v':
return "genmai_dbus_test"
elif char == 'o':
return "/"
else:
raise Exception(f'unknown input type:{char}')
def parse_input(input):
debug_debug(f"parse_input:{input}")
result = []
stack = [] # 用于跟踪未闭合的括号
i = 0
while i < len(input):
if input[i] == 'a':
i += 1
if "a{sv}" in input:
params = {'keytest1': 'valuetest1', 'keytest2': 'valuetest2'}
result.append(params)
i += 3
elif input[i] not in ['{', '(']:
# a后面没有左括号单个类型的数组直接生成input
if input[i] in ['i','u','x','t','y','n','q']:
byte_sequence = [1, 2, 3, 4]
byte_array = array.array('B', byte_sequence).tobytes()
result.append(byte_array)
else:
result.append(generate_input(input[i]))
else:
start_index=i+1
stack=[]
while i < len(input):
debug_debug(f"{i}:{input[i]}")
if input[i] in ['{', '(']:
# 遇到左括号,压栈
stack.append(input[i])
elif input[i] in ['}', ')']:
# 遇到右括号,出栈
top_bracket = stack.pop()
if stack:
# 栈不为空,说明这不是匹配第一个左括号的右括号,跳过
i += 1
continue
# 栈为空,说明已经匹配第一个左括号,递归解析
if (top_bracket == '{' and input[i] == '}') or (top_bracket == '(' and input[i] == ')'):
# 括号类型匹配成功
inner_string = input[start_index:i]
debug_debug(f"inner_string:{inner_string}")
inner_result=parse_input(inner_string)
inner_array=[]
inner_array.append(tuple(inner_result))
result.append(inner_array)
# 递归解析完毕
else:
raise ValueError(f"Unmatched brackets in the input string,i={i},top_bracket={top_bracket},input[i]={input[i]}")
break
i += 1
if stack:
raise ValueError("Unmatched brackets in the input string")
elif input[i] == '(':
start_index=i+1
stack=[]
while i < len(input):
if input[i] in ['(']:
# 遇到左括号,压栈
stack.append(input[i])
elif input[i] in [')']:
# 遇到右括号,出栈
top_bracket = stack.pop()
if stack:
# 栈不为空,说明这不是匹配第一个左括号的右括号,跳过
i += 1
continue
# 栈为空,说明已经匹配第一个左括号,递归解析
if (top_bracket == '(' and input[i] == ')'):
# 括号类型匹配成功
inner_string = input[start_index:i]
debug_debug(f"inner_string:{inner_string}")
inner_result=parse_input(inner_string)
debug_debug(f"inner_result:{tuple(inner_result)}")
result.append(tuple(inner_result))
# 递归解析完毕
else:
raise ValueError(f"Unmatched brackets in the input string,i={i},top_bracket={top_bracket},input[i]={input[i]}")
break
i += 1
if stack:
raise ValueError("Unmatched brackets in the input string")
else:
# 非 'a' 字符直接生成input
result.append(generate_input(input[i]))
i += 1
return result
def parse_arg_list(method_node):
arg_string=""
arg_list=[]
for node in method_node.findall(".//arg"):
if node.attrib['direction'] == 'in':
arg_string+=node.attrib['type']+','
debug_debug(f'testarg: {arg_string}')
arg_list.extend(parse_input(node.attrib['type']))
return tuple(arg_list) ,arg_string
def parse_arg_list1(method_node):
arg_string=""
arg_list=[]
for node in method_node.findall(".//arg"):
if node.attrib['direction'] == 'in':
arg_string+=node.attrib['type']
if node.attrib['type'] == 's':
arg_list.append("")
elif node.attrib['type'] == 'g':
arg_list.append('a')
elif node.attrib['type'] == 'i' or node.attrib['type'] == 'u' or node.attrib['type'] == 'x' or node.attrib['type'] == 't' or node.attrib['type'] == 'y':
arg_list.append(0)
elif node.attrib['type'] == 'd' or node.attrib['type'] == 'f':
arg_list.append(0.0)
elif node.attrib['type'] == 'b':
arg_list.append(False)
elif node.attrib['type'] == 'as':
arg_list.append(dbus.Array(['genmai_dbus_test',],signature='s'))
elif node.attrib['type'] == 'ass' or node.attrib['type'] == 'a(ss)':
#arg_list.append(dbus.Array(['genmai_dbus_test','genmai_dbus_test'],signature='a(ss)'))
arg_list=[('genmai_dbus_test','genmai_dbus_test')]
elif node.attrib['type'] == 'ag':
arg_list.append(dbus.Array(['a',],signature='g'))
elif node.attrib['type'] == 'ai':
arg_list.append(dbus.Array([0,],signature='i'))
elif node.attrib['type'] == 'au':
arg_list.append(dbus.Array([0,],signature='u'))
elif node.attrib['type'] == 'ax':
arg_list.append(dbus.Array([0,],signature='x'))
elif node.attrib['type'] == 'at':
arg_list.append(dbus.Array([0,],signature='t'))
elif node.attrib['type'] == 'ay':
arg_list.append(dbus.Array([0,],signature='y'))
elif node.attrib['type'] == 'ad':
arg_list.append(dbus.Array([0.0,],signature='d'))
elif node.attrib['type'] == 'af':
arg_list.append(dbus.Array([0.0,],signature='f'))
elif node.attrib['type'] == 'av':
param_av=[dbus.Dictionary({'genmai_dbus_test':'genmai_dbus_test'},signature='ss')]
arg_list.append(dbus.Array(param_av,signature='a(ss)'))
elif node.attrib['type'] == 'v':
arg_list.append(dbus.Variant('genmai_dbus_test',signature='s'))
elif node.attrib['type'] == 'a(i)':
param_a_i = [(0,0)]
arg_list.append(dbus.Array(param_a_i,signature='(ii)'))
#debug_print(arg_list)
return tuple(arg_list),arg_string
def is_all_tuple(array):
for item in array:
if not isinstance(item, tuple):
return False
if len(array) == 1:
return False
return True
def method_exec(bus_name,object_path,interface_name,method_name,args):
bus = dbus.SystemBus()
service_object = bus.get_object(bus_name,object_path)
interface = dbus.Interface(service_object,interface_name)
method = getattr(interface,method_name)
if len(args) == 0 :
result=method()
elif is_all_tuple(args):
debug_debug(f"is_all_tuple:{args}")
result=method(args)
else:
debug_debug("not is_all_tuple:",*args)
result=method(*args)
return 'failed'
def special_method_exec(bus_name,object_path,interface_name,method_name,args):
bus = dbus.SystemBus()
service_object = bus.get_object(bus_name,object_path)
interface = dbus.Interface(service_object,interface_name)
method = getattr(interface,method_name)
result=method(args)
return 'failed'
def method_exec_timeout(method_exec,bus_name,object_path,interface_name,method_name,args):
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(method_exec,bus_name,object_path,interface_name,method_name,args)
try:
# 等待future完成最多等待timeout秒
timeout=10
result = future.result(timeout=timeout)
return result
except concurrent.futures.TimeoutError:
# 如果超时则打印错误并返回None或抛出异常
debug_info(f"DBus method {method_name} timed out after {timeout} seconds.")
raise Exception("timeout")
def special_args(method_name):
if method_name == "redirection_logFileProcess" or method_name == "unlink_logFileProcess":
signature = dbus.Signature('sissis')
stLogParm = dbus.Struct(dbus.String("test"),dbus.String("test"),dbus.Int32(1),dbus.String("test"),dbus.String("test"),dbus.Int32(1),signature)
return stLogParm,"stLogParm"
return "invalid method name"
def parse_method_and_exec(bus,bus_name,object_path,service_object,interface_name,interface_node):
method_check_list={}
try:
interface = dbus.Interface(service_object,interface_name)
for node in interface_node.findall(".//method"):
try:
start_time = time.time()
method_check={}
method_name=node.attrib["name"]
if method_name in method_skip_list:
continue
debug_debug(bus_name,ET.tostring(node,encoding='unicode'))
debug_info(f'bus_name: {bus_name} , method_name: {method_name}')
if method_name in method_special_list:
args,args_string=special_args(method_name)
debug_debug(f'args: {args} , args_string: {args_string}')
method_check['input']=args_string
method_check['result']=method_exec_timeout(special_method_exec,bus_name,object_path,interface_name,method_name,args)
else:
args,args_string=parse_arg_list(node)
debug_debug(f'args: {args} , args_string: {args_string}')
method_check['input']=args_string
method_check['result']=method_exec_timeout(method_exec,bus_name,object_path,interface_name,method_name,args)
method_check['result']='failed'
method_check['errorinfo']=''
end_time = time.time()
execution_time = end_time - start_time
if execution_time > 1:
method_check['exec_time']=f'{execution_time:.4f}'
method_check_list[method_name]=method_check
except Exception as e:
debug_error(f"[EXCEP][{method_name}] parse_method_and_exec 功能异常:{e}")
errorStr = str(e).replace(" ", "")
if ("dbusmethodcontrol,operationnotpermitted" in errorStr) or ("dbusmethodcontrol,envforbidden" in errorStr):
method_check['result']='passed'
method_check['errorinfo']=''
elif any (substr in str(e) for substr in method_error_list):
method_check['result']='error'
method_check['errorinfo']=str(e)
else:
method_check['result']='failed'
method_check['errorinfo']=str(e)
end_time = time.time()
execution_time = end_time - start_time
if execution_time > 1:
method_check['exec_time']=f'{execution_time:.4f}'
method_check_list[method_name]=method_check
yield method_check_list
except Exception as e:
debug_error(f"[EXCEP][{method_name}] parse_method_and_exec dbus.Interface 功能异常:{e}")
yield method_check_list
def check_dbus_method_recursive(bus,bus_name,object_path):
try:
service_object = bus.get_object(bus_name,object_path)
introspection_data = dbus.Interface(service_object,"org.freedesktop.DBus.Introspectable")
xml_data = introspection_data.Introspect()
#debug_print(xml_data)
root = ET.fromstring(xml_data)
for node in root.findall(".//interface"):
if node.attrib["name"] not in except_interface:
interface_name=node.attrib["name"]
#debug_print(ET.tostring(node,encoding='unicode'))
yield from parse_method_and_exec(bus,bus_name,object_path,service_object,interface_name,node)
sub_paths=[node.attrib["name"] for node in root.findall(".//node")]
for path in sub_paths:
if object_path.endswith("/"):
new_path = object_path+path
else:
new_path = object_path+'/'+path
yield from check_dbus_method_recursive(bus,bus_name,new_path)
except Exception as e:
debug_error(f"[EXCEP][{bus_name}] check_dbus_method_recursive 功能异常:{e}")
def check_method_control(bus,bus_name):
#debug_print(bus_name)
result=list(check_dbus_method_recursive(bus,bus_name,"/"))
return result
def launch_dbus_service(bus,bus_name):
try:
service_object = bus.get_object(bus_name,"/")
introspection_data = dbus.Interface(service_object,"org.freedesktop.DBus.Introspectable")
except Exception as e:
debug_error(f"[EXCEP][{bus_name}] launch_dbus_service 功能异常:{e}")
def is_systembus(bus,bus_name):
launch_dbus_service(bus,bus_name)
try:
service = bus.get_object('org.freedesktop.DBus','/org/freedesktop/DBus')
uid = service.GetConnectionUnixUser(bus_name)
#debug_print(bus_name,uid)
if uid == 0:
return True
else:
return False
except Exception as e:
debug_error(f"[EXCEP][{bus_name}] is_systembus 功能异常:{e}")
#debug_print(bus_name,e)
if 'Could not get UID of name' in str(e):
return False
else:
return True
def get_command_result(command):
try:
state = subprocess.run(command, shell=True, capture_output=True, text=True)
suc_result = state.stdout.strip()
err_result = state.stderr.strip()
result = suc_result + err_result
except Exception as e:
result = 'None'
return result
def run_with_root(passwd, command):
try:
run_command = f'echo "{passwd}" | su -c "{command}"'
debug_debug(f"run_command: {run_command}")
result = get_command_result(run_command)
except Exception as e:
result = "Error!"
finally:
return result
def get_cpu_architecture():
machine = platform.machine()
if machine == 'x86_64' or machine == 'AMD64' :
return 'amd64'
elif machine == 'aarch64':
return 'arm64'
elif machine == 'mips64':
return 'mips64le'
elif machine == 'riscv64':
return 'riscv64'
else:
return 'unknown-arch'
def bakup_limit(passwd,dbusname):
try:
command=f'../data/SandboxViPyEnv/{get_cpu_architecture()}/myenv/bin/python3 ../data/DBus/SystemBus/dbus_limit.py bakup {dbusname} {get_cpu_architecture()}'
run_with_root(passwd,command)
except Exception as e:
debug_error(f"[EXCEP] bakup_limit 功能异常:{e}")
def restore_limit(passwd,dbusname):
try:
command=f'../data/SandboxViPyEnv/{get_cpu_architecture()}/myenv/bin/python3 ../data/DBus/SystemBus/dbus_limit.py restore {dbusname} {get_cpu_architecture()}'
run_with_root(passwd,command)
except Exception as e:
debug_error(f"[EXCEP] restore_limit 功能异常:{e}")
def chmod_calc_verify_tool():
get_command_result(f"chmod 755 {os.getcwd()}/../data/DBus/SystemBus/calc_verify_amd64")
get_command_result(f"chmod 755 {os.getcwd()}/../data/DBus/SystemBus/calc_verify_arm64")
def write_to_csv(data):
with open('../data/Report/output.csv', 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['dbus_name', 'service_control','method_name', 'method_control', 'errorinfo']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for dbus_name, info in data.items():
print(dbus_name)
print(info)
for method_check in info['MethodControlCheck']:
for method, details in method_check.items():
writer.writerow({
'dbus_name': info['DBusName'],
'service_control': info['ServiceControlCheck'],
'method_name': method,
'method_control': details['result'],
'errorinfo': details['errorinfo']
})
def check_system_bus_serial(passwd):
try:
start_time = time.time()
report={}
chmod_calc_verify_tool()
with open('../data/DBus/SystemBus/kylin_system_bus_list.yaml') as file:
data = yaml.safe_load(file)
kylinlist=data['ListNames']
bus = dbus.SystemBus()
bus_obj = bus.get_object('org.freedesktop.DBus','/org/freedesktop/DBus')
dbus_if=dbus.Interface(bus_obj,'org.freedesktop.DBus')
activated_list = dbus_if.ListNames()
activatable_list = dbus_if.ListActivatableNames()
bus_list=set(activated_list+activatable_list)
for bus_name in bus_list:
#if ((bus_name in kylinlist) or (any (substr in bus_name for substr in kylin_strings))) and is_systembus(bus,bus_name):
if (bus_name in kylinlist) and is_systembus(bus,bus_name):
if bus_name in service_skip_list:
continue
bus_result={}
report[str(bus_name)]=bus_result
debug_info(f"{bus_name} check start")
bus_result['DBusName'],bus_result['DBusNameCheck']=check_name(str(bus_name))
bus_result['DBusConfCheck'],bus_result['DBusConfErrInfo']=check_conf(str(bus_name))
bus_result['ServiceControlCheck'],bus_result['ServiceControlErrInfo']=check_service_control(bus,str(bus_name))
if bus_result['ServiceControlCheck'] == 'passed':
bakup_limit(passwd,bus_name)
bus_result['Path'],bus_result['PathCheck'],bus_result['PathErrInfo']=check_path(bus,str(bus_name))
bus_result['Interface'],bus_result['InterfaceCheck'],bus_result['InterfaceErrInfo']=check_interface(bus,str(bus_name))
bus_result['MethodControlCheck']=check_method_control(bus,str(bus_name))
if bus_result['ServiceControlCheck'] == 'passed':
restore_limit(passwd,bus_name)
debug_info(f"{bus_name} check finish")
#write_to_csv(report)
print("[report_start]")
json_data = json.dumps(report,indent=4)
#json_data = json.dumps(report)
print(json_data)
print("[report_end]")
end_time = time.time()
execution_time = end_time - start_time
print(f"总执行时间: {execution_time:.6f}")
except Exception as e:
debug_error(f"[EXCEP] check_system_bus 功能异常:{e}")
def check_single_dbus_service(passwd,bus_name):
try:
start_time = time.time()
report={}
chmod_calc_verify_tool()
bus = dbus.SystemBus()
bus_result={}
report[str(bus_name)]=bus_result
bus_result['DBusName'],bus_result['DBusNameCheck']=check_name(str(bus_name))
bus_result['DBusConfCheck'],bus_result['DBusConfErrInfo']=check_conf(str(bus_name))
bus_result['ServiceControlCheck'],bus_result['ServiceControlErrInfo']=check_service_control(bus,str(bus_name))
if bus_result['ServiceControlCheck'] == 'passed':
bakup_limit(passwd,bus_name)
bus_result['Path'],bus_result['PathCheck'],bus_result['PathErrInfo']=check_path(bus,str(bus_name))
bus_result['Interface'],bus_result['InterfaceCheck'],bus_result['InterfaceErrInfo']=check_interface(bus,str(bus_name))
bus_result['MethodControlCheck']=check_method_control(bus,str(bus_name))
if bus_result['ServiceControlCheck'] == 'passed':
restore_limit(passwd,bus_name)
print("[report_start]")
json_data = json.dumps(report,indent=4)
#json_data = json.dumps(report)
print(json_data)
print("[report_end]")
end_time = time.time()
execution_time = end_time - start_time
print(f"总执行时间: {execution_time:.6f}")
except Exception as e:
debug_error(f"[EXCEP] check_system_bus 功能异常:{e}")
report={}
lock = threading.Lock()
def check_thread(bus_name):
debug_debug(f"check_thread:{bus_name}")
start_time = time.time()
bus_result={}
with lock:
report[str(bus_name)]=bus_result
try:
bus = dbus.SystemBus()
bus_result['DBusName'],bus_result['DBusNameCheck']=check_name(str(bus_name))
bus_result['ServiceControlCheck'],bus_result['ServiceControlErrInfo']=check_service_control(bus,str(bus_name))
if bus_result['ServiceControlCheck'] != 'passed':
#debug_print("ServiceControlCheck:",bus_name,bus_result['ServiceControlCheck'])
bus_result['Path'],bus_result['PathCheck'],bus_result['PathErrInfo']=check_path(bus,str(bus_name))
bus_result['Interface'],bus_result['InterfaceCheck'],bus_result['InterfaceErrInfo']=check_interface(bus,str(bus_name))
bus_result['MethodControlCheck']=check_method_control(bus,str(bus_name))
#debug_print("MethodControlCheck finish:",bus_name)
end_time = time.time()
execution_time = end_time - start_time
debug_debug(f"{bus_name} 执行时间: {execution_time:.6f}")
except Exception as e:
debug_error(f"[EXCEP] check_thread 功能异常:{e}")
end_time = time.time()
execution_time = end_time - start_time
debug_debug(f"{bus_name} 执行时间: {execution_time:.6f}")
def check_system_bus_parallel():
try:
start_time = time.time()
with open('../data/DBus/SystemBus/kylin_system_bus_list.yaml') as file:
data = yaml.safe_load(file)
kylinlist=data['ListNames']
bus = dbus.SystemBus()
bus_obj = bus.get_object('org.freedesktop.DBus','/org/freedesktop/DBus')
dbus_if=dbus.Interface(bus_obj,'org.freedesktop.DBus')
activated_list = dbus_if.ListNames()
activatable_list = dbus_if.ListActivatableNames()
bus_list=set(activated_list+activatable_list)
check_list=[]
for bus_name in bus_list:
if ((bus_name in kylinlist) or (any (substr in bus_name for substr in kylin_strings))) and is_systembus(bus,bus_name):
#if bus_name == 'com.kysec.auth':
check_list.append(bus_name)
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
# 使用线程池执行dbus_operation函数
futures = [executor.submit(check_thread, name) for name in check_list]
# 等待所有线程完成
concurrent.futures.wait(futures)
print("[report_start]")
json_data = json.dumps(report,indent=4)
#json_data = json.dumps(report)
print(json_data)
print("[report_end]")
end_time = time.time()
execution_time = end_time - start_time
print(f"总执行时间: {execution_time:.6f}")
except Exception as e:
debug_error(f"[EXCEP] check_system_bus 功能异常:{e}")
def method_test():
try:
arg_list=[]
bus = dbus.SystemBus()
bus_name = "com.kylin.kysdk.ListControl"
root_path = "/com/kylin/kysdk/ListControl"
service_object = bus.get_object(bus_name,root_path)
interface = dbus.Interface(service_object,"com.kylin.kysdk.ListControl")
method = getattr(interface,"ClearBlackList")
arg=tuple(arg_list)
#debug_print(method_name,args)
result=method(*arg)
debug_info(f"[success]:{result}")
except Exception as e:
debug_error(f"[abnormal]:{e}")
while True:
time.sleep(1)
if __name__ == '__main__':
if len(sys.argv) <= 2:
print('need rootpasswd and input' )
exit(0)
now = datetime.datetime.now()
timestamp = now.strftime('%Y-%m-%d_%H-%M-%S')
setup_logger(f'../data/log/genmai_dbus_{timestamp}.log')
if sys.argv[2] != "null":
check_single_dbus_service(sys.argv[1],sys.argv[2])
else:
check_system_bus_serial(sys.argv[1])
#method_test()

View File

@ -0,0 +1,20 @@
FormatVer: 20240308
Id: system_bus
Belong: dbus
Power :
SiteInfo:
Severity: high
Name: dbus系统总线检测 -- 系统总线服务安全检查
SiteRequests:
Implement:
ImArray:
- Inter : python3
InterArgs :
Exec : system_bus_check.py
Args :
- ${ROOTPASSWORD}
- ${INPUT}
ExpireTime: 600 #second
Inter:
- ">#:[report_start]%s[report_end]"
Condition: None