php+最新的swoole RTMP 信令服务+HLS / DASH 切片调度+SRT 信令服务+直播鉴权与回调服务+视频转码任务调度
·
PHP 8.3 + Swoole 6.x 直播媒体控制面 · 完整实现
前置说明(很重要):
PHP+Swoole 在直播领域的最佳定位是控制面(Control Plane),不是数据面。
真正的 RTMP/SRT 协议解析、HLS 切片、转码计算交给 SRS / nginx-rtmp / ffmpeg;
PHP+Swoole 负责调度、鉴权、回调、任务编排、监控、CDN 联动 —— 这才是性价比最高的玩法。
┌─────────────────────────────────────┐
│ PHP+Swoole 控制面 (Control Plane) │
│ • 鉴权 / 推流密钥 │
│ • 转码任务调度 │
│ • HLS/DASH 调度 │
│ • SRT 信令 │
│ • 录制回调 │
│ • 监控告警 │
└──┬──────────────────────────┬───────┘
│ 调度 / 回调 │ 派工
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ SRS / nginx- │ pull │ FFmpeg Worker 集群│
│ rtmp / SRT │ ──────►│ (转码/切片) │
│ (数据面) │ └──────────────────┘
└──────┬───────┘ │
│ HLS/DASH │
▼ ▼
Origin S3 / OSS ← 存储
▼
CDN
▼
观众端
---
一、整体流程(大白话)
主播侧:
1. 申请推流密钥(stream_key)→PHP 控制面发 JWT
2. OBS 推流 rtmp://srs/live/{app}?key={token}
3. SRS 收到推流 →回调 PHP /auth/on_publish 验密钥
4. PHP 鉴权通过 + 起转码任务(派工 ffmpeg worker)
5. ffmpeg 转出多码率 HLS/DASH 切片 →推到对象存储
6. PHP 维护切片清单(m3u8/mpd 索引、广告插入)
7. 主播下播 →SRS 回调 /auth/on_unpublish →PHP 触发录制合成 + 通知业务
观众侧:
1. 拉播放地址 https://cdn/live/abc/master.m3u8?token=xxx
2. CDN 回源 Origin →命中切片直接返
3. PHP 鉴权:token + 时长 + 地域 + 设备(回源鉴权)
4. 心跳上报观看时长 / 卡顿率
SRT 模式:
1. SRT publisher 用 streamid=#!::r=live/abc,m=publish,p=xxx 推流
2. SRT 服务 (srt-live-transmit / SRS) 回调 PHP 验签
3. PHP 信令服务返回路由(用哪个边缘节点接)
4. 后续走 RTMP 一样的转码 / 录制流水线
---
二、最佳技术选型
┌─────────────┬─────────────────────────────┬──────────────────────────┐
│ 组件 │ 推荐 │ 在本方案中的角色 │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ RTMP 数据面 │ SRS 5.0 │ 推流接收、转推 RTMP/HLS │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ SRT 数据面 │ SRS 5.0 / haivision srt │ 低延迟传输 │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ 转码 │ FFmpeg 6.x │ 多码率 / DASH / 切片 │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ 控制面 │ PHP 8.3 + Swoole 6.x │ 鉴权、调度、信令、回调 │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ 任务队列 │ Redis Stream │ 转码任务派发 │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ 元数据 │ MySQL + Redis │ 流元信息、密钥、转码进度 │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ 配置 │ Swoole\Table │ 在线流、worker 状态 │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ 对象存储 │ MinIO / OSS / S3(Flysystem) │ HLS/DASH 切片 │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ Token │ firebase/php-jwt │ 推流/拉流鉴权 │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ 路由 │ nikic/fast-route │ HTTP 控制面 API │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ 告警 │ Prometheus + Alertmanager │ 流断/卡顿 │
├─────────────┼─────────────────────────────┼──────────────────────────┤
│ 日志 │ monolog/monolog │ 结构化 │
└─────────────┴─────────────────────────────┴──────────────────────────┘
composer require swoole/ide-helper firebase/php-jwt nikic/fast-route \
league/flysystem league/flysystem-aws-s3-v3 \
promphp/prometheus_client_php monolog/monolog \
symfony/expression-language
---
三、完整代码
1. 入口:控制面 HTTP 服务
<?php
// server.php
declare(strict_types=1);
require __DIR__ . '/vendor/autoload.php';
use Swoole\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
use App\Live\Container;
use App\Live\Router;
\Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
$c = Container::boot();
$server = new Server('0.0.0.0', 9508, SWOOLE_PROCESS);
$server->set([
'worker_num' => swoole_cpu_num() * 2,
'task_worker_num' => 8,
'task_enable_coroutine' => true,
'enable_coroutine' => true,
'max_request' => 100000,
'hook_flags' => SWOOLE_HOOK_ALL,
]);
$server->on('WorkerStart', function($s, $wid) use ($c) {
$c->initPools();
$c->loadConfig();
if ($wid === 0) {
\Swoole\Coroutine::create(fn() => $c->jobDispatcher()->run()); // 转码任务派发
\Swoole\Coroutine::create(fn() => $c->streamMonitor()->run()); // 流状态巡检
\Swoole\Coroutine::create(fn() => $c->workerHealth()->run()); // ffmpeg worker 心跳检查
}
});
$server->on('Request', function(Request $req, Response $res) use ($c, $server) {
(new Router($c, $server))->dispatch($req, $res);
});
// Task:异步业务回调推送 + 录制合成 trigger
$server->on('Task', function($s, $task) use ($c) {
$type = $task->data['type'] ?? '';
match($type) {
'business_webhook' => $c->callbackDispatcher()->push($task->data),
'finalize_recording' => $c->recorder()->finalize($task->data),
default => null,
};
});
$server->on('Finish', fn() => null);
$server->start();
---
2. Container:在线流 / Worker / 任务队列
<?php
// src/Live/Container.php
namespace App\Live;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine\Redis;
use Swoole\Coroutine\MySQL;
use Swoole\Table;
use League\Flysystem\Filesystem;
use League\Flysystem\AwsS3V3\AwsS3V3Adapter;
use Aws\S3\S3Client;
class Container
{
public Table $streams; // 在线流(stream_key →meta)
public Table $workers; // ffmpeg worker 池
public Channel $redisPool;
public Channel $mysqlPool;
public Filesystem $storage;
public array $cfg;
public static function boot(): self
{
$c = new self();
$c->streams = new Table(8192);
$c->streams->column('stream_id', Table::TYPE_STRING, 64);
$c->streams->column('app', Table::TYPE_STRING, 32);
$c->streams->column('publisher_ip', Table::TYPE_STRING, 64);
$c->streams->column('start_at', Table::TYPE_INT, 8);
$c->streams->column('codec_v', Table::TYPE_STRING, 16); // h264/h265
$c->streams->column('codec_a', Table::TYPE_STRING, 16);
$c->streams->column('bitrate', Table::TYPE_INT, 4);
$c->streams->column('width', Table::TYPE_INT, 4);
$c->streams->column('height', Table::TYPE_INT, 4);
$c->streams->column('fps', Table::TYPE_INT, 4);
$c->streams->column('viewers', Table::TYPE_INT, 4);
$c->streams->column('status', Table::TYPE_STRING, 16); // live/done/error
$c->streams->column('transcode_id', Table::TYPE_STRING, 64);
$c->streams->column('record_path', Table::TYPE_STRING, 256);
$c->streams->create();
$c->workers = new Table(256);
$c->workers->column('host', Table::TYPE_STRING, 64);
$c->workers->column('cpu_use', Table::TYPE_INT, 4);
$c->workers->column('jobs', Table::TYPE_INT, 4);
$c->workers->column('max_jobs', Table::TYPE_INT, 4);
$c->workers->column('healthy', Table::TYPE_INT, 1);
$c->workers->column('last_hb', Table::TYPE_INT, 8);
$c->workers->create();
// 对象存储(切片落这里)
$s3 = new S3Client(['version'=>'latest','region'=>'us-east-1',
'endpoint'=>'http://127.0.0.1:9000','use_path_style_endpoint'=>true,
'credentials'=>['key'=>'minio','secret'=>'minio123']]);
$c->storage = new Filesystem(new AwsS3V3Adapter($s3, 'live'));
$c->cfg = [
'jwt_secret' => getenv('JWT_SECRET') ?: 'change-me',
'srs_callback_secret' => 'srs-secret',
'rtmp_endpoint' => 'rtmp://srs.example.com/live',
'srt_endpoint' => 'srt://srt.example.com:10080',
'hls_origin' => 'https://origin.example.com',
'cdn_play_host' => 'https://cdn.example.com',
'job_stream' => 'live:transcode:jobs',
];
return $c;
}
public function initPools(): void
{
$this->redisPool = new Channel(64);
for ($i=0;$i<64;$i++) {
$r = new Redis(); $r->connect('127.0.0.1', 6379);
$this->redisPool->push($r);
}
$this->mysqlPool = new Channel(16);
for ($i=0;$i<16;$i++) {
$db = new MySQL();
$db->connect(['host'=>'127.0.0.1','user'=>'live','password'=>'live',
'database'=>'live','charset'=>'utf8mb4']);
$this->mysqlPool->push($db);
}
}
public function loadConfig(): void
{
// 从 DB 把 worker 节点信息载入内存
$this->withMySQL(function($db) {
$rows = $db->query("SELECT * FROM transcode_workers WHERE enabled=1");
foreach ($rows as $r) {
$this->workers->set($r['node_id'], [
'host'=>$r['host'],'cpu_use'=>0,'jobs'=>0,
'max_jobs'=>(int)$r['max_jobs'],'healthy'=>1,'last_hb'=>time(),
]);
}
});
}
public function withRedis(callable $fn) {
$r = $this->redisPool->pop();
try { return $fn($r); } finally { $this->redisPool->push($r); }
}
public function withMySQL(callable $fn) {
$db = $this->mysqlPool->pop();
try { return $fn($db); } finally { $this->mysqlPool->push($db); }
}
public function jobDispatcher(): JobDispatcher { return new JobDispatcher($this); }
public function streamMonitor(): StreamMonitor { return new StreamMonitor($this); }
public function workerHealth(): WorkerHealth { return new WorkerHealth($this); }
public function callbackDispatcher(): CallbackDispatcher { return new CallbackDispatcher($this); }
public function recorder(): Recorder { return new Recorder($this); }
}
---
3. Router:控制面所有 API
<?php
// src/Live/Router.php
namespace App\Live;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\Http\Server;
class Router
{
public function __construct(private Container $c, private Server $server) {}
public function dispatch(Request $req, Response $res): void
{
$res->header('Content-Type','application/json');
$uri = $req->server['request_uri'];
try {
$out = match($uri) {
// ===== 主播侧 =====
'/api/streams/issue' => (new StreamKeyIssuer($this->c))->issue($req),
'/api/streams/revoke' => (new StreamKeyIssuer($this->c))->revoke($req),
// ===== 观众侧鉴权 URL 签发 =====
'/api/play/url' => (new PlaybackSigner($this->c))->sign($req),
// ===== SRS / nginx-rtmp 回调 =====
'/srs/on_publish' => (new SRSCallback($this->c, $this->server))->onPublish($req),
'/srs/on_unpublish' => (new SRSCallback($this->c, $this->server))->onUnpublish($req),
'/srs/on_play' => (new SRSCallback($this->c, $this->server))->onPlay($req),
'/srs/on_stop' => (new SRSCallback($this->c, $this->server))->onStop($req),
'/srs/on_hls' => (new SRSCallback($this->c, $this->server))->onHls($req),
// ===== SRT 信令 =====
'/srt/connect' => (new SRTSignaling($this->c))->connect($req),
'/srt/disconnect' => (new SRTSignaling($this->c))->disconnect($req),
// ===== HLS / DASH 调度 =====
'/hls/manifest' => (new HLSScheduler($this->c))->manifest($req, $res),
'/hls/keyframe-trigger' => (new HLSScheduler($this->c))->keyframeTrigger($req),
'/dash/manifest' => (new DASHScheduler($this->c))->manifest($req, $res),
// ===== 转码 worker 心跳 / 上报 =====
'/worker/heartbeat' => (new WorkerAPI($this->c))->heartbeat($req),
'/worker/job/done' => (new WorkerAPI($this->c, $this->server))->done($req),
'/worker/job/progress' => (new WorkerAPI($this->c))->progress($req),
// ===== 管理面 =====
'/admin/streams' => $this->snapshot($this->c->streams),
'/admin/workers' => $this->snapshot($this->c->workers),
default => $this->return404($res),
};
if ($out !== null) $res->end(json_encode($out, JSON_UNESCAPED_UNICODE));
} catch (\Throwable $e) {
$res->status(500);
$res->end(json_encode(['code'=>500,'msg'=>$e->getMessage()]));
}
}
private function snapshot(\Swoole\Table $t): array
{
$out = []; foreach ($t as $k=>$v) $out[$k] = $v; return $out;
}
private function return404(Response $res): null
{
$res->status(404); $res->end('not found'); return null;
}
}
---
4. 推流密钥 + 播放鉴权
<?php
// src/Live/StreamKeyIssuer.php
namespace App\Live;
use Firebase\JWT\JWT;
use Swoole\Http\Request;
class StreamKeyIssuer
{
public function __construct(private Container $c) {}
public function issue(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
$userId = (int)($d['user_id'] ?? 0);
$app = $d['app'] ?? 'live';
$duration = (int)($d['duration'] ?? 7200);
$streamId = 'sid_' . bin2hex(random_bytes(8));
$token = JWT::encode([
'iss' => 'live-control',
'sub' => $userId,
'app' => $app,
'sid' => $streamId,
'iat' => time(),
'exp' => time() + $duration,
'scope'=> 'publish',
], $this->c->cfg['jwt_secret'], 'HS256');
// 写 DB 留档
$this->c->withMySQL(function($db) use ($streamId, $userId, $app, $duration) {
$stmt = $db->prepare(
"INSERT INTO stream_keys(stream_id,user_id,app,expires_at,enabled,created_at)
VALUES(?,?,?,FROM_UNIXTIME(?),1,NOW())"
);
$stmt->execute([$streamId, $userId, $app, time()+$duration]);
});
return [
'code' => 0,
'stream_id' => $streamId,
'publish_url'=> "{$this->c->cfg['rtmp_endpoint']}/$app/$streamId?token=$token",
'srt_url' => "{$this->c->cfg['srt_endpoint']}?streamid=#!::r=$app/$streamId,m=publish,token=$token",
'expires_at' => time() + $duration,
];
}
public function revoke(Request $req): array
{
$sid = $req->get['sid'] ?? '';
if (!$sid) return ['code'=>400];
$this->c->withMySQL(fn($db) =>
$db->query("UPDATE stream_keys SET enabled=0 WHERE stream_id='$sid'")
);
// 立即剔除在线流(踢人)
if ($this->c->streams->exist($sid)) {
$this->c->withRedis(fn($r) =>
$r->xAdd('srs:kick', '*', ['stream_id'=>$sid], 1000)
);
}
return ['code'=>0];
}
}
<?php
// src/Live/PlaybackSigner.php
namespace App\Live;
use Swoole\Http\Request;
class PlaybackSigner
{
public function __construct(private Container $c) {}
public function sign(Request $req): array
{
$d = $req->get ?: $req->post ?? [];
$sid = $d['stream_id'] ?? '';
$userId = (int)($d['user_id'] ?? 0);
$expire = time() + (int)($d['duration'] ?? 3600);
// 简单签名:防盗链(类 Nginx secure_link)
$sign = md5("$sid|$userId|$expire|".$this->c->cfg['jwt_secret']);
$host = $this->c->cfg['cdn_play_host'];
return [
'code' => 0,
'hls' => "$host/live/$sid/master.m3u8?uid=$userId&exp=$expire&sign=$sign",
'dash' => "$host/live/$sid/master.mpd?uid=$userId&exp=$expire&sign=$sign",
'flv' => "$host/live/$sid.flv?uid=$userId&exp=$expire&sign=$sign",
'srt_play'=> "{$this->c->cfg['srt_endpoint']}?streamid=#!::r=live/$sid,m=request,sign=$sign",
'expires_at' => $expire,
];
}
}
解释:
- 推流地址带 JWT:SRS 收到后回调 /srs/on_publish 验签
- 播放地址签名 + 过期时间:CDN 节点用相同算法验签(防盗链 + 防分享)
- revoke 立即踢人:写 Redis Stream 让 SRS Watcher 调 SRS API kick stream
---
5. SRS 回调:鉴权 + 转码触发 + 录制触发
<?php
// src/Live/SRSCallback.php
namespace App\Live;
use Swoole\Http\Request;
use Swoole\Http\Server;
use Firebase\JWT\JWT;
use Firebase\JWT\Key;
class SRSCallback
{
public function __construct(private Container $c, private ?Server $server = null) {}
// SRS 收到推流时回调,返回 0 = 允许,非 0 = 拒绝
public function onPublish(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
$app = $d['app'] ?? '';
$stream = $d['stream'] ?? ''; // 这就是 stream_id
$ip = $d['ip'] ?? $d['client_ip'] ?? '';
$param = $d['param'] ?? '';
parse_str(ltrim($param, '?'), $q);
$token = $q['token'] ?? '';
// 1. JWT 验签
try {
$payload = (array) JWT::decode($token, new Key($this->c->cfg['jwt_secret'], 'HS256'));
} catch (\Throwable $e) {
error_log("[on_publish] bad jwt: ".$e->getMessage());
return ['code'=>403]; // SRS 用非 0 表示拒绝
}
if ($payload['sid'] !== $stream || $payload['app'] !== $app
|| ($payload['scope'] ?? '') !== 'publish') {
return ['code'=>403];
}
// 2. DB 二次校验
$row = $this->c->withMySQL(function($db) use ($stream) {
$stmt = $db->prepare("SELECT * FROM stream_keys WHERE stream_id=? AND enabled=1");
return $stmt->execute([$stream])[0] ?? null;
});
if (!$row) return ['code'=>403];
// 3. 登记在线流
$transcodeId = 'tj_'.bin2hex(random_bytes(8));
$this->c->streams->set($stream, [
'stream_id' => $stream,
'app' => $app,
'publisher_ip'=> $ip,
'start_at' => time(),
'codec_v' => '', 'codec_a'=>'',
'bitrate' => 0, 'width'=>0, 'height'=>0, 'fps'=>0,
'viewers' => 0,
'status' => 'live',
'transcode_id'=> $transcodeId,
'record_path' => '',
]);
// 4. 派转码任务到队列
$this->dispatchTranscodeJob($stream, $app, $transcodeId);
// 5. 通知业务(异步)
$this->server?->task([
'type' => 'business_webhook',
'event' => 'stream.started',
'user_id'=> $row['user_id'],
'stream_id'=> $stream,
'ts' => time(),
]);
return ['code'=>0];
}
public function onUnpublish(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
$sid = $d['stream'] ?? '';
$row = $this->c->streams->get($sid);
if (!$row) return ['code'=>0];
$row['status'] = 'done';
$this->c->streams->set($sid, $row);
// 1. 通知 ffmpeg worker 收尾(停止转码进程)
$this->c->withRedis(fn($r) =>
$r->xAdd('worker:cancel', '*', ['transcode_id'=>$row['transcode_id']], 10000)
);
// 2. 触发录制合成(把所有 ts 拼成 mp4)
$this->server?->task([
'type' => 'finalize_recording',
'stream_id' => $sid,
'transcode_id'=> $row['transcode_id'],
'duration' => time() - $row['start_at'],
]);
// 3. 业务通知
$this->server?->task([
'type' => 'business_webhook',
'event' => 'stream.ended',
'stream_id'=> $sid,
'duration' => time() - $row['start_at'],
'ts' => time(),
]);
// 60 秒后从 Table 清除
\Swoole\Timer::after(60_000, fn() => $this->c->streams->del($sid));
return ['code'=>0];
}
public function onPlay(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
$sid = $d['stream'] ?? '';
$param = $d['param'] ?? '';
parse_str(ltrim($param, '?'), $q);
// 签名校验
$expected =
md5(($q['stream_id']??$sid).'|'.($q['uid']??'').'|'.($q['exp']??'').'|'.$this->c->cfg['jwt_secret']);
if (!hash_equals($expected, $q['sign'] ?? '')) return ['code'=>403];
if ((int)($q['exp']??0) < time()) return ['code'=>403];
// 在线观看数 +1
$row = $this->c->streams->get($sid);
if ($row) {
$row['viewers']++;
$this->c->streams->set($sid, $row);
}
return ['code'=>0];
}
public function onStop(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
$sid = $d['stream'] ?? '';
$row = $this->c->streams->get($sid);
if ($row && $row['viewers'] > 0) {
$row['viewers']--;
$this->c->streams->set($sid, $row);
}
return ['code'=>0];
}
// SRS HLS 切片生成回调:每生成一个 ts 通知一次
public function onHls(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
// {file, seq_no, duration, m3u8, m3u8_url, stream, app}
$this->c->withRedis(function($r) use ($d) {
$r->xAdd('hls:segments', '*', [
'stream' => $d['stream'] ?? '',
'file' => $d['file'] ?? '',
'seq_no' => (string)($d['seq_no'] ?? 0),
'duration'=> (string)($d['duration'] ?? 0),
], 100000);
});
return ['code'=>0];
}
private function dispatchTranscodeJob(string $sid, string $app, string $transcodeId): void
{
$sourceUrl = "rtmp://127.0.0.1:1935/$app/$sid";
$job = [
'transcode_id'=> $transcodeId,
'stream_id' => $sid,
'source' => $sourceUrl,
'profiles' => json_encode($this->defaultProfiles()),
'output_base' => "live/$sid", // 在对象存储里的路径前缀
'created_at' => (string)time(),
];
$this->c->withRedis(fn($r) =>
$r->xAdd($this->c->cfg['job_stream'], '*', $job, 100000)
);
}
private function defaultProfiles(): array
{
// 多码率转码梯度
return [
['name'=>'1080p','width'=>1920,'height'=>1080,'video_bitrate'=>'4500k','audio_bitrate'=>'128k','codec'=>'h
264','fps'=>30],
['name'=>'720p', 'width'=>1280,'height'=>720,
'video_bitrate'=>'2500k','audio_bitrate'=>'128k','codec'=>'h264','fps'=>30],
['name'=>'480p', 'width'=>854, 'height'=>480, 'video_bitrate'=>'1000k','audio_bitrate'=>'96k',
'codec'=>'h264','fps'=>30],
['name'=>'360p', 'width'=>640, 'height'=>360, 'video_bitrate'=>'600k', 'audio_bitrate'=>'64k',
'codec'=>'h264','fps'=>30],
];
}
}
解释:
- SRS 回调是 JSON over HTTP,响应 {code:0} = 允许,非 0 = 拒绝
- 双重验签:JWT + DB 二次确认(防 JWT 过期但 DB 已撤销的情况)
- on_unpublish 触发录制合成:这是直播 →点播的关键钩子
- on_hls 拿到切片信息:可用于广告插入 / DRM 加密 / 异常监控(如某切片 size = 0)
---
6. 转码任务派发器 + Worker API
<?php
// src/Live/JobDispatcher.php
namespace App\Live;
class JobDispatcher
{
public function __construct(private Container $c) {}
public function run(): void
{
$stream = $this->c->cfg['job_stream'];
$group = 'dispatchers';
// 创建消费组(已存在忽略)
$this->c->withRedis(function($r) use ($stream, $group) {
try { $r->xGroup('CREATE', $stream, $group, '$', true); } catch (\Throwable $e) {}
});
while (true) {
try {
$msgs = $this->c->withRedis(fn($r) =>
$r->xReadGroup($group, 'dispatcher-'.getmypid(), [$stream => '>'], 10, 2000)
);
foreach (($msgs[$stream] ?? []) as $id => $job) {
\Swoole\Coroutine::create(fn() => $this->assignJob($id, $job));
}
} catch (\Throwable $e) {
error_log("[dispatcher] ".$e->getMessage());
\Swoole\Coroutine::sleep(1);
}
}
}
private function assignJob(string $id, array $job): void
{
// 1. 找最闲的 healthy worker(min jobs / max_jobs)
$best = null; $bestRatio = 999;
foreach ($this->c->workers as $nodeId => $w) {
if (!$w['healthy'] || $w['jobs'] >= $w['max_jobs']) continue;
$ratio = $w['jobs'] / max(1, $w['max_jobs']);
if ($ratio < $bestRatio) { $best = $nodeId; $bestRatio = $ratio; }
}
if (!$best) {
error_log("[dispatcher] no available worker");
// 把消息重新放回:不 ack
return;
}
// 2. 推送任务到该 worker 的专属队列
$this->c->withRedis(function($r) use ($best, $job) {
$r->xAdd("worker:tasks:$best", '*', $job, 1000);
});
// 3. 占用一个槽
$w = $this->c->workers->get($best);
$w['jobs']++;
$this->c->workers->set($best, $w);
// 4. ACK 消费组消息
$this->c->withRedis(fn($r) =>
$r->xAck($this->c->cfg['job_stream'], 'dispatchers', [$id])
);
error_log("[dispatcher] job assigned: $best ←{$job['transcode_id']}");
}
}
<?php
// src/Live/WorkerAPI.php
namespace App\Live;
use Swoole\Http\Request;
use Swoole\Http\Server;
class WorkerAPI
{
public function __construct(private Container $c, private ?Server $server = null) {}
public function heartbeat(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
$nodeId = $d['node_id'] ?? '';
$w = $this->c->workers->get($nodeId);
if (!$w) {
// 自动注册新 worker
$w = ['host'=>$d['host']??'','cpu_use'=>0,'jobs'=>0,
'max_jobs'=>(int)($d['max_jobs']??4),'healthy'=>1,'last_hb'=>time()];
}
$w['cpu_use'] = (int)($d['cpu_use'] ?? $w['cpu_use']);
$w['jobs'] = (int)($d['running_jobs'] ?? $w['jobs']);
$w['last_hb'] = time();
$w['healthy'] = 1;
$this->c->workers->set($nodeId, $w);
return ['code'=>0];
}
public function progress(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
// 写 Redis,供前端查询
$this->c->withRedis(fn($r) => $r->hMSet("live:progress:{$d['transcode_id']}", [
'percent' => (string)($d['percent'] ?? 0),
'profile' => $d['profile'] ?? '',
'fps' => (string)($d['fps'] ?? 0),
'bitrate' => (string)($d['bitrate'] ?? 0),
'updated_at'=> (string)time(),
]));
$this->c->withRedis(fn($r) => $r->expire("live:progress:{$d['transcode_id']}", 600));
return ['code'=>0];
}
public function done(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
$nodeId = $d['node_id'] ?? '';
// 释放 worker 槽
$w = $this->c->workers->get($nodeId);
if ($w && $w['jobs'] > 0) { $w['jobs']--; $this->c->workers->set($nodeId, $w); }
// 通知业务
$this->server?->task([
'type' => 'business_webhook',
'event' => 'transcode.done',
'transcode_id'=> $d['transcode_id'] ?? '',
'outputs'=> $d['outputs'] ?? [],
'ts' => time(),
]);
return ['code'=>0];
}
}
---
7. FFmpeg Worker(独立部署,跑在转码机)
<?php
// worker.php —部署在每台转码机
declare(strict_types=1);
require __DIR__ . '/vendor/autoload.php';
use Swoole\Coroutine\Redis;
use Swoole\Coroutine\Http\Client;
\Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
$cfg = [
'node_id' => getenv('NODE_ID') ?: gethostname(),
'host' => getenv('HOST') ?: gethostbyname(gethostname()),
'control' => getenv('CONTROL') ?: 'http://127.0.0.1:9508',
'redis' => ['host'=>'127.0.0.1','port'=>6379],
'max_jobs' => (int)(getenv('MAX_JOBS') ?: 4),
'storage_mount' => '/mnt/live', // 本地挂载的对象存储 / 共享 FS
];
\Swoole\Coroutine\run(function() use ($cfg) {
// 心跳协程
\Swoole\Coroutine::create(function() use ($cfg) {
while (true) {
try {
$u = parse_url($cfg['control']);
$cl = new Client($u['host'], $u['port'] ?? 80);
$cl->setHeaders(['Content-Type'=>'application/json']);
$cl->set(['timeout'=>2]);
$cl->post('/worker/heartbeat', json_encode([
'node_id'=>$cfg['node_id'],'host'=>$cfg['host'],
'max_jobs'=>$cfg['max_jobs'],
'cpu_use' => (int)(sys_getloadavg()[0]*100/swoole_cpu_num()),
'running_jobs' => $GLOBALS['running'] ?? 0,
]));
$cl->close();
} catch (\Throwable $e) {}
\Swoole\Coroutine::sleep(5);
}
});
// 取任务执行
$r = new Redis(); $r->connect($cfg['redis']['host'], $cfg['redis']['port']);
$stream = "worker:tasks:{$cfg['node_id']}";
$GLOBALS['running'] = 0;
while (true) {
if ($GLOBALS['running'] >= $cfg['max_jobs']) {
\Swoole\Coroutine::sleep(1); continue;
}
$msgs = $r->xRead([$stream => '$'], 1, 2000);
if (!isset($msgs[$stream])) continue;
foreach ($msgs[$stream] as $id => $job) {
$GLOBALS['running']++;
\Swoole\Coroutine::create(function() use ($job, $cfg) {
try { (new \LiveWorker\Transcoder($cfg))->run($job); }
catch (\Throwable $e) { error_log("[transcode] ".$e->getMessage()); }
finally { $GLOBALS['running']--; }
});
}
}
});
// === 转码执行器(同文件,简化) ===
namespace LiveWorker;
use Swoole\Coroutine\Http\Client;
class Transcoder
{
public function __construct(private array $cfg) {}
public function run(array $job): void
{
$profiles = json_decode($job['profiles'], true);
$outputBase = $job['output_base'];
$transcodeId = $job['transcode_id'];
$source = $job['source'];
// 1. 用 ffmpeg 一次性出多码率 HLS
$cmd = $this->buildFfmpegCmd($source, $profiles, $outputBase, $transcodeId);
error_log("[ffmpeg] start: $cmd");
// 2. popen + 实时读 stderr 抽取进度
$proc = popen("$cmd 2>&1", 'r');
while (!feof($proc)) {
$line = fgets($proc);
if (!$line) continue;
// 解析 frame= speed= bitrate=
if (preg_match('/frame=\s*(\d+).*fps=\s*(\d+).*bitrate=\s*([\d.]+)\w+\/s.*speed=\s*([\d.]+)x/', $line,
$m)) {
$this->reportProgress($transcodeId, [
'fps'=>(int)$m[2],'bitrate'=>(int)$m[3],'speed'=>(float)$m[4],
]);
}
}
pclose($proc);
// 3. 上报完成
$this->reportDone($transcodeId, $profiles, $outputBase);
}
private function buildFfmpegCmd(string $src, array $profiles, string $outBase, string $jobId): string
{
// 经典 HLS 多码率命令(单进程产多 variant)
$maps = ''; $outputs = '';
foreach ($profiles as $i => $p) {
$maps .= " -map 0:v:0 -map 0:a:0";
$outputs .=
" -c:v:$i {$p['codec']} -b:v:$i {$p['video_bitrate']} -s:v:$i {$p['width']}x{$p['height']} -r:$i
{$p['fps']}".
" -c:a:$i aac -b:a:$i {$p['audio_bitrate']}";
}
$varStreamMap = [];
foreach ($profiles as $i => $p) $varStreamMap[] = "v:$i,a:$i,name:{$p['name']}";
$vsm = implode(' ', $varStreamMap);
$outDir = "{$this->cfg['storage_mount']}/$outBase";
@mkdir($outDir, 0755, true);
return "ffmpeg -y -i ".escapeshellarg($src)." $maps $outputs ".
"-f hls -hls_time 4 -hls_list_size 6 -hls_flags delete_segments+independent_segments ".
"-master_pl_name master.m3u8 ".
"-hls_segment_filename ".escapeshellarg("$outDir/%v_%03d.ts")." ".
"-var_stream_map ".escapeshellarg($vsm)." ".
escapeshellarg("$outDir/%v.m3u8");
}
private function reportProgress(string $jobId, array $info): void
{
// 限频:每 2 秒上报一次
static $last = 0;
if (time() - $last < 2) return; $last = time();
$u = parse_url($this->cfg['control']);
$c = new Client($u['host'], $u['port'] ?? 80);
$c->setHeaders(['Content-Type'=>'application/json']);
$c->set(['timeout'=>1]);
$c->post('/worker/job/progress', json_encode(['transcode_id'=>$jobId] + $info));
$c->close();
}
private function reportDone(string $jobId, array $profiles, string $outBase): void
{
$u = parse_url($this->cfg['control']);
$c = new Client($u['host'], $u['port'] ?? 80);
$c->setHeaders(['Content-Type'=>'application/json']);
$c->post('/worker/job/done', json_encode([
'node_id' => $this->cfg['node_id'],
'transcode_id'=> $jobId,
'outputs' => array_map(fn($p) => "$outBase/{$p['name']}.m3u8", $profiles),
]));
$c->close();
}
}
解释:
- popen + 解析 stderr:从 ffmpeg 实时拿 fps/bitrate,像看进度条一样
- -var_stream_map:一条 ffmpeg 命令同时出 4 个码率 + master.m3u8,比启 4 个进程省 70% CPU
- -hls_flags delete_segments:旧切片自动清理,直播窗口可控
---
8. HLS/DASH 调度器(广告插入 / DVR / 切流)
<?php
// src/Live/HLSScheduler.php
namespace App\Live;
use Swoole\Http\Request;
use Swoole\Http\Response;
class HLSScheduler
{
public function __construct(private Container $c) {}
// 动态生成 master.m3u8(可注入广告、切换码率列表、地区不同)
public function manifest(Request $req, Response $res): array
{
$sid = $req->get['sid'] ?? '';
$region = $req->get['region'] ?? 'CN';
$row = $this->c->streams->get($sid);
if (!$row || $row['status'] !== 'live') {
$res->status(404); $res->end(''); return null;
}
// 按地区返回不同清晰度组合(海外只给 720p+,带宽贵)
$profiles = $region === 'CN'
? ['1080p'=>4500_000,'720p'=>2500_000,'480p'=>1000_000,'360p'=>600_000]
: ['720p'=>2500_000,'480p'=>1000_000];
$base = "{$this->c->cfg['hls_origin']}/live/$sid";
$m = "#EXTM3U\n#EXT-X-VERSION:6\n";
foreach ($profiles as $name => $bw) {
[$w,$h] = match($name) {
'1080p'=>[1920,1080], '720p'=>[1280,720],
'480p'=>[854,480], '360p'=>[640,360], default=>[640,360]
};
$m .= "#EXT-X-STREAM-INF:BANDWIDTH=$bw,RESOLUTION={$w}x{$h},CODECS=\"avc1.640028,mp4a.40.2\"\n";
$m .= "$base/$name.m3u8?sid=$sid\n";
}
$res->header('Content-Type','application/vnd.apple.mpegurl');
$res->header('Cache-Control','no-cache, max-age=2');
$res->end($m);
return null;
}
// 广告插入触发(运营在后台点"插入30秒广告"调这个)
public function keyframeTrigger(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
$sid = $d['stream_id'] ?? '';
// 写入 Redis,边缘节点在生成 m3u8 时插入 SCTE-35
$this->c->withRedis(fn($r) => $r->lPush("hls:ad:$sid", json_encode([
'type' => 'splice_insert',
'duration'=> (int)($d['duration'] ?? 30),
'at' => time() + 5,
])));
return ['code'=>0,'msg'=>'ad inserted in 5s'];
}
}
<?php
// src/Live/DASHScheduler.php
namespace App\Live;
use Swoole\Http\Request;
use Swoole\Http\Response;
class DASHScheduler
{
public function __construct(private Container $c) {}
public function manifest(Request $req, Response $res): array
{
$sid = $req->get['sid'] ?? '';
$row = $this->c->streams->get($sid);
if (!$row) { $res->status(404); $res->end(''); return null; }
$base = "{$this->c->cfg['hls_origin']}/live/$sid";
// 简化版 MPD(实际生成由 ffmpeg --f dash 产出,这里做 CDN 转发 + 元数据)
$mpd = '<?xml version="1.0"?>'
. '<MPD xmlns="urn:mpeg:dash:schema:mpd:2011" type="dynamic" minimumUpdatePeriod="PT2S" '
. 'availabilityStartTime="'.gmdate('Y-m-d\TH:i:s\Z',$row['start_at']).'" '
. 'profiles="urn:mpeg:dash:profile:isoff-live:2011">'
. '<Period start="PT0S"><BaseURL>'.htmlspecialchars($base).'/</BaseURL>'
. '<AdaptationSet contentType="video">'
. '<Representation id="1080p" bandwidth="4500000" width="1920" height="1080" codecs="avc1.640028"/>'
. '<Representation id="720p" bandwidth="2500000" width="1280" height="720" codecs="avc1.640028"/>'
. '</AdaptationSet></Period></MPD>';
$res->header('Content-Type','application/dash+xml');
$res->header('Cache-Control','no-cache, max-age=2');
$res->end($mpd);
return null;
}
}
---
9. SRT 信令服务
<?php
// src/Live/SRTSignaling.php
namespace App\Live;
use Firebase\JWT\JWT;
use Firebase\JWT\Key;
use Swoole\Http\Request;
class SRTSignaling
{
public function __construct(private Container $c) {}
// SRS 5.x 收到 SRT 推流时调:解析 streamid 决定接收/拒绝
public function connect(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
// streamid 格式:#!::r=app/stream_id,m=publish,token=xxx
$streamid = $d['streamid'] ?? '';
$params = $this->parseSrtStreamId($streamid);
$resource = $params['r'] ?? '';
$mode = $params['m'] ?? 'request'; // publish / request
$token = $params['token'] ?? '';
[$app, $sid] = explode('/', $resource, 2) + ['',''];
// 验签
$scope = $mode === 'publish' ? 'publish' : 'play';
try {
$payload = (array) JWT::decode($token, new Key($this->c->cfg['jwt_secret'], 'HS256'));
if ($payload['sid'] !== $sid || ($payload['scope'] ?? '') !== $scope) {
return ['code'=>403,'reason'=>'scope mismatch'];
}
} catch (\Throwable $e) {
return ['code'=>403,'reason'=>'bad token'];
}
// 选边缘节点(就近接入)
$edge = $this->pickEdgeNode($d['client_ip'] ?? '');
// 登记
$this->c->withRedis(fn($r) => $r->hMSet("srt:session:$sid", [
'app'=>$app,'mode'=>$mode,'edge'=>$edge,
'client_ip'=>$d['client_ip']??'','connected_at'=>(string)time(),
]));
return [
'code' => 0,
'app' => $app,
'stream' => $sid,
'edge' => $edge,
'latency' => 120, // 推荐 latency 120ms
];
}
public function disconnect(Request $req): array
{
$d = json_decode($req->rawContent(), true) ?? [];
$sid = $d['stream'] ?? '';
$this->c->withRedis(fn($r) => $r->del("srt:session:$sid"));
return ['code'=>0];
}
private function parseSrtStreamId(string $s): array
{
// #!::k1=v1,k2=v2 →['k1'=>'v1','k2'=>'v2']
$s = preg_replace('/^#!::/', '', $s);
$out = [];
foreach (explode(',', $s) as $kv) {
[$k,$v] = explode('=', $kv, 2) + [null,null];
if ($k) $out[$k] = $v;
}
return $out;
}
private function pickEdgeNode(string $clientIp): string
{
// 真实场景:GeoIP →距离最近的边缘 SRT 集群
// 简化:轮询
$nodes = ['srt-edge-cn-east-1','srt-edge-cn-south-1','srt-edge-sg-1'];
return $nodes[crc32($clientIp) % count($nodes)];
}
}
解释:
- SRT streamid 协议 #!::k=v,k=v 是 Haivision 标准,SRS 5.x 已原生支持
- mode = publish/request:推 vs 拉,鉴权 scope 必须匹配
- edge 节点动态返回:实现就近接入 + 故障切换
---
10. Recorder(录制合成)
<?php
// src/Live/Recorder.php
namespace App\Live;
class Recorder
{
public function __construct(private Container $c) {}
public function finalize(array $data): void
{
$sid = $data['stream_id'];
$tid = $data['transcode_id'];
$duration = $data['duration'];
// 1. 用 ffmpeg 把 HLS m3u8 合成 mp4
$hlsPath = "/mnt/live/live/$sid/720p.m3u8"; // 用 720p 做录制
if (!is_file($hlsPath)) {
error_log("[record] no hls: $hlsPath");
return;
}
$output = "/mnt/live/recordings/$sid.mp4";
@mkdir(dirname($output), 0755, true);
// ffmpeg 流复制(不重新编码,极快)
$cmd = "ffmpeg -y -i ".escapeshellarg($hlsPath)." -c copy -bsf:a aac_adtstoasc ".escapeshellarg($output);
exec("$cmd 2>&1", $out, $code);
if ($code !== 0) {
error_log("[record] ffmpeg fail: ".implode("\n",$out));
return;
}
// 2. 落 DB
$this->c->withMySQL(function($db) use ($sid, $output, $duration) {
$stmt = $db->prepare(
"INSERT INTO recordings(stream_id,file_path,duration,created_at) VALUES(?,?,?,NOW())"
);
$stmt->execute([$sid, $output, $duration]);
});
// 3. 异步上传对象存储(可选)
\Swoole\Coroutine::create(function() use ($output, $sid) {
$this->c->storage->writeStream("recordings/$sid.mp4", fopen($output, 'rb'));
});
error_log("[record] done: $output");
}
}
---
11. CallbackDispatcher(业务回调)
<?php
// src/Live/CallbackDispatcher.php
namespace App\Live;
use Swoole\Coroutine\Http\Client;
class CallbackDispatcher
{
public function __construct(private Container $c) {}
public function push(array $event): void
{
// 业务方在 DB 配置的 webhook URL
$urls = $this->c->withMySQL(fn($db) =>
$db->query("SELECT url,secret FROM webhooks WHERE event_type='".$event['event']."' AND enabled=1")
);
foreach ($urls as $cfg) {
$body = json_encode($event, JSON_UNESCAPED_UNICODE);
$sig = hash_hmac('sha256', $body, $cfg['secret']);
$u = parse_url($cfg['url']);
try {
$c = new Client($u['host'], $u['port'] ?? ($u['scheme']==='https'?443:80),
($u['scheme']??'http')==='https');
$c->set(['timeout'=>3]);
$c->setHeaders([
'Content-Type'=>'application/json',
'X-Live-Signature'=>$sig,
'X-Live-Event'=>$event['event'],
]);
$c->post($u['path'] ?? '/', $body);
$c->close();
} catch (\Throwable $e) {
// 失败入重试队列
$this->c->withRedis(fn($r) =>
$r->lPush('webhook:retry', json_encode(['url'=>$cfg['url'],'event'=>$event,'attempts'=>1]))
);
}
}
}
}
---
12. StreamMonitor + WorkerHealth
<?php
// src/Live/StreamMonitor.php
namespace App\Live;
class StreamMonitor
{
public function __construct(private Container $c) {}
public function run(): void
{
while (true) {
\Swoole\Coroutine::sleep(10);
$now = time();
// 查 30 秒没心跳的流(SRS on_hls 也算心跳)
foreach ($this->c->streams as $sid => $row) {
if ($row['status'] !== 'live') continue;
$idle = $now - $row['start_at'];
if ($idle > 86400) { // 极端兜底:超过 1 天直接断
$row['status'] = 'timeout';
$this->c->streams->set($sid, $row);
}
}
// 探测 SRS API 拉真实在线流(可选)
}
}
}
<?php
// src/Live/WorkerHealth.php
namespace App\Live;
class WorkerHealth
{
public function __construct(private Container $c) {}
public function run(): void
{
while (true) {
\Swoole\Coroutine::sleep(5);
$now = time();
foreach ($this->c->workers as $id => $w) {
if ($now - $w['last_hb'] > 15) {
$w['healthy'] = 0;
$this->c->workers->set($id, $w);
}
}
}
}
}
---
13. SRS 配置(关键!)
# srs.conf 关键片段
http_hooks {
enabled on;
on_publish http://control:9508/srs/on_publish;
on_unpublish http://control:9508/srs/on_unpublish;
on_play http://control:9508/srs/on_play;
on_stop http://control:9508/srs/on_stop;
on_hls http://control:9508/srs/on_hls;
}
vhost __defaultVhost__ {
hls {
enabled on;
hls_path /data/hls;
hls_fragment 4;
hls_window 60;
hls_cleanup on;
}
srt {
enabled on;
srt_to_rtmp on;
}
}
---
14. DDL
CREATE TABLE stream_keys (
stream_id VARCHAR(64) PRIMARY KEY,
user_id INT NOT NULL,
app VARCHAR(32),
expires_at DATETIME,
enabled TINYINT DEFAULT 1,
created_at DATETIME
);
CREATE TABLE transcode_workers (
node_id VARCHAR(64) PRIMARY KEY,
host VARCHAR(64),
max_jobs INT DEFAULT 4,
enabled TINYINT DEFAULT 1
);
CREATE TABLE recordings (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
stream_id VARCHAR(64),
file_path VARCHAR(256),
duration INT,
created_at DATETIME,
KEY idx_stream(stream_id)
);
CREATE TABLE webhooks (
id INT PRIMARY KEY AUTO_INCREMENT,
event_type VARCHAR(32),
url VARCHAR(512),
secret VARCHAR(64),
enabled TINYINT DEFAULT 1
);
---
四、完整调用示例
# 1. 主播申请推流地址
curl -X POST http://127.0.0.1:9508/api/streams/issue \
-d '{"user_id":10086,"app":"live","duration":7200}'
# {
# "code":0,
# "stream_id":"sid_xxx",
# "publish_url":"rtmp://srs.example.com/live/sid_xxx?token=eyJ...",
# "srt_url":"srt://srt.example.com:10080?streamid=#!::r=live/sid_xxx,m=publish,token=eyJ..."
# }
# 2. OBS 用 publish_url 推流 →SRS 自动回调 /srs/on_publish 验签 →通过则起转码
# 3. 观众端拉播放地址
curl "http://127.0.0.1:9508/api/play/url?stream_id=sid_xxx&user_id=2000&duration=3600"
# {
# "code":0,
# "hls":"https://cdn.example.com/live/sid_xxx/master.m3u8?uid=2000&exp=...&sign=...",
# "dash":"https://cdn.example.com/live/sid_xxx/master.mpd?...",
# "srt_play":"srt://...?streamid=#!::r=live/sid_xxx,m=request,sign=..."
# }
# 4. 运营插入广告
curl -X POST http://127.0.0.1:9508/hls/keyframe-trigger \
-d '{"stream_id":"sid_xxx","duration":30}'
# 5. 撤销推流(违规直播下播)
curl -X POST "http://127.0.0.1:9508/api/streams/revoke?sid=sid_xxx"
# 6. 看在线流 / worker
curl http://127.0.0.1:9508/admin/streams
curl http://127.0.0.1:9508/admin/workers
---
五、性能参考
┌────────────────────────────┬─────────────────────────────────────┐
│ 组件 │ 单机能力 │
├────────────────────────────┼─────────────────────────────────────┤
│ 控制面 QPS(鉴权回调) │ 2-5w │
├────────────────────────────┼─────────────────────────────────────┤
│ 控制面 P99 延迟 │ < 10ms │
├────────────────────────────┼─────────────────────────────────────┤
│ 同时在线流(单控制节点) │ 几万(Swoole Table 支持百万) │
├────────────────────────────┼─────────────────────────────────────┤
│ ffmpeg worker 单机转码路数 │ 4-8 路 1080p(看 CPU,NVENC 可到 20+) │
├────────────────────────────┼─────────────────────────────────────┤
│ HLS 切片延迟(端到端) │ 6-10 秒(hls_fragment=4) │
├────────────────────────────┼─────────────────────────────────────┤
│ SRT 端到端延迟 │ 120-300ms │
├────────────────────────────┼─────────────────────────────────────┤
│ 录制合成 1 小时 1080p │ < 2 分钟(流复制) │
└────────────────────────────┴─────────────────────────────────────┘
---
六、踩坑提示(直播血泪经验)
┌───────────────────────────────┬──────────────────────────────────────────────────────────┐
│ 坑 │ 解决 │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ HLS 延迟 10 秒太高 │ 缩 hls_fragment 到 2s,或上 LL-HLS / WebRTC │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ SRT 推流 streamid 解析错误 │ 严格按 #!::k=v,k=v 规范,SRS 5.0+ 才稳 │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ 转码 worker 抢任务竞争 │ 改成"派发模式":控制面主动 push 给指定 worker(本方案已做) │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ ffmpeg 进程僵尸 │ pcntl_signal(SIGCHLD) 回收;或 systemd 兜底 │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ 录制丢首帧 │ -bsf:a aac_adtstoasc + 等到第一个关键帧 │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ 主播切码率把所有观众踢出 │ 多码率用 ABR(自适应码率),别只发一路 │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ 切片小文件把对象存储打爆 │ 切片合并大文件 / 用 Tier3 冷存 │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ webhook 失败业务侧不知道 │ 必须有重试 + 死信 │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ 鉴权 token 被盗 │ 时长 ≤2 小时,IP 绑定,revoke 立即生效 │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ 高并发推流卡 MySQL │ stream_keys 表加 Redis 缓存 │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ 主播跨房间换密钥 │ 撤销旧密钥同步踢流 │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ 录制覆盖原视频 │ 文件名带 UUID,绝不复用 │
├───────────────────────────────┼──────────────────────────────────────────────────────────┤
│ 直播 →点播没合成成功业务投诉 │ finalize 失败入死信队列 + 告警 │
└───────────────────────────────┴──────────────────────────────────────────────────────────┘
---
七、安全清单
┌────────────────────────────┬───────────────────────────────────────────────┐
│ 风险 │ 防御 │
├────────────────────────────┼───────────────────────────────────────────────┤
│ 推流密钥泄漏 │ JWT 短时效 + IP 绑定 + 一次性使用(可选) │
├────────────────────────────┼───────────────────────────────────────────────┤
│ 盗播 │ 播放 URL 签名 + Referer + 设备指纹 + 时长限制 │
├────────────────────────────┼───────────────────────────────────────────────┤
│ 黄反内容 │ 接 AI 鉴黄(每 N 秒抽帧调外部 API) │
├────────────────────────────┼───────────────────────────────────────────────┤
│ DDoS 推流 │ 网关层限连 + SRS hooks 频率限制 │
├────────────────────────────┼───────────────────────────────────────────────┤
│ 路径穿越(把流推到别人房间) │ stream_id 服务端生成,不接受客户端指定 │
├────────────────────────────┼───────────────────────────────────────────────┤
│ Webhook 被伪造调用 │ HMAC 签名 + 时间戳防重放 │
├────────────────────────────┼───────────────────────────────────────────────┤
│ 录制文件被未授权下载 │ 对象存储私有 + 临时签名 URL │
├────────────────────────────┼───────────────────────────────────────────────┤
│ 转码 worker 被命令注入 │ escapeshellarg 全包裹,本方案已做 │
└────────────────────────────┴───────────────────────────────────────────────┘
---
八、可扩展方向
1. LL-HLS / WebRTC:把延迟压到 2-3 秒甚至 500ms
2. AI 内容审核:抽帧调腾讯/阿里/自建黄反模型
3. DRM:接 Widevine / FairPlay,付费内容必备
4. SCTE-35 广告:精准插入广告标记,广告主投流必备
5. 多 CDN 调度:按观众地区 / 实时质量切换 CDN
6. 弹幕系统:WebSocket 长连接 + Redis Pub/Sub
7. 连麦:WebRTC + SFU(mediasoup)
8. NVENC 转码:NVIDIA 显卡硬编,单卡顶 5 台 CPU 机
9. 多视角直播:同一时间多机位流,客户端切换
10. 回看(Time-shift):HLS 加 DVR 窗口,观众随意拖动
---
九、和 Twitch / 抖音直播架构对比
┌──────────┬───────────────┬────────────────────────────┐
│ 维度 │ Twitch/抖音 │ 本方案 │
├──────────┼───────────────┼────────────────────────────┤
│ 数据面 │ 自研 C++/Rust │ SRS / ffmpeg(开源) │
├──────────┼───────────────┼────────────────────────────┤
│ 控制面 │ Go / Java │ PHP+Swoole(本方案) │
├──────────┼───────────────┼────────────────────────────┤
│ 规模 │ 千万级并发 │ 中型(单集群万级流) │
├──────────┼───────────────┼────────────────────────────┤
│ 业务定制 │ 重 │ 极轻(改 PHP 极快) │
├──────────┼───────────────┼────────────────────────────┤
│ 适合 │ 大厂 / 平台 │ 企业直播 / 教育 / 中小平台 │
└──────────┴───────────────┴────────────────────────────┘
---
十、最重要的认知
PHP+Swoole 在直播领域的甜蜜区:
- ✅ 控制面、鉴权、调度、回调 —这是 PHP 的主场
- ✅ 业务集成层(直播 + 电商 + 礼物 + 弹幕)
- ✅ 中后台管理 / 数据看板
别用 PHP 做的事:
- ❌ 自己实现 RTMP / SRT 协议 —SRS 已经做完了
- ❌ 自己写 HLS 切片 —ffmpeg 已经做完了
- ❌ 替代 ffmpeg 转码 —这是 C/CUDA 的活
- ❌ 替代 CDN 边缘分发 —这是网络硬件的活
正确姿势 = SRS(数据面)+ FFmpeg(计算)+ PHP/Swoole(大脑)+ CDN(分发)
更多推荐
所有评论(0)