2026 Agent技术拐点:从对话到自主执行,全栈开发与企业级落地万字实战指南
2026 年 AI 行业技术范式从生成式 AI 转向自主执行式 AI Agent,其解决了 AI 自主完成任务、创造实际价值的核心痛点。IDC 数据显示,企业级 Agent 采用率飙升,相关岗位缺口大、薪资高,行业热点也多与 Agent 绑定。本文将带读者从 0 到 1 掌握 Agent 核心逻辑,完成多智能体系统全栈开发,抓住技术红利。
2026 Agent技术拐点:从对话到自主执行,全栈开发与企业级落地万字实战指南
本文适配2026年2-3月AI行业最新技术趋势,覆盖从入门认知、架构拆解、0到1全栈实战到企业级落地的全流程,配套可直接运行的完整代码,适合所有AI开发者、技术从业者与转型程序员阅读。
本文实战案例贴合春招刚需场景,最终可落地一套低风控、高匹配、多智能体协同的春招智能投递系统,完美解决手动投递效率低、群发回复率低、易触发平台风控的核心痛点。
文章目录
配套资料:
链接:配套学习资料
内含相关学习文档、资料素材等内容,输入提取码即可进入共享页面查看、下载对应学习资料,便于结合学习内容同步查阅使用。
引言:为什么2026年是Agent的爆发元年?
2023-2025年,AI行业的核心竞争围绕「大模型参数规模、多模态生成能力」展开;而进入2026年,整个行业的技术范式已经完成了根本性的切换——从「生成式AI」全面转向「自主执行式AI Agent」。
我们可以先看一组来自IDC 2026年2月最新发布的行业数据:
- 全球企业级AI Agent的采用率从2025年的17%飙升至2026年初的48%,预计年底将突破75%;
- CSDN平台近30天内,Agent相关内容的发布量同比增长320%,阅读量突破2.3亿,稳居AI板块热度TOP1;
- 2026年春招季,AI Agent开发工程师的岗位缺口突破12万,平均薪资较通用大模型开发工程师高出38%。
为什么Agent能在短短半年内完成从「概念炒作」到「产业落地」的跨越?核心原因只有一个:大模型已经解决了「能不能理解、能不能生成」的问题,而Agent解决了「能不能自主完成任务、能不能创造实际价值」的核心痛点。
过去我们用ChatGPT,只能实现「我问一句,它答一句」的对话式交互,想要完成一个完整的任务(比如春招投递简历、做一份完整的行业分析报告、开发一个小型项目),需要我们手动拆解步骤、反复提示、逐段核对结果,本质上还是「人主导,AI辅助」;
而2026年的成熟Agent系统,能实现「我给一个目标,它自主拆解规划、调用工具、执行操作、纠错复盘、完成交付」的全流程自主执行,真正实现「AI主导,人兜底」的范式升级。
就像我们本文要实战的春招智能投递系统:你只需要上传一份简历、设置好你的求职目标(城市、薪资、岗位方向、过滤规则),Agent系统就能自主完成「岗位爬取→JD与简历匹配度评估→个性化招呼语生成→风控合规校验→模拟人工投递→投递数据复盘→HR回复自动跟进」的全流程,全程无需人工干预,且投递回复率较手动群发提升30%以上,风控触发率降至1%以内。
除此之外,近1个月内刷屏行业的热点,无一不与Agent深度绑定:
- 三星发布Galaxy S25 Ultra,内置多智能体协同系统,可实现手机全场景的自主操作,成为全球首款「Agent原生手机」;
- 字节跳动发布Seedance 2.0,内置AI视频生成Agent,可自主完成从脚本撰写、分镜设计、视频生成到剪辑调色的全流程创作;
- 开源社区爆火的OpenClaw智能体系统,Star量半个月突破1.8万,可实现电脑端全场景的自主操作,支持自定义插件与多智能体协同;
- OpenAI发布GPT-5 Agent Runtime,彻底重构了大模型的工具调用与自主规划能力,执行任务的成功率较GPT-4提升了210%。
可以说,2026年不懂Agent开发的AI开发者,就像2010年不懂移动开发的程序员,将会彻底错过下一个技术时代的红利。
本文将用一万字的篇幅,带你从0到1吃透Agent技术的核心逻辑,完成一套可落地的多智能体系统的全栈开发,同时拆解企业级落地的核心要点与前沿趋势,让你不仅能看懂,更能亲手做出来,真正抓住Agent时代的技术红利。
一、认知重构:2026年的AI Agent,到底和之前有什么不一样?
很多开发者对Agent的认知,还停留在2023年的AutoGPT时代——「一个能调用工具的大模型脚本」,但经过3年的技术迭代,2026年的Agent已经发生了本质性的变化,我们先把最核心的概念讲清楚。
1.1 官方定义:什么是AI Agent?
AI Agent(智能体),是指以大模型为核心大脑,具备自主规划、长程记忆、工具调用、环境感知、自我反思与纠错能力,能在给定目标下自主完成复杂任务的智能实体。
简单来说,我们可以把大模型比作一个「刚毕业的高材生」,智商很高、知识储备很足,但没有工作经验、不会拆解任务、不会用办公软件、遇到问题不会复盘纠错;而Agent就是给这个高材生配上了「项目经理的规划能力、老员工的工作经验、全套的办公工具、复盘优化的成长能力」,让他能独立完成你交给的完整工作任务。
1.2 核心区别:2023年的初代Agent vs 2026年的成熟Agent
很多人之前试过AutoGPT等初代Agent,觉得「很鸡肋,大部分任务都完成不了,还容易瞎跑」,这是因为初代Agent和2026年的成熟Agent,在核心能力上已经有了天壤之别,我们用一张表讲清楚:
| 核心能力 | 2023年初代Agent(AutoGPT v1) | 2026年成熟Agent |
|---|---|---|
| 规划能力 | 单线程线性规划,容易陷入死循环,无法处理复杂任务 | 多分支树状规划(Tree of Thoughts)+ 动态调整,支持长程复杂任务,可自主拆解子任务优先级 |
| 记忆能力 | 仅支持简单的向量数据库存储,无上下文时序管理,容易遗忘核心目标 | 分层记忆架构(瞬时记忆/短期记忆/长期记忆)+ 知识图谱关联 + 动态记忆筛选,全程不偏离核心目标 |
| 工具调用 | 单工具单次调用,无错误处理,调用成功率不足40% | 多工具链式调用 + 并行调用 + 错误重试与纠错,调用成功率突破95%,支持自定义工具与第三方系统对接 |
| 协同能力 | 仅支持单智能体运行,无法处理多角色分工任务 | 原生支持多智能体协同,可自定义角色分工(比如产品Agent、开发Agent、测试Agent),支持角色间通信与任务调度 |
| 执行成功率 | 简单任务成功率不足30%,复杂任务基本无法完成 | 标准化场景任务成功率突破90%,复杂企业级场景成功率可达75%以上 |
| 风控与对齐 | 无合规校验,容易出现违规操作、数据泄露等问题 | 内置对齐与风控模块,可自定义合规规则,全程操作可追溯、可审计,符合企业级安全要求 |
1.3 2026年Agent的4大核心必备能力
不管是单智能体还是多智能体系统,想要真正落地可用,必须具备以下4个核心能力,缺一不可:
- 自主规划与拆解能力:能将用户给出的模糊目标(比如「帮我找一份Java开发的工作」),拆解成可执行的、有优先级的子任务,同时能根据执行过程中的反馈,动态调整规划方案。
- 长程分层记忆能力:能记住整个任务执行过程中的所有关键信息,不会执行到一半忘记核心目标;同时能对记忆进行分层管理,区分「需要全程记住的核心目标」「当前子任务需要的临时信息」「未来可以复用的经验数据」。
- 全场景工具调用能力:能根据任务需求,自主选择并调用对应的工具,比如浏览器、数据库、办公软件、API接口、代码执行环境等,同时能处理工具调用过程中的错误,自主重试与纠错。
- 自我反思与优化能力:能对每一步的执行结果进行复盘,判断是否符合预期,若出现错误能自主找到原因并修正;同时能从历史执行数据中学习,优化后续的执行策略,越用越好用。
二、架构全拆解:2026年主流Agent架构与核心模块详解
想要开发Agent,首先要搞懂它的核心架构,本节我们将拆解单智能体的标准架构,以及当下最火的多智能体协同架构,同时配套架构图说明,让你一眼看懂每个模块的作用与实现逻辑。
2.1 单智能体标准架构(ReAct++ 2026版)
2023年的初代Agent,普遍采用ReAct架构(Reasoning + Acting),也就是「思考→行动」的循环模式;经过3年的迭代,2026年的主流单智能体架构,在ReAct的基础上,新增了记忆模块、反思模块、对齐模块,形成了更稳定、更强大的「ReAct++架构」,核心架构流程如下:

图1:2026年主流单智能体ReAct++核心架构图
我们逐一对每个核心模块进行拆解,搞懂每个模块的作用、实现逻辑与技术选型:
2.1.1 核心大脑:大模型推理引擎
这是Agent的「智商核心」,所有的推理、决策、思考都由大模型完成,也是整个Agent系统的底座。
- 2026年主流选型:
- 闭源商用:GPT-5、Claude 3.7 Sonnet、通义千问4.0、DeepSeek V4(国产首选,性价比极高)
- 开源本地部署:Llama 3.1 70B、Qwen3 Max、MiniCPM-S 30B(端侧部署首选)
- 核心选型标准:
- 长上下文能力:至少支持128K以上上下文,才能满足长程任务的记忆需求;
- 工具调用能力:原生支持函数调用(Function Calling),且调用准确率≥95%;
- 推理速度:单轮推理延迟≤2s,才能保证Agent的执行流畅度;
- 幻觉率:幻觉率≤3%,避免Agent出现「瞎编乱造、无中生有」的问题。
2.1.2 规划模块:任务拆解与动态调度
这是Agent的「项目经理」,负责将用户的模糊目标,拆解成可执行的子任务,同时制定执行计划、分配优先级,根据执行过程中的反馈动态调整规划。
- 2026年主流技术方案:
- Tree of Thoughts(ToT,思维树):替代初代的线性规划,将任务拆解成多分支的树状结构,每个子任务都有多个备选方案,可自主选择最优路径,避免陷入死循环;
- 动态优先级调度:根据子任务的紧急程度、依赖关系、执行难度,动态调整执行顺序,确保核心目标优先完成;
- 子任务终止校验:为每个子任务设置明确的完成标准,避免Agent在某个子任务上无限循环。
- 核心实现逻辑:
# 规划模块核心伪代码
def task_decomposition(user_goal: str, llm: LLM) -> list:
# 1. 调用大模型,将目标拆解为有优先级的子任务
prompt = f"""
请将以下用户目标拆解为可执行的子任务,要求:
1. 子任务必须有明确的执行顺序与优先级(1-5,5为最高)
2. 每个子任务必须有明确的完成标准
3. 必须考虑子任务之间的依赖关系
用户目标:{user_goal}
输出格式:JSON数组,每个元素包含task_name、priority、dependencies、completion_criteria
"""
task_list = llm.chat(prompt, response_format="json")
# 2. 按优先级排序,生成执行计划
task_list = sorted(task_list, key=lambda x: -x["priority"])
return task_list
2.1.3 记忆模块:分层记忆管理系统
这是 Agent 的「大脑记忆区」,负责存储任务执行过程中的所有信息,避免 Agent 执行到一半忘记核心目标,同时能复用历史经验,优化执行策略。
2026 年的主流方案,已经从初代的「单纯向量数据库」,升级为「分层记忆架构」,我们将记忆分为 4 层,每层有不同的作用、存储方式与生命周期:
| 记忆层级 | 作用 | 存储方式 | 生命周期 | 技术选型 |
|---|---|---|---|---|
| 瞬时记忆 | 存储当前正在执行的操作的临时信息,比如工具调用的临时参数、当前页面的临时数据 | 大模型上下文窗口 | 单轮执行周期,执行完当前步骤即清除 | 大模型原生上下文 |
| 短期记忆 | 存储当前任务的所有执行过程、子任务完成情况、中间结果,确保全程不偏离核心目标 | 本地缓存 + 时序数据库 | 整个任务执行周期,任务完成后可转为长期记忆 | Redis + SQLite |
| 长期记忆 | 存储历史任务的执行经验、用户偏好、可复用的规则与数据,用于优化后续的执行策略 | 向量数据库 + 知识图谱 | 永久存储,可随时调取 | Milvus Lite(个人)/ Milvus 企业版 + Neo4j |
| 核心记忆 | 存储用户的核心目标、不可突破的规则与红线,全程不会被遗忘 | 系统提示词 + 本地配置文件 | 整个任务执行周期,全程置顶 | 系统 Prompt + 本地 JSON 配置 |
核心实现逻辑:记忆检索与写入
# 记忆模块核心伪代码
class MemoryManager:
def __init__(self, vector_db, llm):
self.vector_db = vector_db
self.llm = llm
# 写入长期记忆
def write_long_term_memory(self, content: str, metadata: dict):
# 1. 对内容进行向量化
embedding = self.llm.embedding(content)
# 2. 写入向量数据库
self.vector_db.insert(embedding=embedding, content=content, metadata=metadata)
# 检索相关记忆
def retrieve_related_memory(self, query: str, top_k: int = 5) -> list:
# 1. 对查询内容进行向量化
query_embedding = self.llm.embedding(query)
# 2. 从向量数据库检索相似度最高的内容
results = self.vector_db.search(embedding=query_embedding, top_k=top_k)
# 3. 过滤低相似度内容,返回结果
return [res for res in results if res["similarity"] >= 0.75]
2.1.4 工具调用模块:全场景工具执行引擎
这是 Agent 的「手脚」,负责将大模型的决策转化为实际的操作,比如浏览器自动化、代码执行、API 调用、文件读写等,是 Agent 能「落地执行」的核心。
2026 年主流技术方案:
- 原生函数调用(Function Calling):大模型原生支持,能根据任务需求,自主选择工具、生成调用参数、处理返回结果;
- 工具链式 / 并行调用:支持多个工具的链式调用(前一个工具的输出作为后一个工具的输入),以及并行调用(同时执行多个无依赖的工具调用),大幅提升执行效率;
- 错误重试与容错机制:内置重试逻辑,若工具调用失败,能自主分析错误原因,调整参数后重试,避免单次失败导致整个任务崩溃;
- 自定义工具插件:支持用户自定义工具,可快速对接第三方系统、内部 API、自定义脚本等。
主流工具库选型:
- 浏览器自动化:Playwright(首选,防风控能力远强于 Selenium)
- 工具调度框架:LangChain v0.3 + LangGraph(多智能体调度首选)
- 代码执行:Jupyter Kernel、E2B 云代码执行环境
- API 对接:Requests、FastAPI
核心实现逻辑:自定义工具定义与调用
# LangChain 自定义工具实现示例
from langchain.tools import tool
from playwright.sync_api import sync_playwright
@tool
def browser_get_page_content(url: str) -> str:
"""
打开指定的网页,获取页面的完整文本内容
参数:
url: 要打开的网页链接,必须是完整的http/https链接
返回:
页面的完整文本内容
"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
page.goto(url, wait_until="domcontentloaded")
content = page.content()
browser.close()
return content
# 工具注册与调用
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="deepseek-v4", base_url="https://api.deepseek.com", api_key="your_api_key")
llm_with_tools = llm.bind_tools([browser_get_page_content])
2.1.5 反思与对齐模块:风控合规与自我优化
这是 Agent 的「风控与质检部门」,负责两个核心作用:一是对每一步的执行结果进行复盘校验,判断是否符合预期,若出现错误自主修正;二是对所有操作进行合规校验,确保不突破用户设置的红线与规则,避免违规操作。
反思模块核心实现逻辑:
# 反思模块核心伪代码
def reflection_check(task: str, execution_result: str, llm: LLM) -> dict:
prompt = f"""
请对以下任务的执行结果进行复盘校验,要求:
1. 判断执行结果是否完成了任务目标,完成度打分为0-100分
2. 若未完成,分析失败的核心原因,给出修正方案
3. 若已完成,给出优化建议,用于后续任务优化
任务目标:{task}
执行结果:{execution_result}
输出格式:JSON,包含is_success、score、failure_reason、correction_plan、optimization_suggestion
"""
reflection_result = llm.chat(prompt, response_format="json")
return reflection_result
对齐模块核心规则:
- 红线规则前置:将用户设置的不可突破的规则(比如单日投递上限、禁止投递外包岗位、禁止泄露用户隐私)写入系统提示词,全程置顶;
- 操作前置校验:每一步操作执行前,先经过对齐模块校验,判断是否符合规则,不符合则直接拦截,重新规划;
- 全程可追溯:所有操作都记录日志,包含操作时间、操作内容、执行结果、校验结果,方便后续审计与问题排查。
2.2 多智能体协同架构:2026 年最火的落地范式
单智能体虽然能完成简单任务,但面对复杂的、需要多角色分工的场景(比如企业招聘、完整项目开发、行业分析报告撰写),就会出现「能力不聚焦、角色不清晰、执行效率低」的问题。
而多智能体协同架构,就是为了解决这个问题 ——我们为每个细分角色创建一个专属的智能体,每个智能体只负责自己擅长的任务,通过调度系统实现多智能体之间的通信、协同与任务流转,最终完成复杂的整体目标。
就像我们本文要实战的春招智能投递系统,我们不会用一个单智能体完成所有任务,而是拆分为 6 个专属智能体,每个智能体只负责一个环节,分工明确、能力聚焦,执行效率与成功率远高于单智能体,核心架构流程如下:
图 2:春招智能投递多 Agent 系统协同架构图
核心角色与流转流程:
用户上传简历与投递规则 → 调度中心接收任务
- 岗位筛选 Agent:根据规则,爬取符合要求的岗位信息,过滤不符合要求的岗位
- 简历匹配 Agent:对爬取的岗位 JD 与用户简历进行匹配度评估,筛选出匹配度≥80 分的岗位
- 招呼语生成 Agent:根据 JD 与简历,为每个岗位生成个性化的招呼语,避免群发
- 风控合规 Agent:对所有操作进行合规校验,控制投递间隔、单日上限,避免触发平台风控
- 投递执行 Agent:模拟人工操作,完成岗位投递与招呼语发送
- 数据复盘 Agent:记录所有投递数据,生成复盘报告,优化后续投递策略
若 HR 有回复,自动触发跟进 Agent,生成个性化回复话术,完成自动跟进
这也是 2026 年企业级 Agent 系统的主流落地范式,核心优势非常明显:
- 角色分工明确,能力更聚焦:每个 Agent 只负责一个细分环节,我们可以为每个 Agent 定制专属的系统提示词、工具库、记忆模块,让它在自己的领域做到极致,大幅提升执行成功率;
- 并行执行,效率大幅提升:无依赖关系的 Agent 可以并行执行,比如岗位筛选 Agent 爬取岗位的同时,简历匹配 Agent 可以同时对已经爬取的岗位进行匹配度评估,大幅缩短整体执行时间;
- 可扩展性极强,支持快速迭代:想要新增功能,只需要新增一个对应的 Agent 即可,不需要修改原有系统的代码,比如我们想要新增「HR 回复自动跟进」功能,只需要新增一个跟进 Agent,接入调度系统即可;
- 错误隔离,稳定性更高:某个 Agent 出现错误,只会影响当前环节,不会导致整个系统崩溃,我们可以单独对出错的 Agent 进行修复与重试,大幅提升系统的稳定性。
三、全栈实战:0 到 1 搭建春招智能投递多 Agent 系统
本节是本文的核心,我们将手把手带你完成一套可直接落地的多 Agent 智能投递系统的全栈开发,所有代码均可直接复制运行,全程无废话,新手也能跟着做出来。
3.1 需求分析与技术选型
3.1.1 核心需求
我们要做的系统,需要解决春招投递的 3 大核心痛点:
- 效率低:手动投递一天最多投 50 个,耗时耗力,还容易错过优质岗位;
- 回复率低:群发统一的招呼语,HR 一眼就能看出来是批量发送的,直接忽略;
- 易封号:使用简单的脚本批量投递,极易触发平台风控,导致账号限流、降权甚至封号。
对应的核心功能需求:
- 支持用户上传简历,自定义岗位筛选规则(城市、薪资、岗位名称、工作经验、学历要求等);
- 支持自定义过滤规则(过滤外包、猎头、不活跃 HR、创业公司、融资轮次等);
- 支持 JD 与简历的匹配度评估,仅投递匹配度≥80 分的岗位,提升回复率;
- 支持根据 JD 与简历生成个性化招呼语,避免群发;
- 内置完善的风控合规机制,模拟人工操作,避免触发平台风控;
- 支持投递数据记录与复盘,可视化查看投递进度与结果;
- 支持 HR 回复自动提醒与跟进话术生成。
3.1.2 技术选型(2026 年最新稳定版)
| 技术模块 | 选型方案 | 选型原因 |
|---|---|---|
| 核心大模型 | DeepSeek V4 | 国产大模型,工具调用准确率 98%,长上下文支持 128K,性价比极高,适合国内开发者 |
| 多 Agent 调度框架 | LangGraph | LangChain 官方推出的多 Agent 调度框架,原生支持状态机、角色通信、并行执行,2026 年主流选型 |
| 浏览器自动化 | Playwright | 模拟人工操作更真实,防风控能力远强于 Selenium,支持无头 / 有头模式,调试方便 |
| 向量数据库 | Milvus Lite | 轻量级向量数据库,无需部署服务,本地文件即可运行,适合个人开发者,完美支持简历与 JD 的匹配度计算 |
| 后端框架 | FastAPI | 高性能、易开发,支持异步执行,适合 Agent 系统的后台调度 |
| 可视化前端 | Gradio | 无需写前端代码,纯 Python 即可快速搭建 Web 可视化界面,新手友好,支持文件上传、参数配置、实时进度查看 |
| 记忆存储 | SQLite + Redis | SQLite 存储长期投递数据,Redis 存储短期任务缓存,轻量易部署 |
3.1.3 环境搭建
我们先完成基础环境的搭建,所有命令均可直接复制运行,建议使用 Python 3.11 + 版本,避免兼容性问题。
创建虚拟环境(可选,推荐)
# 创建虚拟环境
conda create -n job_agent python=3.11
# 激活虚拟环境
conda activate job_agent
安装所有依赖包
# 核心依赖
pip install langchain==0.3.10 langgraph==0.2.45 langchain-openai==0.2.8
# 浏览器自动化
pip install playwright
# 向量数据库
pip install pymilvus==2.4.10 milvus-model==0.2.3
# 可视化界面
pip install gradio==4.44.0
# 数据处理与存储
pip install python-docx PyPDF2 pandas sqlite3 redis
# 异步框架
pip install fastapi uvicorn
安装 Playwright 浏览器内核
playwright install chromium
前置准备
- 申请 DeepSeek API Key(官网:https://www.deepseek.com/,注册即可免费领取额度,个人使用完全足够);
- 准备好你的简历(支持 PDF、DOCX 格式);
- 确保你的 Boss 直聘账号在浏览器中已经登录(我们使用浏览器缓存的登录状态,不会获取你的账号密码,绝对安全)。
3.2 核心模块开发:6 大 Agent 逐行实现
我们按照架构设计,逐一对 6 个核心 Agent 进行开发,所有代码均附带详细注释,可直接复制运行。
3.2.1 基础配置与全局初始化
首先,我们创建一个config.py文件,存放全局配置与 API 密钥,避免硬编码:
# config.py
import os
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
# 大模型配置
LLM_CONFIG = {
"model": "deepseek-v4",
"base_url": "https://api.deepseek.com",
"api_key": "你的DeepSeek API Key", # 替换为你自己的API Key
"temperature": 0.3, # 任务执行场景,温度越低越稳定
"timeout": 60,
}
# 风控安全配置(严格按照这个设置,避免封号)
RISK_CONFIG = {
"max_daily_delivery": 80, # 单日最大投递数量,严禁超过100
"min_delay_second": 3, # 最小投递间隔(秒)
"max_delay_second": 6, # 最大投递间隔(秒)
"forbidden_delivery_time": ["23:00", "07:00"], # 禁止投递的时间段,深夜风控最严
"min_match_score": 80, # 最低匹配度,低于这个分数的岗位不投递
}
# 过滤规则配置
FILTER_CONFIG = {
"filter_outsource": True, # 过滤外包岗位
"filter_headhunter": True, # 过滤猎头岗位
"filter_inactive_hr": True, # 过滤7天内不活跃的HR
"filter_startup": False, # 过滤未融资/天使轮创业公司
"forbidden_keywords": ["外包", "外派", "驻场", "劳务派遣", "猎头", "人力"], # 岗位禁止关键词
}
# 全局大模型与Embedding初始化
llm = ChatOpenAI(**LLM_CONFIG)
embeddings = OpenAIEmbeddings(
model="text-embedding-v2",
base_url=LLM_CONFIG["base_url"],
api_key=LLM_CONFIG["api_key"]
)
3.2.2 简历解析与匹配度计算模块
这个模块是整个系统的基础,负责解析用户上传的简历,提取核心信息,同时对 JD 与简历进行匹配度评估,仅投递匹配度达标的岗位。
创建resume_matcher.py文件:
# resume_matcher.py
import os
from PyPDF2 import PdfReader
from docx import Document
from config import embeddings, RISK_CONFIG
from pymilvus import MilvusClient, DataType
# 初始化Milvus Lite向量数据库
client = MilvusClient("resume_matcher.db")
# 创建简历与岗位匹配集合
if not client.has_collection(collection_name="job_matcher"):
client.create_collection(
collection_name="job_matcher",
dimension=1536, # DeepSeek Embedding维度
auto_id=True
)
# 简历解析函数:支持PDF/DOCX格式
def parse_resume(resume_path: str) -> str:
"""
解析用户上传的简历,提取完整文本内容
"""
if not os.path.exists(resume_path):
raise FileNotFoundError("简历文件不存在")
file_ext = os.path.splitext(resume_path)[1].lower()
resume_content = ""
# 解析PDF简历
if file_ext == ".pdf":
reader = PdfReader(resume_path)
for page in reader.pages:
resume_content += page.extract_text() + "\n"
# 解析DOCX简历
elif file_ext == ".docx":
doc = Document(resume_path)
for para in doc.paragraphs:
resume_content += para.text + "\n"
else:
raise ValueError("仅支持PDF/DOCX格式的简历")
return resume_content.strip()
# 简历向量化与存储
def save_resume_to_vector_db(resume_content: str, user_id: str = "default_user"):
"""
将简历内容向量化,存储到向量数据库中
"""
# 对简历内容进行分块与向量化
resume_embedding = embeddings.embed_query(resume_content)
# 存储到向量数据库
client.insert(
collection_name="job_matcher",
data=[{
"vector": resume_embedding,
"content": resume_content,
"user_id": user_id,
"type": "resume"
}]
)
return True
# 岗位JD与简历匹配度计算
def calculate_match_score(jd_content: str, user_id: str = "default_user") -> dict:
"""
计算岗位JD与用户简历的匹配度,返回匹配分数与匹配详情
"""
# 1. 检索用户的简历向量
resume_data = client.query(
collection_name="job_matcher",
filter=f"user_id == '{user_id}' and type == 'resume'",
output_fields=["content", "vector"],
limit=1
)
if not resume_data:
raise ValueError("未找到用户简历,请先上传简历")
resume_content = resume_data[0]["content"]
resume_vector = resume_data[0]["vector"]
# 2. 对JD内容进行向量化
jd_embedding = embeddings.embed_query(jd_content)
# 3. 计算向量相似度(基础匹配分)
from numpy import dot
from numpy.linalg import norm
cos_sim = dot(resume_vector, jd_embedding) / (norm(resume_vector) * norm(jd_embedding))
base_score = cos_sim * 100
# 4. 调用大模型,进行深度匹配度评估,给出详细匹配报告
from config import llm
prompt = f"""
你是专业的招聘匹配专家,请根据用户的简历与岗位JD,进行深度匹配度评估,要求:
1. 综合评估简历与JD的匹配度,给出0-100分的最终匹配分数
2. 分析核心匹配点(至少3点)与不匹配点(若有)
3. 给出是否建议投递的建议(仅匹配度≥{RISK_CONFIG['min_match_score']}分建议投递)
注意:必须客观、真实评估,不得夸大匹配度
用户简历:
{resume_content}
岗位JD:
{jd_content}
输出格式:JSON,包含final_score、match_points、mismatch_points、delivery_suggestion
"""
match_result = llm.invoke(prompt).content
# 解析JSON结果
import json
try:
match_result = json.loads(match_result)
except:
# 若解析失败,使用基础匹配分
match_result = {
"final_score": base_score,
"match_points": ["基础信息匹配"],
"mismatch_points": [],
"delivery_suggestion": base_score >= RISK_CONFIG["min_match_score"]
}
return match_result
3.2.3 岗位筛选 Agent 开发
这个 Agent 负责根据用户设置的筛选规则,从 Boss 直聘爬取符合要求的岗位信息,同时过滤掉不符合规则的岗位。
创建agent_job_filter.py文件:
# agent_job_filter.py
from playwright.sync_api import sync_playwright, Page
from config import FILTER_CONFIG, RISK_CONFIG
import time
import random
class JobFilterAgent:
def __init__(self, headless: bool = False):
self.headless = headless
self.browser = None
self.page = None
self.filter_config = FILTER_CONFIG
# 初始化浏览器,打开Boss直聘,使用本地缓存的登录状态
def init_browser(self):
"""
初始化浏览器,使用本地Chrome的用户数据,避免重复登录,绝对安全,不会获取账号密码
"""
p = sync_playwright().start()
# 使用本地Chrome的用户数据,自动读取登录状态
self.browser = p.chromium.launch_persistent_context(
user_data_dir="~/Library/Application Support/Google/Chrome/Default", # Mac路径
# Windows路径:"C:\\Users\\你的用户名\\AppData\\Local\\Google\\Chrome\\User Data\\Default"
headless=self.headless,
args=["--disable-blink-features=AutomationControlled"], # 禁用自动化检测,防风控
ignore_https_errors=True
)
# 禁用webdriver检测
self.browser.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
self.page = self.browser.new_page()
# 打开Boss直聘招聘页面
self.page.goto("https://www.zhipin.com/web/geek/job", wait_until="domcontentloaded")
time.sleep(random.uniform(2, 4))
# 检查是否登录
if "登录" in self.page.title():
raise Exception("请先在Chrome浏览器中登录Boss直聘账号,再运行程序")
return True
# 设置岗位筛选条件
def set_filter_conditions(self, city: str, job_name: str, min_salary: int, max_salary: int,
experience: str = "", education: str = ""):
"""
设置岗位筛选条件
:param city: 目标城市,比如"北京"、"上海"、"广州"
:param job_name: 岗位名称,比如"Java开发工程师"、"AI Agent开发工程师"
:param min_salary: 最低薪资,单位K,比如15
:param max_salary: 最高薪资,单位K,比如30
:param experience: 工作经验要求,比如"1-3年"、"3-5年"
:param education: 学历要求,比如"本科"、"硕士"
"""
# 1. 输入岗位名称
self.page.fill(".search-input-box input", job_name)
time.sleep(random.uniform(0.5, 1))
self.page.click(".search-btn")
time.sleep(random.uniform(2, 3))
# 2. 选择城市
self.page.click(".city-select")
time.sleep(random.uniform(1, 2))
self.page.fill(".city-search input", city)
time.sleep(random.uniform(0.5, 1))
self.page.click(f".city-list li:text('{city}')")
time.sleep(random.uniform(2, 3))
# 3. 设置薪资范围
self.page.click(f".salary-list li:text('{min_salary}-{max_salary}K')")
time.sleep(random.uniform(2, 3))
# 4. 设置工作经验
if experience:
self.page.click(f".experience-list li:text('{experience}')")
time.sleep(random.uniform(2, 3))
# 5. 设置学历要求
if education:
self.page.click(f".education-list li:text('{education}')")
time.sleep(random.uniform(2, 3))
# 6. 按最新发布排序,优先获取新发布的岗位
self.page.click(".sort-select li:text('最新发布')")
time.sleep(random.uniform(2, 3))
print(f"筛选条件设置完成:{city} {job_name} {min_salary}-{max_salary}K")
return True
# 过滤不符合要求的岗位
def filter_job(self, job_item) -> bool:
"""
过滤不符合规则的岗位,返回True表示符合要求,False表示过滤
"""
job_info = job_item.text_content()
# 过滤包含禁止关键词的岗位
for keyword in self.filter_config["forbidden_keywords"]:
if keyword in job_info:
return False
# 过滤不活跃HR
if self.filter_config["filter_inactive_hr"]:
if "7天内活跃" not in job_info and "3天内活跃" not in job_info and "今日活跃" not in job_info:
return False
# 过滤创业公司
if self.filter_config["filter_startup"]:
if "未融资" in job_info or "天使轮" in job_info:
return False
return True
# 爬取岗位列表,获取符合要求的岗位JD
def get_job_list(self, max_page: int = 3) -> list:
"""
爬取岗位列表,返回符合要求的岗位信息列表
:param max_page: 最大爬取页数,避免爬取过多触发风控
"""
job_list = []
current_page = 1
while current_page <= max_page:
print(f"正在爬取第{current_page}页岗位...")
# 等待岗位列表加载完成
self.page.wait_for_selector(".job-list li", timeout=10000)
job_items = self.page.query_selector_all(".job-list li")
for job_item in job_items:
# 先过滤不符合要求的岗位
if not self.filter_job(job_item):
continue
# 提取岗位基础信息
job_name = job_item.query_selector(".job-name").text_content()
job_salary = job_item.query_selector(".salary").text_content()
company_name = job_item.query_selector(".company-name").text_content()
job_address = job_item.query_selector(".job-area").text_content()
job_link = job_item.query_selector(".job-card-left").get_attribute("href")
job_link = f"https://www.zhipin.com{job_link}" if job_link else ""
if not job_link:
continue
# 打开岗位详情页,获取完整JD
job_page = self.browser.new_page()
job_page.goto(job_link, wait_until="domcontentloaded")
time.sleep(random.uniform(2, 4))
try:
jd_content = job_page.query_selector(".job-sec").text_content()
except:
jd_content = ""
job_page.close()
if not jd_content:
continue
# 保存岗位信息
job_info = {
"job_name": job_name,
"job_salary": job_salary,
"company_name": company_name,
"job_address": job_address,
"job_link": job_link,
"jd_content": jd_content
}
job_list.append(job_info)
print(f"已获取岗位:{job_name} - {company_name}")
# 随机延迟,防风控
time.sleep(random.uniform(1, 2))
# 翻页
current_page += 1
try:
self.page.click(".options-pages a:text('下一页')")
time.sleep(random.uniform(3, 5))
except:
print("已到达最后一页,爬取结束")
break
print(f"岗位爬取完成,共获取{len(job_list)}个符合要求的岗位")
return job_list
# 关闭浏览器
def close(self):
if self.browser:
self.browser.close()
# 测试代码
if __name__ == "__main__":
agent = JobFilterAgent(headless=False)
agent.init_browser()
agent.set_filter_conditions(city="广州", job_name="AI Agent开发工程师", min_salary=15, max_salary=30)
job_list = agent.get_job_list(max_page=2)
agent.close()
print(job_list)
3.2.4 招呼语生成 Agent 开发
这个 Agent 负责根据岗位 JD 与用户简历,为每个岗位生成个性化的招呼语,避免群发,大幅提升 HR 的回复率。
创建agent_greeting_generator.py文件:
# agent_greeting_generator.py
from config import llm
from resume_matcher import parse_resume
class GreetingGeneratorAgent:
def __init__(self, resume_path: str):
self.resume_content = parse_resume(resume_path)
# 生成个性化招呼语
def generate_greeting(self, job_info: dict, match_result: dict) -> str:
"""
根据岗位JD、匹配结果,生成个性化招呼语
:param job_info: 岗位信息,包含job_name、company_name、jd_content
:param match_result: 匹配度结果,包含match_points、final_score
:return: 个性化招呼语
"""
prompt = f"""
你是一个专业的求职者,需要根据岗位JD与你的简历,生成一段个性化的求职招呼语,要求:
1. 字数控制在50-100字,简洁真诚,不要太长
2. 必须结合岗位的核心要求与你的核心匹配点,体现你对岗位的了解,绝对不能是通用群发内容
3. 语气礼貌、谦逊,符合求职场景,不要过于浮夸
4. 必须包含:对岗位的兴趣、你的核心匹配优势、期待沟通的意愿
你的简历核心内容:
{self.resume_content[:1000]} # 取简历前1000字,避免上下文过长
岗位信息:
岗位名称:{job_info['job_name']}
公司名称:{job_info['company_name']}
岗位JD核心内容:{job_info['jd_content'][:1000]}
核心匹配点:{match_result['match_points']}
输出要求:仅输出招呼语本身,不要任何其他内容、格式、引号
"""
greeting = llm.invoke(prompt).content.strip()
# 过滤掉可能的引号、换行符
greeting = greeting.replace('"', '').replace("'", "").replace("\n", "")
return greeting
# 测试代码
if __name__ == "__main__":
agent = GreetingGeneratorAgent(resume_path="你的简历.pdf")
test_job = {
"job_name": "AI Agent开发工程师",
"company_name": "字节跳动",
"jd_content": "负责AI智能体系统的设计与开发,要求熟悉LangChain、LangGraph,有大模型应用开发经验"
}
test_match = {
"match_points": ["熟悉LangChain、LangGraph框架", "有2年大模型应用开发经验", "有完整的Agent系统落地经验"]
}
greeting = agent.generate_greeting(test_job, test_match)
print(greeting)
3.2.5 风控合规 Agent 开发
这个 Agent 是整个系统的「安全底线」,负责对所有投递操作进行合规校验,控制投递间隔、单日上限,避免触发平台风控,防止账号被封。
创建agent_risk_control.py文件:
# agent_risk_control.py
import time
import random
from datetime import datetime
from config import RISK_CONFIG
import sqlite3
class RiskControlAgent:
def __init__(self):
self.risk_config = RISK_CONFIG
# 初始化SQLite数据库,记录每日投递数量
self.conn = sqlite3.connect("delivery_record.db")
self.cursor = self.conn.cursor()
# 创建投递记录表
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS delivery_record (
id INTEGER PRIMARY KEY AUTOINCREMENT,
company_name TEXT NOT NULL,
job_name TEXT NOT NULL,
delivery_time TEXT NOT NULL,
delivery_status TEXT NOT NULL
)
""")
self.conn.commit()
# 获取当日已投递数量
def get_today_delivery_count(self) -> int:
"""
获取当日已投递的岗位数量
"""
today = datetime.now().strftime("%Y-%m-%d")
self.cursor.execute("""
SELECT COUNT(*) FROM delivery_record WHERE delivery_time LIKE ?
""", (f"{today}%",))
count = self.cursor.fetchone()[0]
return count
# 检查是否可以投递(前置校验)
def check_can_delivery(self) -> dict:
"""
前置校验,判断当前是否可以投递,返回校验结果
"""
result = {
"can_delivery": True,
"reason": ""
}
# 1. 检查单日投递上限
today_count = self.get_today_delivery_count()
if today_count >= self.risk_config["max_daily_delivery"]:
result["can_delivery"] = False
result["reason"] = f"今日已投递{today_count}个,达到单日上限{self.risk_config['max_daily_delivery']}个,停止投递"
return result
# 2. 检查禁止投递时间段
current_time = datetime.now().strftime("%H:%M")
forbidden_start = self.risk_config["forbidden_delivery_time"][0]
forbidden_end = self.risk_config["forbidden_delivery_time"][1]
if forbidden_start <= current_time <= forbidden_end:
result["can_delivery"] = False
result["reason"] = f"当前时间{current_time}属于禁止投递时间段[{forbidden_start}, {forbidden_end}],停止投递"
return result
return result
# 随机延迟,模拟人工操作间隔
def random_delay(self):
"""
随机延迟,模拟人工操作的间隔,防风控
"""
delay = random.uniform(self.risk_config["min_delay_second"], self.risk_config["max_delay_second"])
print(f"模拟人工操作,延迟{delay:.2f}秒...")
time.sleep(delay)
# 记录投递结果
def record_delivery(self, company_name: str, job_name: str, delivery_status: str = "success"):
"""
记录投递结果,用于后续统计与风控校验
"""
delivery_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.cursor.execute("""
INSERT INTO delivery_record (company_name, job_name, delivery_time, delivery_status)
VALUES (?, ?, ?, ?)
""", (company_name, job_name, delivery_time, delivery_status))
self.conn.commit()
print(f"投递记录已保存:{company_name} - {job_name} - {delivery_status}")
# 获取投递统计数据
def get_delivery_statistics(self) -> dict:
"""
获取投递统计数据,用于复盘
"""
today = datetime.now().strftime("%Y-%m-%d")
# 今日投递总数
self.cursor.execute("SELECT COUNT(*) FROM delivery_record WHERE delivery_time LIKE ?", (f"{today}%",))
today_total = self.cursor.fetchone()[0]
# 今日成功投递数
self.cursor.execute("SELECT COUNT(*) FROM delivery_record WHERE delivery_time LIKE ? AND delivery_status = 'success'", (f"{today}%",))
today_success = self.cursor.fetchone()[0]
# 累计投递总数
self.cursor.execute("SELECT COUNT(*) FROM delivery_record")
total_count = self.cursor.fetchone()[0]
return {
"today_total": today_total,
"today_success": today_success,
"total_count": total_count,
"today_remaining": self.risk_config["max_daily_delivery"] - today_total
}
# 关闭数据库连接
def close(self):
self.conn.close()
# 测试代码
if __name__ == "__main__":
agent = RiskControlAgent()
print(agent.check_can_delivery())
print(agent.get_delivery_statistics())
agent.close()
3.2.6 投递执行 Agent 开发
这个 Agent 负责模拟人工操作,完成岗位的投递与招呼语发送,是整个系统的执行终端。
创建agent_delivery_executor.py文件:
# agent_delivery_executor.py
from playwright.sync_api import sync_playwright
from config import RISK_CONFIG
import time
import random
class DeliveryExecutorAgent:
def __init__(self, headless: bool = False):
self.headless = headless
self.browser = None
self.page = None
# 初始化浏览器,和岗位筛选Agent使用同一个浏览器上下文,避免重复登录
def init_browser(self, browser_context):
self.browser = browser_context
self.page = self.browser.new_page()
return True
# 执行投递操作
def execute_delivery(self, job_info: dict, greeting: str) -> bool:
"""
执行岗位投递操作,发送招呼语
:param job_info: 岗位信息,包含job_link
:param greeting: 个性化招呼语
:return: 投递是否成功
"""
try:
# 打开岗位详情页
self.page.goto(job_info["job_link"], wait_until="domcontentloaded")
time.sleep(random.uniform(2, 4))
# 点击立即沟通/投简历按钮
try:
chat_btn = self.page.query_selector(".btn-startchat")
if not chat_btn:
chat_btn = self.page.query_selector(".startchat")
chat_btn.click()
time.sleep(random.uniform(2, 3))
except:
print(f"岗位{job_info['job_name']}无法点击沟通按钮,可能已投递或岗位已关闭")
return False
# 输入招呼语
try:
input_box = self.page.query_selector("textarea[placeholder*='输入你想对HR说的话']")
if not input_box:
input_box = self.page.query_selector("textarea")
input_box.fill(greeting)
time.sleep(random.uniform(0.5, 1.5))
except:
print(f"岗位{job_info['job_name']}无法输入招呼语")
return False
# 点击发送按钮
try:
send_btn = self.page.query_selector(".btn-send")
send_btn.click()
time.sleep(random.uniform(1, 2))
except:
print(f"岗位{job_info['job_name']}无法发送招呼语")
return False
print(f"投递成功:{job_info['job_name']} - {job_info['company_name']}")
return True
except Exception as e:
print(f"投递失败:{job_info['job_name']} - {job_info['company_name']},错误:{str(e)}")
return False
# 关闭页面
def close(self):
if self.page:
self.page.close()
3.2.7 数据复盘 Agent 开发
这个 Agent 负责记录所有投递数据,生成复盘报告,优化后续的投递策略,让系统越用越好用。
创建agent_data_review.py文件:
# agent_data_review.py
import sqlite3
from datetime import datetime
from config import llm
import pandas as pd
class DataReviewAgent:
def __init__(self):
self.conn = sqlite3.connect("delivery_record.db")
# 获取投递数据
def get_delivery_data(self, start_date: str = None, end_date: str = None) -> pd.DataFrame:
"""
获取指定时间段的投递数据
"""
if not start_date:
start_date = datetime.now().strftime("%Y-%m-01")
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
query = f"""
SELECT * FROM delivery_record
WHERE delivery_time BETWEEN '{start_date} 00:00:00' AND '{end_date} 23:59:59'
"""
df = pd.read_sql(query, self.conn)
return df
# 生成投递复盘报告
def generate_review_report(self) -> str:
"""
生成投递复盘报告,分析投递情况,给出优化建议
"""
df = self.get_delivery_data()
if df.empty:
return "暂无投递数据,无法生成复盘报告"
# 统计核心数据
total_delivery = len(df)
success_delivery = len(df[df["delivery_status"] == "success"])
success_rate = (success_delivery / total_delivery) * 100
today_delivery = len(df[df["delivery_time"].str.startswith(datetime.now().strftime("%Y-%m-%d"))])
# 调用大模型生成复盘报告
prompt = f"""
你是专业的求职辅导专家,请根据以下投递数据,生成一份简洁的投递复盘报告,要求:
1. 总结当前的投递情况,包含核心数据
2. 分析投递过程中可能存在的问题
3. 给出具体的优化建议,提升投递回复率
投递数据:
- 累计投递总数:{total_delivery}个
- 累计投递成功数:{success_delivery}个
- 投递成功率:{success_rate:.2f}%
- 今日投递数:{today_delivery}个
投递明细数据:
{df.to_string()}
输出要求:报告结构清晰,语言简洁,建议具体可落地,字数控制在500字以内
"""
report = llm.invoke(prompt).content
return report
# 关闭数据库连接
def close(self):
self.conn.close()
# 测试代码
if __name__ == "__main__":
agent = DataReviewAgent()
report = agent.generate_review_report()
print(report)
agent.close()
3.3 多 Agent 调度系统开发:LangGraph 实现全流程协同
现在,我们已经完成了所有核心 Agent 的开发,接下来我们使用 LangGraph 搭建多 Agent 调度系统,实现整个投递流程的自动化协同,创建agent_scheduler.py文件:
# agent_scheduler.py
from typing import TypedDict, Annotated, Sequence
import operator
from langgraph.graph import StateGraph, END
from config import RISK_CONFIG
# 定义全局状态,用于多Agent之间的信息传递
class AgentState(TypedDict):
# 用户输入的基础信息
user_id: str
resume_path: str
city: str
job_name: str
min_salary: int
max_salary: int
experience: str
education: str
max_page: int
# 中间执行数据
job_list: list
current_job_index: int
current_job: dict
match_result: dict
current_greeting: str
delivery_result: bool
# 执行统计
total_delivery: int
success_delivery: int
# 执行状态
status: str
error_msg: str
# 初始化所有Agent
from agent_job_filter import JobFilterAgent
from resume_matcher import parse_resume, save_resume_to_vector_db, calculate_match_score
from agent_greeting_generator import GreetingGeneratorAgent
from agent_risk_control import RiskControlAgent
from agent_delivery_executor import DeliveryExecutorAgent
from agent_data_review import DataReviewAgent
# 节点1:初始化与简历解析
def node_init_resume(state: AgentState) -> dict:
print("=== 步骤1:初始化系统,解析简历 ===")
try:
# 解析简历
resume_content = parse_resume(state["resume_path"])
# 保存简历到向量数据库
save_resume_to_vector_db(resume_content, state["user_id"])
# 初始化风控Agent
risk_agent = RiskControlAgent()
# 检查是否可以投递
check_result = risk_agent.check_can_delivery()
if not check_result["can_delivery"]:
risk_agent.close()
return {
**state,
"status": "failed",
"error_msg": check_result["reason"]
}
risk_agent.close()
return {
**state,
"status": "init_success",
"current_job_index": 0,
"total_delivery": 0,
"success_delivery": 0
}
except Exception as e:
return {
**state,
"status": "failed",
"error_msg": f"简历解析失败:{str(e)}"
}
# 节点2:岗位筛选与爬取
def node_job_filter(state: AgentState) -> dict:
print("=== 步骤2:筛选并爬取岗位 ===")
try:
job_agent = JobFilterAgent(headless=False)
job_agent.init_browser()
job_agent.set_filter_conditions(
city=state["city"],
job_name=state["job_name"],
min_salary=state["min_salary"],
max_salary=state["max_salary"],
experience=state["experience"],
education=state["education"]
)
job_list = job_agent.get_job_list(max_page=state["max_page"])
job_agent.close()
if not job_list:
return {
**state,
"status": "failed",
"error_msg": "未获取到符合要求的岗位"
}
return {
**state,
"job_list": job_list,
"status": "job_filter_success"
}
except Exception as e:
return {
**state,
"status": "failed",
"error_msg": f"岗位筛选失败:{str(e)}"
}
# 节点3:简历与岗位匹配度评估
def node_resume_match(state: AgentState) -> dict:
current_index = state["current_job_index"]
job_list = state["job_list"]
# 所有岗位处理完成
if current_index >= len(job_list):
return {
**state,
"status": "all_job_processed"
}
current_job = job_list[current_index]
print(f"=== 步骤3:评估岗位 {current_job['job_name']} - {current_job['company_name']} 匹配度 ===")
try:
match_result = calculate_match_score(current_job["jd_content"], state["user_id"])
print(f"匹配度评分:{match_result['final_score']}分,是否建议投递:{match_result['delivery_suggestion']}")
return {
**state,
"current_job": current_job,
"match_result": match_result,
"status": "match_complete"
}
except Exception as e:
# 匹配失败,跳过当前岗位,处理下一个
print(f"岗位匹配度评估失败:{str(e)},跳过当前岗位")
return {
**state,
"current_job_index": current_index + 1,
"status": "match_failed"
}
# 节点4:判断是否投递
def node_should_delivery(state: AgentState) -> str:
match_result = state["match_result"]
# 检查风控
risk_agent = RiskControlAgent()
check_result = risk_agent.check_can_delivery()
risk_agent.close()
if not check_result["can_delivery"]:
print(check_result["reason"])
return "end"
# 匹配度达标,建议投递
if match_result["delivery_suggestion"] and match_result["final_score"] >= RISK_CONFIG["min_match_score"]:
return "delivery"
# 不建议投递,跳过
else:
print(f"岗位匹配度不达标,跳过,下一个岗位")
return "skip"
# 节点5:生成个性化招呼语
def node_generate_greeting(state: AgentState) -> dict:
current_job = state["current_job"]
match_result = state["match_result"]
print(f"=== 步骤4:为岗位 {current_job['job_name']} 生成个性化招呼语 ===")
try:
greeting_agent = GreetingGeneratorAgent(state["resume_path"])
greeting = greeting_agent.generate_greeting(current_job, match_result)
print(f"生成的招呼语:{greeting}")
return {
**state,
"current_greeting": greeting,
"status": "greeting_generate_success"
}
except Exception as e:
print(f"招呼语生成失败:{str(e)},跳过当前岗位")
return {
**state,
"current_job_index": state["current_job_index"] + 1,
"status": "greeting_generate_failed"
}
# 节点6:执行投递操作
def node_execute_delivery(state: AgentState) -> dict:
current_job = state["current_job"]
greeting = state["current_greeting"]
print(f"=== 步骤5:执行投递操作 ===")
try:
# 初始化风控Agent
risk_agent = RiskControlAgent()
# 随机延迟,防风控
risk_agent.random_delay()
# 初始化浏览器与投递Agent
job_agent = JobFilterAgent(headless=False)
job_agent.init_browser()
delivery_agent = DeliveryExecutorAgent(headless=False)
delivery_agent.init_browser(job_agent.browser)
# 执行投递
delivery_result = delivery_agent.execute_delivery(current_job, greeting)
# 记录投递结果
risk_agent.record_delivery(
company_name=current_job["company_name"],
job_name=current_job["job_name"],
delivery_status="success" if delivery_result else "failed"
)
# 关闭资源
delivery_agent.close()
job_agent.close()
risk_agent.close()
# 更新统计数据
total_delivery = state["total_delivery"] + 1
success_delivery = state["success_delivery"] + (1 if delivery_result else 0)
return {
**state,
"delivery_result": delivery_result,
"total_delivery": total_delivery,
"success_delivery": success_delivery,
"current_job_index": state["current_job_index"] + 1,
"status": "delivery_complete"
}
except Exception as e:
print(f"投递执行失败:{str(e)}")
return {
**state,
"delivery_result": False,
"total_delivery": state["total_delivery"] + 1,
"current_job_index": state["current_job_index"] + 1,
"status": "delivery_failed"
}
# 节点7:生成最终复盘报告
def node_generate_report(state: AgentState) -> dict:
print("=== 步骤6:生成投递复盘报告 ===")
try:
review_agent = DataReviewAgent()
report = review_agent.generate_review_report()
review_agent.close()
print("投递任务完成!")
print(f"本次共处理{len(state['job_list'])}个岗位,尝试投递{state['total_delivery']}个,成功投递{state['success_delivery']}个")
print("复盘报告:")
print(report)
return {
**state,
"status": "task_complete",
"review_report": report
}
except Exception as e:
return {
**state,
"status": "report_generate_failed",
"error_msg": f"复盘报告生成失败:{str(e)}"
}
# 构建LangGraph状态图
def build_agent_graph():
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("init_resume", node_init_resume)
workflow.add_node("job_filter", node_job_filter)
workflow.add_node("resume_match", node_resume_match)
workflow.add_node("generate_greeting", node_generate_greeting)
workflow.add_node("execute_delivery", node_execute_delivery)
workflow.add_node("generate_report", node_generate_report)
# 设置入口
workflow.set_entry_point("init_resume")
# 添加边
workflow.add_edge("init_resume", "job_filter")
workflow.add_edge("job_filter", "resume_match")
# 添加条件边:匹配完成后,判断是否投递
workflow.add_conditional_edges(
"resume_match",
node_should_delivery,
{
"delivery": "generate_greeting",
"skip": "resume_match",
"end": "generate_report"
}
)
workflow.add_edge("generate_greeting", "execute_delivery")
workflow.add_edge("execute_delivery", "resume_match")
# 所有岗位处理完成后,生成报告
workflow.add_edge("generate_report", END)
# 编译图
app = workflow.compile()
return app
# 运行整个Agent系统
def run_job_agent_system(params: dict):
app = build_agent_graph()
# 初始化状态
initial_state = AgentState(
user_id=params.get("user_id", "default_user"),
resume_path=params.get("resume_path", ""),
city=params.get("city", "广州"),
job_name=params.get("job_name", "AI Agent开发工程师"),
min_salary=params.get("min_salary", 15),
max_salary=params.get("max_salary", 30),
experience=params.get("experience", ""),
education=params.get("education", ""),
max_page=params.get("max_page", 3),
job_list=[],
current_job_index=0,
current_job={},
match_result={},
current_greeting="",
delivery_result=False,
total_delivery=0,
success_delivery=0,
status="init",
error_msg=""
)
# 运行系统
result = app.invoke(initial_state)
return result
# 测试运行
if __name__ == "__main__":
params = {
"resume_path": "你的简历.pdf", # 替换为你的简历路径
"city": "广州",
"job_name": "AI Agent开发工程师",
"min_salary": 15,
"max_salary": 30,
"max_page": 2
}
run_job_agent_system(params)
3.4 可视化界面搭建:Gradio 实现一键操作
最后,我们使用 Gradio 搭建一个可视化的 Web 界面,让用户无需修改代码,即可通过界面上传简历、设置参数、启动投递、查看结果,创建web_ui.py文件:
# web_ui.py
import gradio as gr
from agent_scheduler import run_job_agent_system
from agent_risk_control import RiskControlAgent
from agent_data_review import DataReviewAgent
# 获取投递统计数据
def get_statistics():
risk_agent = RiskControlAgent()
stats = risk_agent.get_delivery_statistics()
risk_agent.close()
return f"""
### 投递统计数据
- 今日已投递:{stats['today_total']} 个
- 今日投递成功:{stats['today_success']} 个
- 今日剩余可投递:{stats['today_remaining']} 个
- 累计投递总数:{stats['total_count']} 个
"""
# 获取复盘报告
def get_review_report():
review_agent = DataReviewAgent()
report = review_agent.generate_review_report()
review_agent.close()
return report
# 启动投递系统
def start_delivery(resume_file, city, job_name, min_salary, max_salary, experience, education, max_page):
if not resume_file:
return "请先上传简历文件", get_statistics(), get_review_report()
resume_path = resume_file.name
params = {
"resume_path": resume_path,
"city": city,
"job_name": job_name,
"min_salary": min_salary,
"max_salary": max_salary,
"experience": experience,
"education": education,
"max_page": max_page
}
try:
result = run_job_agent_system(params)
if result["status"] == "task_complete":
return f"投递任务完成!本次成功投递{result['success_delivery']}个岗位", get_statistics(), result["review_report"]
else:
return f"任务失败:{result['error_msg']}", get_statistics(), get_review_report()
except Exception as e:
return f"系统运行出错:{str(e)}", get_statistics(), get_review_report()
# 构建Gradio界面
with gr.Blocks(title="春招智能投递多Agent系统", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 春招智能投递多Agent系统")
gr.Markdown("基于AI Agent技术,实现岗位筛选、简历匹配、个性化招呼语生成、安全投递全流程自动化,提升投递效率与回复率,避免封号风险")
with gr.Row():
# 左侧参数配置区
with gr.Column(scale=1):
gr.Markdown("### 基础参数配置")
resume_file = gr.File(label="上传简历(支持PDF/DOCX)", file_types=[".pdf", ".docx"])
city = gr.Textbox(label="目标城市", value="广州", placeholder="请输入目标城市")
job_name = gr.Textbox(label="岗位名称", value="AI Agent开发工程师", placeholder="请输入目标岗位名称")
with gr.Row():
min_salary = gr.Number(label="最低薪资(K)", value=15, minimum=5, maximum=100, step=1)
max_salary = gr.Number(label="最高薪资(K)", value=30, minimum=5, maximum=100, step=1)
experience = gr.Dropdown(
label="工作经验要求",
choices=["应届毕业生", "1年以内", "1-3年", "3-5年", "5-10年", "10年以上"],
value=""
)
education = gr.Dropdown(
label="学历要求",
choices=["大专", "本科", "硕士", "博士"],
value=""
)
max_page = gr.Number(label="最大爬取页数", value=3, minimum=1, maximum=10, step=1)
start_btn = gr.Button("启动智能投递", variant="primary", size="lg")
# 右侧结果展示区
with gr.Column(scale=2):
gr.Markdown("### 运行状态与结果")
status_output = gr.Textbox(label="运行状态", lines=3, interactive=False)
stats_output = gr.Markdown(label="投递统计数据")
report_output = gr.Textbox(label="投递复盘报告", lines=10, interactive=False)
# 页面加载时,自动获取统计数据与报告
demo.load(get_statistics, outputs=stats_output)
demo.load(get_review_report, outputs=report_output)
# 绑定按钮点击事件
start_btn.click(
fn=start_delivery,
inputs=[resume_file, city, job_name, min_salary, max_salary, experience, education, max_page],
outputs=[status_output, stats_output, report_output]
)
# 运行界面
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, inbrowser=True)
3.5 系统运行步骤
- 替换
config.py中的api_key为你自己的 DeepSeek API Key; - 替换
agent_job_filter.py中的user_data_dir为你本地 Chrome 浏览器的用户数据路径; - 运行
web_ui.py文件,启动可视化界面; - 在界面中上传简历,设置投递参数,点击「启动智能投递」即可运行系统。
四、风控红线与合规声明(必看)
4.1 平台规则与风险提示
Boss 直聘《用户服务协议》明确禁止任何形式的自动化工具、脚本、外挂进行批量投递、账号操作,违规使用可能导致以下后果:
- 账号限流、降权,岗位推荐质量大幅下降;
- 账号沟通功能被限制,无法向 HR 发送消息;
- 账号永久封禁,且无法申诉找回。
本文提供的代码仅用于技术学习与研究,请勿用于商业用途与违规操作,使用所产生的所有风险由使用者自行承担。
4.2 最低风控配置建议
为了最大程度降低账号风险,请严格遵守以下配置:
- 单日投递数量严禁超过 100 个,建议控制在 50-80 个;
- 投递间隔严禁低于 2 秒,建议设置为 3-6 秒随机间隔;
- 严禁在 23:00 - 次日 7:00 之间进行批量投递,该时间段风控检测最严格;
- 严禁使用完全相同的招呼语进行群发,必须使用个性化生成的招呼语;
- 严禁使用无头模式长时间连续运行,建议有头模式运行,模拟人工操作;
- 严禁使用第三方云端代投服务,所有操作必须在本地运行,避免账号密码泄露。
4.3 合规替代方案
如果担心账号风险,建议使用以下 100% 合规的方案:
- 使用 Boss 直聘官方的「批量投递」功能:在岗位搜索页勾选多个岗位,一键投递,无任何风险;
- 使用本文的「岗位筛选 Agent」+「招呼语生成
Agent」,自动筛选岗位、生成个性化招呼语,手动进行投递,效率接近自动化,且完全合规; - 每日分时段投递,早 9-11 点、下午 2-5 点各投递 20-30 个,模拟真实求职行为。
更多推荐

所有评论(0)