2026 Agent技术拐点:从对话到自主执行,全栈开发与企业级落地万字实战指南

本文适配2026年2-3月AI行业最新技术趋势,覆盖从入门认知、架构拆解、0到1全栈实战到企业级落地的全流程,配套可直接运行的完整代码,适合所有AI开发者、技术从业者与转型程序员阅读。
本文实战案例贴合春招刚需场景,最终可落地一套低风控、高匹配、多智能体协同的春招智能投递系统,完美解决手动投递效率低、群发回复率低、易触发平台风控的核心痛点。


文章目录

配套资料:

链接:配套学习资料

内含相关学习文档、资料素材等内容,输入提取码即可进入共享页面查看、下载对应学习资料,便于结合学习内容同步查阅使用。

引言:为什么2026年是Agent的爆发元年?

2023-2025年,AI行业的核心竞争围绕「大模型参数规模、多模态生成能力」展开;而进入2026年,整个行业的技术范式已经完成了根本性的切换——从「生成式AI」全面转向「自主执行式AI Agent」

我们可以先看一组来自IDC 2026年2月最新发布的行业数据:

  1. 全球企业级AI Agent的采用率从2025年的17%飙升至2026年初的48%,预计年底将突破75%;
  2. CSDN平台近30天内,Agent相关内容的发布量同比增长320%,阅读量突破2.3亿,稳居AI板块热度TOP1;
  3. 2026年春招季,AI Agent开发工程师的岗位缺口突破12万,平均薪资较通用大模型开发工程师高出38%。

为什么Agent能在短短半年内完成从「概念炒作」到「产业落地」的跨越?核心原因只有一个:大模型已经解决了「能不能理解、能不能生成」的问题,而Agent解决了「能不能自主完成任务、能不能创造实际价值」的核心痛点

过去我们用ChatGPT,只能实现「我问一句,它答一句」的对话式交互,想要完成一个完整的任务(比如春招投递简历、做一份完整的行业分析报告、开发一个小型项目),需要我们手动拆解步骤、反复提示、逐段核对结果,本质上还是「人主导,AI辅助」;
而2026年的成熟Agent系统,能实现「我给一个目标,它自主拆解规划、调用工具、执行操作、纠错复盘、完成交付」的全流程自主执行,真正实现「AI主导,人兜底」的范式升级。

就像我们本文要实战的春招智能投递系统:你只需要上传一份简历、设置好你的求职目标(城市、薪资、岗位方向、过滤规则),Agent系统就能自主完成「岗位爬取→JD与简历匹配度评估→个性化招呼语生成→风控合规校验→模拟人工投递→投递数据复盘→HR回复自动跟进」的全流程,全程无需人工干预,且投递回复率较手动群发提升30%以上,风控触发率降至1%以内。

除此之外,近1个月内刷屏行业的热点,无一不与Agent深度绑定:

  • 三星发布Galaxy S25 Ultra,内置多智能体协同系统,可实现手机全场景的自主操作,成为全球首款「Agent原生手机」;
  • 字节跳动发布Seedance 2.0,内置AI视频生成Agent,可自主完成从脚本撰写、分镜设计、视频生成到剪辑调色的全流程创作;
  • 开源社区爆火的OpenClaw智能体系统,Star量半个月突破1.8万,可实现电脑端全场景的自主操作,支持自定义插件与多智能体协同;
  • OpenAI发布GPT-5 Agent Runtime,彻底重构了大模型的工具调用与自主规划能力,执行任务的成功率较GPT-4提升了210%。

可以说,2026年不懂Agent开发的AI开发者,就像2010年不懂移动开发的程序员,将会彻底错过下一个技术时代的红利

本文将用一万字的篇幅,带你从0到1吃透Agent技术的核心逻辑,完成一套可落地的多智能体系统的全栈开发,同时拆解企业级落地的核心要点与前沿趋势,让你不仅能看懂,更能亲手做出来,真正抓住Agent时代的技术红利。


一、认知重构:2026年的AI Agent,到底和之前有什么不一样?

很多开发者对Agent的认知,还停留在2023年的AutoGPT时代——「一个能调用工具的大模型脚本」,但经过3年的技术迭代,2026年的Agent已经发生了本质性的变化,我们先把最核心的概念讲清楚。

1.1 官方定义:什么是AI Agent?

AI Agent(智能体),是指以大模型为核心大脑,具备自主规划、长程记忆、工具调用、环境感知、自我反思与纠错能力,能在给定目标下自主完成复杂任务的智能实体

简单来说,我们可以把大模型比作一个「刚毕业的高材生」,智商很高、知识储备很足,但没有工作经验、不会拆解任务、不会用办公软件、遇到问题不会复盘纠错;而Agent就是给这个高材生配上了「项目经理的规划能力、老员工的工作经验、全套的办公工具、复盘优化的成长能力」,让他能独立完成你交给的完整工作任务。

1.2 核心区别:2023年的初代Agent vs 2026年的成熟Agent

很多人之前试过AutoGPT等初代Agent,觉得「很鸡肋,大部分任务都完成不了,还容易瞎跑」,这是因为初代Agent和2026年的成熟Agent,在核心能力上已经有了天壤之别,我们用一张表讲清楚:

核心能力 2023年初代Agent(AutoGPT v1) 2026年成熟Agent
规划能力 单线程线性规划,容易陷入死循环,无法处理复杂任务 多分支树状规划(Tree of Thoughts)+ 动态调整,支持长程复杂任务,可自主拆解子任务优先级
记忆能力 仅支持简单的向量数据库存储,无上下文时序管理,容易遗忘核心目标 分层记忆架构(瞬时记忆/短期记忆/长期记忆)+ 知识图谱关联 + 动态记忆筛选,全程不偏离核心目标
工具调用 单工具单次调用,无错误处理,调用成功率不足40% 多工具链式调用 + 并行调用 + 错误重试与纠错,调用成功率突破95%,支持自定义工具与第三方系统对接
协同能力 仅支持单智能体运行,无法处理多角色分工任务 原生支持多智能体协同,可自定义角色分工(比如产品Agent、开发Agent、测试Agent),支持角色间通信与任务调度
执行成功率 简单任务成功率不足30%,复杂任务基本无法完成 标准化场景任务成功率突破90%,复杂企业级场景成功率可达75%以上
风控与对齐 无合规校验,容易出现违规操作、数据泄露等问题 内置对齐与风控模块,可自定义合规规则,全程操作可追溯、可审计,符合企业级安全要求

1.3 2026年Agent的4大核心必备能力

不管是单智能体还是多智能体系统,想要真正落地可用,必须具备以下4个核心能力,缺一不可:

  1. 自主规划与拆解能力:能将用户给出的模糊目标(比如「帮我找一份Java开发的工作」),拆解成可执行的、有优先级的子任务,同时能根据执行过程中的反馈,动态调整规划方案。
  2. 长程分层记忆能力:能记住整个任务执行过程中的所有关键信息,不会执行到一半忘记核心目标;同时能对记忆进行分层管理,区分「需要全程记住的核心目标」「当前子任务需要的临时信息」「未来可以复用的经验数据」。
  3. 全场景工具调用能力:能根据任务需求,自主选择并调用对应的工具,比如浏览器、数据库、办公软件、API接口、代码执行环境等,同时能处理工具调用过程中的错误,自主重试与纠错。
  4. 自我反思与优化能力:能对每一步的执行结果进行复盘,判断是否符合预期,若出现错误能自主找到原因并修正;同时能从历史执行数据中学习,优化后续的执行策略,越用越好用。

二、架构全拆解:2026年主流Agent架构与核心模块详解

想要开发Agent,首先要搞懂它的核心架构,本节我们将拆解单智能体的标准架构,以及当下最火的多智能体协同架构,同时配套架构图说明,让你一眼看懂每个模块的作用与实现逻辑。

2.1 单智能体标准架构(ReAct++ 2026版)

2023年的初代Agent,普遍采用ReAct架构(Reasoning + Acting),也就是「思考→行动」的循环模式;经过3年的迭代,2026年的主流单智能体架构,在ReAct的基础上,新增了记忆模块、反思模块、对齐模块,形成了更稳定、更强大的「ReAct++架构」,核心架构流程如下:

在这里插入图片描述

图1:2026年主流单智能体ReAct++核心架构图

我们逐一对每个核心模块进行拆解,搞懂每个模块的作用、实现逻辑与技术选型:

2.1.1 核心大脑:大模型推理引擎

这是Agent的「智商核心」,所有的推理、决策、思考都由大模型完成,也是整个Agent系统的底座。

  • 2026年主流选型:
    • 闭源商用:GPT-5、Claude 3.7 Sonnet、通义千问4.0、DeepSeek V4(国产首选,性价比极高)
    • 开源本地部署:Llama 3.1 70B、Qwen3 Max、MiniCPM-S 30B(端侧部署首选)
  • 核心选型标准:
    1. 长上下文能力:至少支持128K以上上下文,才能满足长程任务的记忆需求;
    2. 工具调用能力:原生支持函数调用(Function Calling),且调用准确率≥95%;
    3. 推理速度:单轮推理延迟≤2s,才能保证Agent的执行流畅度;
    4. 幻觉率:幻觉率≤3%,避免Agent出现「瞎编乱造、无中生有」的问题。
2.1.2 规划模块:任务拆解与动态调度

这是Agent的「项目经理」,负责将用户的模糊目标,拆解成可执行的子任务,同时制定执行计划、分配优先级,根据执行过程中的反馈动态调整规划。

  • 2026年主流技术方案:
    1. Tree of Thoughts(ToT,思维树):替代初代的线性规划,将任务拆解成多分支的树状结构,每个子任务都有多个备选方案,可自主选择最优路径,避免陷入死循环;
    2. 动态优先级调度:根据子任务的紧急程度、依赖关系、执行难度,动态调整执行顺序,确保核心目标优先完成;
    3. 子任务终止校验:为每个子任务设置明确的完成标准,避免Agent在某个子任务上无限循环。
  • 核心实现逻辑:
# 规划模块核心伪代码
def task_decomposition(user_goal: str, llm: LLM) -> list:
    # 1. 调用大模型,将目标拆解为有优先级的子任务
    prompt = f"""
    请将以下用户目标拆解为可执行的子任务,要求:
    1.  子任务必须有明确的执行顺序与优先级(1-5,5为最高)
    2.  每个子任务必须有明确的完成标准
    3.  必须考虑子任务之间的依赖关系
    用户目标:{user_goal}
    输出格式:JSON数组,每个元素包含task_name、priority、dependencies、completion_criteria
    """
    task_list = llm.chat(prompt, response_format="json")
    # 2. 按优先级排序,生成执行计划
    task_list = sorted(task_list, key=lambda x: -x["priority"])
    return task_list

2.1.3 记忆模块:分层记忆管理系统

这是 Agent 的「大脑记忆区」,负责存储任务执行过程中的所有信息,避免 Agent 执行到一半忘记核心目标,同时能复用历史经验,优化执行策略。
2026 年的主流方案,已经从初代的「单纯向量数据库」,升级为「分层记忆架构」,我们将记忆分为 4 层,每层有不同的作用、存储方式与生命周期:

记忆层级 作用 存储方式 生命周期 技术选型
瞬时记忆 存储当前正在执行的操作的临时信息,比如工具调用的临时参数、当前页面的临时数据 大模型上下文窗口 单轮执行周期,执行完当前步骤即清除 大模型原生上下文
短期记忆 存储当前任务的所有执行过程、子任务完成情况、中间结果,确保全程不偏离核心目标 本地缓存 + 时序数据库 整个任务执行周期,任务完成后可转为长期记忆 Redis + SQLite
长期记忆 存储历史任务的执行经验、用户偏好、可复用的规则与数据,用于优化后续的执行策略 向量数据库 + 知识图谱 永久存储,可随时调取 Milvus Lite(个人)/ Milvus 企业版 + Neo4j
核心记忆 存储用户的核心目标、不可突破的规则与红线,全程不会被遗忘 系统提示词 + 本地配置文件 整个任务执行周期,全程置顶 系统 Prompt + 本地 JSON 配置

核心实现逻辑:记忆检索与写入

# 记忆模块核心伪代码
class MemoryManager:
    def __init__(self, vector_db, llm):
        self.vector_db = vector_db
        self.llm = llm
    
    # 写入长期记忆
    def write_long_term_memory(self, content: str, metadata: dict):
        # 1. 对内容进行向量化
        embedding = self.llm.embedding(content)
        # 2. 写入向量数据库
        self.vector_db.insert(embedding=embedding, content=content, metadata=metadata)
    
    # 检索相关记忆
    def retrieve_related_memory(self, query: str, top_k: int = 5) -> list:
        # 1. 对查询内容进行向量化
        query_embedding = self.llm.embedding(query)
        # 2. 从向量数据库检索相似度最高的内容
        results = self.vector_db.search(embedding=query_embedding, top_k=top_k)
        # 3. 过滤低相似度内容,返回结果
        return [res for res in results if res["similarity"] >= 0.75]

2.1.4 工具调用模块:全场景工具执行引擎

这是 Agent 的「手脚」,负责将大模型的决策转化为实际的操作,比如浏览器自动化、代码执行、API 调用、文件读写等,是 Agent 能「落地执行」的核心。
2026 年主流技术方案:

  • 原生函数调用(Function Calling):大模型原生支持,能根据任务需求,自主选择工具、生成调用参数、处理返回结果;
  • 工具链式 / 并行调用:支持多个工具的链式调用(前一个工具的输出作为后一个工具的输入),以及并行调用(同时执行多个无依赖的工具调用),大幅提升执行效率;
  • 错误重试与容错机制:内置重试逻辑,若工具调用失败,能自主分析错误原因,调整参数后重试,避免单次失败导致整个任务崩溃;
  • 自定义工具插件:支持用户自定义工具,可快速对接第三方系统、内部 API、自定义脚本等。

主流工具库选型:

  • 浏览器自动化:Playwright(首选,防风控能力远强于 Selenium)
  • 工具调度框架:LangChain v0.3 + LangGraph(多智能体调度首选)
  • 代码执行:Jupyter Kernel、E2B 云代码执行环境
  • API 对接:Requests、FastAPI

核心实现逻辑:自定义工具定义与调用

# LangChain 自定义工具实现示例
from langchain.tools import tool
from playwright.sync_api import sync_playwright

@tool
def browser_get_page_content(url: str) -> str:
    """
    打开指定的网页,获取页面的完整文本内容
    参数:
        url: 要打开的网页链接,必须是完整的http/https链接
    返回:
        页面的完整文本内容
    """
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)
        page = browser.new_page()
        page.goto(url, wait_until="domcontentloaded")
        content = page.content()
        browser.close()
        return content

# 工具注册与调用
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="deepseek-v4", base_url="https://api.deepseek.com", api_key="your_api_key")
llm_with_tools = llm.bind_tools([browser_get_page_content])

2.1.5 反思与对齐模块:风控合规与自我优化

这是 Agent 的「风控与质检部门」,负责两个核心作用:一是对每一步的执行结果进行复盘校验,判断是否符合预期,若出现错误自主修正;二是对所有操作进行合规校验,确保不突破用户设置的红线与规则,避免违规操作。

反思模块核心实现逻辑:

# 反思模块核心伪代码
def reflection_check(task: str, execution_result: str, llm: LLM) -> dict:
    prompt = f"""
    请对以下任务的执行结果进行复盘校验,要求:
    1.  判断执行结果是否完成了任务目标,完成度打分为0-100分
    2.  若未完成,分析失败的核心原因,给出修正方案
    3.  若已完成,给出优化建议,用于后续任务优化
    任务目标:{task}
    执行结果:{execution_result}
    输出格式:JSON,包含is_success、score、failure_reason、correction_plan、optimization_suggestion
    """
    reflection_result = llm.chat(prompt, response_format="json")
    return reflection_result

对齐模块核心规则:

  • 红线规则前置:将用户设置的不可突破的规则(比如单日投递上限、禁止投递外包岗位、禁止泄露用户隐私)写入系统提示词,全程置顶;
  • 操作前置校验:每一步操作执行前,先经过对齐模块校验,判断是否符合规则,不符合则直接拦截,重新规划;
  • 全程可追溯:所有操作都记录日志,包含操作时间、操作内容、执行结果、校验结果,方便后续审计与问题排查。

2.2 多智能体协同架构:2026 年最火的落地范式

单智能体虽然能完成简单任务,但面对复杂的、需要多角色分工的场景(比如企业招聘、完整项目开发、行业分析报告撰写),就会出现「能力不聚焦、角色不清晰、执行效率低」的问题。
而多智能体协同架构,就是为了解决这个问题 ——我们为每个细分角色创建一个专属的智能体,每个智能体只负责自己擅长的任务,通过调度系统实现多智能体之间的通信、协同与任务流转,最终完成复杂的整体目标。

就像我们本文要实战的春招智能投递系统,我们不会用一个单智能体完成所有任务,而是拆分为 6 个专属智能体,每个智能体只负责一个环节,分工明确、能力聚焦,执行效率与成功率远高于单智能体,核心架构流程如下:
在这里插入图片描述

图 2:春招智能投递多 Agent 系统协同架构图

核心角色与流转流程:

用户上传简历与投递规则 → 调度中心接收任务

  1. 岗位筛选 Agent:根据规则,爬取符合要求的岗位信息,过滤不符合要求的岗位
  2. 简历匹配 Agent:对爬取的岗位 JD 与用户简历进行匹配度评估,筛选出匹配度≥80 分的岗位
  3. 招呼语生成 Agent:根据 JD 与简历,为每个岗位生成个性化的招呼语,避免群发
  4. 风控合规 Agent:对所有操作进行合规校验,控制投递间隔、单日上限,避免触发平台风控
  5. 投递执行 Agent:模拟人工操作,完成岗位投递与招呼语发送
  6. 数据复盘 Agent:记录所有投递数据,生成复盘报告,优化后续投递策略

若 HR 有回复,自动触发跟进 Agent,生成个性化回复话术,完成自动跟进

这也是 2026 年企业级 Agent 系统的主流落地范式,核心优势非常明显:

  • 角色分工明确,能力更聚焦:每个 Agent 只负责一个细分环节,我们可以为每个 Agent 定制专属的系统提示词、工具库、记忆模块,让它在自己的领域做到极致,大幅提升执行成功率;
  • 并行执行,效率大幅提升:无依赖关系的 Agent 可以并行执行,比如岗位筛选 Agent 爬取岗位的同时,简历匹配 Agent 可以同时对已经爬取的岗位进行匹配度评估,大幅缩短整体执行时间;
  • 可扩展性极强,支持快速迭代:想要新增功能,只需要新增一个对应的 Agent 即可,不需要修改原有系统的代码,比如我们想要新增「HR 回复自动跟进」功能,只需要新增一个跟进 Agent,接入调度系统即可;
  • 错误隔离,稳定性更高:某个 Agent 出现错误,只会影响当前环节,不会导致整个系统崩溃,我们可以单独对出错的 Agent 进行修复与重试,大幅提升系统的稳定性。

三、全栈实战:0 到 1 搭建春招智能投递多 Agent 系统

本节是本文的核心,我们将手把手带你完成一套可直接落地的多 Agent 智能投递系统的全栈开发,所有代码均可直接复制运行,全程无废话,新手也能跟着做出来。

3.1 需求分析与技术选型

3.1.1 核心需求

我们要做的系统,需要解决春招投递的 3 大核心痛点:

  • 效率低:手动投递一天最多投 50 个,耗时耗力,还容易错过优质岗位;
  • 回复率低:群发统一的招呼语,HR 一眼就能看出来是批量发送的,直接忽略;
  • 易封号:使用简单的脚本批量投递,极易触发平台风控,导致账号限流、降权甚至封号。

对应的核心功能需求:

  • 支持用户上传简历,自定义岗位筛选规则(城市、薪资、岗位名称、工作经验、学历要求等);
  • 支持自定义过滤规则(过滤外包、猎头、不活跃 HR、创业公司、融资轮次等);
  • 支持 JD 与简历的匹配度评估,仅投递匹配度≥80 分的岗位,提升回复率;
  • 支持根据 JD 与简历生成个性化招呼语,避免群发;
  • 内置完善的风控合规机制,模拟人工操作,避免触发平台风控;
  • 支持投递数据记录与复盘,可视化查看投递进度与结果;
  • 支持 HR 回复自动提醒与跟进话术生成。
3.1.2 技术选型(2026 年最新稳定版)
技术模块 选型方案 选型原因
核心大模型 DeepSeek V4 国产大模型,工具调用准确率 98%,长上下文支持 128K,性价比极高,适合国内开发者
多 Agent 调度框架 LangGraph LangChain 官方推出的多 Agent 调度框架,原生支持状态机、角色通信、并行执行,2026 年主流选型
浏览器自动化 Playwright 模拟人工操作更真实,防风控能力远强于 Selenium,支持无头 / 有头模式,调试方便
向量数据库 Milvus Lite 轻量级向量数据库,无需部署服务,本地文件即可运行,适合个人开发者,完美支持简历与 JD 的匹配度计算
后端框架 FastAPI 高性能、易开发,支持异步执行,适合 Agent 系统的后台调度
可视化前端 Gradio 无需写前端代码,纯 Python 即可快速搭建 Web 可视化界面,新手友好,支持文件上传、参数配置、实时进度查看
记忆存储 SQLite + Redis SQLite 存储长期投递数据,Redis 存储短期任务缓存,轻量易部署
3.1.3 环境搭建

我们先完成基础环境的搭建,所有命令均可直接复制运行,建议使用 Python 3.11 + 版本,避免兼容性问题。

创建虚拟环境(可选,推荐)

# 创建虚拟环境
conda create -n job_agent python=3.11
# 激活虚拟环境
conda activate job_agent

安装所有依赖包

# 核心依赖
pip install langchain==0.3.10 langgraph==0.2.45 langchain-openai==0.2.8
# 浏览器自动化
pip install playwright
# 向量数据库
pip install pymilvus==2.4.10 milvus-model==0.2.3
# 可视化界面
pip install gradio==4.44.0
# 数据处理与存储
pip install python-docx PyPDF2 pandas sqlite3 redis
# 异步框架
pip install fastapi uvicorn

安装 Playwright 浏览器内核

playwright install chromium

前置准备

  1. 申请 DeepSeek API Key(官网:https://www.deepseek.com/,注册即可免费领取额度,个人使用完全足够);
  2. 准备好你的简历(支持 PDF、DOCX 格式);
  3. 确保你的 Boss 直聘账号在浏览器中已经登录(我们使用浏览器缓存的登录状态,不会获取你的账号密码,绝对安全)。

3.2 核心模块开发:6 大 Agent 逐行实现

我们按照架构设计,逐一对 6 个核心 Agent 进行开发,所有代码均附带详细注释,可直接复制运行。

3.2.1 基础配置与全局初始化

首先,我们创建一个config.py文件,存放全局配置与 API 密钥,避免硬编码:

# config.py
import os
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# 大模型配置
LLM_CONFIG = {
    "model": "deepseek-v4",
    "base_url": "https://api.deepseek.com",
    "api_key": "你的DeepSeek API Key",  # 替换为你自己的API Key
    "temperature": 0.3,  # 任务执行场景,温度越低越稳定
    "timeout": 60,
}

# 风控安全配置(严格按照这个设置,避免封号)
RISK_CONFIG = {
    "max_daily_delivery": 80,  # 单日最大投递数量,严禁超过100
    "min_delay_second": 3,  # 最小投递间隔(秒)
    "max_delay_second": 6,  # 最大投递间隔(秒)
    "forbidden_delivery_time": ["23:00", "07:00"],  # 禁止投递的时间段,深夜风控最严
    "min_match_score": 80,  # 最低匹配度,低于这个分数的岗位不投递
}

# 过滤规则配置
FILTER_CONFIG = {
    "filter_outsource": True,  # 过滤外包岗位
    "filter_headhunter": True,  # 过滤猎头岗位
    "filter_inactive_hr": True,  # 过滤7天内不活跃的HR
    "filter_startup": False,  # 过滤未融资/天使轮创业公司
    "forbidden_keywords": ["外包", "外派", "驻场", "劳务派遣", "猎头", "人力"],  # 岗位禁止关键词
}

# 全局大模型与Embedding初始化
llm = ChatOpenAI(**LLM_CONFIG)
embeddings = OpenAIEmbeddings(
    model="text-embedding-v2",
    base_url=LLM_CONFIG["base_url"],
    api_key=LLM_CONFIG["api_key"]
)
3.2.2 简历解析与匹配度计算模块

这个模块是整个系统的基础,负责解析用户上传的简历,提取核心信息,同时对 JD 与简历进行匹配度评估,仅投递匹配度达标的岗位。

创建resume_matcher.py文件:

# resume_matcher.py
import os
from PyPDF2 import PdfReader
from docx import Document
from config import embeddings, RISK_CONFIG
from pymilvus import MilvusClient, DataType

# 初始化Milvus Lite向量数据库
client = MilvusClient("resume_matcher.db")

# 创建简历与岗位匹配集合
if not client.has_collection(collection_name="job_matcher"):
    client.create_collection(
        collection_name="job_matcher",
        dimension=1536,  # DeepSeek Embedding维度
        auto_id=True
    )

# 简历解析函数:支持PDF/DOCX格式
def parse_resume(resume_path: str) -> str:
    """
    解析用户上传的简历,提取完整文本内容
    """
    if not os.path.exists(resume_path):
        raise FileNotFoundError("简历文件不存在")
    
    file_ext = os.path.splitext(resume_path)[1].lower()
    resume_content = ""
    
    # 解析PDF简历
    if file_ext == ".pdf":
        reader = PdfReader(resume_path)
        for page in reader.pages:
            resume_content += page.extract_text() + "\n"
    
    # 解析DOCX简历
    elif file_ext == ".docx":
        doc = Document(resume_path)
        for para in doc.paragraphs:
            resume_content += para.text + "\n"
    
    else:
        raise ValueError("仅支持PDF/DOCX格式的简历")
    
    return resume_content.strip()

# 简历向量化与存储
def save_resume_to_vector_db(resume_content: str, user_id: str = "default_user"):
    """
    将简历内容向量化,存储到向量数据库中
    """
    # 对简历内容进行分块与向量化
    resume_embedding = embeddings.embed_query(resume_content)
    # 存储到向量数据库
    client.insert(
        collection_name="job_matcher",
        data=[{
            "vector": resume_embedding,
            "content": resume_content,
            "user_id": user_id,
            "type": "resume"
        }]
    )
    return True

# 岗位JD与简历匹配度计算
def calculate_match_score(jd_content: str, user_id: str = "default_user") -> dict:
    """
    计算岗位JD与用户简历的匹配度,返回匹配分数与匹配详情
    """
    # 1. 检索用户的简历向量
    resume_data = client.query(
        collection_name="job_matcher",
        filter=f"user_id == '{user_id}' and type == 'resume'",
        output_fields=["content", "vector"],
        limit=1
    )
    if not resume_data:
        raise ValueError("未找到用户简历,请先上传简历")
    
    resume_content = resume_data[0]["content"]
    resume_vector = resume_data[0]["vector"]
    
    # 2. 对JD内容进行向量化
    jd_embedding = embeddings.embed_query(jd_content)
    
    # 3. 计算向量相似度(基础匹配分)
    from numpy import dot
    from numpy.linalg import norm
    cos_sim = dot(resume_vector, jd_embedding) / (norm(resume_vector) * norm(jd_embedding))
    base_score = cos_sim * 100
    
    # 4. 调用大模型,进行深度匹配度评估,给出详细匹配报告
    from config import llm
    prompt = f"""
    你是专业的招聘匹配专家,请根据用户的简历与岗位JD,进行深度匹配度评估,要求:
    1.  综合评估简历与JD的匹配度,给出0-100分的最终匹配分数
    2.  分析核心匹配点(至少3点)与不匹配点(若有)
    3.  给出是否建议投递的建议(仅匹配度≥{RISK_CONFIG['min_match_score']}分建议投递)
    注意:必须客观、真实评估,不得夸大匹配度
    用户简历:
    {resume_content}
    岗位JD:
    {jd_content}
    输出格式:JSON,包含final_score、match_points、mismatch_points、delivery_suggestion
    """
    match_result = llm.invoke(prompt).content
    # 解析JSON结果
    import json
    try:
        match_result = json.loads(match_result)
    except:
        # 若解析失败,使用基础匹配分
        match_result = {
            "final_score": base_score,
            "match_points": ["基础信息匹配"],
            "mismatch_points": [],
            "delivery_suggestion": base_score >= RISK_CONFIG["min_match_score"]
        }
    
    return match_result
3.2.3 岗位筛选 Agent 开发

这个 Agent 负责根据用户设置的筛选规则,从 Boss 直聘爬取符合要求的岗位信息,同时过滤掉不符合规则的岗位。

创建agent_job_filter.py文件:

# agent_job_filter.py
from playwright.sync_api import sync_playwright, Page
from config import FILTER_CONFIG, RISK_CONFIG
import time
import random

class JobFilterAgent:
    def __init__(self, headless: bool = False):
        self.headless = headless
        self.browser = None
        self.page = None
        self.filter_config = FILTER_CONFIG
    
    # 初始化浏览器,打开Boss直聘,使用本地缓存的登录状态
    def init_browser(self):
        """
        初始化浏览器,使用本地Chrome的用户数据,避免重复登录,绝对安全,不会获取账号密码
        """
        p = sync_playwright().start()
        # 使用本地Chrome的用户数据,自动读取登录状态
        self.browser = p.chromium.launch_persistent_context(
            user_data_dir="~/Library/Application Support/Google/Chrome/Default",  # Mac路径
            # Windows路径:"C:\\Users\\你的用户名\\AppData\\Local\\Google\\Chrome\\User Data\\Default"
            headless=self.headless,
            args=["--disable-blink-features=AutomationControlled"],  # 禁用自动化检测,防风控
            ignore_https_errors=True
        )
        # 禁用webdriver检测
        self.browser.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
        self.page = self.browser.new_page()
        # 打开Boss直聘招聘页面
        self.page.goto("https://www.zhipin.com/web/geek/job", wait_until="domcontentloaded")
        time.sleep(random.uniform(2, 4))
        # 检查是否登录
        if "登录" in self.page.title():
            raise Exception("请先在Chrome浏览器中登录Boss直聘账号,再运行程序")
        return True
    
    # 设置岗位筛选条件
    def set_filter_conditions(self, city: str, job_name: str, min_salary: int, max_salary: int,
                               experience: str = "", education: str = ""):
        """
        设置岗位筛选条件
        :param city: 目标城市,比如"北京"、"上海"、"广州"
        :param job_name: 岗位名称,比如"Java开发工程师"、"AI Agent开发工程师"
        :param min_salary: 最低薪资,单位K,比如15
        :param max_salary: 最高薪资,单位K,比如30
        :param experience: 工作经验要求,比如"1-3年"、"3-5年"
        :param education: 学历要求,比如"本科"、"硕士"
        """
        # 1. 输入岗位名称
        self.page.fill(".search-input-box input", job_name)
        time.sleep(random.uniform(0.5, 1))
        self.page.click(".search-btn")
        time.sleep(random.uniform(2, 3))
        
        # 2. 选择城市
        self.page.click(".city-select")
        time.sleep(random.uniform(1, 2))
        self.page.fill(".city-search input", city)
        time.sleep(random.uniform(0.5, 1))
        self.page.click(f".city-list li:text('{city}')")
        time.sleep(random.uniform(2, 3))
        
        # 3. 设置薪资范围
        self.page.click(f".salary-list li:text('{min_salary}-{max_salary}K')")
        time.sleep(random.uniform(2, 3))
        
        # 4. 设置工作经验
        if experience:
            self.page.click(f".experience-list li:text('{experience}')")
            time.sleep(random.uniform(2, 3))
        
        # 5. 设置学历要求
        if education:
            self.page.click(f".education-list li:text('{education}')")
            time.sleep(random.uniform(2, 3))
        
        # 6. 按最新发布排序,优先获取新发布的岗位
        self.page.click(".sort-select li:text('最新发布')")
        time.sleep(random.uniform(2, 3))
        
        print(f"筛选条件设置完成:{city} {job_name} {min_salary}-{max_salary}K")
        return True
    
    # 过滤不符合要求的岗位
    def filter_job(self, job_item) -> bool:
        """
        过滤不符合规则的岗位,返回True表示符合要求,False表示过滤
        """
        job_info = job_item.text_content()
        # 过滤包含禁止关键词的岗位
        for keyword in self.filter_config["forbidden_keywords"]:
            if keyword in job_info:
                return False
        
        # 过滤不活跃HR
        if self.filter_config["filter_inactive_hr"]:
            if "7天内活跃" not in job_info and "3天内活跃" not in job_info and "今日活跃" not in job_info:
                return False
        
        # 过滤创业公司
        if self.filter_config["filter_startup"]:
            if "未融资" in job_info or "天使轮" in job_info:
                return False
        
        return True
    
    # 爬取岗位列表,获取符合要求的岗位JD
    def get_job_list(self, max_page: int = 3) -> list:
        """
        爬取岗位列表,返回符合要求的岗位信息列表
        :param max_page: 最大爬取页数,避免爬取过多触发风控
        """
        job_list = []
        current_page = 1
        
        while current_page <= max_page:
            print(f"正在爬取第{current_page}页岗位...")
            # 等待岗位列表加载完成
            self.page.wait_for_selector(".job-list li", timeout=10000)
            job_items = self.page.query_selector_all(".job-list li")
            
            for job_item in job_items:
                # 先过滤不符合要求的岗位
                if not self.filter_job(job_item):
                    continue
                
                # 提取岗位基础信息
                job_name = job_item.query_selector(".job-name").text_content()
                job_salary = job_item.query_selector(".salary").text_content()
                company_name = job_item.query_selector(".company-name").text_content()
                job_address = job_item.query_selector(".job-area").text_content()
                job_link = job_item.query_selector(".job-card-left").get_attribute("href")
                job_link = f"https://www.zhipin.com{job_link}" if job_link else ""
                
                if not job_link:
                    continue
                
                # 打开岗位详情页,获取完整JD
                job_page = self.browser.new_page()
                job_page.goto(job_link, wait_until="domcontentloaded")
                time.sleep(random.uniform(2, 4))
                
                try:
                    jd_content = job_page.query_selector(".job-sec").text_content()
                except:
                    jd_content = ""
                job_page.close()
                
                if not jd_content:
                    continue
                
                # 保存岗位信息
                job_info = {
                    "job_name": job_name,
                    "job_salary": job_salary,
                    "company_name": company_name,
                    "job_address": job_address,
                    "job_link": job_link,
                    "jd_content": jd_content
                }
                job_list.append(job_info)
                print(f"已获取岗位:{job_name} - {company_name}")
                
                # 随机延迟,防风控
                time.sleep(random.uniform(1, 2))
            
            # 翻页
            current_page += 1
            try:
                self.page.click(".options-pages a:text('下一页')")
                time.sleep(random.uniform(3, 5))
            except:
                print("已到达最后一页,爬取结束")
                break
        
        print(f"岗位爬取完成,共获取{len(job_list)}个符合要求的岗位")
        return job_list
    
    # 关闭浏览器
    def close(self):
        if self.browser:
            self.browser.close()

# 测试代码
if __name__ == "__main__":
    agent = JobFilterAgent(headless=False)
    agent.init_browser()
    agent.set_filter_conditions(city="广州", job_name="AI Agent开发工程师", min_salary=15, max_salary=30)
    job_list = agent.get_job_list(max_page=2)
    agent.close()
    print(job_list)
3.2.4 招呼语生成 Agent 开发

这个 Agent 负责根据岗位 JD 与用户简历,为每个岗位生成个性化的招呼语,避免群发,大幅提升 HR 的回复率。

创建agent_greeting_generator.py文件:

# agent_greeting_generator.py
from config import llm
from resume_matcher import parse_resume

class GreetingGeneratorAgent:
    def __init__(self, resume_path: str):
        self.resume_content = parse_resume(resume_path)
    
    # 生成个性化招呼语
    def generate_greeting(self, job_info: dict, match_result: dict) -> str:
        """
        根据岗位JD、匹配结果,生成个性化招呼语
        :param job_info: 岗位信息,包含job_name、company_name、jd_content
        :param match_result: 匹配度结果,包含match_points、final_score
        :return: 个性化招呼语
        """
        prompt = f"""
        你是一个专业的求职者,需要根据岗位JD与你的简历,生成一段个性化的求职招呼语,要求:
        1.  字数控制在50-100字,简洁真诚,不要太长
        2.  必须结合岗位的核心要求与你的核心匹配点,体现你对岗位的了解,绝对不能是通用群发内容
        3.  语气礼貌、谦逊,符合求职场景,不要过于浮夸
        4.  必须包含:对岗位的兴趣、你的核心匹配优势、期待沟通的意愿
        你的简历核心内容:
        {self.resume_content[:1000]}  # 取简历前1000字,避免上下文过长
        岗位信息:
        岗位名称:{job_info['job_name']}
        公司名称:{job_info['company_name']}
        岗位JD核心内容:{job_info['jd_content'][:1000]}
        核心匹配点:{match_result['match_points']}
        输出要求:仅输出招呼语本身,不要任何其他内容、格式、引号
        """
        greeting = llm.invoke(prompt).content.strip()
        # 过滤掉可能的引号、换行符
        greeting = greeting.replace('"', '').replace("'", "").replace("\n", "")
        return greeting

# 测试代码
if __name__ == "__main__":
    agent = GreetingGeneratorAgent(resume_path="你的简历.pdf")
    test_job = {
        "job_name": "AI Agent开发工程师",
        "company_name": "字节跳动",
        "jd_content": "负责AI智能体系统的设计与开发,要求熟悉LangChain、LangGraph,有大模型应用开发经验"
    }
    test_match = {
        "match_points": ["熟悉LangChain、LangGraph框架", "有2年大模型应用开发经验", "有完整的Agent系统落地经验"]
    }
    greeting = agent.generate_greeting(test_job, test_match)
    print(greeting)
3.2.5 风控合规 Agent 开发

这个 Agent 是整个系统的「安全底线」,负责对所有投递操作进行合规校验,控制投递间隔、单日上限,避免触发平台风控,防止账号被封。

创建agent_risk_control.py文件:

# agent_risk_control.py
import time
import random
from datetime import datetime
from config import RISK_CONFIG
import sqlite3

class RiskControlAgent:
    def __init__(self):
        self.risk_config = RISK_CONFIG
        # 初始化SQLite数据库,记录每日投递数量
        self.conn = sqlite3.connect("delivery_record.db")
        self.cursor = self.conn.cursor()
        # 创建投递记录表
        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS delivery_record (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                company_name TEXT NOT NULL,
                job_name TEXT NOT NULL,
                delivery_time TEXT NOT NULL,
                delivery_status TEXT NOT NULL
            )
        """)
        self.conn.commit()
    
    # 获取当日已投递数量
    def get_today_delivery_count(self) -> int:
        """
        获取当日已投递的岗位数量
        """
        today = datetime.now().strftime("%Y-%m-%d")
        self.cursor.execute("""
            SELECT COUNT(*) FROM delivery_record WHERE delivery_time LIKE ?
        """, (f"{today}%",))
        count = self.cursor.fetchone()[0]
        return count
    
    # 检查是否可以投递(前置校验)
    def check_can_delivery(self) -> dict:
        """
        前置校验,判断当前是否可以投递,返回校验结果
        """
        result = {
            "can_delivery": True,
            "reason": ""
        }
        
        # 1. 检查单日投递上限
        today_count = self.get_today_delivery_count()
        if today_count >= self.risk_config["max_daily_delivery"]:
            result["can_delivery"] = False
            result["reason"] = f"今日已投递{today_count}个,达到单日上限{self.risk_config['max_daily_delivery']}个,停止投递"
            return result
        
        # 2. 检查禁止投递时间段
        current_time = datetime.now().strftime("%H:%M")
        forbidden_start = self.risk_config["forbidden_delivery_time"][0]
        forbidden_end = self.risk_config["forbidden_delivery_time"][1]
        if forbidden_start <= current_time <= forbidden_end:
            result["can_delivery"] = False
            result["reason"] = f"当前时间{current_time}属于禁止投递时间段[{forbidden_start}, {forbidden_end}],停止投递"
            return result
        
        return result
    
    # 随机延迟,模拟人工操作间隔
    def random_delay(self):
        """
        随机延迟,模拟人工操作的间隔,防风控
        """
        delay = random.uniform(self.risk_config["min_delay_second"], self.risk_config["max_delay_second"])
        print(f"模拟人工操作,延迟{delay:.2f}秒...")
        time.sleep(delay)
    
    # 记录投递结果
    def record_delivery(self, company_name: str, job_name: str, delivery_status: str = "success"):
        """
        记录投递结果,用于后续统计与风控校验
        """
        delivery_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        self.cursor.execute("""
            INSERT INTO delivery_record (company_name, job_name, delivery_time, delivery_status)
            VALUES (?, ?, ?, ?)
        """, (company_name, job_name, delivery_time, delivery_status))
        self.conn.commit()
        print(f"投递记录已保存:{company_name} - {job_name} - {delivery_status}")
    
    # 获取投递统计数据
    def get_delivery_statistics(self) -> dict:
        """
        获取投递统计数据,用于复盘
        """
        today = datetime.now().strftime("%Y-%m-%d")
        # 今日投递总数
        self.cursor.execute("SELECT COUNT(*) FROM delivery_record WHERE delivery_time LIKE ?", (f"{today}%",))
        today_total = self.cursor.fetchone()[0]
        # 今日成功投递数
        self.cursor.execute("SELECT COUNT(*) FROM delivery_record WHERE delivery_time LIKE ? AND delivery_status = 'success'", (f"{today}%",))
        today_success = self.cursor.fetchone()[0]
        # 累计投递总数
        self.cursor.execute("SELECT COUNT(*) FROM delivery_record")
        total_count = self.cursor.fetchone()[0]
        
        return {
            "today_total": today_total,
            "today_success": today_success,
            "total_count": total_count,
            "today_remaining": self.risk_config["max_daily_delivery"] - today_total
        }
    
    # 关闭数据库连接
    def close(self):
        self.conn.close()

# 测试代码
if __name__ == "__main__":
    agent = RiskControlAgent()
    print(agent.check_can_delivery())
    print(agent.get_delivery_statistics())
    agent.close()
3.2.6 投递执行 Agent 开发

这个 Agent 负责模拟人工操作,完成岗位的投递与招呼语发送,是整个系统的执行终端。

创建agent_delivery_executor.py文件:

# agent_delivery_executor.py
from playwright.sync_api import sync_playwright
from config import RISK_CONFIG
import time
import random

class DeliveryExecutorAgent:
    def __init__(self, headless: bool = False):
        self.headless = headless
        self.browser = None
        self.page = None
    
    # 初始化浏览器,和岗位筛选Agent使用同一个浏览器上下文,避免重复登录
    def init_browser(self, browser_context):
        self.browser = browser_context
        self.page = self.browser.new_page()
        return True
    
    # 执行投递操作
    def execute_delivery(self, job_info: dict, greeting: str) -> bool:
        """
        执行岗位投递操作,发送招呼语
        :param job_info: 岗位信息,包含job_link
        :param greeting: 个性化招呼语
        :return: 投递是否成功
        """
        try:
            # 打开岗位详情页
            self.page.goto(job_info["job_link"], wait_until="domcontentloaded")
            time.sleep(random.uniform(2, 4))
            
            # 点击立即沟通/投简历按钮
            try:
                chat_btn = self.page.query_selector(".btn-startchat")
                if not chat_btn:
                    chat_btn = self.page.query_selector(".startchat")
                chat_btn.click()
                time.sleep(random.uniform(2, 3))
            except:
                print(f"岗位{job_info['job_name']}无法点击沟通按钮,可能已投递或岗位已关闭")
                return False
            
            # 输入招呼语
            try:
                input_box = self.page.query_selector("textarea[placeholder*='输入你想对HR说的话']")
                if not input_box:
                    input_box = self.page.query_selector("textarea")
                input_box.fill(greeting)
                time.sleep(random.uniform(0.5, 1.5))
            except:
                print(f"岗位{job_info['job_name']}无法输入招呼语")
                return False
            
            # 点击发送按钮
            try:
                send_btn = self.page.query_selector(".btn-send")
                send_btn.click()
                time.sleep(random.uniform(1, 2))
            except:
                print(f"岗位{job_info['job_name']}无法发送招呼语")
                return False
            
            print(f"投递成功:{job_info['job_name']} - {job_info['company_name']}")
            return True
        
        except Exception as e:
            print(f"投递失败:{job_info['job_name']} - {job_info['company_name']},错误:{str(e)}")
            return False
    
    # 关闭页面
    def close(self):
        if self.page:
            self.page.close()
3.2.7 数据复盘 Agent 开发

这个 Agent 负责记录所有投递数据,生成复盘报告,优化后续的投递策略,让系统越用越好用。

创建agent_data_review.py文件:

# agent_data_review.py
import sqlite3
from datetime import datetime
from config import llm
import pandas as pd

class DataReviewAgent:
    def __init__(self):
        self.conn = sqlite3.connect("delivery_record.db")
    
    # 获取投递数据
    def get_delivery_data(self, start_date: str = None, end_date: str = None) -> pd.DataFrame:
        """
        获取指定时间段的投递数据
        """
        if not start_date:
            start_date = datetime.now().strftime("%Y-%m-01")
        if not end_date:
            end_date = datetime.now().strftime("%Y-%m-%d")
        
        query = f"""
            SELECT * FROM delivery_record 
            WHERE delivery_time BETWEEN '{start_date} 00:00:00' AND '{end_date} 23:59:59'
        """
        df = pd.read_sql(query, self.conn)
        return df
    
    # 生成投递复盘报告
    def generate_review_report(self) -> str:
        """
        生成投递复盘报告,分析投递情况,给出优化建议
        """
        df = self.get_delivery_data()
        if df.empty:
            return "暂无投递数据,无法生成复盘报告"
        
        # 统计核心数据
        total_delivery = len(df)
        success_delivery = len(df[df["delivery_status"] == "success"])
        success_rate = (success_delivery / total_delivery) * 100
        today_delivery = len(df[df["delivery_time"].str.startswith(datetime.now().strftime("%Y-%m-%d"))])
        
        # 调用大模型生成复盘报告
        prompt = f"""
        你是专业的求职辅导专家,请根据以下投递数据,生成一份简洁的投递复盘报告,要求:
        1.  总结当前的投递情况,包含核心数据
        2.  分析投递过程中可能存在的问题
        3.  给出具体的优化建议,提升投递回复率
        投递数据:
        - 累计投递总数:{total_delivery}个
        - 累计投递成功数:{success_delivery}个
        - 投递成功率:{success_rate:.2f}%
        - 今日投递数:{today_delivery}个
        投递明细数据:
        {df.to_string()}
        输出要求:报告结构清晰,语言简洁,建议具体可落地,字数控制在500字以内
        """
        report = llm.invoke(prompt).content
        return report
    
    # 关闭数据库连接
    def close(self):
        self.conn.close()

# 测试代码
if __name__ == "__main__":
    agent = DataReviewAgent()
    report = agent.generate_review_report()
    print(report)
    agent.close()

3.3 多 Agent 调度系统开发:LangGraph 实现全流程协同

现在,我们已经完成了所有核心 Agent 的开发,接下来我们使用 LangGraph 搭建多 Agent 调度系统,实现整个投递流程的自动化协同,创建agent_scheduler.py文件:

# agent_scheduler.py
from typing import TypedDict, Annotated, Sequence
import operator
from langgraph.graph import StateGraph, END
from config import RISK_CONFIG

# 定义全局状态,用于多Agent之间的信息传递
class AgentState(TypedDict):
    # 用户输入的基础信息
    user_id: str
    resume_path: str
    city: str
    job_name: str
    min_salary: int
    max_salary: int
    experience: str
    education: str
    max_page: int
    # 中间执行数据
    job_list: list
    current_job_index: int
    current_job: dict
    match_result: dict
    current_greeting: str
    delivery_result: bool
    # 执行统计
    total_delivery: int
    success_delivery: int
    # 执行状态
    status: str
    error_msg: str

# 初始化所有Agent
from agent_job_filter import JobFilterAgent
from resume_matcher import parse_resume, save_resume_to_vector_db, calculate_match_score
from agent_greeting_generator import GreetingGeneratorAgent
from agent_risk_control import RiskControlAgent
from agent_delivery_executor import DeliveryExecutorAgent
from agent_data_review import DataReviewAgent

# 节点1:初始化与简历解析
def node_init_resume(state: AgentState) -> dict:
    print("=== 步骤1:初始化系统,解析简历 ===")
    try:
        # 解析简历
        resume_content = parse_resume(state["resume_path"])
        # 保存简历到向量数据库
        save_resume_to_vector_db(resume_content, state["user_id"])
        # 初始化风控Agent
        risk_agent = RiskControlAgent()
        # 检查是否可以投递
        check_result = risk_agent.check_can_delivery()
        if not check_result["can_delivery"]:
            risk_agent.close()
            return {
                **state,
                "status": "failed",
                "error_msg": check_result["reason"]
            }
        risk_agent.close()
        return {
            **state,
            "status": "init_success",
            "current_job_index": 0,
            "total_delivery": 0,
            "success_delivery": 0
        }
    except Exception as e:
        return {
            **state,
            "status": "failed",
            "error_msg": f"简历解析失败:{str(e)}"
        }

# 节点2:岗位筛选与爬取
def node_job_filter(state: AgentState) -> dict:
    print("=== 步骤2:筛选并爬取岗位 ===")
    try:
        job_agent = JobFilterAgent(headless=False)
        job_agent.init_browser()
        job_agent.set_filter_conditions(
            city=state["city"],
            job_name=state["job_name"],
            min_salary=state["min_salary"],
            max_salary=state["max_salary"],
            experience=state["experience"],
            education=state["education"]
        )
        job_list = job_agent.get_job_list(max_page=state["max_page"])
        job_agent.close()
        
        if not job_list:
            return {
                **state,
                "status": "failed",
                "error_msg": "未获取到符合要求的岗位"
            }
        
        return {
            **state,
            "job_list": job_list,
            "status": "job_filter_success"
        }
    except Exception as e:
        return {
            **state,
            "status": "failed",
            "error_msg": f"岗位筛选失败:{str(e)}"
        }

# 节点3:简历与岗位匹配度评估
def node_resume_match(state: AgentState) -> dict:
    current_index = state["current_job_index"]
    job_list = state["job_list"]
    
    # 所有岗位处理完成
    if current_index >= len(job_list):
        return {
            **state,
            "status": "all_job_processed"
        }
    
    current_job = job_list[current_index]
    print(f"=== 步骤3:评估岗位 {current_job['job_name']} - {current_job['company_name']} 匹配度 ===")
    
    try:
        match_result = calculate_match_score(current_job["jd_content"], state["user_id"])
        print(f"匹配度评分:{match_result['final_score']}分,是否建议投递:{match_result['delivery_suggestion']}")
        
        return {
            **state,
            "current_job": current_job,
            "match_result": match_result,
            "status": "match_complete"
        }
    except Exception as e:
        # 匹配失败,跳过当前岗位,处理下一个
        print(f"岗位匹配度评估失败:{str(e)},跳过当前岗位")
        return {
            **state,
            "current_job_index": current_index + 1,
            "status": "match_failed"
        }

# 节点4:判断是否投递
def node_should_delivery(state: AgentState) -> str:
    match_result = state["match_result"]
    # 检查风控
    risk_agent = RiskControlAgent()
    check_result = risk_agent.check_can_delivery()
    risk_agent.close()
    
    if not check_result["can_delivery"]:
        print(check_result["reason"])
        return "end"
    # 匹配度达标,建议投递
    if match_result["delivery_suggestion"] and match_result["final_score"] >= RISK_CONFIG["min_match_score"]:
        return "delivery"
    # 不建议投递,跳过
    else:
        print(f"岗位匹配度不达标,跳过,下一个岗位")
        return "skip"

# 节点5:生成个性化招呼语
def node_generate_greeting(state: AgentState) -> dict:
    current_job = state["current_job"]
    match_result = state["match_result"]
    print(f"=== 步骤4:为岗位 {current_job['job_name']} 生成个性化招呼语 ===")
    
    try:
        greeting_agent = GreetingGeneratorAgent(state["resume_path"])
        greeting = greeting_agent.generate_greeting(current_job, match_result)
        print(f"生成的招呼语:{greeting}")
        
        return {
            **state,
            "current_greeting": greeting,
            "status": "greeting_generate_success"
        }
    except Exception as e:
        print(f"招呼语生成失败:{str(e)},跳过当前岗位")
        return {
            **state,
            "current_job_index": state["current_job_index"] + 1,
            "status": "greeting_generate_failed"
        }

# 节点6:执行投递操作
def node_execute_delivery(state: AgentState) -> dict:
    current_job = state["current_job"]
    greeting = state["current_greeting"]
    print(f"=== 步骤5:执行投递操作 ===")
    
    try:
        # 初始化风控Agent
        risk_agent = RiskControlAgent()
        # 随机延迟,防风控
        risk_agent.random_delay()
        
        # 初始化浏览器与投递Agent
        job_agent = JobFilterAgent(headless=False)
        job_agent.init_browser()
        delivery_agent = DeliveryExecutorAgent(headless=False)
        delivery_agent.init_browser(job_agent.browser)
        
        # 执行投递
        delivery_result = delivery_agent.execute_delivery(current_job, greeting)
        
        # 记录投递结果
        risk_agent.record_delivery(
            company_name=current_job["company_name"],
            job_name=current_job["job_name"],
            delivery_status="success" if delivery_result else "failed"
        )
        
        # 关闭资源
        delivery_agent.close()
        job_agent.close()
        risk_agent.close()
        
        # 更新统计数据
        total_delivery = state["total_delivery"] + 1
        success_delivery = state["success_delivery"] + (1 if delivery_result else 0)
        
        return {
            **state,
            "delivery_result": delivery_result,
            "total_delivery": total_delivery,
            "success_delivery": success_delivery,
            "current_job_index": state["current_job_index"] + 1,
            "status": "delivery_complete"
        }
    except Exception as e:
        print(f"投递执行失败:{str(e)}")
        return {
            **state,
            "delivery_result": False,
            "total_delivery": state["total_delivery"] + 1,
            "current_job_index": state["current_job_index"] + 1,
            "status": "delivery_failed"
        }

# 节点7:生成最终复盘报告
def node_generate_report(state: AgentState) -> dict:
    print("=== 步骤6:生成投递复盘报告 ===")
    try:
        review_agent = DataReviewAgent()
        report = review_agent.generate_review_report()
        review_agent.close()
        
        print("投递任务完成!")
        print(f"本次共处理{len(state['job_list'])}个岗位,尝试投递{state['total_delivery']}个,成功投递{state['success_delivery']}个")
        print("复盘报告:")
        print(report)
        
        return {
            **state,
            "status": "task_complete",
            "review_report": report
        }
    except Exception as e:
        return {
            **state,
            "status": "report_generate_failed",
            "error_msg": f"复盘报告生成失败:{str(e)}"
        }

# 构建LangGraph状态图
def build_agent_graph():
    workflow = StateGraph(AgentState)
    
    # 添加节点
    workflow.add_node("init_resume", node_init_resume)
    workflow.add_node("job_filter", node_job_filter)
    workflow.add_node("resume_match", node_resume_match)
    workflow.add_node("generate_greeting", node_generate_greeting)
    workflow.add_node("execute_delivery", node_execute_delivery)
    workflow.add_node("generate_report", node_generate_report)
    
    # 设置入口
    workflow.set_entry_point("init_resume")
    
    # 添加边
    workflow.add_edge("init_resume", "job_filter")
    workflow.add_edge("job_filter", "resume_match")
    
    # 添加条件边:匹配完成后,判断是否投递
    workflow.add_conditional_edges(
        "resume_match",
        node_should_delivery,
        {
            "delivery": "generate_greeting",
            "skip": "resume_match",
            "end": "generate_report"
        }
    )
    
    workflow.add_edge("generate_greeting", "execute_delivery")
    workflow.add_edge("execute_delivery", "resume_match")
    
    # 所有岗位处理完成后,生成报告
    workflow.add_edge("generate_report", END)
    
    # 编译图
    app = workflow.compile()
    return app

# 运行整个Agent系统
def run_job_agent_system(params: dict):
    app = build_agent_graph()
    # 初始化状态
    initial_state = AgentState(
        user_id=params.get("user_id", "default_user"),
        resume_path=params.get("resume_path", ""),
        city=params.get("city", "广州"),
        job_name=params.get("job_name", "AI Agent开发工程师"),
        min_salary=params.get("min_salary", 15),
        max_salary=params.get("max_salary", 30),
        experience=params.get("experience", ""),
        education=params.get("education", ""),
        max_page=params.get("max_page", 3),
        job_list=[],
        current_job_index=0,
        current_job={},
        match_result={},
        current_greeting="",
        delivery_result=False,
        total_delivery=0,
        success_delivery=0,
        status="init",
        error_msg=""
    )
    # 运行系统
    result = app.invoke(initial_state)
    return result

# 测试运行
if __name__ == "__main__":
    params = {
        "resume_path": "你的简历.pdf",  # 替换为你的简历路径
        "city": "广州",
        "job_name": "AI Agent开发工程师",
        "min_salary": 15,
        "max_salary": 30,
        "max_page": 2
    }
    run_job_agent_system(params)

3.4 可视化界面搭建:Gradio 实现一键操作

最后,我们使用 Gradio 搭建一个可视化的 Web 界面,让用户无需修改代码,即可通过界面上传简历、设置参数、启动投递、查看结果,创建web_ui.py文件:

# web_ui.py
import gradio as gr
from agent_scheduler import run_job_agent_system
from agent_risk_control import RiskControlAgent
from agent_data_review import DataReviewAgent

# 获取投递统计数据
def get_statistics():
    risk_agent = RiskControlAgent()
    stats = risk_agent.get_delivery_statistics()
    risk_agent.close()
    return f"""
    ### 投递统计数据
    - 今日已投递:{stats['today_total']} 个
    - 今日投递成功:{stats['today_success']} 个
    - 今日剩余可投递:{stats['today_remaining']} 个
    - 累计投递总数:{stats['total_count']} 个
    """

# 获取复盘报告
def get_review_report():
    review_agent = DataReviewAgent()
    report = review_agent.generate_review_report()
    review_agent.close()
    return report

# 启动投递系统
def start_delivery(resume_file, city, job_name, min_salary, max_salary, experience, education, max_page):
    if not resume_file:
        return "请先上传简历文件", get_statistics(), get_review_report()
    
    resume_path = resume_file.name
    params = {
        "resume_path": resume_path,
        "city": city,
        "job_name": job_name,
        "min_salary": min_salary,
        "max_salary": max_salary,
        "experience": experience,
        "education": education,
        "max_page": max_page
    }
    
    try:
        result = run_job_agent_system(params)
        if result["status"] == "task_complete":
            return f"投递任务完成!本次成功投递{result['success_delivery']}个岗位", get_statistics(), result["review_report"]
        else:
            return f"任务失败:{result['error_msg']}", get_statistics(), get_review_report()
    except Exception as e:
        return f"系统运行出错:{str(e)}", get_statistics(), get_review_report()

# 构建Gradio界面
with gr.Blocks(title="春招智能投递多Agent系统", theme=gr.themes.Soft()) as demo:
    gr.Markdown("# 春招智能投递多Agent系统")
    gr.Markdown("基于AI Agent技术,实现岗位筛选、简历匹配、个性化招呼语生成、安全投递全流程自动化,提升投递效率与回复率,避免封号风险")
    
    with gr.Row():
        # 左侧参数配置区
        with gr.Column(scale=1):
            gr.Markdown("### 基础参数配置")
            resume_file = gr.File(label="上传简历(支持PDF/DOCX)", file_types=[".pdf", ".docx"])
            city = gr.Textbox(label="目标城市", value="广州", placeholder="请输入目标城市")
            job_name = gr.Textbox(label="岗位名称", value="AI Agent开发工程师", placeholder="请输入目标岗位名称")
            with gr.Row():
                min_salary = gr.Number(label="最低薪资(K)", value=15, minimum=5, maximum=100, step=1)
                max_salary = gr.Number(label="最高薪资(K)", value=30, minimum=5, maximum=100, step=1)
            experience = gr.Dropdown(
                label="工作经验要求",
                choices=["应届毕业生", "1年以内", "1-3年", "3-5年", "5-10年", "10年以上"],
                value=""
            )
            education = gr.Dropdown(
                label="学历要求",
                choices=["大专", "本科", "硕士", "博士"],
                value=""
            )
            max_page = gr.Number(label="最大爬取页数", value=3, minimum=1, maximum=10, step=1)
            
            start_btn = gr.Button("启动智能投递", variant="primary", size="lg")
        
        # 右侧结果展示区
        with gr.Column(scale=2):
            gr.Markdown("### 运行状态与结果")
            status_output = gr.Textbox(label="运行状态", lines=3, interactive=False)
            stats_output = gr.Markdown(label="投递统计数据")
            report_output = gr.Textbox(label="投递复盘报告", lines=10, interactive=False)
    
    # 页面加载时,自动获取统计数据与报告
    demo.load(get_statistics, outputs=stats_output)
    demo.load(get_review_report, outputs=report_output)
    
    # 绑定按钮点击事件
    start_btn.click(
        fn=start_delivery,
        inputs=[resume_file, city, job_name, min_salary, max_salary, experience, education, max_page],
        outputs=[status_output, stats_output, report_output]
    )

# 运行界面
if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7860, inbrowser=True)

3.5 系统运行步骤

  1. 替换config.py中的api_key为你自己的 DeepSeek API Key;
  2. 替换agent_job_filter.py中的user_data_dir为你本地 Chrome 浏览器的用户数据路径;
  3. 运行web_ui.py文件,启动可视化界面;
  4. 在界面中上传简历,设置投递参数,点击「启动智能投递」即可运行系统。

四、风控红线与合规声明(必看)

4.1 平台规则与风险提示

Boss 直聘《用户服务协议》明确禁止任何形式的自动化工具、脚本、外挂进行批量投递、账号操作,违规使用可能导致以下后果:

  1. 账号限流、降权,岗位推荐质量大幅下降;
  2. 账号沟通功能被限制,无法向 HR 发送消息;
  3. 账号永久封禁,且无法申诉找回。

本文提供的代码仅用于技术学习与研究,请勿用于商业用途与违规操作,使用所产生的所有风险由使用者自行承担。

4.2 最低风控配置建议

为了最大程度降低账号风险,请严格遵守以下配置:

  1. 单日投递数量严禁超过 100 个,建议控制在 50-80 个;
  2. 投递间隔严禁低于 2 秒,建议设置为 3-6 秒随机间隔;
  3. 严禁在 23:00 - 次日 7:00 之间进行批量投递,该时间段风控检测最严格;
  4. 严禁使用完全相同的招呼语进行群发,必须使用个性化生成的招呼语;
  5. 严禁使用无头模式长时间连续运行,建议有头模式运行,模拟人工操作;
  6. 严禁使用第三方云端代投服务,所有操作必须在本地运行,避免账号密码泄露。

4.3 合规替代方案

如果担心账号风险,建议使用以下 100% 合规的方案:

  1. 使用 Boss 直聘官方的「批量投递」功能:在岗位搜索页勾选多个岗位,一键投递,无任何风险;
  2. 使用本文的「岗位筛选 Agent」+「招呼语生成
    Agent」,自动筛选岗位、生成个性化招呼语,手动进行投递,效率接近自动化,且完全合规;
  3. 每日分时段投递,早 9-11 点、下午 2-5 点各投递 20-30 个,模拟真实求职行为。
Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐