hyperf =AI Agent 工作流开源从0开始(非常合适 蓝海)
【代码】hyperf =AI Agent 工作流开源从0开始(非常合适 蓝海)
·
工作流平台合适嘛?hyperf
· Undulating… ●UHyperf 做工作流平台:合适,但要分清做哪一层
简短结论:合适,而且是 Hyperf 少数能做出差异化的方向之一,但前提是你做"业务工作流 / Agent
编排"这一层,别去碰"高性能通用 Durable Execution 引擎"那一层。
--- 先拆概念:工作流平台不是一种东西
市面上叫"工作流"的产品差别极大,先分清楚:
┌───────────────────┬─────────────────────────────┬────────────────────────────────────┬─────────────────────────┐
│ 层次 │ 代表产品 │ 核心诉求 │ Hyperf 适配度 │
├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
│ BPM / 审批流 │ Camunda、Flowable、钉钉审批 │ 人审 + 表单 + 流转 │ 非常合适 │
├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
│ 数据/ETL 工作流 │ Airflow、DolphinScheduler │ DAG 调度 + 数据处理 │ 合适 │
├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
│ 业务编排 / Saga │ 自研微服务编排、Seata Saga │ 跨服务事务、补偿 │ 合适 │
├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
│ Durable Execution │ Temporal、Cadence、Restate │ 代码即工作流、状态持久化、故障重放 │ 一般(能做但拼不过 Go) │
├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
│ 低代码/iPaaS │ n8n、Zapier、Activepieces │ 拖拽节点 + 海量连接器 │ 非常合适 │
├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
│ AI Agent 工作流 │ LangGraph、Dify、Coze │ LLM 编排 + 工具调用 + 记忆 │ 非常合适(蓝海) │
└───────────────────┴─────────────────────────────┴────────────────────────────────────┴─────────────────────────┘
同样叫"工作流平台",做 BPM 和做 Temporal 是两个物种。选错层级,一开始就不对。
---
Hyperf 做工作流平台的三大优势
1. 常驻进程天生适合状态机
工作流本质是"状态 + 事件驱动 + 持久化"。FPM 模式每请求重启进程,做长流程极其别扭;Hyperf 常驻协程可以把"等人审批 7
天"这种流程优雅挂起。
2. 协程调度 = 天然的并行节点执行
DAG 工作流里并行分支、扇出扇入、等待多个子任务,用协程 WaitGroup 两行代码就能搞定,比 Java 线程池清爽。
3. 注解 + AOP + DI 的编排表达力
Hyperf 的注解体系非常适合做"代码即工作流"的 DSL:
#[Workflow]
class OrderFlow {
#[Activity(retry: 3, timeout: 30)]
public function createOrder() {}
#[Activity(compensate: 'refund')]
public function pay() {}
}
这种 API 在 PHP 生态里目前几乎没人做。
---
三大短板 / 风险点
1. 纯性能拼不过 Temporal/Cadence
Temporal 是 Go + Cassandra/Postgres,单集群能跑千万级并发工作流。Hyperf 做到十万级没问题,百万级要很努力。所以别对标
Temporal 的性能,对标它的"易用性 + PHP 生态适配"。
2. 状态一致性很难做对
工作流引擎最核心、最容易翻车的是:任务崩溃后的状态恢复、幂等、补偿、去重。这块没有捷径,必须啃:
- 事件溯源(Event Sourcing)
- 幂等 token
- 分布式锁 / 乐观锁
- 补偿事务 Saga
低估这块复杂度的项目 90% 会死。
3. PHP 生态调度原语较弱
Go 有 goroutine + channel + context 三件套,写工作流引擎心智负担低。PHP 协程虽然能用,但 channel
用法/上下文传递都要自己封装一层。工作量比想象的大。
---
五个最值得做的细分方向(按机会大小排序)
1. AI Agent 工作流引擎(最推荐,蓝海)
为什么:LangGraph、Dify 是 Python/TS,PHP 生态完全空白。但国内大量 PHP 团队想接大模型,没有趁手工具。
产品形态:
- 可视化拖拽编排 LLM 节点、工具节点、条件节点、记忆节点
- 代码模式:注解定义 Agent 流程
- 内置 RAG、MCP、函数调用
- 流式输出、SSE 推送(Hyperf 强项)
- 多模型路由(接 OpenAI/Claude/通义/DeepSeek)
护城河:和 LLM 网关、向量库、RAG 做深度整合,变成"PHP 团队做 AI 应用的默认选择"。
---
2. 低代码 iPaaS(n8n 的 PHP 版)
为什么:n8n 太贵、Node.js 性能一般,国内大量中小企业想要自部署、能连国产 SaaS 的方案。
产品形态:
- 触发器(Webhook/定时/消息)→ 节点链 → 输出
- 海量连接器(钉钉、企微、飞书、CRM、电商 API)
- Hyperf 协程并发执行节点
- 可视化编辑 + 版本管理
商业化:社区版免费 + 企业版(SSO/审计/私有部署)。这条路 n8n 已经验证过。
---
3. BPM / 审批流引擎(传统但稳)
为什么:所有 OA、ERP、政务系统都要审批流。Java 有 Flowable/Camunda,PHP 生态里没有一个成熟的 BPMN 2.0 实现。
产品形态:
- BPMN 2.0 解析器 + 执行引擎
- 可视化流程设计器(前端)
- 表单引擎 + 数据权限
- 会签、加签、转办、催办、回退
- 和主流 PHP 框架(Laravel/Hyperf/ThinkPHP)集成
痛点:BPMN 2.0 规范非常复杂,工程量大。但做出来就是"PHP 圈的 Flowable",长期吃红利。
---
4. Saga / 分布式事务编排
为什么:微服务化后跨服务事务是刚需。PHP 团队现在基本靠"手写补偿 + 日志对账",非常痛苦。
产品形态:
- 注解定义 Saga 步骤和补偿动作
- 状态持久化到 MySQL/Redis
- 超时/重试/幂等内置
- 和 Hyperf 的 RPC、消息队列打通
参考:DTM(Go)、Seata(Java)。PHP 版空位。
---
5. 数据管道 / ETL 调度
为什么:DolphinScheduler 功能强但太重,中小团队想要一个轻量的 PHP 版。
产品形态:
- DAG 任务编排
- Shell / SQL / HTTP / Python 任务节点
- 依赖触发 + 定时触发
- 失败重跑 + 数据补跑
- 可视化监控
注意:这块竞品多,差异化要想清楚(比如专做"轻量 + PHP 栈亲和")。
---
架构骨架建议
不管做哪个方向,核心组件都差不多:
┌─────────────────────────────────────────┐
│ API 层 (Hyperf HTTP) │
│ - 流程定义 CRUD / 实例启动 / 查询 │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ 调度器 (Scheduler) │
│ - 解析流程定义 DSL │
│ - 决定下一步执行哪个节点 │
│ - 维护工作流状态机 │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ 执行器 (Worker Pool) │
│ - 协程池执行节点任务 │
│ - 支持 HTTP/RPC/SQL/Script/LLM 节点 │
│ - 超时、重试、幂等 │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ 状态存储 (State Store) │
│ - 事件日志 (MySQL/PostgreSQL) │
│ - 运行态缓存 (Redis) │
│ - 历史归档 (对象存储) │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ 可观测性 │
│ - 日志 / 链路 / 流程实例可视化 │
└─────────────────────────────────────────┘
关键设计点:
1. 流程定义用 DSL(JSON/YAML 或注解),别硬编码
2. 状态机必须事件溯源,所有状态变更产生事件,崩溃可重放
3. 节点执行要幂等,每个节点一个唯一执行 ID
4. 调度器和执行器解耦,调度器决定"做什么",执行器负责"怎么做"
5. 锁粒度要细,流程实例级别加锁,别全局锁
---
什么情况下别做
- 你想做 Temporal 的 PHP 平替:拼不过,生态鸿沟太大,除非有公司级投入
- 你想做通用 DAG 引擎:Airflow 已经太强,PHP 版本吸引力不够
- 你对状态一致性、分布式系统没信心:工作流引擎翻车代价极大,用户丢数据就再也不会用你的东西
---
一句话总结
Hyperf 做工作流平台非常合适,但要做"业务可编排 + PHP 生态原生"这一层,而不是"高性能通用引擎"。
机会排序:AI Agent 工作流 > 低代码 iPaaS > BPM 审批流 > Saga 编排 > ETL 调度。
其中 AI Agent 工作流是当下最大的蓝海:LangGraph 们在 Python/TS 称霸,PHP 生态完全空缺,而 PHP
团队有真实需求。谁先做出来并做扎实,就是这个赛道的默认选择。
Hyperf AI Agent 工作流引擎:从 0 到 1 开源全流程
---
一、先说清楚:做这个有什么用
1.1 一句话定位
让 PHP 团队用最熟悉的语法(注解 + 协程),可视化编排大模型 + 工具 + 记忆,做出能自动完成复杂任务的 AI Agent。
1.2 谁会用?解决什么痛?
┌──────────────────┬───────────────────────────────────────────────────┬─────────────────────────────────────────┐
│ 用户 │ 现在的痛 │ 用了你的库之后 │
├──────────────────┼───────────────────────────────────────────────────┼─────────────────────────────────────────┤
│ PHP 电商/SaaS │ 想接大模型做客服/营销,但 LangChain 是 │ 在现有 Hyperf 项目里加几个注解就接入 │
│ 团队 │ Python,团队学不动 │ LLM │
├──────────────────┼───────────────────────────────────────────────────┼─────────────────────────────────────────┤
│ 传统行业 OA/ERP │ 想用 AI 做智能审批、合同抽取,但没 Python/Go 人力 │ PHP 团队自己就能开发 AI 工作流 │
├──────────────────┼───────────────────────────────────────────────────┼─────────────────────────────────────────┤
│ 独立开发者 │ 想做 AI SaaS,但 Dify 太重、不想被平台锁死 │ 用开源库自建,数据和逻辑完全自控 │
├──────────────────┼───────────────────────────────────────────────────┼─────────────────────────────────────────┤
│ 企业内训 │ 想私有化部署 AI 助手 │ Hyperf + │
│ │ │ 本地模型(Ollama/vLLM),合规可控 │
└──────────────────┴───────────────────────────────────────────────────┴─────────────────────────────────────────┘
1.3 为什么是"蓝海"
┌────────────┬────────────────────────────────────┬──────────┐
│ 生态 │ 工作流框架 │ 市占率 │
├────────────┼────────────────────────────────────┼──────────┤
│ Python │ LangChain / LangGraph / LlamaIndex │ 垄断 │
├────────────┼────────────────────────────────────┼──────────┤
│ TypeScript │ Vercel AI SDK / Mastra │ 快速崛起 │
├────────────┼────────────────────────────────────┼──────────┤
│ Java │ Spring AI / LangChain4j │ 官方背书 │
├────────────┼────────────────────────────────────┼──────────┤
│ Go │ Eino / LangChainGo │ 在做 │
├────────────┼────────────────────────────────────┼──────────┤
│ PHP │ 几乎空白 │ 0 │
└────────────┴────────────────────────────────────┴──────────┘
国内 PHP 开发者数百万,AI 应用需求真实存在,但没人给他们趁手的工具。这就是机会。
---
二、最佳方式:技术选型的取舍
┌────────────┬───────────────────┬────────────────────────────────────────┐
│ 决策点 │ 选择 │ 理由 │
├────────────┼───────────────────┼────────────────────────────────────────┤
│ 框架 │ Hyperf 3.x │ 协程 + 注解 + DI,天然适合编排 │
├────────────┼───────────────────┼────────────────────────────────────────┤
│ PHP 版本 │ ≥ 8.1 │ 用 enum、readonly、注解 │
├────────────┼───────────────────┼────────────────────────────────────────┤
│ 存储 │ MySQL + Redis │ 事件溯源 + 运行态缓存 │
├────────────┼───────────────────┼────────────────────────────────────────┤
│ LLM 协议 │ OpenAI 兼容 │ 覆盖 90% 模型(含 DeepSeek/通义/Ollama) │
├────────────┼───────────────────┼────────────────────────────────────────┤
│ 流式输出 │ SSE │ Hyperf 强项 │
├────────────┼───────────────────┼────────────────────────────────────────┤
│ 工作流 DSL │ YAML + 注解双模式 │ 代码党和可视化党都要 │
├────────────┼───────────────────┼────────────────────────────────────────┤
│ 扩展机制 │ 节点即插件 │ 社区生态能繁荣 │
├────────────┼───────────────────┼────────────────────────────────────────┤
│ 可观测 │ OpenTelemetry │ 对接标准生态 │
└────────────┴───────────────────┴────────────────────────────────────────┘
---
三、完整代码(可运行的 MVP)
3.1 项目骨架
hyperf-ai-flow/
├── composer.json
├── config/autoload/
│ └── ai_flow.php
├── src/
│ ├── Contract/ # 接口
│ │ ├── NodeInterface.php
│ │ ├── LLMProviderInterface.php
│ │ └── StateStoreInterface.php
│ ├── Annotation/ # 注解
│ │ ├── Workflow.php
│ │ ├── Node.php
│ │ └── Tool.php
│ ├── DSL/ # 流程定义
│ │ ├── FlowDefinition.php
│ │ └── YamlLoader.php
│ ├── Node/ # 内置节点
│ │ ├── LLMNode.php
│ │ ├── ToolNode.php
│ │ ├── ConditionNode.php
│ │ └── HttpNode.php
│ ├── LLM/ # 模型适配
│ │ └── OpenAICompatProvider.php
│ ├── Engine/ # 执行引擎
│ │ ├── Executor.php
│ │ ├── Context.php
│ │ └── EventBus.php
│ ├── State/ # 状态管理
│ │ └── MysqlStateStore.php
│ └── Controller/ # HTTP API
│ └── FlowController.php
└── examples/
└── customer_service.yaml
3.2 composer.json
{
"name": "yourname/hyperf-ai-flow",
"description": "AI Agent workflow engine for Hyperf",
"type": "library",
"license": "MIT",
"require": {
"php": ">=8.1",
"hyperf/framework": "^3.1",
"hyperf/di": "^3.1",
"hyperf/http-server": "^3.1",
"hyperf/guzzle": "^3.1",
"hyperf/redis": "^3.1",
"hyperf/database": "^3.1",
"symfony/yaml": "^6.0"
},
"autoload": {
"psr-4": { "YourName\\AiFlow\\": "src/" }
},
"extra": {
"hyperf": {
"config": "YourName\\AiFlow\\ConfigProvider"
}
}
}
3.3 核心契约 src/Contract/*
<?php
namespace YourName\AiFlow\Contract;
use YourName\AiFlow\Engine\Context;
interface NodeInterface
{
public function execute(Context $ctx, array $config): array;
public function type(): string;
}
<?php
namespace YourName\AiFlow\Contract;
interface LLMProviderInterface
{
public function chat(array $messages, array $options = []): array;
public function stream(array $messages, array $options, callable $onChunk): void;
}
<?php
namespace YourName\AiFlow\Contract;
interface StateStoreInterface
{
public function saveRun(string $runId, array $state): void;
public function loadRun(string $runId): ?array;
public function appendEvent(string $runId, array $event): void;
public function listEvents(string $runId): array;
}
3.4 注解 src/Annotation/*
<?php
namespace YourName\AiFlow\Annotation;
use Attribute;
use Hyperf\Di\Annotation\AbstractAnnotation;
#[Attribute(Attribute::TARGET_CLASS)]
class Workflow extends AbstractAnnotation
{
public function __construct(public string $name, public string $version = '1.0') {}
}
#[Attribute(Attribute::TARGET_METHOD)]
class Node extends AbstractAnnotation
{
public function __construct(
public string $id,
public string $type = 'custom',
public array $next = [],
public int $retry = 0,
public int $timeout = 30
) {}
}
#[Attribute(Attribute::TARGET_METHOD)]
class Tool extends AbstractAnnotation
{
public function __construct(
public string $name,
public string $description,
public array $parameters = []
) {}
}
3.5 执行上下文 src/Engine/Context.php
<?php
namespace YourName\AiFlow\Engine;
class Context
{
public function __construct(
public string $runId,
public array $input = [],
public array $variables = [],
public array $history = [],
) {}
public function set(string $key, mixed $value): void { $this->variables[$key] = $value; }
public function get(string $key, mixed $default = null): mixed { return $this->variables[$key] ?? $default; }
public function pushHistory(string $role, string $content): void {
$this->history[] = ['role' => $role, 'content' => $content];
}
}
3.6 OpenAI 兼容 Provider
<?php
namespace YourName\AiFlow\LLM;
use GuzzleHttp\Client;
use YourName\AiFlow\Contract\LLMProviderInterface;
class OpenAICompatProvider implements LLMProviderInterface
{
public function __construct(
private string $baseUrl,
private string $apiKey,
private string $model = 'gpt-4o-mini',
private ?Client $http = null,
) {
$this->http ??= new Client(['timeout' => 60]);
}
public function chat(array $messages, array $options = []): array
{
$resp = $this->http->post($this->baseUrl . '/chat/completions', [
'headers' => ['Authorization' => 'Bearer ' . $this->apiKey],
'json' => array_merge([
'model' => $this->model,
'messages' => $messages,
], $options),
]);
return json_decode((string) $resp->getBody(), true);
}
public function stream(array $messages, array $options, callable $onChunk): void
{
$resp = $this->http->post($this->baseUrl . '/chat/completions', [
'headers' => ['Authorization' => 'Bearer ' . $this->apiKey],
'json' => array_merge([
'model' => $this->model,
'messages' => $messages,
'stream' => true,
], $options),
'stream' => true,
]);
$body = $resp->getBody();
while (!$body->eof()) {
$line = \GuzzleHttp\Psr7\Utils::readLine($body);
if (str_starts_with($line, 'data: ')) {
$json = substr($line, 6);
if (trim($json) === '[DONE]') break;
$onChunk(json_decode($json, true));
}
}
}
}
3.7 内置节点 src/Node/*
<?php
namespace YourName\AiFlow\Node;
use YourName\AiFlow\Contract\{NodeInterface, LLMProviderInterface};
use YourName\AiFlow\Engine\Context;
class LLMNode implements NodeInterface
{
public function __construct(private LLMProviderInterface $llm) {}
public function type(): string { return 'llm'; }
public function execute(Context $ctx, array $config): array
{
$prompt = $this->render($config['prompt'] ?? '', $ctx);
$messages = $ctx->history;
$messages[] = ['role' => 'user', 'content' => $prompt];
$result = $this->llm->chat($messages, $config['options'] ?? []);
$answer = $result['choices'][0]['message']['content'] ?? '';
$ctx->pushHistory('user', $prompt);
$ctx->pushHistory('assistant', $answer);
if (!empty($config['output'])) {
$ctx->set($config['output'], $answer);
}
return ['output' => $answer];
}
private function render(string $tpl, Context $ctx): string
{
return preg_replace_callback('/\{\{\s*(\w+)\s*\}\}/', function ($m) use ($ctx) {
return (string) ($ctx->get($m[1]) ?? $ctx->input[$m[1]] ?? '');
}, $tpl);
}
}
<?php
namespace YourName\AiFlow\Node;
use YourName\AiFlow\Contract\NodeInterface;
use YourName\AiFlow\Engine\Context;
class ConditionNode implements NodeInterface
{
public function type(): string { return 'condition'; }
public function execute(Context $ctx, array $config): array
{
$expr = $config['expression'] ?? 'true';
$vars = array_merge($ctx->input, $ctx->variables);
$ok = $this->evaluate($expr, $vars);
return ['branch' => $ok ? 'true' : 'false'];
}
private function evaluate(string $expr, array $vars): bool
{
foreach ($vars as $k => $v) {
if (is_scalar($v)) {
$expr = preg_replace('/\b' . preg_quote($k, '/') . '\b/', var_export($v, true), $expr);
}
}
return (bool) @eval("return ($expr);");
}
}
<?php
namespace YourName\AiFlow\Node;
use GuzzleHttp\Client;
use YourName\AiFlow\Contract\NodeInterface;
use YourName\AiFlow\Engine\Context;
class HttpNode implements NodeInterface
{
public function __construct(private ?Client $http = null) { $this->http ??= new Client(); }
public function type(): string { return 'http'; }
public function execute(Context $ctx, array $config): array
{
$resp = $this->http->request(
$config['method'] ?? 'GET',
$config['url'],
['json' => $config['body'] ?? null, 'headers' => $config['headers'] ?? []]
);
$data = json_decode((string) $resp->getBody(), true);
if (!empty($config['output'])) $ctx->set($config['output'], $data);
return ['output' => $data];
}
}
<?php
namespace YourName\AiFlow\Node;
use YourName\AiFlow\Contract\NodeInterface;
use YourName\AiFlow\Engine\Context;
class ToolNode implements NodeInterface
{
private array $tools = [];
public function type(): string { return 'tool'; }
public function register(string $name, callable $fn, string $description = ''): void
{
$this->tools[$name] = ['fn' => $fn, 'description' => $description];
}
public function execute(Context $ctx, array $config): array
{
$name = $config['tool'];
if (!isset($this->tools[$name])) {
throw new \RuntimeException("tool not found: $name");
}
$args = $config['args'] ?? [];
$resolved = [];
foreach ($args as $k => $v) {
$resolved[$k] = is_string($v) && str_starts_with($v, '$')
? $ctx->get(substr($v, 1)) : $v;
}
$result = ($this->tools[$name]['fn'])($resolved, $ctx);
if (!empty($config['output'])) $ctx->set($config['output'], $result);
return ['output' => $result];
}
}
3.8 流程定义和加载 src/DSL/*
<?php
namespace YourName\AiFlow\DSL;
class FlowDefinition
{
public function __construct(
public string $name,
public string $version,
public string $start,
public array $nodes, // id => [type, config, next]
) {}
public function nextOf(string $id, ?string $branch = null): ?string
{
$next = $this->nodes[$id]['next'] ?? null;
if ($next === null) return null;
if (is_string($next)) return $next;
if (is_array($next)) return $next[$branch ?? 'default'] ?? null;
return null;
}
}
<?php
namespace YourName\AiFlow\DSL;
use Symfony\Component\Yaml\Yaml;
class YamlLoader
{
public function load(string $path): FlowDefinition
{
$raw = Yaml::parseFile($path);
return new FlowDefinition(
name: $raw['name'],
version: $raw['version'] ?? '1.0',
start: $raw['start'],
nodes: $raw['nodes'],
);
}
}
3.9 执行引擎 src/Engine/Executor.php
<?php
namespace YourName\AiFlow\Engine;
use Hyperf\Coroutine\Coroutine;
use Psr\Container\ContainerInterface;
use YourName\AiFlow\Contract\{NodeInterface, StateStoreInterface};
use YourName\AiFlow\DSL\FlowDefinition;
class Executor
{
private array $nodeMap = [];
public function __construct(
private ContainerInterface $container,
private StateStoreInterface $store,
) {}
public function registerNode(NodeInterface $node): void
{
$this->nodeMap[$node->type()] = $node;
}
public function run(FlowDefinition $flow, array $input, ?string $runId = null): array
{
$runId ??= uniqid('run_', true);
$ctx = new Context(runId: $runId, input: $input);
$this->store->saveRun($runId, ['status' => 'running', 'flow' => $flow->name]);
$currentId = $flow->start;
while ($currentId !== null) {
$nodeDef = $flow->nodes[$currentId];
$type = $nodeDef['type'];
$config = $nodeDef['config'] ?? [];
if (!isset($this->nodeMap[$type])) {
throw new \RuntimeException("unknown node type: $type");
}
$this->store->appendEvent($runId, [
'at' => microtime(true), 'node' => $currentId, 'type' => $type, 'phase' => 'start',
]);
try {
$result = $this->runWithRetry(
fn() => $this->nodeMap[$type]->execute($ctx, $config),
$nodeDef['retry'] ?? 0,
$nodeDef['timeout'] ?? 30,
);
} catch (\Throwable $e) {
$this->store->appendEvent($runId, [
'at' => microtime(true), 'node' => $currentId, 'phase' => 'error',
'error' => $e->getMessage(),
]);
$this->store->saveRun($runId, ['status' => 'failed', 'error' => $e->getMessage()]);
throw $e;
}
$this->store->appendEvent($runId, [
'at' => microtime(true), 'node' => $currentId, 'phase' => 'end', 'result' => $result,
]);
$branch = $result['branch'] ?? null;
$currentId = $flow->nextOf($currentId, $branch);
}
$this->store->saveRun($runId, [
'status' => 'completed', 'variables' => $ctx->variables, 'history' => $ctx->history,
]);
return ['runId' => $runId, 'variables' => $ctx->variables];
}
private function runWithRetry(callable $fn, int $retry, int $timeout): mixed
{
$attempt = 0; $lastErr = null;
while ($attempt <= $retry) {
try {
$result = null; $done = false;
Coroutine::create(function () use ($fn, &$result, &$done) {
try { $result = $fn(); } finally { $done = true; }
});
$deadline = microtime(true) + $timeout;
while (!$done && microtime(true) < $deadline) { usleep(10_000); }
if (!$done) throw new \RuntimeException('node timeout');
return $result;
} catch (\Throwable $e) {
$lastErr = $e; $attempt++;
if ($attempt > $retry) break;
usleep(200_000 * $attempt);
}
}
throw $lastErr;
}
}
3.10 状态存储 src/State/MysqlStateStore.php
<?php
namespace YourName\AiFlow\State;
use Hyperf\DbConnection\Db;
use YourName\AiFlow\Contract\StateStoreInterface;
class MysqlStateStore implements StateStoreInterface
{
public function saveRun(string $runId, array $state): void
{
Db::table('ai_flow_runs')->updateOrInsert(
['run_id' => $runId],
['state' => json_encode($state, JSON_UNESCAPED_UNICODE), 'updated_at' => date('Y-m-d H:i:s')]
);
}
public function loadRun(string $runId): ?array
{
$row = Db::table('ai_flow_runs')->where('run_id', $runId)->first();
return $row ? json_decode($row->state, true) : null;
}
public function appendEvent(string $runId, array $event): void
{
Db::table('ai_flow_events')->insert([
'run_id' => $runId, 'payload' => json_encode($event, JSON_UNESCAPED_UNICODE),
'created_at' => date('Y-m-d H:i:s.u'),
]);
}
public function listEvents(string $runId): array
{
return Db::table('ai_flow_events')->where('run_id', $runId)
->orderBy('id')->pluck('payload')
->map(fn($s) => json_decode($s, true))->toArray();
}
}
建表 SQL:
CREATE TABLE ai_flow_runs (
run_id VARCHAR(64) PRIMARY KEY,
state JSON NOT NULL,
updated_at DATETIME(3) NOT NULL
);
CREATE TABLE ai_flow_events (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
run_id VARCHAR(64) NOT NULL,
payload JSON NOT NULL,
created_at DATETIME(3) NOT NULL,
INDEX idx_run (run_id)
);
3.11 HTTP 控制器
<?php
namespace YourName\AiFlow\Controller;
use Hyperf\HttpServer\Annotation\{Controller, PostMapping, GetMapping};
use Hyperf\HttpServer\Contract\RequestInterface;
use YourName\AiFlow\DSL\YamlLoader;
use YourName\AiFlow\Engine\Executor;
use YourName\AiFlow\Contract\StateStoreInterface;
#[Controller(prefix: '/ai-flow')]
class FlowController
{
public function __construct(
private Executor $executor,
private YamlLoader $loader,
private StateStoreInterface $store,
) {}
#[PostMapping('run')]
public function run(RequestInterface $req): array
{
$file = $req->input('file');
$input = $req->input('input', []);
$flow = $this->loader->load($file);
return $this->executor->run($flow, $input);
}
#[GetMapping('runs/{id}')]
public function detail(string $id): array
{
return [
'state' => $this->store->loadRun($id),
'events' => $this->store->listEvents($id),
];
}
}
3.12 ConfigProvider(关键,Hyperf 包入口)
<?php
namespace YourName\AiFlow;
use YourName\AiFlow\Contract\{LLMProviderInterface, StateStoreInterface};
use YourName\AiFlow\LLM\OpenAICompatProvider;
use YourName\AiFlow\State\MysqlStateStore;
use YourName\AiFlow\Engine\Executor;
use YourName\AiFlow\Node\{LLMNode, ConditionNode, HttpNode, ToolNode};
class ConfigProvider
{
public function __invoke(): array
{
return [
'dependencies' => [
LLMProviderInterface::class => function ($c) {
$cfg = $c->get(\Hyperf\Contract\ConfigInterface::class)->get('ai_flow.llm');
return new OpenAICompatProvider($cfg['base_url'], $cfg['api_key'], $cfg['model']);
},
StateStoreInterface::class => MysqlStateStore::class,
Executor::class => function ($c) {
$exec = new Executor($c, $c->get(StateStoreInterface::class));
$exec->registerNode(new LLMNode($c->get(LLMProviderInterface::class)));
$exec->registerNode(new ConditionNode());
$exec->registerNode(new HttpNode());
$exec->registerNode($c->get(ToolNode::class));
return $exec;
},
],
'publish' => [[
'id' => 'config', 'description' => 'ai-flow config',
'source' => __DIR__ . '/../config/ai_flow.php',
'destination' => BASE_PATH . '/config/autoload/ai_flow.php',
]],
];
}
}
// config/ai_flow.php
return [
'llm' => [
'base_url' => env('AIFLOW_LLM_URL', 'https://api.openai.com/v1'),
'api_key' => env('AIFLOW_LLM_KEY', ''),
'model' => env('AIFLOW_LLM_MODEL', 'gpt-4o-mini'),
],
];
3.13 示例流程 examples/customer_service.yaml
name: customer_service
version: "1.0"
start: classify
nodes:
classify:
type: llm
config:
prompt: |
判断用户问题类型,只回复一个词:退款 / 物流 / 其他
用户问题:{{ question }}
output: category
next: route
route:
type: condition
config:
expression: "category == '退款'"
next:
"true": handle_refund
"false": handle_other
handle_refund:
type: http
config:
method: POST
url: https://example.com/api/refund
body: { question: "{{ question }}" }
output: refund_result
next: reply
handle_other:
type: llm
config:
prompt: "作为客服,回答用户:{{ question }}"
output: reply_text
next: null
reply:
type: llm
config:
prompt: "根据退款处理结果{{ refund_result }},生成用户答复"
output: reply_text
next: null
调用:
curl -X POST localhost:9501/ai-flow/run \
-H 'Content-Type: application/json' \
-d '{"file":"examples/customer_service.yaml","input":{"question":"我要退款"}}'
---
四、开源完整流程(0 → 持续维护)
阶段 1:立项(Day 0-1)
- 定位:README 第一句话 —— "让 PHP 开发者用最熟悉的方式构建 AI Agent 工作流"
- 选名:hyperf-ai-flow / agentflow-php,GitHub 搜一遍没人占
- 选协议:MIT(库类最友好,扩散最快)
- 建仓库:GitHub 新建,添加 .gitignore、LICENSE、空 README
阶段 2:MVP 开发(Week 1-3)
按上面代码跑通最小闭环:YAML 加载 → LLM 节点 → 条件节点 → 状态持久化 → HTTP 查询。核心指标:能跑通 3
个真实示例(客服分流、内容审核、数据抽取)。
阶段 3:仓库规范(Week 3)
必备文件:
- README.md:徽章 + 30 秒 Quick Start + 架构图 + 对标表
- CONTRIBUTING.md:PR 规范、Conventional Commits
- CODE_OF_CONDUCT.md:Contributor Covenant 模板
- SECURITY.md:漏洞披露邮箱
- CHANGELOG.md:Keep a Changelog 格式
- .github/ISSUE_TEMPLATE/:bug / feature / question 三类
- .github/PULL_REQUEST_TEMPLATE.md
阶段 4:质量流水线(Week 4)
.github/workflows/ci.yml:
name: CI
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
php: ['8.1', '8.2', '8.3']
steps:
- uses: actions/checkout@v4
- uses: shivammathur/setup-php@v2
with: { php-version: '${{ matrix.php }}', extensions: swoole }
- run: composer install --prefer-dist
- run: vendor/bin/phpstan analyse src --level=5
- run: vendor/bin/pest --coverage
- uses: codecov/codecov-action@v4
测试分三层:
- 单元:每个节点类单测
- 集成:Executor + MysqlStateStore 联调(docker-compose 起 MySQL)
- E2E:真实 LLM 调用(用 mock server 或 Ollama 本地跑)
阶段 5:文档站(Week 5)
- 用 VitePress 或 Mintlify 建站
- 核心页:概念、快速开始、节点参考、注解参考、YAML DSL 规范、对标 LangGraph
- 录 3 分钟 demo 视频放 README 顶部
阶段 6:首次发版 v0.1.0(Week 6)
git tag v0.1.0 && git push --tags
Packagist 接入 GitHub webhook,tag 推送自动发包。
阶段 7:推广
- 技术社区:掘金、V2EX、开源中国写一篇《为什么 PHP 也能做 AI Agent》
- Hyperf 官方生态:提 PR 到 hyperf/awesome-hyperf
- 对比内容:写《Hyperf AI Flow vs LangChain:PHP 团队的选择》
- Hacker News / Reddit r/PHP:英文一篇
- 短视频:B 站/YouTube 演示"5 分钟搭一个 AI 客服"
阶段 8:持续迭代(长期)
v0.2+ 路线图建议:
1. 可视化编辑器(Vue/React 前端,输出 YAML)
2. 更多节点:向量检索、MCP 工具、Python 桥接
3. 流式 SSE:前端实时看 LLM 输出
4. 记忆系统:短期 + 长期记忆抽象
5. 子流程 / 并行分支:fan-out/fan-in
6. OpenTelemetry 接入:链路追踪
7. 多租户 / 配额:SaaS 场景
阶段 9:社区治理
- Issue 48h 内首次响应
- 每月发布 minor 版本,patch 随时发
- 贡献者 ≥ 3 人后设 MAINTAINERS.md + 投票机制
- 开 GitHub Discussions 或 Discord
阶段 10:商业化路径(可选)
- 开源核心免费
- 企业版:多租户、SSO、审计日志、SLA
- 托管云:一键部署的 SaaS
- 咨询和定制:按项目收费
---
五、做这个项目的三重价值总结
┌────────────────────┬──────────────────────────────────────────────┐
│ 对谁 │ 价值 │
├────────────────────┼──────────────────────────────────────────────┤
│ 对用户(PHP 团队) │ 第一次能用母语做 AI Agent,不用跨栈学 Python │
├────────────────────┼──────────────────────────────────────────────┤
│ 对生态(PHP 社区) │ 填补 AI 基础设施空白,延长 PHP 生命力 │
├────────────────────┼──────────────────────────────────────────────┤
│ 对作者(你) │ 占位蓝海,积累个人影响力,可能孵化商业产品 │
└────────────────────┴──────────────────────────────────────────────┘
一句话:这不是又一个工具库,而是 PHP 进入 AI 时代的入场券。谁先做出来并做扎实,就是这个赛道未来几年的默认选择。
现在动手,窗口还在。
更多推荐




所有评论(0)