工作流平台合适嘛?hyperf                                                                                              
· Undulating…                                                                                                         ●UHyperf 做工作流平台:合适,但要分清做哪一层

  简短结论:合适,而且是 Hyperf 少数能做出差异化的方向之一,但前提是你做"业务工作流 / Agent
  编排"这一层,别去碰"高性能通用 Durable Execution 引擎"那一层。                                                        
  ---                                                                                                                     先拆概念:工作流平台不是一种东西

  市面上叫"工作流"的产品差别极大,先分清楚:

  ┌───────────────────┬─────────────────────────────┬────────────────────────────────────┬─────────────────────────┐
  │       层次        │          代表产品           │              核心诉求              │      Hyperf 适配度      │
  ├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
  │ BPM / 审批流      │ Camunda、Flowable、钉钉审批 │ 人审 + 表单 + 流转                 │ 非常合适                │
  ├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
  │ 数据/ETL 工作流   │ Airflow、DolphinScheduler   │ DAG 调度 + 数据处理                │ 合适                    │
  ├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
  │ 业务编排 / Saga   │ 自研微服务编排、Seata Saga  │ 跨服务事务、补偿                   │ 合适                    │
  ├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
  │ Durable Execution │ Temporal、Cadence、Restate  │ 代码即工作流、状态持久化、故障重放 │ 一般(能做但拼不过 Go) │
  ├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
  │ 低代码/iPaaS      │ n8n、Zapier、Activepieces   │ 拖拽节点 + 海量连接器              │ 非常合适                │
  ├───────────────────┼─────────────────────────────┼────────────────────────────────────┼─────────────────────────┤
  │ AI Agent 工作流   │ LangGraph、Dify、Coze       │ LLM 编排 + 工具调用 + 记忆         │ 非常合适(蓝海)        │
  └───────────────────┴─────────────────────────────┴────────────────────────────────────┴─────────────────────────┘

  同样叫"工作流平台",做 BPM 和做 Temporal 是两个物种。选错层级,一开始就不对。

  ---
  Hyperf 做工作流平台的三大优势

  1. 常驻进程天生适合状态机

  工作流本质是"状态 + 事件驱动 + 持久化"。FPM 模式每请求重启进程,做长流程极其别扭;Hyperf 常驻协程可以把"等人审批 7
  天"这种流程优雅挂起。

  2. 协程调度 = 天然的并行节点执行

  DAG 工作流里并行分支、扇出扇入、等待多个子任务,用协程 WaitGroup 两行代码就能搞定,比 Java 线程池清爽。

  3. 注解 + AOP + DI 的编排表达力

  Hyperf 的注解体系非常适合做"代码即工作流"的 DSL:
  #[Workflow]
  class OrderFlow {
      #[Activity(retry: 3, timeout: 30)]
      public function createOrder() {}

      #[Activity(compensate: 'refund')]
      public function pay() {}
  }
  这种 API 在 PHP 生态里目前几乎没人做。

  ---
  三大短板 / 风险点

  1. 纯性能拼不过 Temporal/Cadence

  Temporal 是 Go + Cassandra/Postgres,单集群能跑千万级并发工作流。Hyperf 做到十万级没问题,百万级要很努力。所以别对标
  Temporal 的性能,对标它的"易用性 + PHP 生态适配"2. 状态一致性很难做对

  工作流引擎最核心、最容易翻车的是:任务崩溃后的状态恢复、幂等、补偿、去重。这块没有捷径,必须啃:
  - 事件溯源(Event Sourcing)
  - 幂等 token
  - 分布式锁 / 乐观锁
  - 补偿事务 Saga

  低估这块复杂度的项目 90% 会死。

  3. PHP 生态调度原语较弱

  Go 有 goroutine + channel + context 三件套,写工作流引擎心智负担低。PHP 协程虽然能用,但 channel
  用法/上下文传递都要自己封装一层。工作量比想象的大。

  ---
  五个最值得做的细分方向(按机会大小排序)

  1. AI Agent 工作流引擎(最推荐,蓝海)

  为什么:LangGraph、Dify 是 Python/TS,PHP 生态完全空白。但国内大量 PHP 团队想接大模型,没有趁手工具。

  产品形态:
  - 可视化拖拽编排 LLM 节点、工具节点、条件节点、记忆节点
  - 代码模式:注解定义 Agent 流程
  - 内置 RAG、MCP、函数调用
  - 流式输出、SSE 推送(Hyperf 强项)
  - 多模型路由(接 OpenAI/Claude/通义/DeepSeek)

  护城河:和 LLM 网关、向量库、RAG 做深度整合,变成"PHP 团队做 AI 应用的默认选择"。

  ---
  2. 低代码 iPaaS(n8n 的 PHP 版)

  为什么:n8n 太贵、Node.js 性能一般,国内大量中小企业想要自部署、能连国产 SaaS 的方案。

  产品形态:
  - 触发器(Webhook/定时/消息)→ 节点链 → 输出
  - 海量连接器(钉钉、企微、飞书、CRM、电商 API)
  - Hyperf 协程并发执行节点
  - 可视化编辑 + 版本管理

  商业化:社区版免费 + 企业版(SSO/审计/私有部署)。这条路 n8n 已经验证过。

  ---
  3. BPM / 审批流引擎(传统但稳)

  为什么:所有 OA、ERP、政务系统都要审批流。Java 有 Flowable/Camunda,PHP 生态里没有一个成熟的 BPMN 2.0 实现。

  产品形态:
  - BPMN 2.0 解析器 + 执行引擎
  - 可视化流程设计器(前端)
  - 表单引擎 + 数据权限
  - 会签、加签、转办、催办、回退
  - 和主流 PHP 框架(Laravel/Hyperf/ThinkPHP)集成

  痛点:BPMN 2.0 规范非常复杂,工程量大。但做出来就是"PHP 圈的 Flowable",长期吃红利。

  ---
  4. Saga / 分布式事务编排

  为什么:微服务化后跨服务事务是刚需。PHP 团队现在基本靠"手写补偿 + 日志对账",非常痛苦。

  产品形态:
  - 注解定义 Saga 步骤和补偿动作
  - 状态持久化到 MySQL/Redis
  - 超时/重试/幂等内置
  - 和 Hyperf 的 RPC、消息队列打通

  参考:DTM(Go)、Seata(Java)。PHP 版空位。

  ---
  5. 数据管道 / ETL 调度

  为什么:DolphinScheduler 功能强但太重,中小团队想要一个轻量的 PHP 版。

  产品形态:
  - DAG 任务编排
  - Shell / SQL / HTTP / Python 任务节点
  - 依赖触发 + 定时触发
  - 失败重跑 + 数据补跑
  - 可视化监控

  注意:这块竞品多,差异化要想清楚(比如专做"轻量 + PHP 栈亲和")。

  ---
  架构骨架建议

  不管做哪个方向,核心组件都差不多:

  ┌─────────────────────────────────────────┐
  │  API 层 (Hyperf HTTP)                    │
  │  - 流程定义 CRUD / 实例启动 / 查询         │
  └──────────────┬──────────────────────────┘
                 │
  ┌──────────────▼──────────────────────────┐
  │  调度器 (Scheduler)                       │
  │  - 解析流程定义 DSL                        │
  │  - 决定下一步执行哪个节点                    │
  │  - 维护工作流状态机                         │
  └──────────────┬──────────────────────────┘
                 │
  ┌──────────────▼──────────────────────────┐
  │  执行器 (Worker Pool)                     │
  │  - 协程池执行节点任务                       │
  │  - 支持 HTTP/RPC/SQL/Script/LLM 节点       │
  │  - 超时、重试、幂等                         │
  └──────────────┬──────────────────────────┘
                 │
  ┌──────────────▼──────────────────────────┐
  │  状态存储 (State Store)                   │
  │  - 事件日志 (MySQL/PostgreSQL)             │
  │  - 运行态缓存 (Redis)                      │
  │  - 历史归档 (对象存储)                      │
  └──────────────┬──────────────────────────┘
                 │
  ┌──────────────▼──────────────────────────┐
  │  可观测性                                  │
  │  - 日志 / 链路 / 流程实例可视化              │
  └─────────────────────────────────────────┘

  关键设计点:
  1. 流程定义用 DSL(JSON/YAML 或注解),别硬编码
  2. 状态机必须事件溯源,所有状态变更产生事件,崩溃可重放
  3. 节点执行要幂等,每个节点一个唯一执行 ID
  4. 调度器和执行器解耦,调度器决定"做什么",执行器负责"怎么做"
  5. 锁粒度要细,流程实例级别加锁,别全局锁

  ---
  什么情况下别做

  - 你想做 Temporal 的 PHP 平替:拼不过,生态鸿沟太大,除非有公司级投入
  - 你想做通用 DAG 引擎:Airflow 已经太强,PHP 版本吸引力不够
  - 你对状态一致性、分布式系统没信心:工作流引擎翻车代价极大,用户丢数据就再也不会用你的东西

  ---
  一句话总结

  Hyperf 做工作流平台非常合适,但要做"业务可编排 + PHP 生态原生"这一层,而不是"高性能通用引擎"。

  机会排序:AI Agent 工作流 > 低代码 iPaaS > BPM 审批流 > Saga 编排 > ETL 调度。

  其中 AI Agent 工作流是当下最大的蓝海:LangGraph 们在 Python/TS 称霸,PHP 生态完全空缺,而 PHP
  团队有真实需求。谁先做出来并做扎实,就是这个赛道的默认选择。
Hyperf AI Agent 工作流引擎:从 01 开源全流程

  ---
  一、先说清楚:做这个有什么用                                                                                          
  1.1 一句话定位                                                                                                        
  让 PHP 团队用最熟悉的语法(注解 + 协程),可视化编排大模型 + 工具 + 记忆,做出能自动完成复杂任务的 AI Agent。

  1.2 谁会用?解决什么痛?

  ┌──────────────────┬───────────────────────────────────────────────────┬─────────────────────────────────────────┐
  │       用户       │                     现在的痛                      │             用了你的库之后              │
  ├──────────────────┼───────────────────────────────────────────────────┼─────────────────────────────────────────┤
  │ PHP 电商/SaaS    │ 想接大模型做客服/营销,但 LangChain 是            │ 在现有 Hyperf 项目里加几个注解就接入    │
  │ 团队             │ Python,团队学不动                                │ LLM                                     │
  ├──────────────────┼───────────────────────────────────────────────────┼─────────────────────────────────────────┤
  │ 传统行业 OA/ERP  │ 想用 AI 做智能审批、合同抽取,但没 Python/Go 人力 │ PHP 团队自己就能开发 AI 工作流          │
  ├──────────────────┼───────────────────────────────────────────────────┼─────────────────────────────────────────┤
  │ 独立开发者       │ 想做 AI SaaS,但 Dify 太重、不想被平台锁死        │ 用开源库自建,数据和逻辑完全自控        │
  ├──────────────────┼───────────────────────────────────────────────────┼─────────────────────────────────────────┤
  │ 企业内训         │ 想私有化部署 AI 助手                              │ Hyperf +                                │
  │                  │                                                   │ 本地模型(Ollama/vLLM),合规可控       │
  └──────────────────┴───────────────────────────────────────────────────┴─────────────────────────────────────────┘

  1.3 为什么是"蓝海"

  ┌────────────┬────────────────────────────────────┬──────────┐
  │    生态    │             工作流框架             │  市占率  │
  ├────────────┼────────────────────────────────────┼──────────┤
  │ Python     │ LangChain / LangGraph / LlamaIndex │ 垄断     │
  ├────────────┼────────────────────────────────────┼──────────┤
  │ TypeScript │ Vercel AI SDK / Mastra             │ 快速崛起 │
  ├────────────┼────────────────────────────────────┼──────────┤
  │ Java       │ Spring AI / LangChain4j            │ 官方背书 │
  ├────────────┼────────────────────────────────────┼──────────┤
  │ Go         │ Eino / LangChainGo                 │ 在做     │
  ├────────────┼────────────────────────────────────┼──────────┤
  │ PHP        │ 几乎空白                           │ 0        │
  └────────────┴────────────────────────────────────┴──────────┘

  国内 PHP 开发者数百万,AI 应用需求真实存在,但没人给他们趁手的工具。这就是机会。

  ---
  二、最佳方式:技术选型的取舍

  ┌────────────┬───────────────────┬────────────────────────────────────────┐
  │   决策点   │       选择        │                  理由                  │
  ├────────────┼───────────────────┼────────────────────────────────────────┤
  │ 框架       │ Hyperf 3.x        │ 协程 + 注解 + DI,天然适合编排         │
  ├────────────┼───────────────────┼────────────────────────────────────────┤
  │ PHP 版本   │ ≥ 8.1             │ 用 enum、readonly、注解                │
  ├────────────┼───────────────────┼────────────────────────────────────────┤
  │ 存储       │ MySQL + Redis     │ 事件溯源 + 运行态缓存                  │
  ├────────────┼───────────────────┼────────────────────────────────────────┤
  │ LLM 协议   │ OpenAI 兼容       │ 覆盖 90% 模型(含 DeepSeek/通义/Ollama) │
  ├────────────┼───────────────────┼────────────────────────────────────────┤
  │ 流式输出   │ SSE               │ Hyperf 强项                            │
  ├────────────┼───────────────────┼────────────────────────────────────────┤
  │ 工作流 DSL │ YAML + 注解双模式 │ 代码党和可视化党都要                   │
  ├────────────┼───────────────────┼────────────────────────────────────────┤
  │ 扩展机制   │ 节点即插件        │ 社区生态能繁荣                         │
  ├────────────┼───────────────────┼────────────────────────────────────────┤
  │ 可观测     │ OpenTelemetry     │ 对接标准生态                           │
  └────────────┴───────────────────┴────────────────────────────────────────┘

  ---
  三、完整代码(可运行的 MVP)

  3.1 项目骨架

  hyperf-ai-flow/
  ├── composer.json
  ├── config/autoload/
  │   └── ai_flow.php
  ├── src/
  │   ├── Contract/              # 接口
  │   │   ├── NodeInterface.php
  │   │   ├── LLMProviderInterface.php
  │   │   └── StateStoreInterface.php
  │   ├── Annotation/            # 注解
  │   │   ├── Workflow.php
  │   │   ├── Node.php
  │   │   └── Tool.php
  │   ├── DSL/                   # 流程定义
  │   │   ├── FlowDefinition.php
  │   │   └── YamlLoader.php
  │   ├── Node/                  # 内置节点
  │   │   ├── LLMNode.php
  │   │   ├── ToolNode.php
  │   │   ├── ConditionNode.php
  │   │   └── HttpNode.php
  │   ├── LLM/                   # 模型适配
  │   │   └── OpenAICompatProvider.php
  │   ├── Engine/                # 执行引擎
  │   │   ├── Executor.php
  │   │   ├── Context.php
  │   │   └── EventBus.php
  │   ├── State/                 # 状态管理
  │   │   └── MysqlStateStore.php
  │   └── Controller/            # HTTP API
  │       └── FlowController.php
  └── examples/
      └── customer_service.yaml

  3.2 composer.json

  {
    "name": "yourname/hyperf-ai-flow",
    "description": "AI Agent workflow engine for Hyperf",
    "type": "library",
    "license": "MIT",
    "require": {
      "php": ">=8.1",
      "hyperf/framework": "^3.1",
      "hyperf/di": "^3.1",
      "hyperf/http-server": "^3.1",
      "hyperf/guzzle": "^3.1",
      "hyperf/redis": "^3.1",
      "hyperf/database": "^3.1",
      "symfony/yaml": "^6.0"
    },
    "autoload": {
      "psr-4": { "YourName\\AiFlow\\": "src/" }
    },
    "extra": {
      "hyperf": {
        "config": "YourName\\AiFlow\\ConfigProvider"
      }
    }
  }

  3.3 核心契约 src/Contract/*

  <?php
  namespace YourName\AiFlow\Contract;

  use YourName\AiFlow\Engine\Context;

  interface NodeInterface
  {
      public function execute(Context $ctx, array $config): array;
      public function type(): string;
  }

  <?php
  namespace YourName\AiFlow\Contract;

  interface LLMProviderInterface
  {
      public function chat(array $messages, array $options = []): array;
      public function stream(array $messages, array $options, callable $onChunk): void;
  }

  <?php
  namespace YourName\AiFlow\Contract;

  interface StateStoreInterface
  {
      public function saveRun(string $runId, array $state): void;
      public function loadRun(string $runId): ?array;
      public function appendEvent(string $runId, array $event): void;
      public function listEvents(string $runId): array;
  }

  3.4 注解 src/Annotation/*

  <?php
  namespace YourName\AiFlow\Annotation;

  use Attribute;
  use Hyperf\Di\Annotation\AbstractAnnotation;

  #[Attribute(Attribute::TARGET_CLASS)]
  class Workflow extends AbstractAnnotation
  {
      public function __construct(public string $name, public string $version = '1.0') {}
  }

  #[Attribute(Attribute::TARGET_METHOD)]
  class Node extends AbstractAnnotation
  {
      public function __construct(
          public string $id,
          public string $type = 'custom',
          public array $next = [],
          public int $retry = 0,
          public int $timeout = 30
      ) {}
  }

  #[Attribute(Attribute::TARGET_METHOD)]
  class Tool extends AbstractAnnotation
  {
      public function __construct(
          public string $name,
          public string $description,
          public array $parameters = []
      ) {}
  }

  3.5 执行上下文 src/Engine/Context.php

  <?php
  namespace YourName\AiFlow\Engine;

  class Context
  {
      public function __construct(
          public string $runId,
          public array $input = [],
          public array $variables = [],
          public array $history = [],
      ) {}

      public function set(string $key, mixed $value): void { $this->variables[$key] = $value; }
      public function get(string $key, mixed $default = null): mixed { return $this->variables[$key] ?? $default; }
      public function pushHistory(string $role, string $content): void {
          $this->history[] = ['role' => $role, 'content' => $content];
      }
  }

  3.6 OpenAI 兼容 Provider

  <?php
  namespace YourName\AiFlow\LLM;

  use GuzzleHttp\Client;
  use YourName\AiFlow\Contract\LLMProviderInterface;

  class OpenAICompatProvider implements LLMProviderInterface
  {
      public function __construct(
          private string $baseUrl,
          private string $apiKey,
          private string $model = 'gpt-4o-mini',
          private ?Client $http = null,
      ) {
          $this->http ??= new Client(['timeout' => 60]);
      }

      public function chat(array $messages, array $options = []): array
      {
          $resp = $this->http->post($this->baseUrl . '/chat/completions', [
              'headers' => ['Authorization' => 'Bearer ' . $this->apiKey],
              'json' => array_merge([
                  'model' => $this->model,
                  'messages' => $messages,
              ], $options),
          ]);
          return json_decode((string) $resp->getBody(), true);
      }

      public function stream(array $messages, array $options, callable $onChunk): void
      {
          $resp = $this->http->post($this->baseUrl . '/chat/completions', [
              'headers' => ['Authorization' => 'Bearer ' . $this->apiKey],
              'json' => array_merge([
                  'model' => $this->model,
                  'messages' => $messages,
                  'stream' => true,
              ], $options),
              'stream' => true,
          ]);
          $body = $resp->getBody();
          while (!$body->eof()) {
              $line = \GuzzleHttp\Psr7\Utils::readLine($body);
              if (str_starts_with($line, 'data: ')) {
                  $json = substr($line, 6);
                  if (trim($json) === '[DONE]') break;
                  $onChunk(json_decode($json, true));
              }
          }
      }
  }

  3.7 内置节点 src/Node/*

  <?php
  namespace YourName\AiFlow\Node;

  use YourName\AiFlow\Contract\{NodeInterface, LLMProviderInterface};
  use YourName\AiFlow\Engine\Context;

  class LLMNode implements NodeInterface
  {
      public function __construct(private LLMProviderInterface $llm) {}
      public function type(): string { return 'llm'; }

      public function execute(Context $ctx, array $config): array
      {
          $prompt = $this->render($config['prompt'] ?? '', $ctx);
          $messages = $ctx->history;
          $messages[] = ['role' => 'user', 'content' => $prompt];

          $result = $this->llm->chat($messages, $config['options'] ?? []);
          $answer = $result['choices'][0]['message']['content'] ?? '';

          $ctx->pushHistory('user', $prompt);
          $ctx->pushHistory('assistant', $answer);
          if (!empty($config['output'])) {
              $ctx->set($config['output'], $answer);
          }
          return ['output' => $answer];
      }

      private function render(string $tpl, Context $ctx): string
      {
          return preg_replace_callback('/\{\{\s*(\w+)\s*\}\}/', function ($m) use ($ctx) {
              return (string) ($ctx->get($m[1]) ?? $ctx->input[$m[1]] ?? '');
          }, $tpl);
      }
  }

  <?php
  namespace YourName\AiFlow\Node;

  use YourName\AiFlow\Contract\NodeInterface;
  use YourName\AiFlow\Engine\Context;

  class ConditionNode implements NodeInterface
  {
      public function type(): string { return 'condition'; }

      public function execute(Context $ctx, array $config): array
      {
          $expr = $config['expression'] ?? 'true';
          $vars = array_merge($ctx->input, $ctx->variables);
          $ok = $this->evaluate($expr, $vars);
          return ['branch' => $ok ? 'true' : 'false'];
      }

      private function evaluate(string $expr, array $vars): bool
      {
          foreach ($vars as $k => $v) {
              if (is_scalar($v)) {
                  $expr = preg_replace('/\b' . preg_quote($k, '/') . '\b/', var_export($v, true), $expr);
              }
          }
          return (bool) @eval("return ($expr);");
      }
  }

  <?php
  namespace YourName\AiFlow\Node;

  use GuzzleHttp\Client;
  use YourName\AiFlow\Contract\NodeInterface;
  use YourName\AiFlow\Engine\Context;

  class HttpNode implements NodeInterface
  {
      public function __construct(private ?Client $http = null) { $this->http ??= new Client(); }
      public function type(): string { return 'http'; }

      public function execute(Context $ctx, array $config): array
      {
          $resp = $this->http->request(
              $config['method'] ?? 'GET',
              $config['url'],
              ['json' => $config['body'] ?? null, 'headers' => $config['headers'] ?? []]
          );
          $data = json_decode((string) $resp->getBody(), true);
          if (!empty($config['output'])) $ctx->set($config['output'], $data);
          return ['output' => $data];
      }
  }

  <?php
  namespace YourName\AiFlow\Node;

  use YourName\AiFlow\Contract\NodeInterface;
  use YourName\AiFlow\Engine\Context;

  class ToolNode implements NodeInterface
  {
      private array $tools = [];
      public function type(): string { return 'tool'; }

      public function register(string $name, callable $fn, string $description = ''): void
      {
          $this->tools[$name] = ['fn' => $fn, 'description' => $description];
      }

      public function execute(Context $ctx, array $config): array
      {
          $name = $config['tool'];
          if (!isset($this->tools[$name])) {
              throw new \RuntimeException("tool not found: $name");
          }
          $args = $config['args'] ?? [];
          $resolved = [];
          foreach ($args as $k => $v) {
              $resolved[$k] = is_string($v) && str_starts_with($v, '$')
                  ? $ctx->get(substr($v, 1)) : $v;
          }
          $result = ($this->tools[$name]['fn'])($resolved, $ctx);
          if (!empty($config['output'])) $ctx->set($config['output'], $result);
          return ['output' => $result];
      }
  }

  3.8 流程定义和加载 src/DSL/*

  <?php
  namespace YourName\AiFlow\DSL;

  class FlowDefinition
  {
      public function __construct(
          public string $name,
          public string $version,
          public string $start,
          public array $nodes,  // id => [type, config, next]
      ) {}

      public function nextOf(string $id, ?string $branch = null): ?string
      {
          $next = $this->nodes[$id]['next'] ?? null;
          if ($next === null) return null;
          if (is_string($next)) return $next;
          if (is_array($next)) return $next[$branch ?? 'default'] ?? null;
          return null;
      }
  }

  <?php
  namespace YourName\AiFlow\DSL;

  use Symfony\Component\Yaml\Yaml;

  class YamlLoader
  {
      public function load(string $path): FlowDefinition
      {
          $raw = Yaml::parseFile($path);
          return new FlowDefinition(
              name: $raw['name'],
              version: $raw['version'] ?? '1.0',
              start: $raw['start'],
              nodes: $raw['nodes'],
          );
      }
  }

  3.9 执行引擎 src/Engine/Executor.php

  <?php
  namespace YourName\AiFlow\Engine;

  use Hyperf\Coroutine\Coroutine;
  use Psr\Container\ContainerInterface;
  use YourName\AiFlow\Contract\{NodeInterface, StateStoreInterface};
  use YourName\AiFlow\DSL\FlowDefinition;

  class Executor
  {
      private array $nodeMap = [];

      public function __construct(
          private ContainerInterface $container,
          private StateStoreInterface $store,
      ) {}

      public function registerNode(NodeInterface $node): void
      {
          $this->nodeMap[$node->type()] = $node;
      }

      public function run(FlowDefinition $flow, array $input, ?string $runId = null): array
      {
          $runId ??= uniqid('run_', true);
          $ctx = new Context(runId: $runId, input: $input);
          $this->store->saveRun($runId, ['status' => 'running', 'flow' => $flow->name]);

          $currentId = $flow->start;
          while ($currentId !== null) {
              $nodeDef = $flow->nodes[$currentId];
              $type = $nodeDef['type'];
              $config = $nodeDef['config'] ?? [];

              if (!isset($this->nodeMap[$type])) {
                  throw new \RuntimeException("unknown node type: $type");
              }

              $this->store->appendEvent($runId, [
                  'at' => microtime(true), 'node' => $currentId, 'type' => $type, 'phase' => 'start',
              ]);

              try {
                  $result = $this->runWithRetry(
                      fn() => $this->nodeMap[$type]->execute($ctx, $config),
                      $nodeDef['retry'] ?? 0,
                      $nodeDef['timeout'] ?? 30,
                  );
              } catch (\Throwable $e) {
                  $this->store->appendEvent($runId, [
                      'at' => microtime(true), 'node' => $currentId, 'phase' => 'error',
                      'error' => $e->getMessage(),
                  ]);
                  $this->store->saveRun($runId, ['status' => 'failed', 'error' => $e->getMessage()]);
                  throw $e;
              }

              $this->store->appendEvent($runId, [
                  'at' => microtime(true), 'node' => $currentId, 'phase' => 'end', 'result' => $result,
              ]);

              $branch = $result['branch'] ?? null;
              $currentId = $flow->nextOf($currentId, $branch);
          }

          $this->store->saveRun($runId, [
              'status' => 'completed', 'variables' => $ctx->variables, 'history' => $ctx->history,
          ]);
          return ['runId' => $runId, 'variables' => $ctx->variables];
      }

      private function runWithRetry(callable $fn, int $retry, int $timeout): mixed
      {
          $attempt = 0; $lastErr = null;
          while ($attempt <= $retry) {
              try {
                  $result = null; $done = false;
                  Coroutine::create(function () use ($fn, &$result, &$done) {
                      try { $result = $fn(); } finally { $done = true; }
                  });
                  $deadline = microtime(true) + $timeout;
                  while (!$done && microtime(true) < $deadline) { usleep(10_000); }
                  if (!$done) throw new \RuntimeException('node timeout');
                  return $result;
              } catch (\Throwable $e) {
                  $lastErr = $e; $attempt++;
                  if ($attempt > $retry) break;
                  usleep(200_000 * $attempt);
              }
          }
          throw $lastErr;
      }
  }

  3.10 状态存储 src/State/MysqlStateStore.php

  <?php
  namespace YourName\AiFlow\State;

  use Hyperf\DbConnection\Db;
  use YourName\AiFlow\Contract\StateStoreInterface;

  class MysqlStateStore implements StateStoreInterface
  {
      public function saveRun(string $runId, array $state): void
      {
          Db::table('ai_flow_runs')->updateOrInsert(
              ['run_id' => $runId],
              ['state' => json_encode($state, JSON_UNESCAPED_UNICODE), 'updated_at' => date('Y-m-d H:i:s')]
          );
      }

      public function loadRun(string $runId): ?array
      {
          $row = Db::table('ai_flow_runs')->where('run_id', $runId)->first();
          return $row ? json_decode($row->state, true) : null;
      }

      public function appendEvent(string $runId, array $event): void
      {
          Db::table('ai_flow_events')->insert([
              'run_id' => $runId, 'payload' => json_encode($event, JSON_UNESCAPED_UNICODE),
              'created_at' => date('Y-m-d H:i:s.u'),
          ]);
      }

      public function listEvents(string $runId): array
      {
          return Db::table('ai_flow_events')->where('run_id', $runId)
              ->orderBy('id')->pluck('payload')
              ->map(fn($s) => json_decode($s, true))->toArray();
      }
  }

  建表 SQL:
  CREATE TABLE ai_flow_runs (
    run_id VARCHAR(64) PRIMARY KEY,
    state JSON NOT NULL,
    updated_at DATETIME(3) NOT NULL
  );
  CREATE TABLE ai_flow_events (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    run_id VARCHAR(64) NOT NULL,
    payload JSON NOT NULL,
    created_at DATETIME(3) NOT NULL,
    INDEX idx_run (run_id)
  );

  3.11 HTTP 控制器

  <?php
  namespace YourName\AiFlow\Controller;

  use Hyperf\HttpServer\Annotation\{Controller, PostMapping, GetMapping};
  use Hyperf\HttpServer\Contract\RequestInterface;
  use YourName\AiFlow\DSL\YamlLoader;
  use YourName\AiFlow\Engine\Executor;
  use YourName\AiFlow\Contract\StateStoreInterface;

  #[Controller(prefix: '/ai-flow')]
  class FlowController
  {
      public function __construct(
          private Executor $executor,
          private YamlLoader $loader,
          private StateStoreInterface $store,
      ) {}

      #[PostMapping('run')]
      public function run(RequestInterface $req): array
      {
          $file = $req->input('file');
          $input = $req->input('input', []);
          $flow = $this->loader->load($file);
          return $this->executor->run($flow, $input);
      }

      #[GetMapping('runs/{id}')]
      public function detail(string $id): array
      {
          return [
              'state' => $this->store->loadRun($id),
              'events' => $this->store->listEvents($id),
          ];
      }
  }

  3.12 ConfigProvider(关键,Hyperf 包入口)

  <?php
  namespace YourName\AiFlow;

  use YourName\AiFlow\Contract\{LLMProviderInterface, StateStoreInterface};
  use YourName\AiFlow\LLM\OpenAICompatProvider;
  use YourName\AiFlow\State\MysqlStateStore;
  use YourName\AiFlow\Engine\Executor;
  use YourName\AiFlow\Node\{LLMNode, ConditionNode, HttpNode, ToolNode};

  class ConfigProvider
  {
      public function __invoke(): array
      {
          return [
              'dependencies' => [
                  LLMProviderInterface::class => function ($c) {
                      $cfg = $c->get(\Hyperf\Contract\ConfigInterface::class)->get('ai_flow.llm');
                      return new OpenAICompatProvider($cfg['base_url'], $cfg['api_key'], $cfg['model']);
                  },
                  StateStoreInterface::class => MysqlStateStore::class,
                  Executor::class => function ($c) {
                      $exec = new Executor($c, $c->get(StateStoreInterface::class));
                      $exec->registerNode(new LLMNode($c->get(LLMProviderInterface::class)));
                      $exec->registerNode(new ConditionNode());
                      $exec->registerNode(new HttpNode());
                      $exec->registerNode($c->get(ToolNode::class));
                      return $exec;
                  },
              ],
              'publish' => [[
                  'id' => 'config', 'description' => 'ai-flow config',
                  'source' => __DIR__ . '/../config/ai_flow.php',
                  'destination' => BASE_PATH . '/config/autoload/ai_flow.php',
              ]],
          ];
      }
  }

  // config/ai_flow.php
  return [
      'llm' => [
          'base_url' => env('AIFLOW_LLM_URL', 'https://api.openai.com/v1'),
          'api_key'  => env('AIFLOW_LLM_KEY', ''),
          'model'    => env('AIFLOW_LLM_MODEL', 'gpt-4o-mini'),
      ],
  ];

  3.13 示例流程 examples/customer_service.yaml

  name: customer_service
  version: "1.0"
  start: classify
  nodes:
    classify:
      type: llm
      config:
        prompt: |
          判断用户问题类型,只回复一个词:退款 / 物流 / 其他
          用户问题:{{ question }}
        output: category
      next: route

    route:
      type: condition
      config:
        expression: "category == '退款'"
      next:
        "true": handle_refund
        "false": handle_other

    handle_refund:
      type: http
      config:
        method: POST
        url: https://example.com/api/refund
        body: { question: "{{ question }}" }
        output: refund_result
      next: reply

    handle_other:
      type: llm
      config:
        prompt: "作为客服,回答用户:{{ question }}"
        output: reply_text
      next: null

    reply:
      type: llm
      config:
        prompt: "根据退款处理结果{{ refund_result }},生成用户答复"
        output: reply_text
      next: null

  调用:
  curl -X POST localhost:9501/ai-flow/run \
    -H 'Content-Type: application/json' \
    -d '{"file":"examples/customer_service.yaml","input":{"question":"我要退款"}}'

  ---
  四、开源完整流程(0 → 持续维护)

  阶段 1:立项(Day 0-1)

  - 定位:README 第一句话 —— "让 PHP 开发者用最熟悉的方式构建 AI Agent 工作流"
  - 选名:hyperf-ai-flow / agentflow-php,GitHub 搜一遍没人占
  - 选协议:MIT(库类最友好,扩散最快)
  - 建仓库:GitHub 新建,添加 .gitignore、LICENSE、空 README

  阶段 2:MVP 开发(Week 1-3)

  按上面代码跑通最小闭环:YAML 加载 → LLM 节点 → 条件节点 → 状态持久化 → HTTP 查询。核心指标:能跑通 3
  个真实示例(客服分流、内容审核、数据抽取)。

  阶段 3:仓库规范(Week 3)

  必备文件:
  - README.md:徽章 + 30 秒 Quick Start + 架构图 + 对标表
  - CONTRIBUTING.md:PR 规范、Conventional Commits
  - CODE_OF_CONDUCT.md:Contributor Covenant 模板
  - SECURITY.md:漏洞披露邮箱
  - CHANGELOG.md:Keep a Changelog 格式
  - .github/ISSUE_TEMPLATE/:bug / feature / question 三类
  - .github/PULL_REQUEST_TEMPLATE.md

  阶段 4:质量流水线(Week 4)

  .github/workflows/ci.yml:
  name: CI
  on: [push, pull_request]
  jobs:
    test:
      runs-on: ubuntu-latest
      strategy:
        matrix:
          php: ['8.1', '8.2', '8.3']
      steps:
        - uses: actions/checkout@v4
        - uses: shivammathur/setup-php@v2
          with: { php-version: '${{ matrix.php }}', extensions: swoole }
        - run: composer install --prefer-dist
        - run: vendor/bin/phpstan analyse src --level=5
        - run: vendor/bin/pest --coverage
        - uses: codecov/codecov-action@v4

  测试分三层:
  - 单元:每个节点类单测
  - 集成:Executor + MysqlStateStore 联调(docker-compose 起 MySQL)
  - E2E:真实 LLM 调用(用 mock server 或 Ollama 本地跑)

  阶段 5:文档站(Week 5)

  - 用 VitePress 或 Mintlify 建站
  - 核心页:概念、快速开始、节点参考、注解参考、YAML DSL 规范、对标 LangGraph
  - 录 3 分钟 demo 视频放 README 顶部

  阶段 6:首次发版 v0.1.0(Week 6git tag v0.1.0 && git push --tags
  Packagist 接入 GitHub webhook,tag 推送自动发包。

  阶段 7:推广

  - 技术社区:掘金、V2EX、开源中国写一篇《为什么 PHP 也能做 AI Agent》
  - Hyperf 官方生态:提 PR 到 hyperf/awesome-hyperf
  - 对比内容:写《Hyperf AI Flow vs LangChain:PHP 团队的选择》
  - Hacker News / Reddit r/PHP:英文一篇
  - 短视频:B 站/YouTube 演示"5 分钟搭一个 AI 客服"

  阶段 8:持续迭代(长期)

  v0.2+ 路线图建议:
  1. 可视化编辑器(Vue/React 前端,输出 YAML)
  2. 更多节点:向量检索、MCP 工具、Python 桥接
  3. 流式 SSE:前端实时看 LLM 输出
  4. 记忆系统:短期 + 长期记忆抽象
  5. 子流程 / 并行分支:fan-out/fan-in
  6. OpenTelemetry 接入:链路追踪
  7. 多租户 / 配额:SaaS 场景

  阶段 9:社区治理

  - Issue 48h 内首次响应
  - 每月发布 minor 版本,patch 随时发
  - 贡献者 ≥ 3 人后设 MAINTAINERS.md + 投票机制
  - 开 GitHub Discussions 或 Discord

  阶段 10:商业化路径(可选)

  - 开源核心免费
  - 企业版:多租户、SSO、审计日志、SLA
  - 托管云:一键部署的 SaaS
  - 咨询和定制:按项目收费

  ---
  五、做这个项目的三重价值总结

  ┌────────────────────┬──────────────────────────────────────────────┐
  │        对谁        │                     价值                     │
  ├────────────────────┼──────────────────────────────────────────────┤
  │ 对用户(PHP 团队) │ 第一次能用母语做 AI Agent,不用跨栈学 Python │
  ├────────────────────┼──────────────────────────────────────────────┤
  │ 对生态(PHP 社区) │ 填补 AI 基础设施空白,延长 PHP 生命力        │
  ├────────────────────┼──────────────────────────────────────────────┤
  │ 对作者(你)       │ 占位蓝海,积累个人影响力,可能孵化商业产品   │
  └────────────────────┴──────────────────────────────────────────────┘

  一句话:这不是又一个工具库,而是 PHP 进入 AI 时代的入场券。谁先做出来并做扎实,就是这个赛道未来几年的默认选择。

  现在动手,窗口还在。
Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐