!272 增加ssh登录失败检测项

Merge pull request !272 from a-alpha/alpha-dev
This commit is contained in:
a-alpha 2023-07-20 03:25:15 +00:00 committed by Gitee
commit 7285cbfbba
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
5 changed files with 403 additions and 0 deletions

View File

@ -0,0 +1,106 @@
# [OK] 为验证通过
# [WARNING] 为风险提示
import redis
import os
import sys
################################
# 常量
# for get_env_lang()
STR_GET_ENV_LANG_ZH = "语言环境为中文"
STR_GET_ENV_LANG_EN = "语言环境为英文"
STR_GET_ENV_LANG_UNKNOW = "语言环境未知"
# for is_root()
STR_IS_ROOT_TRUE = "当前用户为root权限"
STR_IS_ROOT_FALSE = "当前用户没有root权限"
################################
# 环境检查函数
def get_env_lang():
# lang = os.getenv("LANG")
# if lang.startswith("zh"):
# return STR_GET_ENV_LANG_ZH
# elif lang.startswith("en"):
# return STR_GET_ENV_LANG_EN
# else:
# return STR_GET_ENV_LANG_UNKNOW
#
if arg_lang == "zh":
return STR_GET_ENV_LANG_ZH
elif arg_lang == "en":
return STR_GET_ENV_LANG_EN
else:
return STR_GET_ENV_LANG_UNKNOW
def is_root():
if os.geteuid() == 0:
print(STR_IS_ROOT_TRUE)
return True
else:
print(STR_IS_ROOT_FALSE)
return False
def is_root():
if os.geteuid() == 0:
print(STR_IS_ROOT_TRUE)
return True
else:
print(STR_IS_ROOT_FALSE)
return False
################################
# 辅助函数
def l_print(zh_str, en_str) :
if STR_GET_ENV_LANG_ZH == get_env_lang() :
print(zh_str);
else :
print(en_str);
################################
# 功能函数
# Redis弱密码检查
def check_redis_weak_password(ip, port, password):
try:
r = redis.StrictRedis(host=ip, port=int(port), db=0, password=password)
r.ping()
return False
except:
# if str(e) == "NOAUTH Authentication required.":
return True
# else:
# return 2
# l_print(f"[OK] Redis 错误: {ip}:{port}",
# f"[OK] Redis error for {ip}:{port}")
# except redis.exceptions.ConnectionError:
# return 3
# l_print(f"[OK] Redis 链接错误 {ip}.{port}",
# f"[OK] Redis connection error for {ip}:{port}")
if __name__ == '__main__':
arg_lang = sys.argv[1]
ip="localhost"
port=6379
const=0
# 读取字典中的弱口令
# 路径为口令字典路径按实际需求修改
with open('../../../../../data/dic/weakPassword', 'r',encoding="utf-8-sig") as f :
for line in f:
password = line.strip()
password = password.encode('latin1','ignore')
re=check_redis_weak_password(ip, port, password.decode('latin1'))
if re == False:
const=1
l_print(f"[WARNING] 检测到Redis弱密码: {ip}:{port}",
f"[WARNING] Redis weak password detected for {ip}:{port}")
break
if const==0:
l_print(f"[OK] Redis弱密码检测通过: {ip}:{port}",
f"[OK] Redis weak password check passed for {ip}:{port}")

View File

@ -0,0 +1,22 @@
FormatVer: 20230518
Id: check_redis_weak_password
Belong: baseline
SiteInfo:
Name: 本地服务检测 -- 检测redis弱口令
Power :
SiteRequests:
Implement:
ImArray:
- Inter : python3
InterArgs :
Exec : check_redis_weak_password.py
Args :
Inter:
- "[WARNING]"
Condition: None
RepairArgs:
- Inter :
InterArgs :
Exec : ''
Args :
RepairPower: # root权限或者普通用户权限

View File

@ -0,0 +1,143 @@
#!/usr/bin/env python3
# 导入所需库
import os
import sys
import crypt
from pathlib import Path
# 定义密码字典文件路径(请根据实际情况修改)
password_dict_path = '../../../../../data/dic/weakPassword'
# 定义shadow文件路径
shadow_file_path = "/etc/shadow"
arg_lang = ""
################################
# 常量
# for get_env_lang()
STR_GET_ENV_LANG_ZH = "语言环境为中文"
STR_GET_ENV_LANG_EN = "语言环境为英文"
STR_GET_ENV_LANG_UNKNOW = "语言环境未知"
# for is_root()
STR_IS_ROOT_TRUE = "当前用户为root权限"
STR_IS_ROOT_FALSE = "当前用户没有root权限"
# 最大密码过期天数
MAX_EXPIRE_DAYS = 90
################################
# 环境检查函数
def get_env_lang():
# lang = os.getenv("LANG")
# if lang.startswith("zh"):
# return STR_GET_ENV_LANG_ZH
# elif lang.startswith("en"):
# return STR_GET_ENV_LANG_EN
# else:
# return STR_GET_ENV_LANG_UNKNOW
#
if arg_lang == "zh":
return STR_GET_ENV_LANG_ZH
elif arg_lang == "en":
return STR_GET_ENV_LANG_EN
else:
return STR_GET_ENV_LANG_UNKNOW
def is_root():
if os.geteuid() == 0:
print(STR_IS_ROOT_TRUE)
return True
else:
print(STR_IS_ROOT_FALSE)
return False
################################
# 辅助函数
def l_print(zh_str, en_str) :
if STR_GET_ENV_LANG_ZH == get_env_lang() :
print(zh_str);
else :
print(en_str);
################################
# 功能函数
# 从密码字典文件中读取所有密码
def read_passwords_from_dict(file_path):
with open(file_path, 'r') as password_dict:
passwords = [line.strip() for line in password_dict]
return passwords
# 解析shadow文件提取用户名、加密盐值和密码哈希
def parse_shadow_file(file_path):
shadow_entries = []
with open(file_path, 'r') as shadow_file:
for line in shadow_file:
if not line.startswith("#"):
fields = line.strip().split(":")
username = fields[0]
encrypted_password = fields[1]
if encrypted_password != "*" and \
encrypted_password != "!" and \
encrypted_password != "!!" and \
encrypted_password != "!*" and \
encrypted_password != "*!" and \
encrypted_password != "" :
method, salt, hashed_pass = encrypted_password.split("$")[1:]
shadow_entries.append((username, method, salt, hashed_pass))
return shadow_entries
# 使用crypt库进行密码哈希比较与shadow文件中的哈希
def check_password(username, method, salt, hashed_pass, password):
salt_string = f"${method}${salt}$"
encrypted_password = crypt.crypt(password, salt_string)
if encrypted_password == f"${method}${salt}${hashed_pass}":
l_print(f"[WARNING] 匹配出用户名存在弱密码: {username} ",
f"[WARNING] Match username using week password: {username}")
return True
return False
if __name__ == "__main__":
arg_lang = sys.argv[1]
if False == is_root() :
l_print("[WARNING] 非root权限此程序需要root权限",
"[WARNING] Non root permission, this program requires root permission")
exit(1)
# 从密码字典文件中读取密码
passwords = read_passwords_from_dict(password_dict_path)
# 解析shadow文件获取用户名、加密盐值和密码哈希
shadow_entries = parse_shadow_file(shadow_file_path)
# 遍历shadow文件中的所有用户
is_got_match_pw = False
for username, method, salt, hashed_pass in shadow_entries:
for password in passwords:
# 检查密码是否匹配
if True == check_password(username, method, salt, hashed_pass, password) :
is_got_match_pw = True
#
break
if False == is_got_match_pw :
l_print(f"[OK] 没有发现弱密码",
f"[OK] No weak password found")
exit(0)
exit(1)

View File

@ -0,0 +1,22 @@
FormatVer: 20230518
Id: check_weak_password
Belong: baseline
SiteInfo:
Name: 账户风险 -- 检查是否存在弱密码
Power : "root"
SiteRequests:
Implement:
ImArray:
- Inter : python3
InterArgs :
Exec : check_weak_password.py
Args :
Inter:
- "[WARNING]"
Condition: None
RepairArgs:
- Inter :
InterArgs :
Exec : ''
Args :
RepairPower: # root # root权限或者普通用户权限

View File

@ -0,0 +1,110 @@
import re
import os
import sys
arg_lang = ""
################################
# 常量
# for get_env_lang()
STR_GET_ENV_LANG_ZH = "语言环境为中文"
STR_GET_ENV_LANG_EN = "语言环境为英文"
STR_GET_ENV_LANG_UNKNOW = "语言环境未知"
# for is_root()
STR_IS_ROOT_TRUE = "当前用户为root权限"
STR_IS_ROOT_FALSE = "当前用户没有root权限"
################################
# 环境检查函数
def get_env_lang():
# lang = os.getenv("LANG")
# if lang.startswith("zh"):
# return STR_GET_ENV_LANG_ZH
# elif lang.startswith("en"):
# return STR_GET_ENV_LANG_EN
# else:
# return STR_GET_ENV_LANG_UNKNOW
#
if arg_lang == "zh":
return STR_GET_ENV_LANG_ZH
elif arg_lang == "en":
return STR_GET_ENV_LANG_EN
else:
return STR_GET_ENV_LANG_UNKNOW
def is_root():
if os.geteuid() == 0:
print(STR_IS_ROOT_TRUE)
return True
else:
print(STR_IS_ROOT_FALSE)
return False
################################
# 辅助函数
def l_print(zh_str, en_str) :
if STR_GET_ENV_LANG_ZH == get_env_lang() :
print(zh_str);
else :
print(en_str);
################################
# 功能函数
def check_ssh_failed():
# 定义正则表达式用于匹配SSH登录失败日志行
regex = r'^(\S+).*authentication failure.*rhost=(\S+).*user=(\S+).*'
# 定义常量,指定最大失败次数
MAX_FAILD_NUM = 5
# 初始化计数器
counter = {}
# 打开SSH日志文件并逐行读取
with open('/var/log/auth.log', 'r') as f:
for line in f:
# 使用正则表达式匹配登录失败日志行
match = re.match(regex, line)
if match:
# 如果找到匹配项提取IP地址和用户名
ip_address = match.group(2)
username = match.group(3)
# 检查是否需要记录该IP地址和用户
if ip_address+":"+username in counter:
# 如果IP地址已存在于计数器中则增加计数器
counter[ip_address+":"+username] += 1
else:
# 如果IP地址不存在于计数器中则添加新的计数器
counter[ip_address+":"+username] = 1
has_faild = False
for key, cnt in counter.items():
if cnt >= MAX_FAILD_NUM:
has_faild = True
# 如果失败次数超过最大限制则记录IP地址和用户
l_print(f'[WARNING] IP地址{key.split(":")[0]},用户:{key.split(":")[1]}, 失败次数:{cnt}',
f'[WARNING] IP: {key.split(":")[0]}USER: {key.split(":")[1]}, Count of Failures{cnt}')
if False == has_faild :
l_print(f'[OK] 没有发现大量登录失败',
f'[OK] No significant login failures found')
################################
# main
if __name__ == "__main__":
arg_lang = sys.argv[1]
if False == is_root() :
l_print("[WARNING] 非root权限此程序需要root权限",
"[WARNING] Non root permission, this program requires root permission")
exit(1)
check_ssh_failed()
exit(0)