目录

前言

架构设计

Agent Runtime 分层

模块结构

实现思路

代码实现

部分代码

agent_loop.py(主流程agent)

agent_tools.py(工具注册)

browser.py(数据采集工具实现)

主流程cli启动代码

有关项目的补充

模型层替换

工具层替换

上下文压缩(tools/compress.py)

可调参数

压缩范围

可调策略

结束语


前言

先在这里讨论讨论,什么叫做agent,一个闭环的ai调度?我认为不是,我认为的是ai为调度大脑,但是核心中心聚焦点是工具(tools),而不是ai。那么核心是什么?就是工具为主,集中统筹使用的ai大脑,一个framwork的调度为agent,例如codex或者说类似的crow工具。

本章不会讨论如何使用mcp第三方插件来实现,纯采用原生的python实现。这里面主要介绍和过渡如何设计一个底层内容来认识一个agent 以及思考什么样才能作为一个agent 而不是只是掉了个ai的api的生成工具。

架构设计

Agent Runtime 分层

+------------------+     +------------------+
|    模型层 (Model)  |     |  策略层 (Strategy) |
|------------------|     |------------------|
| ai_config.py     |     | ai_prompt.py     |
| 可换 OpenAI/Claude |     | 系统行为定义      |
| /Groq/通义千问...  |     | 不绑定代码        |
+--------+---------+     +--------+---------+
         |                       |
         v                       v
+------------------+     +------------------+
|   运行时 (Runtime) | <-- |   记忆 (Memory)   |
|------------------|     |------------------|
| agent_loop.py    |     | compress.py      |
| 主循环 + 错误恢复:  |     | 上下文压缩        |
| 输入→模型→工具→输出 |     | 策略可替换        |
| 异常 catch→喂回模型 |     |                  |
+--------+---------+     +------------------+
         |
         v
+------------------+
|   工具层 (Tool)    |
|------------------|
| ai_tools.py      | @register_tool 注册表
| tools/            |
|   websearch.py   | Bing 搜索
|   playwright/     | 浏览器自动化
|   resume.py      | 简历解析
+------------------+
职责 可替换性
Runtime 循环调度:收输入 -> 调模型 -> 执行工具 -> 回传结果 与模型和工具完全解耦
Model LLM API 客户端,当前 DeepSeek 任意 OpenAI 兼容 API,改 base_url + api_key 即可
Tool 装饰器注册(@register_tool),按名调度,工具各自独立 新增工具加文件 + 一行装饰器,零侵入 Runtime
Strategy System prompt 定义 Agent 行为边界 纯文本替换,不涉及代码改动
Memory 上下文压缩,超阈值自动生成摘要续传 压缩算法/阈值/保留轮数均可调

这里主要采用单循环的机制,不采取任何的中间态,例如sqlite,redis这些,没必要,轻量级的内容就是要建立在即用即弃的策略上。为什么不采用langchain,等一些框架来使用。我不否认这些框架的实用性,而是作为不学习这些框架,底层逻辑我们该如何操作,本质上框架也是别人封装好的内容,但是大部分工具也需要自己去手动写和注册使用,那么抛弃掉这些框架,如何轻量级创建一个个人的agent才是最重要的。

模块结构

main.py                     # CLI 统一入口
resume.py                   # 文档解析(.pdf / .txt / .md),独立工具

agent/                      # Agent Runtime 层
  agent_loop.py             # Runtime 主循环:输入 -> 模型 -> 工具 -> 输出 + 错误恢复
  ai_config.py              # 模型客户端初始化(DeepSeek 默认,可换)
  ai_prompt.py              # System prompt(业务策略,纯文本替换)
  ai_tools.py               # 工具注册表(@register_tool 装饰器 + get_tools + run_tool)

tools/                      # 工具层(各自独立,可拔插)
  websearch.py              # 浏览器搜索引擎(当前 Bing)
  compress.py               # 上下文压缩引擎

  playwright/               # Playwright 自动化工具
    browser.py              # 网页采集通用逻辑:打开 -> 操作 -> 抓取
    mouse.py                # 鼠标/键盘模拟(贝塞尔曲线)

当前工具集以职位采集为例演示完整链路,任意工具均可替换为其他业务场景的实现。

目前主要是以一个职位为主,后续可将内容封装,修改工具成为自己的特定业务的agent。而不是只是修改个提示词套壳注入就是一个生成式agent了。可以从模块机构上看到,这是一个极其简单的可复用升级的agent内容。

实现思路

我认为的agent 并不是可复用套壳使用的内容,市面上的绝大多数都是一些虚假的泡沫内容,大部分都是教你框架,然后套壳一个api就解决了一个agent实际的思路?有没有可能,agent是要以工具为中心,特定业务去做的一个流程工具,即减少重复,能自我验证,可拔插随时淘换升级的工具,优化调整力度的,任何的agent的设计要有冗余可升级和随时替换的地步,少任何一个模块且能保证工具流畅运行,也就是每个能独立调试运行测试,而不是动一发牵全身的。

代码实现

这里主要是以该项目的内容为主,以一个采集分析助手为主,这里只放一些关键代码。

源码可私信博主或参考github链接:https://github.com/haimikao0721-a11y/51JobRader

部分代码

agent_loop.py(主流程agent)

import json
import argparse
import sys
import os

# 确保 agent/ 目录下的模块能正常导入
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from ai_config import client
from ai_prompt import SYSTEM_PROMPT
from ai_tools import get_tools, run_tool
from tools.compress import check_and_compress


def _inject_resume(path: str) -> str:
    """读取简历文件,返回注入文本"""
    sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), ".."))
    from resume import load_resume

    return load_resume(path)


# ── 主循环 ────────────────────────────────────────────────
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--resume", help="简历文件路径(.pdf / .txt / .md)")
    parser.add_argument(
        "--max-context", type=int, default=256000,
        help="最大上下文 token 数(默认 256000),压缩阈值自动设为 70%%",
    )
    args, _ = parser.parse_known_args()

    system_content = SYSTEM_PROMPT
    if args.resume:
        resume_text = _inject_resume(args.resume)
        system_content += f"\n\n## 用户简历\n{resume_text}"

    messages = [{"role": "system", "content": system_content}]
    warn_threshold = int(args.max_context * 0.7)
    print(f"职位采集助手已启动(最大上下文 {args.max_context:,} tokens,压缩阈值 {warn_threshold:,} tokens)")
    if args.resume:
        print(f"已加载简历: {args.resume}")
    print("=" * 50)
    while True:
        user_input = input("\n你: ").strip()
        if user_input.lower() in ("exit", "quit"):
            break
        if not user_input:
            continue

        messages.append({"role": "user", "content": user_input})

        while True:
            response = client.chat.completions.create(
                model="deepseek-v4-pro",
                messages=messages,
                tools=get_tools(),
                tool_choice="auto",
            )
            msg = response.choices[0].message

            if not msg.tool_calls:
                print("\n🤖 ", end="", flush=True)
                stream = client.chat.completions.create(
                    model="deepseek-v4-pro",
                    messages=messages,
                    stream=True,
                )
                reply = ""
                for chunk in stream:
                    delta = chunk.choices[0].delta
                    if delta and delta.content:
                        print(delta.content, end="", flush=True)
                        reply += delta.content
                print()
                messages.append({"role": "assistant", "content": reply})

                # 回复后检查 token 用量
                messages, report = check_and_compress(messages, warn_threshold=warn_threshold)
                if report:
                    print(f"\n[压缩] {report}")
                break

            messages.append(
                {
                    "role": msg.role,
                    "content": msg.content,
                    "tool_calls": [
                        {
                            "id": tc.id,
                            "function": {
                                "name": tc.function.name,
                                "arguments": tc.function.arguments,
                            },
                            "type": "function",
                        }
                        for tc in msg.tool_calls
                    ],
                }
            )

            for tc in msg.tool_calls:
                name = tc.function.name
                args = json.loads(tc.function.arguments)
                print(f"\n调用 {name}({json.dumps(args, ensure_ascii=False)})...")

                try:
                    result = run_tool(name, args)
                except Exception as e:
                    error_info = {
                        "error": f"{type(e).__name__}: {str(e)[:300]}",
                        "hint": "请根据错误信息决定:换参数重试、换其他工具、或告知用户当前不可用",
                    }
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": json.dumps(error_info, ensure_ascii=False),
                    })
                    print(f"工具异常: {type(e).__name__}")
                    continue

                print(f"返回 {len(result) if isinstance(result, list) else 1} 条结果")

                messages.append(
                    {
                        "role": "tool",
                        "tool_call_id": tc.id,
                        "content": json.dumps(result, ensure_ascii=False),
                    }
                )

            # 工具返回后检查 token 用量,接近阈值则压缩
            messages, report = check_and_compress(messages, warn_threshold=warn_threshold)
            if report:
                print(f"\n[压缩] {report}")


if __name__ == "__main__":
    main()

agent_tools.py(工具注册)

"""工具注册表 — 第三方通过 @register_tool 装饰器注册,无需改调度逻辑"""
import asyncio

_registry: dict[str, dict] = {}


def register_tool(name: str, description: str, parameters: dict):
    """装饰器:将函数注册为 AI 可调用的工具。注册后自动出现在 get_tools() 列表中。

    用法:
        @register_tool("my_search", "搜索互联网", {
            "type": "object",
            "properties": {"q": {"type": "string"}},
            "required": ["q"],
        })
        def _handle(q: str):
            ...
    """
    def decorator(func):
        _registry[name] = {
            "schema": {
                "type": "function",
                "function": {
                    "name": name,
                    "description": description,
                    "parameters": parameters,
                },
            },
            "handler": func,
        }
        return func
    return decorator


def get_tools() -> list[dict]:
    """返回所有已注册工具的 OpenAI tool schema 列表"""
    return [v["schema"] for v in _registry.values()]


def run_tool(name: str, args: dict):
    """按名调度工具。找不到抛 ValueError,工具内部异常向上抛(由 Runtime 层处理)"""
    entry = _registry.get(name)
    if not entry:
        raise ValueError(f"未知工具: {name}")
    return entry["handler"](**args)


# ── 内置工具注册 ──────────────────────────────────
# 第三方新增工具:仿照下方格式,在任意模块加 @register_tool 并确保被导入即可


@register_tool(
    name="collect_jobs",
    description="在招聘网站搜索职位,返回 title / salary / company / location / tags / jd",
    parameters={
        "type": "object",
        "properties": {
            "keyword": {"type": "string", "description": "搜索关键词,如 python开发、产品经理"},
            "city": {"type": "string", "description": "城市名,多个用逗号分隔(可选),如 深圳、深圳,上海"},
        },
        "required": ["keyword"],
    },
)
def _tool_collect_jobs(keyword: str, city: str = ""):
    from tools.playwright.browser import collect_jobs

    cit_list = (
        [c.strip() for c in city.replace(",", ",").split(",") if c.strip()]
        if city else []
    )
    return asyncio.run(collect_jobs(keyword, cit_list))


@register_tool(
    name="search_bing",
    description="搜索公司/事物信息,返回 title / url / snippet / content(含页面正文)",
    parameters={
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "搜索关键词,如 米哈游 公司背景"},
        },
        "required": ["query"],
    },
)
def _tool_search_bing(query: str):
    from tools.websearch import search_bing

    return asyncio.run(search_bing(query))

browser.py(数据采集工具实现)

# 51job 职位采集工具
# 用法: collect_jobs("关键词") → list[dict]
import asyncio
import json
import os
import sys
import random

# 确保项目根目录在 path 中(兼容直接运行和作为模块导入)
_project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if _project_root not in sys.path:
    sys.path.insert(0, _project_root)

from playwright.async_api import async_playwright
from tools.playwright.mouse import Human

COOKIE_FILE = os.path.join(os.path.dirname(__file__), "cookies.json")


async def save_cookies(page):
    cookies = await page.context.cookies()
    with open(COOKIE_FILE, "w") as f:
        json.dump(cookies, f)
    print(f"cookie 已保存 ({len(cookies)} 条)")

async def load_cookies(page):
    if not os.path.exists(COOKIE_FILE):
        return False
    with open(COOKIE_FILE) as f:
        cookies = json.load(f)
    await page.context.add_cookies(cookies)
    print(f"已注入 cookie ({len(cookies)} 条)")
    return True

async def collect_jobs(keyword: str, cit_list=list|None) -> list[dict]:
    """
    搜索 51job 并采集职位完整信息
    参数:
        keyword:   搜索关键词,如 "python开发"
        cit_list:  城市名列表,如 ["深圳"]、["深圳","上海"](可选)
                   通过 UI 弹窗筛选,不传则不限制
    返回:
        list[dict]: 每个职位包含 title, salary, company, location, tags, jd
    """
    async with async_playwright() as p:
        browser = await p.chromium.launch(
            headless=False,
            args=['--disable-blink-features=AutomationControlled']
        )
        page = await browser.new_page()

        # 注入 cookie
        await load_cookies(page)

        # 去个人中心,检查登录状态
        await page.goto("https://we.51job.com/pc/my/myjob")
        await page.wait_for_timeout(3000)

        if "login" in page.url.lower() or "passport" in page.url.lower():
            print("请手动登录...")
            await page.wait_for_url("**/myjob**", timeout=0)
            await save_cookies(page)
        else:
            print("已登录")
            if not os.path.exists(COOKIE_FILE):
                await save_cookies(page)

        # 输入搜索关键词
        h = Human(page)
        await h.mouse_move_click(".searchInp input")
        await h.men_input(keyword)

        # 点击搜索 → 打开新标签页
        async with page.context.expect_page() as new_page_info:
            await h.mouse_move_click('[sensor_elementid="sensor_my_searchButton"]')
        search_page = await new_page_info.value
        await search_page.wait_for_load_state()
        print(f"搜索页 URL: {search_page.url}")

        # 等搜索结果渲染
        try:
            await search_page.wait_for_selector(".joblist-item-job", timeout=10000)
        except:
            await search_page.wait_for_timeout(5000)

        # 在页面筛选目标城市
        if cit_list:
            print(f"展开城市筛选: {cit_list}")
            await search_page.wait_for_timeout(2000)

            # 点"其他城市"打开城市选择弹窗
            await search_page.locator(".allcity").click()
            await search_page.wait_for_timeout(1500)

            # 等待城市选择弹窗出现(用弹窗内特有的元素判断)
            await search_page.wait_for_selector(".jbs_cascader_panel", timeout=5000)
            print("城市选择弹窗已打开")

            # 先清空所有已选中的城市标签
            clear_btns = search_page.locator(".selected_list_wrapper_tag .el-tag__close")
            clear_count = await clear_btns.count()
            if clear_count > 0:
                print(f"清空 {clear_count} 个已选城市...")
                for _ in range(clear_count):
                    await clear_btns.first.click()
                    await search_page.wait_for_timeout(300)
                print("已清空")

            # 在弹窗中选择目标城市(从 ABCD 字母分类列表中选)
            for city in cit_list:
                el = search_page.locator(f'.resumeDialog__right-city[title="{city}"]')
                if await el.count() > 0:
                    await el.click()
                    print(f"选择城市: {city}")
                else:
                    print(f"未找到城市: {city}")
                await search_page.wait_for_timeout(300)

            # 点"确定"(限定在"选择城市"弹窗内)
            dialog = search_page.get_by_role("dialog").filter(has_text="选择城市")
            await dialog.locator(".confirm_button").click()
            await search_page.wait_for_timeout(1000)
            print("城市筛选已确认")
            # 等搜索结果刷新
            await search_page.wait_for_timeout(2000)
            

        # 获取列表基本信息
        jobs_list = await search_page.evaluate("""() => {
            const items = document.querySelectorAll('.joblist-item-job');
            return Array.from(items).map(el => ({
                title: el.querySelector('.jname')?.innerText?.trim() || '',
                salary: el.querySelector('.sal')?.innerText?.trim() || '',
                company: el.querySelector('.cname')?.innerText?.trim() || '',
                location: el.querySelector('.location, .area, .city, .workplace')?.innerText?.trim() || '',
                tags: Array.from(el.querySelectorAll('.tag')).map(t => t.innerText.trim()),
            }));
        }""")
        print(f"列表共 {len(jobs_list)} 个职位")

        # 逐个进详情页拿 JD
        results = []
        for i, job in enumerate(jobs_list):
            print(f"  获取第 {i+1}/{len(jobs_list)}: {job['title']}")
            try:
                async with search_page.context.expect_page() as new_page_info:
                    await search_page.locator(".joblist-item-job").nth(i).locator(".jname").click()
                detail_page = await new_page_info.value

                # 等职位内容真正渲染出来
                try:
                    await detail_page.wait_for_selector(".jTitle, .job-detail", timeout=15000)
                except:
                    await detail_page.wait_for_timeout(5000)

                # 调试:看看页面结构
                structure = await detail_page.evaluate("""() =>
                    Array.from(document.querySelectorAll('[class*="detail"],[class*="content"],[class*="main"],[class*="primary"],[class*="job-detail"],[class*="desc"]'))
                        .slice(0,20).map(e => e.tagName+'.'+(e.className||'').split(' ').join('.'))
                """)
                if structure:
                    print(f"  页面容器: {structure}")

                jd_text = await detail_page.evaluate("""() => {
                    const sel = '.job-detail, .job-primary, .job-main, [class*="job-detail"], [class*="job-primary"], .j_jobmsg, .jobmsg, .job_content';
                    for (const s of sel.split(', ')) {
                        const el = document.querySelector(s);
                        if (el && el.innerText.trim().length > 50) return el.innerText.trim();
                    }
                    // 降级:body 去掉页脚导航
                    const body = document.body.cloneNode(true);
                    body.querySelectorAll('script, style, .header, .footer, .nav, [class*="footer"], [class*="copyright"], [class*="friend-link"]').forEach(el => el.remove());
                    return body.innerText.trim();
                }""")

                # 检查是否被反爬拦了
                if "请求时间" in jd_text and "TraceID" in jd_text:
                    print(f"触发反爬,等 5 秒重试...")
                    await asyncio.sleep(5)
                    try:
                        await detail_page.wait_for_selector(".jTitle, .job-detail", timeout=10000)
                    except:
                        pass
                    jd_text = await detail_page.evaluate("""() => {
                        const sel = '.job-detail, .job-primary, .job-main, [class*="job-detail"], [class*="job-primary"], .j_jobmsg, .jobmsg, .job_content';
                        for (const s of sel.split(', ')) {
                            const el = document.querySelector(s);
                            if (el && el.innerText.trim().length > 50) return el.innerText.trim();
                        }
                        const body = document.body.cloneNode(true);
                        body.querySelectorAll('script, style, .header, .footer, .nav, [class*="footer"], [class*="copyright"], [class*="friend-link"]').forEach(el => el.remove());
                        return body.innerText.trim();
                    }""")

                await detail_page.close()

                results.append({
                    "title": job["title"],
                    "salary": job["salary"],
                    "company": job["company"],
                    "location": job.get("location", ""),
                    "tags": job["tags"],
                    "jd": jd_text,
                })
            except Exception as e:
                print(f"失败: {e}")
            # 间隔一段时间,避免被限
            await asyncio.sleep(random.uniform(2, 4))

        await browser.close()
        return results


async def main():
    keyword = sys.argv[1] if len(sys.argv) > 1 else "python开发"
    city_arg = sys.argv[2] if len(sys.argv) > 2 else ""
    cit_list = city_arg.replace(",", ",").split(",") if city_arg else None
    results = await collect_jobs(keyword, cit_list)
    print("\n=== 采集完成 ===")
    print(json.dumps(results, ensure_ascii=False, indent=2))
    print(f"\n共 {len(results)} 个职位")


if __name__ == "__main__":
    asyncio.run(main())

主流程cli启动代码

"""JobRadar — CLI 统一入口"""

import sys
import os
import argparse

# 确保各模块目录能正常导入(只加根目录,避免 tools/playwright 与 real playwright 冲突)
_project_root = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, _project_root)


def cmd_chat(args):
    """启动交互式采集助手"""
    from agent.agent_loop import main as chat_main

    # 透传参数给 agent_loop
    pass_args = []
    if args.resume:
        pass_args += ["--resume", args.resume]
    if hasattr(args, "max_context") and args.max_context:
        pass_args += ["--max-context", str(args.max_context)]
    if pass_args:
        sys.argv = [sys.argv[0]] + pass_args
    chat_main()


def cmd_search(args):
    """直接搜索职位(跳过 AI 对话)"""
    from tools.playwright.browser import collect_jobs
    import asyncio

    cities = args.city.split(",") if args.city else []
    result = asyncio.run(collect_jobs(args.keyword, cities))
    for i, job in enumerate(result, 1):
        print(f"\n{'=' * 50}")
        print(f"{i}. {job['title']}")
        print(f"   公司: {job['company']}  |  薪资: {job['salary']}")
        print(f"   地点: {job['location']}")
        print(f"   标签: {', '.join(job['tags'])}")
        print(f"   JD: {job['jd'][:200]}...")


def cmd_web(args):
    """直接搜索 Bing(跳过 AI 对话)"""
    from tools.websearch import search_bing
    import asyncio

    result = asyncio.run(search_bing(args.query, args.count))
    for i, r in enumerate(result, 1):
        print(f"\n{i}. {r['title']}")
        print(f"   URL: {r['url']}")
        print(f"   摘要: {r['snippet'][:120]}...")


def cmd_resume(args):
    """解析简历文件并输出文本"""
    from resume import load_resume

    text = load_resume(args.file)
    print(text)


def main():
    parser = argparse.ArgumentParser(description="JobRadar — 职位采集分析工具")
    parser.add_argument("--resume", help="简历文件路径,启动时注入上下文")
    parser.add_argument(
        "--max-context", type=int, default=256000,
        help="最大上下文 token 数(默认 256000),压缩阈值自动设为 70%%",
    )

    sub = parser.add_subparsers(dest="command", help="可用命令")

    # python main.py search <keyword> [--city ...]
    p_search = sub.add_parser("search", help="搜索职位")
    p_search.add_argument("keyword", help="搜索关键词")
    p_search.add_argument("--city", default="", help="城市,多个用逗号分隔")

    # python main.py web <query> [--count 5]
    p_web = sub.add_parser("web", help="Bing 搜索")
    p_web.add_argument("query", help="搜索关键词")
    p_web.add_argument("--count", type=int, default=5, help="返回条数(默认 5)")

    # python main.py resume <file>
    p_resume = sub.add_parser("resume", help="解析简历文件")
    p_resume.add_argument("file", help="简历文件路径")

    args, _ = parser.parse_known_args()

    if args.command == "search":
        cmd_search(args)
    elif args.command == "web":
        cmd_web(args)
    elif args.command == "resume":
        cmd_resume(args)
    else:
        # 默认启动交互式对话
        cmd_chat(args)


if __name__ == "__main__":
    main()

有关项目的补充

模型层替换

当前 可换 改动
DeepSeek API OpenAI / Claude / Groq / 通义千问 / 本地模型(Ollama) 改 .env 中的 base_url + api_key,零代码

工具层替换

当前工具 业务场景 可替换为
browser.py 网页采集 任意平台数据抓取(电商、新闻、报表),只需保持函数签名
websearch.py 搜索引擎 Google / 百度 / Sogou / SearXNG,只需保持入参返回格式
resume.py 文档解析 任意文档类型:合同、发票、报告,换解析库即可
mouse.py 浏览器自动化 Selenium / pyppeteer / DrissionPage

上下文压缩(tools/compress.py)

当对话历史接近 token 上限时,自动将历史记录压缩为结构化 MD 摘要,注入到 system 和 user 之间作为上下文,避免长对话超出模型限制。

可调参数

参数 位置 默认值 说明
--max-context 启动参数 256000 模型最大上下文 token 数,压缩阈值自动设为 70%(如 256K → 180K 触发)
max_tokens compress_messages() 参数 2000 压缩后 MD 摘要的 token 上限
keep_last compress_messages() 参数 2 保留最近 N 轮完整对话不压缩

压缩范围

内容 压缩策略 是否压缩
System Prompt 始终保持不变 不压缩
用户输入 截取前 200 字 → > **用户**: ... 压缩
AI 文本回复 截取前 300 字 → > **AI**: ... 压缩
AI 工具调用 只保留工具名和参数 → > **AI 调用了 collect_jobs** — 参数: {...} 压缩
工具返回结果 截取前 200 字 → > **工具返回**: ... 压缩
最近 N 轮对话 保持完整,不做任何处理 不压缩

最近 N 轮默认 keep_last=2,即保留最近 2 轮用户 + AI 的完整对话。

可调策略

在 tools/compress.py 的 _to_md() 函数中可自定义:

策略 当前行为 可改为
摘要截断 用户输入保留前 200 字,AI 回复保留前 300 字 改为全文保留、或仅提取关键词、或按段落截取
工具结果处理 工具返回数据截取前 200 字 改为只保留工具名+关键字段,或丢弃工具返回只保留结论
保留轮数 保留最近 2 轮完整对话(keep_last=2 改为 3-5 轮,或按 token 数动态保留
压缩格式 结构化 MD(> **用户**: ... 格式) 改为纯文本、JSON、或表格
触发时机 每次工具调用和 AI 回复后检查 改为仅工具调用后、或仅用户输入后、或固定间隔

结束语

以上内容都是可以去github下载内容进行自己的更新和测试,这里只是放出一个引子,供大家参考和升级。

GitHub - haimikao0721-a11y/51JobRader · GitHub --- GitHub - haimikao0721-a11y/51JobRader · GitHub

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐