PHP 8.3 + Swoole 6.x 直播媒体控制面 · 完整实现

  前置说明(很重要):
  PHP+Swoole 在直播领域的最佳定位是控制面(Control Plane),不是数据面。
  真正的 RTMP/SRT 协议解析、HLS 切片、转码计算交给 SRS / nginx-rtmp / ffmpeg;
  PHP+Swoole 负责调度、鉴权、回调、任务编排、监控、CDN 联动 —— 这才是性价比最高的玩法。

          ┌─────────────────────────────────────┐
          │   PHP+Swoole 控制面 (Control Plane)  │
          │  • 鉴权 / 推流密钥                   │
          │  • 转码任务调度                      │
          │  • HLS/DASH 调度                     │
          │  • SRT 信令                          │
          │  • 录制回调                          │
          │  • 监控告警                          │
          └──┬──────────────────────────┬───────┘
             │ 调度 / 回调               │ 派工
             ▼                         ▼
     ┌──────────────┐          ┌──────────────────┐
     │ SRS / nginx-  │  pull  │ FFmpeg Worker 集群│
     │ rtmp / SRT    │ ──────►│ (转码/切片)       │
     │ (数据面)      │          └──────────────────┘
     └──────┬───────┘                  │
            │ HLS/DASH                 │
            ▼                         ▼
          Origin S3 / OSS  ← 存储
            ▼
          CDN
            ▼
         观众端

  ---
  一、整体流程(大白话)

  主播侧:
    1. 申请推流密钥(stream_key)→PHP 控制面发 JWT
    2. OBS 推流 rtmp://srs/live/{app}?key={token}
    3. SRS 收到推流 →回调 PHP /auth/on_publish 验密钥
    4. PHP 鉴权通过 + 起转码任务(派工 ffmpeg worker)
    5. ffmpeg 转出多码率 HLS/DASH 切片 →推到对象存储
    6. PHP 维护切片清单(m3u8/mpd 索引、广告插入)
    7. 主播下播 →SRS 回调 /auth/on_unpublish →PHP 触发录制合成 + 通知业务

  观众侧:
    1. 拉播放地址 https://cdn/live/abc/master.m3u8?token=xxx
    2. CDN 回源 Origin →命中切片直接返
    3. PHP 鉴权:token + 时长 + 地域 + 设备(回源鉴权)
    4. 心跳上报观看时长 / 卡顿率

  SRT 模式:
    1. SRT publisher 用 streamid=#!::r=live/abc,m=publish,p=xxx 推流
    2. SRT 服务 (srt-live-transmit / SRS) 回调 PHP 验签
    3. PHP 信令服务返回路由(用哪个边缘节点接)
    4. 后续走 RTMP 一样的转码 / 录制流水线

  ---
  二、最佳技术选型

  ┌─────────────┬─────────────────────────────┬──────────────────────────┐
  │    组件     │            推荐             │     在本方案中的角色     │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ RTMP 数据面 │ SRS 5.0                     │ 推流接收、转推 RTMP/HLS  │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ SRT 数据面  │ SRS 5.0 / haivision srt     │ 低延迟传输               │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ 转码        │ FFmpeg 6.x                  │ 多码率 / DASH / 切片     │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ 控制面      │ PHP 8.3 + Swoole 6.x        │ 鉴权、调度、信令、回调   │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ 任务队列    │ Redis Stream                │ 转码任务派发             │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ 元数据      │ MySQL + Redis               │ 流元信息、密钥、转码进度 │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ 配置        │ Swoole\Table                │ 在线流、worker 状态      │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ 对象存储    │ MinIO / OSS / S3(Flysystem) │ HLS/DASH 切片            │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ Token       │ firebase/php-jwt            │ 推流/拉流鉴权            │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ 路由        │ nikic/fast-route            │ HTTP 控制面 API          │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ 告警        │ Prometheus + Alertmanager   │ 流断/卡顿                │
  ├─────────────┼─────────────────────────────┼──────────────────────────┤
  │ 日志        │ monolog/monolog             │ 结构化                   │
  └─────────────┴─────────────────────────────┴──────────────────────────┘

  composer require swoole/ide-helper firebase/php-jwt nikic/fast-route \
                   league/flysystem league/flysystem-aws-s3-v3 \
                   promphp/prometheus_client_php monolog/monolog \
                   symfony/expression-language

  ---
  三、完整代码

  1. 入口:控制面 HTTP 服务

  <?php
  // server.php
  declare(strict_types=1);
  require __DIR__ . '/vendor/autoload.php';

  use Swoole\Http\Server;
  use Swoole\Http\Request;
  use Swoole\Http\Response;
  use App\Live\Container;
  use App\Live\Router;

  \Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);

  $c = Container::boot();

  $server = new Server('0.0.0.0', 9508, SWOOLE_PROCESS);
  $server->set([
      'worker_num'        => swoole_cpu_num() * 2,
      'task_worker_num'   => 8,
      'task_enable_coroutine' => true,
      'enable_coroutine'  => true,
      'max_request'       => 100000,
      'hook_flags'        => SWOOLE_HOOK_ALL,
  ]);

  $server->on('WorkerStart', function($s, $wid) use ($c) {
      $c->initPools();
      $c->loadConfig();
      if ($wid === 0) {
          \Swoole\Coroutine::create(fn() => $c->jobDispatcher()->run());        // 转码任务派发
          \Swoole\Coroutine::create(fn() => $c->streamMonitor()->run());        // 流状态巡检
          \Swoole\Coroutine::create(fn() => $c->workerHealth()->run());         // ffmpeg worker 心跳检查
      }
  });

  $server->on('Request', function(Request $req, Response $res) use ($c, $server) {
      (new Router($c, $server))->dispatch($req, $res);
  });

  // Task:异步业务回调推送 + 录制合成 trigger
  $server->on('Task', function($s, $task) use ($c) {
      $type = $task->data['type'] ?? '';
      match($type) {
          'business_webhook' => $c->callbackDispatcher()->push($task->data),
          'finalize_recording' => $c->recorder()->finalize($task->data),
          default => null,
      };
  });
  $server->on('Finish', fn() => null);

  $server->start();

  ---
  2. Container:在线流 / Worker / 任务队列

  <?php
  // src/Live/Container.php
  namespace App\Live;

  use Swoole\Coroutine\Channel;
  use Swoole\Coroutine\Redis;
  use Swoole\Coroutine\MySQL;
  use Swoole\Table;
  use League\Flysystem\Filesystem;
  use League\Flysystem\AwsS3V3\AwsS3V3Adapter;
  use Aws\S3\S3Client;

  class Container
  {
      public Table $streams;        // 在线流(stream_key →meta)
      public Table $workers;        // ffmpeg worker 池
      public Channel $redisPool;
      public Channel $mysqlPool;
      public Filesystem $storage;
      public array $cfg;

      public static function boot(): self
      {
          $c = new self();

          $c->streams = new Table(8192);
          $c->streams->column('stream_id', Table::TYPE_STRING, 64);
          $c->streams->column('app',       Table::TYPE_STRING, 32);
          $c->streams->column('publisher_ip', Table::TYPE_STRING, 64);
          $c->streams->column('start_at',  Table::TYPE_INT, 8);
          $c->streams->column('codec_v',   Table::TYPE_STRING, 16);   // h264/h265
          $c->streams->column('codec_a',   Table::TYPE_STRING, 16);
          $c->streams->column('bitrate',   Table::TYPE_INT, 4);
          $c->streams->column('width',     Table::TYPE_INT, 4);
          $c->streams->column('height',    Table::TYPE_INT, 4);
          $c->streams->column('fps',       Table::TYPE_INT, 4);
          $c->streams->column('viewers',   Table::TYPE_INT, 4);
          $c->streams->column('status',    Table::TYPE_STRING, 16);   // live/done/error
          $c->streams->column('transcode_id', Table::TYPE_STRING, 64);
          $c->streams->column('record_path',  Table::TYPE_STRING, 256);
          $c->streams->create();

          $c->workers = new Table(256);
          $c->workers->column('host',     Table::TYPE_STRING, 64);
          $c->workers->column('cpu_use',  Table::TYPE_INT, 4);
          $c->workers->column('jobs',     Table::TYPE_INT, 4);
          $c->workers->column('max_jobs', Table::TYPE_INT, 4);
          $c->workers->column('healthy',  Table::TYPE_INT, 1);
          $c->workers->column('last_hb',  Table::TYPE_INT, 8);
          $c->workers->create();

          // 对象存储(切片落这里)
          $s3 = new S3Client(['version'=>'latest','region'=>'us-east-1',
              'endpoint'=>'http://127.0.0.1:9000','use_path_style_endpoint'=>true,
              'credentials'=>['key'=>'minio','secret'=>'minio123']]);
          $c->storage = new Filesystem(new AwsS3V3Adapter($s3, 'live'));

          $c->cfg = [
              'jwt_secret'    => getenv('JWT_SECRET') ?: 'change-me',
              'srs_callback_secret' => 'srs-secret',
              'rtmp_endpoint' => 'rtmp://srs.example.com/live',
              'srt_endpoint'  => 'srt://srt.example.com:10080',
              'hls_origin'    => 'https://origin.example.com',
              'cdn_play_host' => 'https://cdn.example.com',
              'job_stream'    => 'live:transcode:jobs',
          ];
          return $c;
      }

      public function initPools(): void
      {
          $this->redisPool = new Channel(64);
          for ($i=0;$i<64;$i++) {
              $r = new Redis(); $r->connect('127.0.0.1', 6379);
              $this->redisPool->push($r);
          }
          $this->mysqlPool = new Channel(16);
          for ($i=0;$i<16;$i++) {
              $db = new MySQL();
              $db->connect(['host'=>'127.0.0.1','user'=>'live','password'=>'live',
                  'database'=>'live','charset'=>'utf8mb4']);
              $this->mysqlPool->push($db);
          }
      }

      public function loadConfig(): void
      {
          // 从 DB 把 worker 节点信息载入内存
          $this->withMySQL(function($db) {
              $rows = $db->query("SELECT * FROM transcode_workers WHERE enabled=1");
              foreach ($rows as $r) {
                  $this->workers->set($r['node_id'], [
                      'host'=>$r['host'],'cpu_use'=>0,'jobs'=>0,
                      'max_jobs'=>(int)$r['max_jobs'],'healthy'=>1,'last_hb'=>time(),
                  ]);
              }
          });
      }

      public function withRedis(callable $fn) {
          $r = $this->redisPool->pop();
          try { return $fn($r); } finally { $this->redisPool->push($r); }
      }
      public function withMySQL(callable $fn) {
          $db = $this->mysqlPool->pop();
          try { return $fn($db); } finally { $this->mysqlPool->push($db); }
      }

      public function jobDispatcher(): JobDispatcher { return new JobDispatcher($this); }
      public function streamMonitor(): StreamMonitor { return new StreamMonitor($this); }
      public function workerHealth(): WorkerHealth   { return new WorkerHealth($this); }
      public function callbackDispatcher(): CallbackDispatcher { return new CallbackDispatcher($this); }
      public function recorder(): Recorder           { return new Recorder($this); }
  }

  ---
  3. Router:控制面所有 API

  <?php
  // src/Live/Router.php
  namespace App\Live;

  use Swoole\Http\Request;
  use Swoole\Http\Response;
  use Swoole\Http\Server;

  class Router
  {
      public function __construct(private Container $c, private Server $server) {}

      public function dispatch(Request $req, Response $res): void
      {
          $res->header('Content-Type','application/json');
          $uri = $req->server['request_uri'];
          try {
              $out = match($uri) {
                  // ===== 主播侧 =====
                  '/api/streams/issue'    => (new StreamKeyIssuer($this->c))->issue($req),
                  '/api/streams/revoke'   => (new StreamKeyIssuer($this->c))->revoke($req),

                  // ===== 观众侧鉴权 URL 签发 =====
                  '/api/play/url'         => (new PlaybackSigner($this->c))->sign($req),

                  // ===== SRS / nginx-rtmp 回调 =====
                  '/srs/on_publish'       => (new SRSCallback($this->c, $this->server))->onPublish($req),
                  '/srs/on_unpublish'     => (new SRSCallback($this->c, $this->server))->onUnpublish($req),
                  '/srs/on_play'          => (new SRSCallback($this->c, $this->server))->onPlay($req),
                  '/srs/on_stop'          => (new SRSCallback($this->c, $this->server))->onStop($req),
                  '/srs/on_hls'           => (new SRSCallback($this->c, $this->server))->onHls($req),

                  // ===== SRT 信令 =====
                  '/srt/connect'          => (new SRTSignaling($this->c))->connect($req),
                  '/srt/disconnect'       => (new SRTSignaling($this->c))->disconnect($req),

                  // ===== HLS / DASH 调度 =====
                  '/hls/manifest'         => (new HLSScheduler($this->c))->manifest($req, $res),
                  '/hls/keyframe-trigger' => (new HLSScheduler($this->c))->keyframeTrigger($req),
                  '/dash/manifest'        => (new DASHScheduler($this->c))->manifest($req, $res),

                  // ===== 转码 worker 心跳 / 上报 =====
                  '/worker/heartbeat'     => (new WorkerAPI($this->c))->heartbeat($req),
                  '/worker/job/done'      => (new WorkerAPI($this->c, $this->server))->done($req),
                  '/worker/job/progress'  => (new WorkerAPI($this->c))->progress($req),

                  // ===== 管理面 =====
                  '/admin/streams'        => $this->snapshot($this->c->streams),
                  '/admin/workers'        => $this->snapshot($this->c->workers),
                  default                 => $this->return404($res),
              };
              if ($out !== null) $res->end(json_encode($out, JSON_UNESCAPED_UNICODE));
          } catch (\Throwable $e) {
              $res->status(500);
              $res->end(json_encode(['code'=>500,'msg'=>$e->getMessage()]));
          }
      }

      private function snapshot(\Swoole\Table $t): array
      {
          $out = []; foreach ($t as $k=>$v) $out[$k] = $v; return $out;
      }
      private function return404(Response $res): null
      {
          $res->status(404); $res->end('not found'); return null;
      }
  }

  ---
  4. 推流密钥 + 播放鉴权

  <?php
  // src/Live/StreamKeyIssuer.php
  namespace App\Live;

  use Firebase\JWT\JWT;
  use Swoole\Http\Request;

  class StreamKeyIssuer
  {
      public function __construct(private Container $c) {}

      public function issue(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          $userId = (int)($d['user_id'] ?? 0);
          $app    = $d['app'] ?? 'live';
          $duration = (int)($d['duration'] ?? 7200);

          $streamId = 'sid_' . bin2hex(random_bytes(8));
          $token = JWT::encode([
              'iss'  => 'live-control',
              'sub'  => $userId,
              'app'  => $app,
              'sid'  => $streamId,
              'iat'  => time(),
              'exp'  => time() + $duration,
              'scope'=> 'publish',
          ], $this->c->cfg['jwt_secret'], 'HS256');

          // 写 DB 留档
          $this->c->withMySQL(function($db) use ($streamId, $userId, $app, $duration) {
              $stmt = $db->prepare(
                  "INSERT INTO stream_keys(stream_id,user_id,app,expires_at,enabled,created_at)
                   VALUES(?,?,?,FROM_UNIXTIME(?),1,NOW())"
              );
              $stmt->execute([$streamId, $userId, $app, time()+$duration]);
          });

          return [
              'code'       => 0,
              'stream_id'  => $streamId,
              'publish_url'=> "{$this->c->cfg['rtmp_endpoint']}/$app/$streamId?token=$token",
              'srt_url'    => "{$this->c->cfg['srt_endpoint']}?streamid=#!::r=$app/$streamId,m=publish,token=$token",
              'expires_at' => time() + $duration,
          ];
      }

      public function revoke(Request $req): array
      {
          $sid = $req->get['sid'] ?? '';
          if (!$sid) return ['code'=>400];
          $this->c->withMySQL(fn($db) =>
              $db->query("UPDATE stream_keys SET enabled=0 WHERE stream_id='$sid'")
          );
          // 立即剔除在线流(踢人)
          if ($this->c->streams->exist($sid)) {
              $this->c->withRedis(fn($r) =>
                  $r->xAdd('srs:kick', '*', ['stream_id'=>$sid], 1000)
              );
          }
          return ['code'=>0];
      }
  }

  <?php
  // src/Live/PlaybackSigner.php
  namespace App\Live;

  use Swoole\Http\Request;

  class PlaybackSigner
  {
      public function __construct(private Container $c) {}

      public function sign(Request $req): array
      {
          $d = $req->get ?: $req->post ?? [];
          $sid = $d['stream_id'] ?? '';
          $userId = (int)($d['user_id'] ?? 0);
          $expire = time() + (int)($d['duration'] ?? 3600);

          // 简单签名:防盗链(类 Nginx secure_link)
          $sign = md5("$sid|$userId|$expire|".$this->c->cfg['jwt_secret']);
          $host = $this->c->cfg['cdn_play_host'];

          return [
              'code'    => 0,
              'hls'     => "$host/live/$sid/master.m3u8?uid=$userId&exp=$expire&sign=$sign",
              'dash'    => "$host/live/$sid/master.mpd?uid=$userId&exp=$expire&sign=$sign",
              'flv'     => "$host/live/$sid.flv?uid=$userId&exp=$expire&sign=$sign",
              'srt_play'=> "{$this->c->cfg['srt_endpoint']}?streamid=#!::r=live/$sid,m=request,sign=$sign",
              'expires_at' => $expire,
          ];
      }
  }

  解释:
  - 推流地址带 JWT:SRS 收到后回调 /srs/on_publish 验签
  - 播放地址签名 + 过期时间:CDN 节点用相同算法验签(防盗链 + 防分享)
  - revoke 立即踢人:写 Redis Stream 让 SRS Watcher 调 SRS API kick stream

  ---
  5. SRS 回调:鉴权 + 转码触发 + 录制触发

  <?php
  // src/Live/SRSCallback.php
  namespace App\Live;

  use Swoole\Http\Request;
  use Swoole\Http\Server;
  use Firebase\JWT\JWT;
  use Firebase\JWT\Key;

  class SRSCallback
  {
      public function __construct(private Container $c, private ?Server $server = null) {}

      // SRS 收到推流时回调,返回 0 = 允许,非 0 = 拒绝
      public function onPublish(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          $app    = $d['app'] ?? '';
          $stream = $d['stream'] ?? '';            // 这就是 stream_id
          $ip     = $d['ip'] ?? $d['client_ip'] ?? '';
          $param  = $d['param'] ?? '';

          parse_str(ltrim($param, '?'), $q);
          $token = $q['token'] ?? '';

          // 1. JWT 验签
          try {
              $payload = (array) JWT::decode($token, new Key($this->c->cfg['jwt_secret'], 'HS256'));
          } catch (\Throwable $e) {
              error_log("[on_publish] bad jwt: ".$e->getMessage());
              return ['code'=>403];   // SRS 用非 0 表示拒绝
          }
          if ($payload['sid'] !== $stream || $payload['app'] !== $app
              || ($payload['scope'] ?? '') !== 'publish') {
              return ['code'=>403];
          }

          // 2. DB 二次校验
          $row = $this->c->withMySQL(function($db) use ($stream) {
              $stmt = $db->prepare("SELECT * FROM stream_keys WHERE stream_id=? AND enabled=1");
              return $stmt->execute([$stream])[0] ?? null;
          });
          if (!$row) return ['code'=>403];

          // 3. 登记在线流
          $transcodeId = 'tj_'.bin2hex(random_bytes(8));
          $this->c->streams->set($stream, [
              'stream_id'   => $stream,
              'app'         => $app,
              'publisher_ip'=> $ip,
              'start_at'    => time(),
              'codec_v'     => '', 'codec_a'=>'',
              'bitrate'     => 0, 'width'=>0, 'height'=>0, 'fps'=>0,
              'viewers'     => 0,
              'status'      => 'live',
              'transcode_id'=> $transcodeId,
              'record_path' => '',
          ]);

          // 4. 派转码任务到队列
          $this->dispatchTranscodeJob($stream, $app, $transcodeId);

          // 5. 通知业务(异步)
          $this->server?->task([
              'type'   => 'business_webhook',
              'event'  => 'stream.started',
              'user_id'=> $row['user_id'],
              'stream_id'=> $stream,
              'ts'     => time(),
          ]);
          return ['code'=>0];
      }

      public function onUnpublish(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          $sid = $d['stream'] ?? '';
          $row = $this->c->streams->get($sid);
          if (!$row) return ['code'=>0];

          $row['status'] = 'done';
          $this->c->streams->set($sid, $row);

          // 1. 通知 ffmpeg worker 收尾(停止转码进程)
          $this->c->withRedis(fn($r) =>
              $r->xAdd('worker:cancel', '*', ['transcode_id'=>$row['transcode_id']], 10000)
          );

          // 2. 触发录制合成(把所有 ts 拼成 mp4)
          $this->server?->task([
              'type'        => 'finalize_recording',
              'stream_id'   => $sid,
              'transcode_id'=> $row['transcode_id'],
              'duration'    => time() - $row['start_at'],
          ]);

          // 3. 业务通知
          $this->server?->task([
              'type'   => 'business_webhook',
              'event'  => 'stream.ended',
              'stream_id'=> $sid,
              'duration' => time() - $row['start_at'],
              'ts'     => time(),
          ]);
          // 60 秒后从 Table 清除
          \Swoole\Timer::after(60_000, fn() => $this->c->streams->del($sid));
          return ['code'=>0];
      }

      public function onPlay(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          $sid = $d['stream'] ?? '';
          $param = $d['param'] ?? '';
          parse_str(ltrim($param, '?'), $q);

          // 签名校验
          $expected =
  md5(($q['stream_id']??$sid).'|'.($q['uid']??'').'|'.($q['exp']??'').'|'.$this->c->cfg['jwt_secret']);
          if (!hash_equals($expected, $q['sign'] ?? '')) return ['code'=>403];
          if ((int)($q['exp']??0) < time()) return ['code'=>403];

          // 在线观看数 +1
          $row = $this->c->streams->get($sid);
          if ($row) {
              $row['viewers']++;
              $this->c->streams->set($sid, $row);
          }
          return ['code'=>0];
      }

      public function onStop(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          $sid = $d['stream'] ?? '';
          $row = $this->c->streams->get($sid);
          if ($row && $row['viewers'] > 0) {
              $row['viewers']--;
              $this->c->streams->set($sid, $row);
          }
          return ['code'=>0];
      }

      // SRS HLS 切片生成回调:每生成一个 ts 通知一次
      public function onHls(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          // {file, seq_no, duration, m3u8, m3u8_url, stream, app}
          $this->c->withRedis(function($r) use ($d) {
              $r->xAdd('hls:segments', '*', [
                  'stream'  => $d['stream'] ?? '',
                  'file'    => $d['file'] ?? '',
                  'seq_no'  => (string)($d['seq_no'] ?? 0),
                  'duration'=> (string)($d['duration'] ?? 0),
              ], 100000);
          });
          return ['code'=>0];
      }

      private function dispatchTranscodeJob(string $sid, string $app, string $transcodeId): void
      {
          $sourceUrl = "rtmp://127.0.0.1:1935/$app/$sid";
          $job = [
              'transcode_id'=> $transcodeId,
              'stream_id'   => $sid,
              'source'      => $sourceUrl,
              'profiles'    => json_encode($this->defaultProfiles()),
              'output_base' => "live/$sid",     // 在对象存储里的路径前缀
              'created_at'  => (string)time(),
          ];
          $this->c->withRedis(fn($r) =>
              $r->xAdd($this->c->cfg['job_stream'], '*', $job, 100000)
          );
      }

      private function defaultProfiles(): array
      {
          // 多码率转码梯度
          return [
              ['name'=>'1080p','width'=>1920,'height'=>1080,'video_bitrate'=>'4500k','audio_bitrate'=>'128k','codec'=>'h
  264','fps'=>30],
              ['name'=>'720p', 'width'=>1280,'height'=>720,
  'video_bitrate'=>'2500k','audio_bitrate'=>'128k','codec'=>'h264','fps'=>30],
              ['name'=>'480p', 'width'=>854, 'height'=>480, 'video_bitrate'=>'1000k','audio_bitrate'=>'96k',
  'codec'=>'h264','fps'=>30],
              ['name'=>'360p', 'width'=>640, 'height'=>360, 'video_bitrate'=>'600k', 'audio_bitrate'=>'64k',
  'codec'=>'h264','fps'=>30],
          ];
      }
  }

  解释:
  - SRS 回调是 JSON over HTTP,响应 {code:0} = 允许,0 = 拒绝
  - 双重验签:JWT + DB 二次确认(防 JWT 过期但 DB 已撤销的情况)
  - on_unpublish 触发录制合成:这是直播 →点播的关键钩子
  - on_hls 拿到切片信息:可用于广告插入 / DRM 加密 / 异常监控(如某切片 size = 0)

  ---
  6. 转码任务派发器 + Worker API

  <?php
  // src/Live/JobDispatcher.php
  namespace App\Live;

  class JobDispatcher
  {
      public function __construct(private Container $c) {}

      public function run(): void
      {
          $stream = $this->c->cfg['job_stream'];
          $group  = 'dispatchers';
          // 创建消费组(已存在忽略)
          $this->c->withRedis(function($r) use ($stream, $group) {
              try { $r->xGroup('CREATE', $stream, $group, '$', true); } catch (\Throwable $e) {}
          });

          while (true) {
              try {
                  $msgs = $this->c->withRedis(fn($r) =>
                      $r->xReadGroup($group, 'dispatcher-'.getmypid(), [$stream => '>'], 10, 2000)
                  );
                  foreach (($msgs[$stream] ?? []) as $id => $job) {
                      \Swoole\Coroutine::create(fn() => $this->assignJob($id, $job));
                  }
              } catch (\Throwable $e) {
                  error_log("[dispatcher] ".$e->getMessage());
                  \Swoole\Coroutine::sleep(1);
              }
          }
      }

      private function assignJob(string $id, array $job): void
      {
          // 1. 找最闲的 healthy worker(min jobs / max_jobs)
          $best = null; $bestRatio = 999;
          foreach ($this->c->workers as $nodeId => $w) {
              if (!$w['healthy'] || $w['jobs'] >= $w['max_jobs']) continue;
              $ratio = $w['jobs'] / max(1, $w['max_jobs']);
              if ($ratio < $bestRatio) { $best = $nodeId; $bestRatio = $ratio; }
          }
          if (!$best) {
              error_log("[dispatcher] no available worker");
              // 把消息重新放回:不 ack
              return;
          }
          // 2. 推送任务到该 worker 的专属队列
          $this->c->withRedis(function($r) use ($best, $job) {
              $r->xAdd("worker:tasks:$best", '*', $job, 1000);
          });
          // 3. 占用一个槽
          $w = $this->c->workers->get($best);
          $w['jobs']++;
          $this->c->workers->set($best, $w);

          // 4. ACK 消费组消息
          $this->c->withRedis(fn($r) =>
              $r->xAck($this->c->cfg['job_stream'], 'dispatchers', [$id])
          );
          error_log("[dispatcher] job assigned: $best ←{$job['transcode_id']}");
      }
  }

  <?php
  // src/Live/WorkerAPI.php
  namespace App\Live;

  use Swoole\Http\Request;
  use Swoole\Http\Server;

  class WorkerAPI
  {
      public function __construct(private Container $c, private ?Server $server = null) {}

      public function heartbeat(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          $nodeId = $d['node_id'] ?? '';
          $w = $this->c->workers->get($nodeId);
          if (!$w) {
              // 自动注册新 worker
              $w = ['host'=>$d['host']??'','cpu_use'=>0,'jobs'=>0,
                    'max_jobs'=>(int)($d['max_jobs']??4),'healthy'=>1,'last_hb'=>time()];
          }
          $w['cpu_use'] = (int)($d['cpu_use'] ?? $w['cpu_use']);
          $w['jobs']    = (int)($d['running_jobs'] ?? $w['jobs']);
          $w['last_hb'] = time();
          $w['healthy'] = 1;
          $this->c->workers->set($nodeId, $w);
          return ['code'=>0];
      }

      public function progress(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          // 写 Redis,供前端查询
          $this->c->withRedis(fn($r) => $r->hMSet("live:progress:{$d['transcode_id']}", [
              'percent'  => (string)($d['percent'] ?? 0),
              'profile'  => $d['profile'] ?? '',
              'fps'      => (string)($d['fps'] ?? 0),
              'bitrate'  => (string)($d['bitrate'] ?? 0),
              'updated_at'=> (string)time(),
          ]));
          $this->c->withRedis(fn($r) => $r->expire("live:progress:{$d['transcode_id']}", 600));
          return ['code'=>0];
      }

      public function done(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          $nodeId = $d['node_id'] ?? '';
          // 释放 worker 槽
          $w = $this->c->workers->get($nodeId);
          if ($w && $w['jobs'] > 0) { $w['jobs']--; $this->c->workers->set($nodeId, $w); }
          // 通知业务
          $this->server?->task([
              'type'   => 'business_webhook',
              'event'  => 'transcode.done',
              'transcode_id'=> $d['transcode_id'] ?? '',
              'outputs'=> $d['outputs'] ?? [],
              'ts'     => time(),
          ]);
          return ['code'=>0];
      }
  }

  ---
  7. FFmpeg Worker(独立部署,跑在转码机)

  <?php
  // worker.php —部署在每台转码机
  declare(strict_types=1);
  require __DIR__ . '/vendor/autoload.php';

  use Swoole\Coroutine\Redis;
  use Swoole\Coroutine\Http\Client;

  \Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);

  $cfg = [
      'node_id'    => getenv('NODE_ID') ?: gethostname(),
      'host'       => getenv('HOST') ?: gethostbyname(gethostname()),
      'control'    => getenv('CONTROL') ?: 'http://127.0.0.1:9508',
      'redis'      => ['host'=>'127.0.0.1','port'=>6379],
      'max_jobs'   => (int)(getenv('MAX_JOBS') ?: 4),
      'storage_mount' => '/mnt/live',     // 本地挂载的对象存储 / 共享 FS
  ];

  \Swoole\Coroutine\run(function() use ($cfg) {
      // 心跳协程
      \Swoole\Coroutine::create(function() use ($cfg) {
          while (true) {
              try {
                  $u = parse_url($cfg['control']);
                  $cl = new Client($u['host'], $u['port'] ?? 80);
                  $cl->setHeaders(['Content-Type'=>'application/json']);
                  $cl->set(['timeout'=>2]);
                  $cl->post('/worker/heartbeat', json_encode([
                      'node_id'=>$cfg['node_id'],'host'=>$cfg['host'],
                      'max_jobs'=>$cfg['max_jobs'],
                      'cpu_use' => (int)(sys_getloadavg()[0]*100/swoole_cpu_num()),
                      'running_jobs' => $GLOBALS['running'] ?? 0,
                  ]));
                  $cl->close();
              } catch (\Throwable $e) {}
              \Swoole\Coroutine::sleep(5);
          }
      });

      // 取任务执行
      $r = new Redis(); $r->connect($cfg['redis']['host'], $cfg['redis']['port']);
      $stream = "worker:tasks:{$cfg['node_id']}";
      $GLOBALS['running'] = 0;

      while (true) {
          if ($GLOBALS['running'] >= $cfg['max_jobs']) {
              \Swoole\Coroutine::sleep(1); continue;
          }
          $msgs = $r->xRead([$stream => '$'], 1, 2000);
          if (!isset($msgs[$stream])) continue;
          foreach ($msgs[$stream] as $id => $job) {
              $GLOBALS['running']++;
              \Swoole\Coroutine::create(function() use ($job, $cfg) {
                  try { (new \LiveWorker\Transcoder($cfg))->run($job); }
                  catch (\Throwable $e) { error_log("[transcode] ".$e->getMessage()); }
                  finally { $GLOBALS['running']--; }
              });
          }
      }
  });

  // === 转码执行器(同文件,简化) ===
  namespace LiveWorker;

  use Swoole\Coroutine\Http\Client;

  class Transcoder
  {
      public function __construct(private array $cfg) {}

      public function run(array $job): void
      {
          $profiles = json_decode($job['profiles'], true);
          $outputBase = $job['output_base'];
          $transcodeId = $job['transcode_id'];
          $source = $job['source'];

          // 1. 用 ffmpeg 一次性出多码率 HLS
          $cmd = $this->buildFfmpegCmd($source, $profiles, $outputBase, $transcodeId);
          error_log("[ffmpeg] start: $cmd");

          // 2. popen + 实时读 stderr 抽取进度
          $proc = popen("$cmd 2>&1", 'r');
          while (!feof($proc)) {
              $line = fgets($proc);
              if (!$line) continue;
              // 解析 frame= speed= bitrate=
              if (preg_match('/frame=\s*(\d+).*fps=\s*(\d+).*bitrate=\s*([\d.]+)\w+\/s.*speed=\s*([\d.]+)x/', $line,
  $m)) {
                  $this->reportProgress($transcodeId, [
                      'fps'=>(int)$m[2],'bitrate'=>(int)$m[3],'speed'=>(float)$m[4],
                  ]);
              }
          }
          pclose($proc);

          // 3. 上报完成
          $this->reportDone($transcodeId, $profiles, $outputBase);
      }

      private function buildFfmpegCmd(string $src, array $profiles, string $outBase, string $jobId): string
      {
          // 经典 HLS 多码率命令(单进程产多 variant)
          $maps = ''; $outputs = '';
          foreach ($profiles as $i => $p) {
              $maps .= " -map 0:v:0 -map 0:a:0";
              $outputs .=
                  " -c:v:$i {$p['codec']} -b:v:$i {$p['video_bitrate']} -s:v:$i {$p['width']}x{$p['height']} -r:$i
  {$p['fps']}".
                  " -c:a:$i aac -b:a:$i {$p['audio_bitrate']}";
          }
          $varStreamMap = [];
          foreach ($profiles as $i => $p) $varStreamMap[] = "v:$i,a:$i,name:{$p['name']}";
          $vsm = implode(' ', $varStreamMap);
          $outDir = "{$this->cfg['storage_mount']}/$outBase";
          @mkdir($outDir, 0755, true);

          return "ffmpeg -y -i ".escapeshellarg($src)." $maps $outputs ".
                 "-f hls -hls_time 4 -hls_list_size 6 -hls_flags delete_segments+independent_segments ".
                 "-master_pl_name master.m3u8 ".
                 "-hls_segment_filename ".escapeshellarg("$outDir/%v_%03d.ts")." ".
                 "-var_stream_map ".escapeshellarg($vsm)." ".
                 escapeshellarg("$outDir/%v.m3u8");
      }

      private function reportProgress(string $jobId, array $info): void
      {
          // 限频:每 2 秒上报一次
          static $last = 0;
          if (time() - $last < 2) return; $last = time();
          $u = parse_url($this->cfg['control']);
          $c = new Client($u['host'], $u['port'] ?? 80);
          $c->setHeaders(['Content-Type'=>'application/json']);
          $c->set(['timeout'=>1]);
          $c->post('/worker/job/progress', json_encode(['transcode_id'=>$jobId] + $info));
          $c->close();
      }

      private function reportDone(string $jobId, array $profiles, string $outBase): void
      {
          $u = parse_url($this->cfg['control']);
          $c = new Client($u['host'], $u['port'] ?? 80);
          $c->setHeaders(['Content-Type'=>'application/json']);
          $c->post('/worker/job/done', json_encode([
              'node_id'    => $this->cfg['node_id'],
              'transcode_id'=> $jobId,
              'outputs'    => array_map(fn($p) => "$outBase/{$p['name']}.m3u8", $profiles),
          ]));
          $c->close();
      }
  }

  解释:
  - popen + 解析 stderr:从 ffmpeg 实时拿 fps/bitrate,像看进度条一样
  - -var_stream_map:一条 ffmpeg 命令同时出 4 个码率 + master.m3u8,比启 4 个进程省 70% CPU
  - -hls_flags delete_segments:旧切片自动清理,直播窗口可控

  ---
  8. HLS/DASH 调度器(广告插入 / DVR / 切流)

  <?php
  // src/Live/HLSScheduler.php
  namespace App\Live;

  use Swoole\Http\Request;
  use Swoole\Http\Response;

  class HLSScheduler
  {
      public function __construct(private Container $c) {}

      // 动态生成 master.m3u8(可注入广告、切换码率列表、地区不同)
      public function manifest(Request $req, Response $res): array
      {
          $sid    = $req->get['sid'] ?? '';
          $region = $req->get['region'] ?? 'CN';
          $row    = $this->c->streams->get($sid);
          if (!$row || $row['status'] !== 'live') {
              $res->status(404); $res->end(''); return null;
          }
          // 按地区返回不同清晰度组合(海外只给 720p+,带宽贵)
          $profiles = $region === 'CN'
              ? ['1080p'=>4500_000,'720p'=>2500_000,'480p'=>1000_000,'360p'=>600_000]
              : ['720p'=>2500_000,'480p'=>1000_000];

          $base = "{$this->c->cfg['hls_origin']}/live/$sid";
          $m = "#EXTM3U\n#EXT-X-VERSION:6\n";
          foreach ($profiles as $name => $bw) {
              [$w,$h] = match($name) {
                  '1080p'=>[1920,1080], '720p'=>[1280,720],
                  '480p'=>[854,480], '360p'=>[640,360], default=>[640,360]
              };
              $m .= "#EXT-X-STREAM-INF:BANDWIDTH=$bw,RESOLUTION={$w}x{$h},CODECS=\"avc1.640028,mp4a.40.2\"\n";
              $m .= "$base/$name.m3u8?sid=$sid\n";
          }
          $res->header('Content-Type','application/vnd.apple.mpegurl');
          $res->header('Cache-Control','no-cache, max-age=2');
          $res->end($m);
          return null;
      }

      // 广告插入触发(运营在后台点"插入30秒广告"调这个)
      public function keyframeTrigger(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          $sid = $d['stream_id'] ?? '';
          // 写入 Redis,边缘节点在生成 m3u8 时插入 SCTE-35
          $this->c->withRedis(fn($r) => $r->lPush("hls:ad:$sid", json_encode([
              'type'    => 'splice_insert',
              'duration'=> (int)($d['duration'] ?? 30),
              'at'      => time() + 5,
          ])));
          return ['code'=>0,'msg'=>'ad inserted in 5s'];
      }
  }

  <?php
  // src/Live/DASHScheduler.php
  namespace App\Live;

  use Swoole\Http\Request;
  use Swoole\Http\Response;

  class DASHScheduler
  {
      public function __construct(private Container $c) {}

      public function manifest(Request $req, Response $res): array
      {
          $sid = $req->get['sid'] ?? '';
          $row = $this->c->streams->get($sid);
          if (!$row) { $res->status(404); $res->end(''); return null; }

          $base = "{$this->c->cfg['hls_origin']}/live/$sid";
          // 简化版 MPD(实际生成由 ffmpeg --f dash 产出,这里做 CDN 转发 + 元数据)
          $mpd = '<?xml version="1.0"?>'
              . '<MPD xmlns="urn:mpeg:dash:schema:mpd:2011" type="dynamic" minimumUpdatePeriod="PT2S" '
              . 'availabilityStartTime="'.gmdate('Y-m-d\TH:i:s\Z',$row['start_at']).'" '
              . 'profiles="urn:mpeg:dash:profile:isoff-live:2011">'
              . '<Period start="PT0S"><BaseURL>'.htmlspecialchars($base).'/</BaseURL>'
              . '<AdaptationSet contentType="video">'
              . '<Representation id="1080p" bandwidth="4500000" width="1920" height="1080" codecs="avc1.640028"/>'
              . '<Representation id="720p"  bandwidth="2500000" width="1280" height="720"  codecs="avc1.640028"/>'
              . '</AdaptationSet></Period></MPD>';
          $res->header('Content-Type','application/dash+xml');
          $res->header('Cache-Control','no-cache, max-age=2');
          $res->end($mpd);
          return null;
      }
  }

  ---
  9. SRT 信令服务

  <?php
  // src/Live/SRTSignaling.php
  namespace App\Live;

  use Firebase\JWT\JWT;
  use Firebase\JWT\Key;
  use Swoole\Http\Request;

  class SRTSignaling
  {
      public function __construct(private Container $c) {}

      // SRS 5.x 收到 SRT 推流时调:解析 streamid 决定接收/拒绝
      public function connect(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          // streamid 格式:#!::r=app/stream_id,m=publish,token=xxx
          $streamid = $d['streamid'] ?? '';
          $params = $this->parseSrtStreamId($streamid);

          $resource = $params['r'] ?? '';
          $mode     = $params['m'] ?? 'request';   // publish / request
          $token    = $params['token'] ?? '';
          [$app, $sid] = explode('/', $resource, 2) + ['',''];

          // 验签
          $scope = $mode === 'publish' ? 'publish' : 'play';
          try {
              $payload = (array) JWT::decode($token, new Key($this->c->cfg['jwt_secret'], 'HS256'));
              if ($payload['sid'] !== $sid || ($payload['scope'] ?? '') !== $scope) {
                  return ['code'=>403,'reason'=>'scope mismatch'];
              }
          } catch (\Throwable $e) {
              return ['code'=>403,'reason'=>'bad token'];
          }

          // 选边缘节点(就近接入)
          $edge = $this->pickEdgeNode($d['client_ip'] ?? '');

          // 登记
          $this->c->withRedis(fn($r) => $r->hMSet("srt:session:$sid", [
              'app'=>$app,'mode'=>$mode,'edge'=>$edge,
              'client_ip'=>$d['client_ip']??'','connected_at'=>(string)time(),
          ]));

          return [
              'code'    => 0,
              'app'     => $app,
              'stream'  => $sid,
              'edge'    => $edge,
              'latency' => 120,        // 推荐 latency 120ms
          ];
      }

      public function disconnect(Request $req): array
      {
          $d = json_decode($req->rawContent(), true) ?? [];
          $sid = $d['stream'] ?? '';
          $this->c->withRedis(fn($r) => $r->del("srt:session:$sid"));
          return ['code'=>0];
      }

      private function parseSrtStreamId(string $s): array
      {
          // #!::k1=v1,k2=v2  →['k1'=>'v1','k2'=>'v2']
          $s = preg_replace('/^#!::/', '', $s);
          $out = [];
          foreach (explode(',', $s) as $kv) {
              [$k,$v] = explode('=', $kv, 2) + [null,null];
              if ($k) $out[$k] = $v;
          }
          return $out;
      }

      private function pickEdgeNode(string $clientIp): string
      {
          // 真实场景:GeoIP →距离最近的边缘 SRT 集群
          // 简化:轮询
          $nodes = ['srt-edge-cn-east-1','srt-edge-cn-south-1','srt-edge-sg-1'];
          return $nodes[crc32($clientIp) % count($nodes)];
      }
  }

  解释:
  - SRT streamid 协议 #!::k=v,k=v 是 Haivision 标准,SRS 5.x 已原生支持
  - mode = publish/request:推 vs 拉,鉴权 scope 必须匹配
  - edge 节点动态返回:实现就近接入 + 故障切换

  ---
  10. Recorder(录制合成)

  <?php
  // src/Live/Recorder.php
  namespace App\Live;

  class Recorder
  {
      public function __construct(private Container $c) {}

      public function finalize(array $data): void
      {
          $sid = $data['stream_id'];
          $tid = $data['transcode_id'];
          $duration = $data['duration'];

          // 1. 用 ffmpeg 把 HLS m3u8 合成 mp4
          $hlsPath = "/mnt/live/live/$sid/720p.m3u8";   // 用 720p 做录制
          if (!is_file($hlsPath)) {
              error_log("[record] no hls: $hlsPath");
              return;
          }
          $output = "/mnt/live/recordings/$sid.mp4";
          @mkdir(dirname($output), 0755, true);

          // ffmpeg 流复制(不重新编码,极快)
          $cmd = "ffmpeg -y -i ".escapeshellarg($hlsPath)." -c copy -bsf:a aac_adtstoasc ".escapeshellarg($output);
          exec("$cmd 2>&1", $out, $code);
          if ($code !== 0) {
              error_log("[record] ffmpeg fail: ".implode("\n",$out));
              return;
          }
          // 2. 落 DB
          $this->c->withMySQL(function($db) use ($sid, $output, $duration) {
              $stmt = $db->prepare(
                  "INSERT INTO recordings(stream_id,file_path,duration,created_at) VALUES(?,?,?,NOW())"
              );
              $stmt->execute([$sid, $output, $duration]);
          });
          // 3. 异步上传对象存储(可选)
          \Swoole\Coroutine::create(function() use ($output, $sid) {
              $this->c->storage->writeStream("recordings/$sid.mp4", fopen($output, 'rb'));
          });
          error_log("[record] done: $output");
      }
  }

  ---
  11. CallbackDispatcher(业务回调)

  <?php
  // src/Live/CallbackDispatcher.php
  namespace App\Live;

  use Swoole\Coroutine\Http\Client;

  class CallbackDispatcher
  {
      public function __construct(private Container $c) {}

      public function push(array $event): void
      {
          // 业务方在 DB 配置的 webhook URL
          $urls = $this->c->withMySQL(fn($db) =>
              $db->query("SELECT url,secret FROM webhooks WHERE event_type='".$event['event']."' AND enabled=1")
          );
          foreach ($urls as $cfg) {
              $body = json_encode($event, JSON_UNESCAPED_UNICODE);
              $sig  = hash_hmac('sha256', $body, $cfg['secret']);
              $u = parse_url($cfg['url']);
              try {
                  $c = new Client($u['host'], $u['port'] ?? ($u['scheme']==='https'?443:80),
                                  ($u['scheme']??'http')==='https');
                  $c->set(['timeout'=>3]);
                  $c->setHeaders([
                      'Content-Type'=>'application/json',
                      'X-Live-Signature'=>$sig,
                      'X-Live-Event'=>$event['event'],
                  ]);
                  $c->post($u['path'] ?? '/', $body);
                  $c->close();
              } catch (\Throwable $e) {
                  // 失败入重试队列
                  $this->c->withRedis(fn($r) =>
                      $r->lPush('webhook:retry', json_encode(['url'=>$cfg['url'],'event'=>$event,'attempts'=>1]))
                  );
              }
          }
      }
  }

  ---
  12. StreamMonitor + WorkerHealth

  <?php
  // src/Live/StreamMonitor.php
  namespace App\Live;

  class StreamMonitor
  {
      public function __construct(private Container $c) {}

      public function run(): void
      {
          while (true) {
              \Swoole\Coroutine::sleep(10);
              $now = time();
              // 查 30 秒没心跳的流(SRS on_hls 也算心跳)
              foreach ($this->c->streams as $sid => $row) {
                  if ($row['status'] !== 'live') continue;
                  $idle = $now - $row['start_at'];
                  if ($idle > 86400) {     // 极端兜底:超过 1 天直接断
                      $row['status'] = 'timeout';
                      $this->c->streams->set($sid, $row);
                  }
              }
              // 探测 SRS API 拉真实在线流(可选)
          }
      }
  }

  <?php
  // src/Live/WorkerHealth.php
  namespace App\Live;

  class WorkerHealth
  {
      public function __construct(private Container $c) {}
      public function run(): void
      {
          while (true) {
              \Swoole\Coroutine::sleep(5);
              $now = time();
              foreach ($this->c->workers as $id => $w) {
                  if ($now - $w['last_hb'] > 15) {
                      $w['healthy'] = 0;
                      $this->c->workers->set($id, $w);
                  }
              }
          }
      }
  }

  ---
  13. SRS 配置(关键!)

  # srs.conf 关键片段
  http_hooks {
      enabled         on;
      on_publish      http://control:9508/srs/on_publish;
      on_unpublish    http://control:9508/srs/on_unpublish;
      on_play         http://control:9508/srs/on_play;
      on_stop         http://control:9508/srs/on_stop;
      on_hls          http://control:9508/srs/on_hls;
  }
  vhost __defaultVhost__ {
      hls {
          enabled         on;
          hls_path        /data/hls;
          hls_fragment    4;
          hls_window      60;
          hls_cleanup     on;
      }
      srt {
          enabled         on;
          srt_to_rtmp     on;
      }
  }

  ---
  14. DDL

  CREATE TABLE stream_keys (
    stream_id VARCHAR(64) PRIMARY KEY,
    user_id INT NOT NULL,
    app VARCHAR(32),
    expires_at DATETIME,
    enabled TINYINT DEFAULT 1,
    created_at DATETIME
  );

  CREATE TABLE transcode_workers (
    node_id VARCHAR(64) PRIMARY KEY,
    host VARCHAR(64),
    max_jobs INT DEFAULT 4,
    enabled TINYINT DEFAULT 1
  );

  CREATE TABLE recordings (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    stream_id VARCHAR(64),
    file_path VARCHAR(256),
    duration INT,
    created_at DATETIME,
    KEY idx_stream(stream_id)
  );

  CREATE TABLE webhooks (
    id INT PRIMARY KEY AUTO_INCREMENT,
    event_type VARCHAR(32),
    url VARCHAR(512),
    secret VARCHAR(64),
    enabled TINYINT DEFAULT 1
  );

  ---
  四、完整调用示例

  # 1. 主播申请推流地址
  curl -X POST http://127.0.0.1:9508/api/streams/issue \
    -d '{"user_id":10086,"app":"live","duration":7200}'
  # {
  #   "code":0,
  #   "stream_id":"sid_xxx",
  #   "publish_url":"rtmp://srs.example.com/live/sid_xxx?token=eyJ...",
  #   "srt_url":"srt://srt.example.com:10080?streamid=#!::r=live/sid_xxx,m=publish,token=eyJ..."
  # }

  # 2. OBS 用 publish_url 推流 →SRS 自动回调 /srs/on_publish 验签 →通过则起转码

  # 3. 观众端拉播放地址
  curl "http://127.0.0.1:9508/api/play/url?stream_id=sid_xxx&user_id=2000&duration=3600"
  # {
  #   "code":0,
  #   "hls":"https://cdn.example.com/live/sid_xxx/master.m3u8?uid=2000&exp=...&sign=...",
  #   "dash":"https://cdn.example.com/live/sid_xxx/master.mpd?...",
  #   "srt_play":"srt://...?streamid=#!::r=live/sid_xxx,m=request,sign=..."
  # }

  # 4. 运营插入广告
  curl -X POST http://127.0.0.1:9508/hls/keyframe-trigger \
    -d '{"stream_id":"sid_xxx","duration":30}'

  # 5. 撤销推流(违规直播下播)
  curl -X POST "http://127.0.0.1:9508/api/streams/revoke?sid=sid_xxx"

  # 6. 看在线流 / worker
  curl http://127.0.0.1:9508/admin/streams
  curl http://127.0.0.1:9508/admin/workers

  ---
  五、性能参考

  ┌────────────────────────────┬─────────────────────────────────────┐
  │            组件            │              单机能力               │
  ├────────────────────────────┼─────────────────────────────────────┤
  │ 控制面 QPS(鉴权回调)2-5w                                │
  ├────────────────────────────┼─────────────────────────────────────┤
  │ 控制面 P99 延迟            │ < 10ms                              │
  ├────────────────────────────┼─────────────────────────────────────┤
  │ 同时在线流(单控制节点)     │ 几万(Swoole Table 支持百万)         │
  ├────────────────────────────┼─────────────────────────────────────┤
  │ ffmpeg worker 单机转码路数 │ 4-81080p(看 CPU,NVENC 可到 20+) │
  ├────────────────────────────┼─────────────────────────────────────┤
  │ HLS 切片延迟(端到端)6-10(hls_fragment=4)             │
  ├────────────────────────────┼─────────────────────────────────────┤
  │ SRT 端到端延迟             │ 120-300ms                           │
  ├────────────────────────────┼─────────────────────────────────────┤
  │ 录制合成 1 小时 1080p      │ < 2 分钟(流复制)                    │
  └────────────────────────────┴─────────────────────────────────────┘

  ---
  六、踩坑提示(直播血泪经验)

  ┌───────────────────────────────┬──────────────────────────────────────────────────────────┐
  │              坑               │                           解决                           │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ HLS 延迟 10 秒太高            │ 缩 hls_fragment 到 2s,或上 LL-HLS / WebRTC               │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ SRT 推流 streamid 解析错误    │ 严格按 #!::k=v,k=v 规范,SRS 5.0+ 才稳                    │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ 转码 worker 抢任务竞争        │ 改成"派发模式":控制面主动 push 给指定 worker(本方案已做) │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ ffmpeg 进程僵尸               │ pcntl_signal(SIGCHLD) 回收;或 systemd 兜底               │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ 录制丢首帧                    │ -bsf:a aac_adtstoasc + 等到第一个关键帧                  │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ 主播切码率把所有观众踢出      │ 多码率用 ABR(自适应码率),别只发一路                      │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ 切片小文件把对象存储打爆      │ 切片合并大文件 / 用 Tier3 冷存                           │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ webhook 失败业务侧不知道      │ 必须有重试 + 死信                                        │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ 鉴权 token 被盗               │ 时长 ≤2 小时,IP 绑定,revoke 立即生效                    │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ 高并发推流卡 MySQL            │ stream_keys 表加 Redis 缓存                              │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ 主播跨房间换密钥              │ 撤销旧密钥同步踢流                                       │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ 录制覆盖原视频                │ 文件名带 UUID,绝不复用                                   │
  ├───────────────────────────────┼──────────────────────────────────────────────────────────┤
  │ 直播 →点播没合成成功业务投诉 │ finalize 失败入死信队列 + 告警                           │
  └───────────────────────────────┴──────────────────────────────────────────────────────────┘

  ---
  七、安全清单

  ┌────────────────────────────┬───────────────────────────────────────────────┐
  │            风险            │                     防御                      │
  ├────────────────────────────┼───────────────────────────────────────────────┤
  │ 推流密钥泄漏               │ JWT 短时效 + IP 绑定 + 一次性使用(可选)       │
  ├────────────────────────────┼───────────────────────────────────────────────┤
  │ 盗播                       │ 播放 URL 签名 + Referer + 设备指纹 + 时长限制 │
  ├────────────────────────────┼───────────────────────────────────────────────┤
  │ 黄反内容                   │ 接 AI 鉴黄(每 N 秒抽帧调外部 API)             │
  ├────────────────────────────┼───────────────────────────────────────────────┤
  │ DDoS 推流                  │ 网关层限连 + SRS hooks 频率限制               │
  ├────────────────────────────┼───────────────────────────────────────────────┤
  │ 路径穿越(把流推到别人房间) │ stream_id 服务端生成,不接受客户端指定         │
  ├────────────────────────────┼───────────────────────────────────────────────┤
  │ Webhook 被伪造调用         │ HMAC 签名 + 时间戳防重放                      │
  ├────────────────────────────┼───────────────────────────────────────────────┤
  │ 录制文件被未授权下载       │ 对象存储私有 + 临时签名 URL                   │
  ├────────────────────────────┼───────────────────────────────────────────────┤
  │ 转码 worker 被命令注入     │ escapeshellarg 全包裹,本方案已做              │
  └────────────────────────────┴───────────────────────────────────────────────┘

  ---
  八、可扩展方向

  1. LL-HLS / WebRTC:把延迟压到 2-3 秒甚至 500ms
  2. AI 内容审核:抽帧调腾讯/阿里/自建黄反模型
  3. DRM:接 Widevine / FairPlay,付费内容必备
  4. SCTE-35 广告:精准插入广告标记,广告主投流必备
  5. 多 CDN 调度:按观众地区 / 实时质量切换 CDN
  6. 弹幕系统:WebSocket 长连接 + Redis Pub/Sub
  7. 连麦:WebRTC + SFU(mediasoup)
  8. NVENC 转码:NVIDIA 显卡硬编,单卡顶 5 台 CPU 机
  9. 多视角直播:同一时间多机位流,客户端切换
  10. 回看(Time-shift):HLS 加 DVR 窗口,观众随意拖动

  ---
  九、和 Twitch / 抖音直播架构对比

  ┌──────────┬───────────────┬────────────────────────────┐
  │   维度   │  Twitch/抖音  │           本方案           │
  ├──────────┼───────────────┼────────────────────────────┤
  │ 数据面   │ 自研 C++/Rust │ SRS / ffmpeg(开源)         │
  ├──────────┼───────────────┼────────────────────────────┤
  │ 控制面   │ Go / Java     │ PHP+Swoole(本方案)         │
  ├──────────┼───────────────┼────────────────────────────┤
  │ 规模     │ 千万级并发    │ 中型(单集群万级流)         │
  ├──────────┼───────────────┼────────────────────────────┤
  │ 业务定制 │ 重            │ 极轻(改 PHP 极快)          │
  ├──────────┼───────────────┼────────────────────────────┤
  │ 适合     │ 大厂 / 平台   │ 企业直播 / 教育 / 中小平台 │
  └──────────┴───────────────┴────────────────────────────┘

  ---
  十、最重要的认知

  PHP+Swoole 在直播领域的甜蜜区:
  - ✅ 控制面、鉴权、调度、回调 —这是 PHP 的主场
  - ✅ 业务集成层(直播 + 电商 + 礼物 + 弹幕)
  - ✅ 中后台管理 / 数据看板

  别用 PHP 做的事:
  - ❌ 自己实现 RTMP / SRT 协议 —SRS 已经做完了
  - ❌ 自己写 HLS 切片 —ffmpeg 已经做完了
  - ❌ 替代 ffmpeg 转码 —这是 C/CUDA 的活
  - ❌ 替代 CDN 边缘分发 —这是网络硬件的活

  正确姿势 = SRS(数据面)+ FFmpeg(计算)+ PHP/Swoole(大脑)+ CDN(分发)

更多推荐