PHP+Swoole从0实现一个消息队列(存储引擎、持久化、消费组、重试死信- 消息队列的协议层解析(AMQP、RocketMQ协议)- 延迟消息、事务消息的底层实现
·
一套完整可跑的 PHP + Swoole)消息队列,RocketMQ 架构 + AMQP 协议兼容思想,核心组件全在一份文件里。
一、整体架构(大白话)
┌─ Producer ──┐ ┌─ Consumer ──┐
│ 发消息 │ │ 收消息 │
└─────┬───────┘ └─────▲───────┘
│ 网络帧(协议层) │
┌───▼───────────────────────┴────┐
│ Broker (Swoole TcpServer) │
│ ┌────────────────────────────┐ │
│ │ ①协议解析 (AMQP/RMQ Frame)│ │
│ │ ②Topic 路由 │ │
│ │ ③事务半消息 / 延迟消息 │ │
│ │ ④写 CommitLog (顺序追加) │ │ ←所有消息只写一个大文件,极快
│ │ ⑤构建 ConsumeQueue 索引 │ │ ←每个 Topic/Queue 一个索引
│ │ ⑥消费组 offset 管理 │ │
│ │ ⑦重试队列 + 死信队列 │ │
│ └────────────────────────────┘ │
└──┬─────────────────────────────┘
│
data/
commitlog.000 ←唯一存储,所有消息按写入顺序
cq/{topic}/{queue} ←索引文件:每条 20 字节(offset+size+tag)
consumer_offset.json ←每个消费组消费到哪了
delay.idx ←延迟消息索引(按到期时间排)
tx_half.idx ←未确认的事务半消息
核心思想(抄 RocketMQ):
- CommitLog 顺序写:消息一律追加到一个大文件,顺序 IO 比随机 IO 快 100 倍
- ConsumeQueue 是"指针文件":不存消息内容,只存"消息在 CommitLog 的 offset",查找时按 offset 跳读
- 消费组 offset 单独存:重启不丢消费进度
- 延迟消息:不直接进目标 Topic,先进 SCHEDULE_TOPIC,时间到了再转投
- 事务消息:两阶段——先发"半消息"(消费者看不到),业务执行后commit/rollback
二、完整代码
<?php
// mq.php ——php mq.php server 启动 Broker
// php mq.php produce 发测试消息
// php mq.php consume 起消费者
// php mq.php tx 事务消息 demo
use Swoole\Server;
use Swoole\Coroutine;
use Swoole\Timer;
const DATA_DIR = __DIR__ . '/mqdata';
const PORT = 9876;
// ====================== 0. 协议层 ======================
// 大白话:网络上传字节流,得有"信封"告诉对方一条消息从哪到哪。
// 这里设计一个兼容 AMQP 思路的简化二进制帧:
// [魔数2][版本1][类型1][长度4][JSON header][body bytes]
// 类型: 1=PUB 2=SUB 3=ACK 4=NACK 5=TX_HALF 6=TX_COMMIT 7=TX_ROLLBACK 8=PULL 9=RESP
class Frame {
const MAGIC = 0xCAFE;
public static function encode(int $type, array $header, string $body = ''): string {
$h = json_encode($header, JSON_UNESCAPED_UNICODE);
$payload = pack('n', strlen($h)) . $h . $body;
return pack('nCCN', self::MAGIC, 1, $type, strlen($payload)) . $payload;
}
// 从 buffer 里尝试解出一帧,不够就返回 null
public static function decode(string &$buf): ?array {
if (strlen($buf) < 8) return null;
$h = unpack('nmagic/Cver/Ctype/Nlen', substr($buf, 0, 8));
if ($h['magic'] !== self::MAGIC) throw new \RuntimeException("bad magic");
if (strlen($buf) < 8 + $h['len']) return null;
$payload = substr($buf, 8, $h['len']);
$buf = substr($buf, 8 + $h['len']);
$hl = unpack('n', substr($payload,0,2))[1];
$header = json_decode(substr($payload, 2, $hl), true) ?? [];
$body = substr($payload, 2 + $hl);
return ['type' => $h['type'], 'header' => $header, 'body' => $body];
}
}
// 顺手再做个"AMQP 风格"的方法码到 type 的映射(展示协议层抽象,实际用上面的就够)
class AmqpLike {
const BASIC_PUBLISH = 0x3C28;
const BASIC_CONSUME = 0x3C14;
const BASIC_ACK = 0x3C50;
// ……AMQP 真协议是 class-id + method-id 两字节,我们这里仅示意
public static function methodToType(int $m): int {
return match($m) {
self::BASIC_PUBLISH => 1,
self::BASIC_CONSUME => 2,
self::BASIC_ACK => 3,
default => throw new \RuntimeException("unknown amqp method"),
};
}
}
// ====================== 1. CommitLog(顺序写存储)======================
// 大白话:一个永远追加的大文件。每条消息格式:
// [总长度4][topic长度1][topic][tag长度1][tag][propsLen2][props json][body...]
// 返回值是这条消息在文件里的 offset ——ConsumeQueue 就靠这个 offset 找数据
class CommitLog {
private $fp;
private int $size;
public function __construct(string $path) {
$this->fp = fopen($path, 'c+b');
fseek($this->fp, 0, SEEK_END);
$this->size = ftell($this->fp);
}
public function append(string $topic, string $tag, array $props, string $body): int {
$propsJ = json_encode($props, JSON_UNESCAPED_UNICODE);
$msg = chr(strlen($topic)) . $topic
. chr(strlen($tag)) . $tag
. pack('n', strlen($propsJ)) . $propsJ
. $body;
$rec = pack('N', strlen($msg)) . $msg;
$offset = $this->size;
fwrite($this->fp, $rec);
fflush($this->fp); // 落盘(同步刷盘;异步刷盘可去掉这行性能翻倍)
$this->size += strlen($rec);
return $offset;
}
public function read(int $offset): ?array {
fseek($this->fp, $offset);
$h = fread($this->fp, 4);
if (strlen($h) < 4) return null;
$len = unpack('N', $h)[1];
$body = fread($this->fp, $len);
if (strlen($body) < $len) return null;
$o = 0;
$tl = ord($body[$o++]); $topic = substr($body,$o,$tl); $o += $tl;
$gl = ord($body[$o++]); $tag = substr($body,$o,$gl); $o += $gl;
$pl = unpack('n', substr($body,$o,2))[1]; $o += 2;
$props = json_decode(substr($body,$o,$pl), true); $o += $pl;
$payload = substr($body, $o);
return compact('topic','tag','props','payload') + ['size' => 4 + $len];
}
}
// ====================== 2. ConsumeQueue(消费索引文件)======================
// 大白话:每个 Topic 的每个 Queue 一个文件,每条记录固定 20 字节:
// [commitlog_offset 8][msg_size 4][tag_hash 8]
// 这样消费时按 (offset/20) 直接 seek,极快。
class ConsumeQueue {
private array $fps = [];
private string $base;
public function __construct(string $base) {
$this->base = $base;
}
private function getFp(string $topic, int $queueId) {
$key = "$topic#$queueId";
if (!isset($this->fps[$key])) {
$dir = "{$this->base}/$topic";
if (!is_dir($dir)) mkdir($dir, 0777, true);
$this->fps[$key] = fopen("$dir/$queueId.cq", 'c+b');
}
return $this->fps[$key];
}
public function append(string $topic, int $queueId, int $clOffset, int $size, string $tag): void {
$fp = $this->getFp($topic, $queueId);
fseek($fp, 0, SEEK_END);
// tag 用 8 字节 hash,消费端做 tag 过滤可以不解 body
$tagHash = unpack('P', substr(hash('sha256', $tag, true), 0, 8))[1];
fwrite($fp, pack('JNP', $clOffset, $size, $tagHash));
fflush($fp);
}
// 拿第 $cqIndex 条索引(0 起始)
public function read(string $topic, int $queueId, int $cqIndex): ?array {
$fp = $this->getFp($topic, $queueId);
fseek($fp, $cqIndex * 20);
$d = fread($fp, 20);
if (strlen($d) < 20) return null;
$u = unpack('Jo/Ns/Pt', $d);
return ['clOffset' => $u['o'], 'size' => $u['s'], 'tagHash' => $u['t']];
}
public function count(string $topic, int $queueId): int {
$fp = $this->getFp($topic, $queueId);
fseek($fp, 0, SEEK_END);
return ftell($fp) / 20;
}
}
// ====================== 3. 消费组 Offset 管理 ======================
// 大白话:每个"消费组"在每个 Queue 上消费到哪一条,记下来。
// 同一个 Topic,不同消费组互不干扰(广播/集群语义靠这个区分)。
class OffsetStore {
private array $offsets;
public function __construct(private string $file) {
$this->offsets = file_exists($file) ? json_decode(file_get_contents($file), true) ?? [] : [];
}
public function get(string $group, string $topic, int $q): int {
return $this->offsets["$group/$topic/$q"] ?? 0;
}
public function set(string $group, string $topic, int $q, int $v): void {
$this->offsets["$group/$topic/$q"] = $v;
}
public function flush(): void {
file_put_contents($this->file, json_encode($this->offsets));
}
}
// ====================== 4. Broker(消息中枢)======================
class Broker {
public CommitLog $cl;
public ConsumeQueue $cq;
public OffsetStore $off;
// 内存索引
private array $delayed = []; // [expireMs => [msgRef,...]] 延迟消息小顶堆替代
private array $txHalf = []; // txId => {topic, body, props} 事务半消息
private array $retryCnt = []; // msgKey => 重试次数
const QUEUES_PER_TOPIC = 4;
const MAX_RETRY = 3;
const RETRY_DELAY_MS = 5000;
public function __construct() {
if (!is_dir(DATA_DIR)) mkdir(DATA_DIR, 0777, true);
if (!is_dir(DATA_DIR.'/cq')) mkdir(DATA_DIR.'/cq', 0777, true);
$this->cl = new CommitLog(DATA_DIR . '/commitlog.000');
$this->cq = new ConsumeQueue(DATA_DIR . '/cq');
$this->off = new OffsetStore(DATA_DIR . '/consumer_offset.json');
$this->loadDelayed();
}
// ——普通发消息 ——
// queueId 用 key 取模决定(保证同 key 顺序)
public function publish(string $topic, string $tag, array $props, string $body, ?string $orderKey = null): array {
$qid = $orderKey
? abs(crc32($orderKey)) % self::QUEUES_PER_TOPIC
: random_int(0, self::QUEUES_PER_TOPIC - 1);
$offset = $this->cl->append($topic, $tag, $props, $body);
$size = strlen($body) + 64; // 近似
$this->cq->append($topic, $qid, $offset, $size, $tag);
return ['offset' => $offset, 'queueId' => $qid];
}
// ——延迟消息 ——
// 思路:不进目标 topic,先放内存"延迟池";时间到了再正常 publish。
// 真正生产用"时间轮"或按到期分级 Topic(SCHEDULE_TOPIC_5s/30s/1m/...)
public function publishDelayed(string $topic, string $tag, array $props, string $body, int $delayMs): void {
$expire = (int)(microtime(true) * 1000) + $delayMs;
$this->delayed[] = compact('expire','topic','tag','props','body');
usort($this->delayed, fn($a,$b) => $a['expire'] <=> $b['expire']);
$this->persistDelayed();
}
public function tickDelayed(): void {
$now = (int)(microtime(true) * 1000);
while ($this->delayed && $this->delayed[0]['expire'] <= $now) {
$m = array_shift($this->delayed);
$this->publish($m['topic'], $m['tag'], $m['props'], $m['body']);
}
$this->persistDelayed();
}
private function persistDelayed(): void {
file_put_contents(DATA_DIR.'/delay.idx', json_encode($this->delayed));
}
private function loadDelayed(): void {
if (file_exists(DATA_DIR.'/delay.idx'))
$this->delayed = json_decode(file_get_contents(DATA_DIR.'/delay.idx'), true) ?: [];
}
// ——事务消息(两阶段)——
// 1) Producer 发 TX_HALF:消息存"半消息池",对消费者不可见
// 2) Producer 执行本地事务(扣库存、改 DB)
// 3) 成功 →发 TX_COMMIT:Broker 把半消息正式 publish
// 失败 →发 TX_ROLLBACK:Broker 直接丢弃
// 4) 如果 Producer 挂了不回应:Broker 定时"回查"(此处简化为超时丢弃)
public function txHalf(string $txId, string $topic, string $tag, array $props, string $body): void {
$this->txHalf[$txId] = [
'topic'=>$topic, 'tag'=>$tag, 'props'=>$props, 'body'=>$body,
'ts'=> microtime(true)
];
}
public function txCommit(string $txId): void {
if (!isset($this->txHalf[$txId])) return;
$m = $this->txHalf[$txId];
$this->publish($m['topic'], $m['tag'], $m['props'], $m['body']);
unset($this->txHalf[$txId]);
}
public function txRollback(string $txId): void {
unset($this->txHalf[$txId]);
}
public function checkHalfTimeout(): void {
$now = microtime(true);
foreach ($this->txHalf as $id => $m) {
if ($now - $m['ts'] > 60) {
// 真实场景:这里要"回查"Producer 业务状态;此处直接 rollback
unset($this->txHalf[$id]);
}
}
}
// ——消费者拉消息(Pull 模型,比 Push 更稳)——
public function pull(string $group, string $topic, int $qid, int $batch = 16): array {
$cqStart = $this->off->get($group, $topic, $qid);
$cqEnd = $this->cq->count($topic, $qid);
$msgs = [];
for ($i = $cqStart; $i < $cqEnd && count($msgs) < $batch; $i++) {
$idx = $this->cq->read($topic, $qid, $i);
if (!$idx) break;
$msg = $this->cl->read($idx['clOffset']);
if (!$msg) continue;
$msgs[] = $msg + ['cqIndex' => $i, 'msgKey' => "$topic/$qid/$i"];
}
return $msgs;
}
public function ack(string $group, string $topic, int $qid, int $cqIndex): void {
// ACK = 把消费 offset 推到 cqIndex+1
if ($cqIndex + 1 > $this->off->get($group, $topic, $qid))
$this->off->set($group, $topic, $qid, $cqIndex + 1);
$this->off->flush();
}
// ——重试 + 死信 ——
// NACK:把消息推到 %RETRY%group 这个特殊 topic,延迟后重投;
// 重试达上限进入 %DLQ%group(死信),需要人工/告警处理。
public function nack(string $group, array $msg): void {
$key = $msg['msgKey'];
$cnt = ($this->retryCnt[$key] ?? 0) + 1;
$this->retryCnt[$key] = $cnt;
if ($cnt > self::MAX_RETRY) {
$this->publish("%DLQ%$group", $msg['tag'], $msg['props'] + ['origTopic'=>$msg['topic']], $msg['payload']);
echo "[DLQ] $key 进入死信\n";
} else {
$this->publishDelayed("%RETRY%$group", $msg['tag'],
$msg['props'] + ['origTopic'=>$msg['topic'],'retry'=>$cnt],
$msg['payload'], self::RETRY_DELAY_MS * $cnt);
echo "[RETRY] $key 第 $cnt 次重试,${cnt}*5s 后重投\n";
}
// 即便 NACK,offset 也要推进,否则会卡住整个 queue
$this->ack($group, $msg['topic'], (int)explode('/', $key)[1], $msg['cqIndex']);
}
}
// ====================== 5. Swoole TCP Server(协议层入口)======================
function runServer(): void {
$broker = new Broker();
$server = new Server('0.0.0.0', PORT);
$server->set([
'worker_num' => 2,
'open_length_check' => false, // 我们自己解析帧
'package_max_length' => 16 * 1024 * 1024,
]);
$buffers = []; // fd => 粘包缓冲
$server->on('connect', function ($s, $fd) use (&$buffers) {
$buffers[$fd] = '';
echo "[+] $fd 连接\n";
});
$server->on('close', function ($s, $fd) use (&$buffers) {
unset($buffers[$fd]);
echo "[-] $fd 断开\n";
});
$server->on('receive', function (Server $s, int $fd, int $rid, string $data) use (&$buffers, $broker) {
$buffers[$fd] .= $data;
while (($pkt = Frame::decode($buffers[$fd])) !== null) {
handlePacket($s, $fd, $pkt, $broker);
}
});
// 定时任务:每 200ms 扫一次延迟消息;每 30s 检查事务超时
$server->on('workerStart', function ($s, $wid) use ($broker) {
if ($wid !== 0) return;
Timer::tick(200, fn() => $broker->tickDelayed());
Timer::tick(30000, fn() => $broker->checkHalfTimeout());
});
echo "Broker 启动 :".PORT."\n";
$server->start();
}
function handlePacket(Server $s, int $fd, array $pkt, Broker $b): void {
$t = $pkt['type']; $h = $pkt['header']; $body = $pkt['body'];
try {
switch ($t) {
case 1: // PUB
if (!empty($h['delayMs'])) {
$b->publishDelayed($h['topic'], $h['tag'] ?? '', $h['props'] ?? [], $body, (int)$h['delayMs']);
reply($s, $fd, ['ok'=>1, 'delayed'=>1]);
} else {
$r = $b->publish($h['topic'], $h['tag'] ?? '', $h['props'] ?? [], $body, $h['orderKey'] ?? null);
reply($s, $fd, ['ok'=>1] + $r);
}
break;
case 8: // PULL
$msgs = $b->pull($h['group'], $h['topic'], (int)$h['queueId'], (int)($h['batch'] ?? 16));
reply($s, $fd, ['ok'=>1, 'msgs'=>$msgs]);
break;
case 3: // ACK
$b->ack($h['group'], $h['topic'], (int)$h['queueId'], (int)$h['cqIndex']);
reply($s, $fd, ['ok'=>1]);
break;
case 4: // NACK
$b->nack($h['group'], $h);
reply($s, $fd, ['ok'=>1]);
break;
case 5: // TX_HALF
$b->txHalf($h['txId'], $h['topic'], $h['tag'] ?? '', $h['props'] ?? [], $body);
reply($s, $fd, ['ok'=>1]);
break;
case 6: $b->txCommit($h['txId']); reply($s, $fd, ['ok'=>1]); break;
case 7: $b->txRollback($h['txId']); reply($s, $fd, ['ok'=>1]); break;
default: reply($s, $fd, ['ok'=>0, 'err'=>'unknown type']);
}
} catch (\Throwable $e) {
reply($s, $fd, ['ok'=>0, 'err'=>$e->getMessage()]);
}
}
function reply(Server $s, int $fd, array $resp): void {
$s->send($fd, Frame::encode(9, $resp));
}
// ====================== 6. 简易客户端 + Demo ======================
class Client {
private $sock;
private string $buf = '';
public function __construct(string $host = '127.0.0.1', int $port = PORT) {
$this->sock = stream_socket_client("tcp://$host:$port", $eno, $estr, 3);
if (!$this->sock) throw new \RuntimeException($estr);
}
private function req(int $type, array $h, string $body = ''): array {
fwrite($this->sock, Frame::encode($type, $h, $body));
while (($pkt = Frame::decode($this->buf)) === null) {
$d = fread($this->sock, 8192);
if ($d === false || $d === '') throw new \RuntimeException('disconnected');
$this->buf .= $d;
}
return $pkt['header'];
}
public function publish(string $topic, string $body, array $extra = []): array {
return $this->req(1, ['topic'=>$topic] + $extra, $body);
}
public function pull(string $group, string $topic, int $qid, int $batch = 16): array {
return $this->req(8, compact('group','topic','qid','batch') + ['queueId'=>$qid]);
}
public function ack(string $group, string $topic, int $qid, int $cqIndex): void {
$this->req(3, compact('group','topic') + ['queueId'=>$qid,'cqIndex'=>$cqIndex]);
}
public function nack(string $group, array $msg): void {
$this->req(4, ['group'=>$group, 'topic'=>$msg['topic'], 'cqIndex'=>$msg['cqIndex'],
'tag'=>$msg['tag'], 'props'=>$msg['props'], 'msgKey'=>$msg['msgKey'],
'queueId'=>(int)explode('/',$msg['msgKey'])[1]], $msg['payload']);
}
public function txHalf(string $txId, string $topic, string $body): void {
$this->req(5, compact('txId','topic'), $body);
}
public function txCommit(string $txId): void { $this->req(6, compact('txId')); }
public function txRollback(string $txId): void { $this->req(7, compact('txId')); }
}
// ===== 命令行入口 =====
$cmd = $argv[1] ?? 'server';
if ($cmd === 'server') {
runServer();
} elseif ($cmd === 'produce') {
$c = new Client();
for ($i = 0; $i < 5; $i++) {
$r = $c->publish('order', "订单消息#$i", ['tag'=>'pay']);
echo "发送 #$i →queue {$r['queueId']} offset {$r['offset']}\n";
}
// 延迟 3 秒
$c->publish('order', '这条是 3 秒后才能消费', ['delayMs'=>3000]);
echo "延迟消息已投递\n";
} elseif ($cmd === 'consume') {
$c = new Client();
$group = 'group-A';
echo "消费组 $group 启动,每秒拉一次...\n";
while (true) {
for ($q = 0; $q < 4; $q++) {
$resp = $c->pull($group, 'order', $q, 16);
foreach ($resp['msgs'] ?? [] as $m) {
echo "收到 [q=$q cq={$m['cqIndex']}] {$m['payload']}\n";
// 模拟 20% 失败 →NACK 走重试
if (random_int(1,5) === 1) {
echo " ↳ 处理失败,NACK\n";
$c->nack($group, $m);
} else {
$c->ack($group, 'order', $q, $m['cqIndex']);
}
}
}
sleep(1);
}
} elseif ($cmd === 'tx') {
$c = new Client();
$txId = uniqid('tx_', true);
$c->txHalf($txId, 'order', '事务消息: 创建订单 #888');
echo "已发送半消息,模拟执行本地事务...\n";
sleep(1);
if (random_int(0,1)) { $c->txCommit($txId); echo "本地事务成功 →COMMIT,消费者可见\n"; }
else { $c->txRollback($txId); echo "本地事务失败 →ROLLBACK,消息丢弃\n"; }
}
三、各模块大白话总结
┌───────────────┬─────────────────────────────────────┬───────────────────────────────────────────────┐
│ 模块 │ 比喻 │ 解决什么 │
├───────────────┼─────────────────────────────────────┼───────────────────────────────────────────────┤
│ Frame(协议层) │ 信封 + 邮戳 │ 网络上区分一条消息从哪到哪,解决 TCP 粘包/拆包 │
├───────────────┼─────────────────────────────────────┼───────────────────────────────────────────────┤
│ CommitLog │ 永远只能往后写的流水账 │ 顺序 IO →单机几十万 QPS 的根本 │
├───────────────┼─────────────────────────────────────┼───────────────────────────────────────────────┤
│ ConsumeQueue │ 流水账的"目录页",每页固定 20 字节 │ 消费者按目录跳读,不用扫全表 │
├───────────────┼─────────────────────────────────────┼───────────────────────────────────────────────┤
│ OffsetStore │ 每个消费组的"书签" │ 重启不丢消费进度;不同组互不干扰 │
├───────────────┼─────────────────────────────────────┼───────────────────────────────────────────────┤
│ 延迟消息 │ 邮局的"定时投递" │ 订单 30 分钟未付款自动关闭等场景 │
├───────────────┼─────────────────────────────────────┼───────────────────────────────────────────────┤
│ 事务消息 │ 银行的"两步确认" │ 本地事务 + 发消息的最终一致性 │
├───────────────┼─────────────────────────────────────┼───────────────────────────────────────────────┤
│ 重试 + 死信 │ 派件失败放回派件柜 →放久了进退件区 │ 防止"毒消息"无限重试卡死队列 │
├───────────────┼─────────────────────────────────────┼───────────────────────────────────────────────┤
│ 协程定时器 │ 后台扫地阿姨 │ 每 200ms 推延迟消息;每 30s 清超时半消息 │
└───────────────┴─────────────────────────────────────┴───────────────────────────────────────────────┘
四、协议层关键点(AMQP / RocketMQ 对照)
┌──────────┬─────────────────────────────────┬───────────────────────────────────────┬────────────────────────────┐
│ 概念 │ 本实现 │ AMQP │ RocketMQ │
├──────────┼─────────────────────────────────┼───────────────────────────────────────┼────────────────────────────┤
│ 消息单元 │ Frame(8 字节头 + JSON header + │ Frame(type+channel+size+payload+0xCE) │ RemotingCommand │
│ │ body) │ │ │
├──────────┼─────────────────────────────────┼───────────────────────────────────────┼────────────────────────────┤
│ 路由 │ topic + queueId(crc32 取模) │ Exchange + RoutingKey + Queue │ Topic + QueueId │
├──────────┼─────────────────────────────────┼───────────────────────────────────────┼────────────────────────────┤
│ 投递模式 │ Pull(消费者主动拉) │ Push(basic.deliver) │ 默认 Pull,内核长轮询模拟 │
│ │ │ │ Push │
├──────────┼─────────────────────────────────┼───────────────────────────────────────┼────────────────────────────┤
│ 确认 │ ACK/NACK │ basic.ack/basic.nack │ CONSUMER_SEND_MSG_BACK │
├──────────┼─────────────────────────────────┼───────────────────────────────────────┼────────────────────────────┤
│ 事务 │ TX_HALF/COMMIT/ROLLBACK │ tx.select / tx.commit │ Half + EndTransaction │
└──────────┴─────────────────────────────────┴───────────────────────────────────────┴────────────────────────────┘
▎ 真 AMQP 帧解析:头部是 [type(1) channel(2) size(4) ... payload ... 0xCE],我们的 Frame::decode
▎ 就是同样思路,只是字段顺序/编码不同。换成真 AMQP 只要替换 Frame 类即可,上层完全无感。
五、延迟消息的底层实现
本例用 内存有序数组 + 200ms tick,直观但不抗大量延迟。生产级三种实现:
1. 分级 Topic(RocketMQ 默认):预定义 18 个等级 1s/5s/10s/.../2h,Broker 对每个等级建一个独立
ConsumeQueue,后台线程到点把消息搬回原 Topic。优点:简单、O(1) 入队。缺点:延迟只能选预设档。
2. 时间轮(Kafka):多层级哈希时间轮,O(1) 到期触发。本例的 usort 换成 SplPriorityQueue 就接近时间轮思路。
3. 基于 RocksDB 的有序 KV(RocketMQ 5.x):key = expireTimestamp + msgId,按 key 范围扫到期项。
六、事务消息为什么这么设计
最终一致性的经典场景:下单 + 扣库存
传统(必坏): 1) 写订单DB 2) 发MQ通知库存服务
↑DB 成功但 MQ 挂了 →库存永远不扣
事务消息流程:
1) Producer →Broker:发送 TX_HALF(消费者看不到)
2) Producer 执行本地事务(写订单 DB)
3a) 成功 →发 TX_COMMIT →Broker 把半消息正式 publish →消费者扣库存
3b) 失败 →发 TX_ROLLBACK →Broker 丢弃
4) Producer 挂了不发 3? →Broker 定时"回查"Producer 业务表,补偿 commit/rollback
关键:第 2 步和第 1 步顺序可对调(先本地事务再发半消息也行),但必须有"回查"机制——这是RocketMQ
事务消息和"普通消息+本地表"的核心区别。
七、运行演示
# 终端 1:启动 Broker
php mq.php server
# 终端 2:发 5 条普通 + 1 条延迟
php mq.php produce
# 输出:
# 发送 #0 →queue 2 offset 0
# ...
# 延迟消息已投递
# 终端 3:启动消费组(20% 故意失败走重试)
php mq.php consume
# 收到 [q=2 cq=0] 订单消息#0
# ↳ 处理失败,NACK
# [RETRY] order/2/0 第 1 次重试,1*5s 后重投
# ...(3 秒后)
# 收到 [q=1 cq=0] 这条是 3 秒后才能消费
# 终端 4:事务消息
php mq.php tx
# 已发送半消息,模拟执行本地事务...
# 本地事务成功 →COMMIT,消费者可见
八、这套实现已具备的"工业级要点"
✅ 顺序写 CommitLog(同 RocketMQ)
✅ 定长 ConsumeQueue 索引(20 字节,同 RocketMQ)
✅ 消费组级别 offset 持久化
✅ Pull 模型 + ACK/NACK
✅ 重试队列 + 死信队列(%RETRY%group / %DLQ%group)
✅ 延迟消息(内存有序队列)
✅ 事务消息两阶段 + 半消息超时清理
✅ 顺序消息(同 key →同 queue)
✅ TCP 粘包处理 + 自定义二进制协议
✅ 协程定时器后台任务
九、要扩展成"真"MQ 还差什么
- MMap + PageCache 优化:CommitLog 用 swoole_event + mmap 替代 fwrite/fflush,吞吐再涨 10 倍
- 主从同步:Slave Broker 拉 Master 的 CommitLog(类似 MySQL binlog)
- NameServer:多 Broker 时的路由发现(Producer 先问 NameServer "order topic 在哪")
- 长轮询:Pull 没数据时挂起 30s,等到有新消息立刻返回 →既是 Pull 又有 Push 的实时性
- 消息回查(事务消息):Broker 主动反向 RPC 问 Producer "txId=xxx 到底是 commit 还是 rollback"
- 批量发送 / 批量刷盘:把 N 条消息攒一起 fsync,QPS 翻倍
骨架就是上面这些。读懂这一份代码,RocketMQ 80% 的核心概念就通了。
更多推荐
所有评论(0)