Redis协议层实现=用PHP手写一个Redis客户端
·
Redis 协议层深度解析:从 RESP 协议到手写 Cluster 客户端
延续上次风格,先讲透协议,再上能跑的完整代码,最后告诉你生产怎么选。
---
一、为什么要懂 RESP?
平时你 $redis->get('foo'),扩展把这一切包好了。但下面这些事不懂协议永远玩不转:
- 怎么实现 Pipeline 让 1000 次操作只花 1 次 RTT?
- Pub/Sub 为啥不能和普通命令混用?
- MOVED/ASK 重定向到底怎么处理?
- 为啥 Cluster 客户端要维护 16384 个 slot 的路由表?
- 大 value 为什么要用 RESP3 的 streaming?
一句话:Redis 客户端的本质就是 TCP 长连接 + RESP 编解码 + Cluster 路由,没了。
---
二、RESP 协议大白话
2.1 协议版本
┌───────┬────────────┬──────────────────────────────────────────┐
│ 版本 │ Redis 版本 │ 特点 │
├───────┼────────────┼──────────────────────────────────────────┤
│ RESP2 │ 2.0+ │ 5 种类型,文本可读 │
├───────┼────────────┼──────────────────────────────────────────┤
│ RESP3 │ 6.0+ │ 14 种类型,支持 Map/Set/Push,推荐新项目用 │
└───────┴────────────┴──────────────────────────────────────────┘
我们主要讲 RESP2(99% 场景够用),最后简单讲 RESP3 增量。
2.2 五种数据类型(RESP2)
每个回复第一个字节决定类型,后面跟内容,全部以 \r\n 结尾。
┌────────┬───────────────────────────────┬──────────────────────────────────┐
│ 首字节 │ 类型 │ 例子 │
├────────┼───────────────────────────────┼──────────────────────────────────┤
│ + │ Simple String(简单字符串) │ +OK\r\n │
├────────┼───────────────────────────────┼──────────────────────────────────┤
│ - │ Error(错误) │ -ERR unknown\r\n │
├────────┼───────────────────────────────┼──────────────────────────────────┤
│ : │ Integer(整数) │ :1000\r\n │
├────────┼───────────────────────────────┼──────────────────────────────────┤
│ $ │ Bulk String(二进制安全字符串) │ $5\r\nhello\r\n │
├────────┼───────────────────────────────┼──────────────────────────────────┤
│ * │ Array(数组) │ *2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n │
└────────┴───────────────────────────────┴──────────────────────────────────┘
Bulk String 大白话:
$5\r\nhello\r\n
↑ ↑
长度=5 正好 5 字节
为什么要带长度?因为 value 里可能有 \r\n,光靠分隔符切不开。
Array 嵌套:
*3\r\n ← 3 个元素
:1\r\n ← 元素1: 整数 1
$3\r\nfoo\r\n ← 元素2: 字符串 "foo"
*2\r\n ← 元素3: 嵌套数组,2 个元素
:10\r\n
:20\r\n
2.3 客户端发命令的格式 —— 也是 Array
你写 SET foo bar,客户端实际发出去的是:
*3\r\n
$3\r\nSET\r\n
$3\r\nfoo\r\n
$3\r\nbar\r\n
就是把命令当成字符串数组发。这设计极简,Redis 解析超快。
2.4 用 nc 手动玩 RESP
$ printf '*1\r\n$4\r\nPING\r\n' | nc localhost 6379
+PONG
恭喜,你刚刚没有用任何客户端就和 Redis 对话了。
2.5 RESP3 增量(简介)
% Map %2\r\n+name\r\n+alice\r\n+age\r\n:30\r\n
~ Set ~3\r\n:1\r\n:2\r\n:3\r\n
> Push >2\r\n+pubsub\r\n+message\r\n ← Pub/Sub 推送
( BigNum (1234567890123456789012345\r\n
_ Null _\r\n
# Bool #t\r\n / #f\r\n
, Double ,3.14\r\n
最有用的是 > Push 类型,Pub/Sub 消息和命令响应不会再混在一起(RESP2 时代靠"模式"区分,容易出 bug)。
---
三、整体架构
┌────────────────────────────────────────────┐
│ RedisClient (facade) │
│ ┌──────────────────────────────────────┐ │
│ │ __call('get', ['foo']) → command() │ │
│ └──────────────────────────────────────┘ │
└──────────┬─────────────────────────────────┘
│
┌──────▼──────┐ ┌─────────────┐ ┌──────────┐
│ RespCodec │ │ Connection │ │ Pipeline │
│ encode/ │ │ socket I/O │ │ buffer │
│ decode │ │ + reconnect│ │ + flush │
└─────────────┘ └─────────────┘ └──────────┘
┌────────────────────────────────────────────┐
│ RedisCluster (facade) │
│ ┌──────────────────────────────────────┐ │
│ │ slot = crc16(key) & 16383 │ │
│ │ node = slotMap[slot] │ │
│ │ if MOVED: 更新路由表 + 重定向 │ │
│ │ if ASK: 一次性 ASKING + 重定向 │ │
│ └──────────────────────────────────────┘ │
└────────────────────────────────────────────┘
---
四、完整代码
4.1 项目结构
mini-redis/
├── composer.json
├── src/
│ ├── Resp/
│ │ ├── RespEncoder.php
│ │ ├── RespDecoder.php
│ │ └── RespType.php
│ ├── Connection.php
│ ├── RedisClient.php
│ ├── Pipeline.php
│ ├── PubSub.php
│ └── Cluster/
│ ├── Crc16.php
│ ├── SlotMap.php
│ └── RedisCluster.php
└── examples/
├── basic.php
├── pipeline.php
├── pubsub.php
└── cluster.php
4.2 composer.json
{
"require": {
"php": ">=8.2"
},
"autoload": {
"psr-4": { "App\\": "src/" }
}
}
▎ 我们不依赖任何 Redis 库,从 socket 开始。完成后会告诉你生产用什么库。
---
4.3 src/Resp/RespEncoder.php —— 编码器
<?php
namespace App\Resp;
class RespEncoder
{
/** 把一个命令编码成 RESP 报文,例如 ['SET','foo','bar'] -> "*3\r\n$3\r\nSET\r\n..." */
public function encode(array $args): string
{
$out = '*' . count($args) . "\r\n";
foreach ($args as $a) {
$a = (string) $a;
$out .= '$' . strlen($a) . "\r\n" . $a . "\r\n";
}
return $out;
}
/** Pipeline:多个命令拼一起一次发送 */
public function encodeMany(array $commands): string
{
$out = '';
foreach ($commands as $cmd) {
$out .= $this->encode($cmd);
}
return $out;
}
}
就这么简单。Encoder 没什么花头,关键是 Decoder。
---
4.4 src/Resp/RespType.php
<?php
namespace App\Resp;
class RespError
{
public function __construct(public string $message) {}
}
class RespIncomplete {} // 数据没收完,告诉调用方继续读
为什么要 RespIncomplete? 因为 TCP 是流,可能你只读到半个回复,Decoder 必须能说"还不够",而不是抛错。
---
4.5 src/Resp/RespDecoder.php —— 解码器(核心)
<?php
namespace App\Resp;
class RespDecoder
{
private string $buffer = '';
public function feed(string $data): void
{
$this->buffer .= $data;
}
/** 尝试解析一个完整回复;不够就返回 RespIncomplete */
public function parse(): mixed
{
$offset = 0;
$result = $this->parseAt($offset);
if ($result instanceof RespIncomplete) {
return $result;
}
// 成功解析,把已消费的部分丢掉
$this->buffer = substr($this->buffer, $offset);
return $result;
}
private function parseAt(int &$offset): mixed
{
if ($offset >= strlen($this->buffer)) {
return new RespIncomplete();
}
$type = $this->buffer[$offset];
$offset++;
return match ($type) {
'+' => $this->parseSimpleString($offset),
'-' => $this->parseError($offset),
':' => $this->parseInteger($offset),
'$' => $this->parseBulkString($offset),
'*' => $this->parseArray($offset),
default => throw new \RuntimeException("不认识的类型: $type at offset " . ($offset - 1)),
};
}
private function readLine(int &$offset): string|RespIncomplete
{
$pos = strpos($this->buffer, "\r\n", $offset);
if ($pos === false) return new RespIncomplete();
$line = substr($this->buffer, $offset, $pos - $offset);
$offset = $pos + 2;
return $line;
}
private function parseSimpleString(int &$offset): string|RespIncomplete
{
return $this->readLine($offset);
}
private function parseError(int &$offset): RespError|RespIncomplete
{
$line = $this->readLine($offset);
return $line instanceof RespIncomplete ? $line : new RespError($line);
}
private function parseInteger(int &$offset): int|RespIncomplete
{
$line = $this->readLine($offset);
return $line instanceof RespIncomplete ? $line : (int) $line;
}
private function parseBulkString(int &$offset): string|null|RespIncomplete
{
$lenLine = $this->readLine($offset);
if ($lenLine instanceof RespIncomplete) return $lenLine;
$length = (int) $lenLine;
if ($length === -1) return null; // $-1\r\n 表示 nil
// 还要读 length 字节 + \r\n
if (strlen($this->buffer) < $offset + $length + 2) {
return new RespIncomplete();
}
$value = substr($this->buffer, $offset, $length);
$offset += $length + 2; // 跳过 \r\n
return $value;
}
private function parseArray(int &$offset): array|null|RespIncomplete
{
$lenLine = $this->readLine($offset);
if ($lenLine instanceof RespIncomplete) return $lenLine;
$count = (int) $lenLine;
if ($count === -1) return null;
$items = [];
for ($i = 0; $i < $count; $i++) {
$item = $this->parseAt($offset);
if ($item instanceof RespIncomplete) return $item;
$items[] = $item;
}
return $items;
}
}
大白话:
1. 第一字节决定类型,分发到对应解析函数
2. 每次都用引用 &$offset 推进游标
3. 任何一步发现 buffer 不够,立刻返回 RespIncomplete,外层啥都不变(关键!)
4. Array 是递归解析,自然支持无限嵌套
---
4.6 src/Connection.php —— TCP 连接管理
<?php
namespace App;
use App\Resp\{RespEncoder, RespDecoder, RespError, RespIncomplete};
class Connection
{
/** @var resource|null */
private $sock = null;
private RespEncoder $encoder;
private RespDecoder $decoder;
public function __construct(
public readonly string $host = '127.0.0.1',
public readonly int $port = 6379,
public readonly float $connectTimeout = 1.5,
public readonly float $readTimeout = 5.0,
public readonly ?string $password = null,
public readonly int $database = 0,
) {
$this->encoder = new RespEncoder();
$this->decoder = new RespDecoder();
}
public function connect(): void
{
if ($this->sock !== null) return;
$errno = 0;
$errstr = '';
$sock = @stream_socket_client(
"tcp://{$this->host}:{$this->port}",
$errno, $errstr, $this->connectTimeout,
STREAM_CLIENT_CONNECT
);
if ($sock === false) {
throw new \RuntimeException("连接 {$this->host}:{$this->port} 失败: $errstr");
}
stream_set_timeout($sock, (int) $this->readTimeout, (int) (($this->readTimeout - (int)$this->readTimeout) *
1e6));
$this->sock = $sock;
if ($this->password) {
$this->command(['AUTH', $this->password]);
}
if ($this->database !== 0) {
$this->command(['SELECT', (string) $this->database]);
}
}
public function close(): void
{
if ($this->sock) {
@fclose($this->sock);
$this->sock = null;
}
}
/** 发一个命令,等一个回复 */
public function command(array $args): mixed
{
$this->connect();
$this->writeAll($this->encoder->encode($args));
return $this->readReply();
}
/** Pipeline:发 N 个命令,读 N 个回复 */
public function commandPipeline(array $commands): array
{
$this->connect();
$this->writeAll($this->encoder->encodeMany($commands));
$replies = [];
for ($i = 0, $n = count($commands); $i < $n; $i++) {
$replies[] = $this->readReply();
}
return $replies;
}
/** 读一个完整回复(可能要 read 多次) */
public function readReply(): mixed
{
while (true) {
$reply = $this->decoder->parse();
if (!($reply instanceof RespIncomplete)) {
if ($reply instanceof RespError) {
throw new RedisException($reply->message);
}
return $reply;
}
// 数据不够,继续读
$chunk = @fread($this->sock, 8192);
if ($chunk === false || $chunk === '') {
$this->close();
throw new \RuntimeException("连接被关闭或读超时");
}
$this->decoder->feed($chunk);
}
}
private function writeAll(string $data): void
{
$total = strlen($data);
$written = 0;
while ($written < $total) {
$n = @fwrite($this->sock, substr($data, $written));
if ($n === false || $n === 0) {
$this->close();
throw new \RuntimeException("写入失败");
}
$written += $n;
}
}
/** Pub/Sub 用:不发命令直接读下一个推送 */
public function readPush(): array
{
$reply = $this->readReply();
if (!is_array($reply)) {
throw new \RuntimeException("期望推送数组,收到 " . gettype($reply));
}
return $reply;
}
}
class RedisException extends \RuntimeException {}
几个生产级要点:
1. writeAll 循环写,不能假设 fwrite 一次写完
2. readReply 循环读,不能假设 fread 一次读到完整回复
3. 错误回复(-ERR)直接抛异常,业务用 try/catch
4. 连接出错就关掉,下次自动重连
---
4.7 src/RedisClient.php —— 用户门面
<?php
namespace App;
class RedisClient
{
public function __construct(public Connection $conn) {}
/** 魔术方法:$redis->get('foo') 等同于 ->command(['GET', 'foo']) */
public function __call(string $name, array $args): mixed
{
return $this->conn->command([strtoupper($name), ...$args]);
}
/** 显式版本 */
public function command(array $args): mixed
{
return $this->conn->command($args);
}
public function pipeline(): Pipeline
{
return new Pipeline($this->conn);
}
public function pubSub(): PubSub
{
return new PubSub($this->conn);
}
}
---
4.8 src/Pipeline.php —— 批量命令
<?php
namespace App;
class Pipeline
{
private array $commands = [];
public function __construct(private Connection $conn) {}
public function __call(string $name, array $args): self
{
$this->commands[] = [strtoupper($name), ...$args];
return $this; // 链式
}
public function exec(): array
{
if (empty($this->commands)) return [];
$replies = $this->conn->commandPipeline($this->commands);
$this->commands = [];
return $replies;
}
}
Pipeline 大白话:
- 普通模式:1000 个 GET → 1000 次 RTT
- Pipeline:1000 个命令一次发出 → 1 次 RTT,服务端按顺序回 1000 个回复
- 同机房 RTT 0.1ms,跨机房 50ms,效果天差地别
⚠️注意:Pipeline 不是事务,中间命令的结果不能影响后面命令(因为 Redis 是收到全部命令后才一起执行)。要事务用 MULTI/EXEC。
---
4.9 src/PubSub.php —— 发布订阅
<?php
namespace App;
class PubSub
{
public function __construct(private Connection $conn) {}
/**
* 订阅一个或多个频道,阻塞接收消息
* @param string[] $channels
* @param callable $onMessage fn(string $channel, string $message) => void
*/
public function subscribe(array $channels, callable $onMessage): void
{
$this->conn->command(['SUBSCRIBE', ...$channels]);
// SUBSCRIBE 命令本身会返回 N 个 ["subscribe", channel, count] 确认
// 后续每条消息是 ["message", channel, payload]
while (true) {
$push = $this->conn->readPush();
$kind = $push[0] ?? null;
if ($kind === 'message') {
// ["message", channel, payload]
$onMessage($push[1], $push[2]);
} elseif ($kind === 'pmessage') {
// 模式订阅: ["pmessage", pattern, channel, payload]
$onMessage($push[2], $push[3]);
}
// subscribe/unsubscribe 确认消息忽略
}
}
public function publish(string $channel, string $message): int
{
return (int) $this->conn->command(['PUBLISH', $channel, $message]);
}
}
Pub/Sub 大白话:
1. 订阅后,这个连接就只能用来收消息了,不能再发普通命令(发了会报错)
2. 所以生产中订阅独占一条连接,普通操作另开连接
3. 服务端推送的格式固定是 ["message", channel, payload]
---
4.10 试试基础功能
examples/basic.php:
<?php
require __DIR__ . '/../vendor/autoload.php';
use App\{Connection, RedisClient};
$redis = new RedisClient(new Connection('127.0.0.1', 6379));
var_dump($redis->ping()); // string(4) "PONG"
var_dump($redis->set('foo', 'bar')); // string(2) "OK"
var_dump($redis->get('foo')); // string(3) "bar"
var_dump($redis->incr('counter')); // int(1)
var_dump($redis->lpush('mylist', 'a', 'b')); // int(2)
var_dump($redis->lrange('mylist', 0, -1)); // array
examples/pipeline.php:
<?php
require __DIR__ . '/../vendor/autoload.php';
use App\{Connection, RedisClient};
$redis = new RedisClient(new Connection());
// 不用 pipeline:1000 次 RTT
$t = microtime(true);
for ($i = 0; $i < 1000; $i++) $redis->set("k:$i", $i);
echo "no-pipeline: " . round((microtime(true)-$t)*1000) . "ms\n";
// 用 pipeline:1 次 RTT
$t = microtime(true);
$pipe = $redis->pipeline();
for ($i = 0; $i < 1000; $i++) $pipe->set("k:$i", $i);
$pipe->exec();
echo "pipeline: " . round((microtime(true)-$t)*1000) . "ms\n";
跨机器测试一般会看到 几十倍到几百倍 的差距。
examples/pubsub.php:
<?php
require __DIR__ . '/../vendor/autoload.php';
use App\{Connection, RedisClient};
$argv[1] ?? die("用法: php pubsub.php sub|pub\n");
if ($argv[1] === 'sub') {
$redis = new RedisClient(new Connection());
$redis->pubSub()->subscribe(['news'], function ($channel, $msg) {
echo "[$channel] $msg\n";
});
} else {
$redis = new RedisClient(new Connection());
$count = $redis->pubSub()->publish('news', 'Hello at ' . date('H:i:s'));
echo "推送给 $count 个订阅者\n";
}
---
五、Redis Cluster 客户端
5.1 Cluster 大白话
Redis Cluster 把所有 key 分到 16384 个 slot,通过 CRC16(key) & 16383 计算。每个节点负责一部分 slot:
Node A: slots 0-5460
Node B: slots 5461-10922
Node C: slots 10923-16383
客户端必须知道路由表,否则随便发会被服务端用 MOVED 重定向。
5.2 MOVED 和 ASK 的区别(关键!)
┌───────────────────────┬───────────────────────────────────────────┬─────────────────────────────────────────────┐
│ 错误 │ 啥时候出现 │ 客户端怎么处理 │
├───────────────────────┼───────────────────────────────────────────┼─────────────────────────────────────────────┤
│ MOVED 12345 │ slot │ 更新本地路由表 + 重定向到目标节点 │
│ 1.2.3.4:6379 │ 永久归属那个节点(我手里的路由表过时了) │ │
├───────────────────────┼───────────────────────────────────────────┼─────────────────────────────────────────────┤
│ ASK 12345 │ slot 正在迁移(部分 key 已经搬走) │ 不更新路由表,只这一次去新节点,且要先发 │
│ 1.2.3.4:6379 │ │ ASKING │
└───────────────────────┴───────────────────────────────────────────┴─────────────────────────────────────────────┘
为什么有 ASK? 想象一下:slot 5000 正在从 A 搬到 B,客户端要 GET foo:
- A 检查:foo 还在我这 → 直接返回
- A 检查:foo 已经搬到 B 了 → 回 ASK ... B:6379
- B 收到 GET foo 不带 ASKING:"这 slot 不归我管,滚" → 回 MOVED A
- B 收到 ASKING + GET foo:"哦你来取暂存的,给你"
如果用 MOVED 处理,客户端会以为 slot 整个搬完了,把还没搬走的 key 也发到 B,B 一脸懵逼。所以用 ASK
表示"这一次破例,但路由表别改"。
5.3 src/Cluster/Crc16.php
<?php
namespace App\Cluster;
class Crc16
{
private const TABLE = [
// CRC16-CCITT (XMODEM) 标准表,256 项
0x0000,0x1021,0x2042,0x3063,0x4084,0x50A5,0x60C6,0x70E7,
0x8108,0x9129,0xA14A,0xB16B,0xC18C,0xD1AD,0xE1CE,0xF1EF,
0x1231,0x0210,0x3273,0x2252,0x52B5,0x4294,0x72F7,0x62D6,
0x9339,0x8318,0xB37B,0xA35A,0xD3BD,0xC39C,0xF3FF,0xE3DE,
0x2462,0x3443,0x0420,0x1401,0x64E6,0x74C7,0x44A4,0x5485,
0xA56A,0xB54B,0x8528,0x9509,0xE5EE,0xF5CF,0xC5AC,0xD58D,
0x3653,0x2672,0x1611,0x0630,0x76D7,0x66F6,0x5695,0x46B4,
0xB75B,0xA77A,0x9719,0x8738,0xF7DF,0xE7FE,0xD79D,0xC7BC,
0x48C4,0x58E5,0x6886,0x78A7,0x0840,0x1861,0x2802,0x3823,
0xC9CC,0xD9ED,0xE98E,0xF9AF,0x8948,0x9969,0xA90A,0xB92B,
0x5AF5,0x4AD4,0x7AB7,0x6A96,0x1A71,0x0A50,0x3A33,0x2A12,
0xDBFD,0xCBDC,0xFBBF,0xEB9E,0x9B79,0x8B58,0xBB3B,0xAB1A,
0x6CA6,0x7C87,0x4CE4,0x5CC5,0x2C22,0x3C03,0x0C60,0x1C41,
0xEDAE,0xFD8F,0xCDEC,0xDDCD,0xAD2A,0xBD0B,0x8D68,0x9D49,
0x7E97,0x6EB6,0x5ED5,0x4EF4,0x3E13,0x2E32,0x1E51,0x0E70,
0xFF9F,0xEFBE,0xDFDD,0xCFFC,0xBF1B,0xAF3A,0x9F59,0x8F78,
0x9188,0x81A9,0xB1CA,0xA1EB,0xD10C,0xC12D,0xF14E,0xE16F,
0x1080,0x00A1,0x30C2,0x20E3,0x5004,0x4025,0x7046,0x6067,
0x83B9,0x9398,0xA3FB,0xB3DA,0xC33D,0xD31C,0xE37F,0xF35E,
0x02B1,0x1290,0x22F3,0x32D2,0x4235,0x5214,0x6277,0x7256,
0xB5EA,0xA5CB,0x95A8,0x8589,0xF56E,0xE54F,0xD52C,0xC50D,
0x34E2,0x24C3,0x14A0,0x0481,0x7466,0x6447,0x5424,0x4405,
0xA7DB,0xB7FA,0x8799,0x97B8,0xE75F,0xF77E,0xC71D,0xD73C,
0x26D3,0x36F2,0x0691,0x16B0,0x6657,0x7676,0x4615,0x5634,
0xD94C,0xC96D,0xF90E,0xE92F,0x99C8,0x89E9,0xB98A,0xA9AB,
0x5844,0x4865,0x7806,0x6827,0x18C0,0x08E1,0x3882,0x28A3,
0xCB7D,0xDB5C,0xEB3F,0xFB1E,0x8BF9,0x9BD8,0xABBB,0xBB9A,
0x4A75,0x5A54,0x6A37,0x7A16,0x0AF1,0x1AD0,0x2AB3,0x3A92,
0xFD2E,0xED0F,0xDD6C,0xCD4D,0xBDAA,0xAD8B,0x9DE8,0x8DC9,
0x7C26,0x6C07,0x5C64,0x4C45,0x3CA2,0x2C83,0x1CE0,0x0CC1,
0xEF1F,0xFF3E,0xCF5D,0xDF7C,0xAF9B,0xBFBA,0x8FD9,0x9FF8,
0x6E17,0x7E36,0x4E55,0x5E74,0x2E93,0x3EB2,0x0ED1,0x1EF0,
];
public static function compute(string $data): int
{
$crc = 0;
$len = strlen($data);
for ($i = 0; $i < $len; $i++) {
$crc = (($crc << 8) & 0xFF00) ^ self::TABLE[(($crc >> 8) ^ ord($data[$i])) & 0xFF];
}
return $crc & 0xFFFF;
}
/** 算 slot,处理 hash tag {xxx} */
public static function slot(string $key): int
{
// hash tag: 只对花括号里的部分计算 slot,这样可以保证某些 key 落在同一 slot
$start = strpos($key, '{');
if ($start !== false) {
$end = strpos($key, '}', $start + 1);
if ($end !== false && $end > $start + 1) {
$key = substr($key, $start + 1, $end - $start - 1);
}
}
return self::compute($key) & 16383;
}
}
Hash Tag 大白话:user:{1000}:profile 和 user:{1000}:orders 因为有 {1000},会落在同一 slot,可以一起做 MGET 和事务。
5.4 src/Cluster/SlotMap.php
<?php
namespace App\Cluster;
use App\{Connection, RedisClient};
class SlotMap
{
/** @var array<int, string> slot => "host:port" */
private array $slots = [];
/** @var array<string, Connection> 节点连接缓存 */
private array $connections = [];
/** 通过 CLUSTER SLOTS 命令拉取最新路由表 */
public function refresh(Connection $seed): void
{
$client = new RedisClient($seed);
$resp = $client->command(['CLUSTER', 'SLOTS']);
// 返回格式: [[start, end, [host, port, id], [replica1...]], ...]
$newSlots = [];
foreach ($resp as $range) {
[$start, $end, $master] = $range;
$node = $master[0] . ':' . $master[1];
for ($s = $start; $s <= $end; $s++) {
$newSlots[$s] = $node;
}
}
$this->slots = $newSlots;
}
public function nodeForSlot(int $slot): ?string
{
return $this->slots[$slot] ?? null;
}
public function setSlot(int $slot, string $node): void
{
$this->slots[$slot] = $node;
}
public function connection(string $node, ?string $password = null): Connection
{
if (!isset($this->connections[$node])) {
[$host, $port] = explode(':', $node);
$this->connections[$node] = new Connection($host, (int)$port, password: $password);
}
return $this->connections[$node];
}
public function nodes(): array
{
return array_unique(array_values($this->slots));
}
}
5.5 src/Cluster/RedisCluster.php —— 主角
<?php
namespace App\Cluster;
use App\{Connection, RedisException};
class RedisCluster
{
private SlotMap $slotMap;
private const MAX_REDIRECTS = 5;
/**
* @param array $seeds [["host"=>"127.0.0.1","port"=>7001], ...]
*/
public function __construct(
private array $seeds,
private ?string $password = null
) {
$this->slotMap = new SlotMap();
$this->initialize();
}
private function initialize(): void
{
foreach ($this->seeds as $seed) {
try {
$conn = new Connection($seed['host'], $seed['port'], password: $this->password);
$this->slotMap->refresh($conn);
return;
} catch (\Throwable $e) {
continue; // 这个种子挂了,试下一个
}
}
throw new \RuntimeException('所有种子节点都连不上');
}
public function __call(string $name, array $args): mixed
{
return $this->command([strtoupper($name), ...$args]);
}
public function command(array $args): mixed
{
// 第一个参数后面就是 key(简化:支持 SET/GET 等 key 在 args[1] 的命令)
$key = $args[1] ?? null;
if ($key === null) {
throw new \RuntimeException('Cluster 模式需要 key');
}
$slot = Crc16::slot((string) $key);
return $this->executeWithRedirect($args, $slot);
}
private function executeWithRedirect(array $args, int $slot, int $redirectCount = 0, ?string $askNode = null):
mixed
{
if ($redirectCount > self::MAX_REDIRECTS) {
throw new \RuntimeException('重定向次数过多');
}
$node = $askNode ?? $this->slotMap->nodeForSlot($slot);
if ($node === null) {
// 路由表里没这个 slot,刷新一下
$this->initialize();
$node = $this->slotMap->nodeForSlot($slot);
if ($node === null) {
throw new \RuntimeException("找不到 slot=$slot 的节点");
}
}
$conn = $this->slotMap->connection($node, $this->password);
try {
// 如果是 ASK 重定向,要先发 ASKING
if ($askNode !== null) {
$conn->command(['ASKING']);
}
return $conn->command($args);
} catch (RedisException $e) {
$msg = $e->getMessage();
// MOVED 12345 1.2.3.4:6379
if (str_starts_with($msg, 'MOVED ')) {
[, $movedSlot, $newNode] = explode(' ', $msg, 3);
$this->slotMap->setSlot((int)$movedSlot, $newNode); // 永久更新
return $this->executeWithRedirect($args, (int)$movedSlot, $redirectCount + 1);
}
// ASK 12345 1.2.3.4:6379
if (str_starts_with($msg, 'ASK ')) {
[, , $newNode] = explode(' ', $msg, 3);
// 注意:不更新 slotMap,只这一次去新节点
return $this->executeWithRedirect($args, $slot, $redirectCount + 1, askNode: $newNode);
}
// CLUSTERDOWN / TRYAGAIN: 短暂重试
if (str_starts_with($msg, 'CLUSTERDOWN') || str_starts_with($msg, 'TRYAGAIN')) {
usleep(100_000); // 等 100ms
return $this->executeWithRedirect($args, $slot, $redirectCount + 1);
}
throw $e;
}
}
}
核心逻辑总结:
1. crc16(key) & 16383 算 slot
2. 查本地路由表找到节点
3. 发命令
4. 如果回 MOVED:更新路由表 + 去新节点重发
5. 如果回 ASK:不更新路由表,带 ASKING 去新节点重发
6. 都不是就正常返回/抛错
5.6 试试 Cluster
examples/cluster.php:
<?php
require __DIR__ . '/../vendor/autoload.php';
use App\Cluster\{RedisCluster, Crc16};
// 假设你本地起了 6 节点 cluster (3 主 3 从)
$cluster = new RedisCluster([
['host' => '127.0.0.1', 'port' => 7001],
['host' => '127.0.0.1', 'port' => 7002],
['host' => '127.0.0.1', 'port' => 7003],
]);
// 看看不同 key 落在哪个 slot
foreach (['foo', 'bar', 'user:1000', 'user:{1000}:orders'] as $k) {
echo "key=$k slot=" . Crc16::slot($k) . "\n";
}
$cluster->set('foo', 'hello');
echo $cluster->get('foo') . "\n";
$cluster->mset('a', '1', 'b', '2'); // 注意:跨 slot 会失败,要用 hash tag
本地起一个测试 Cluster 最快的办法:
# 用 redis-cli --cluster 自动起 6 节点
docker run -d -p 7001-7006:7001-7006 grokzen/redis-cluster:7.2.0
---
六、生产最佳实践 / 用什么库
┌──────────────────┬──────────────────────────────────────────────────────────────────────────────────────┐
│ 需求 │ 推荐 │
├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
│ 生产首选 │ predis/predis(纯 PHP,Cluster/Sentinel/Pipeline 全支持,作者就是 Redis 官方文档贡献者) │
├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
│ 极致性能 │ ext-redis(C 扩展,比 predis 快 5-10 倍,但 API 略土) │
├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
│ Swoole 协程环境 │ Swoole\Coroutine\Redis 或用 Swoole hook + ext-redis │
├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
│ Hyperf 框架 │ hyperf/redis(基于 ext-redis + 协程池) │
├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
│ 想看好代码 │ 读 predis 源码,架构非常清晰(Connection/Command/Profile 三层) │
├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
│ RESP3 + 现代特性 │ predis 2.x 已支持;ext-redis 6.x 已支持 │
└──────────────────┴──────────────────────────────────────────────────────────────────────────────────────┘
推荐用法对比
// 1. predis (生态最全,纯 PHP,Composer 装)
composer require predis/predis
$client = new Predis\Client('tcp://127.0.0.1:6379');
// 2. ext-redis (性能最高,需要装扩展)
$client = new Redis();
$client->connect('127.0.0.1', 6379);
// 3. Cluster (predis 直接支持)
$cluster = new Predis\Client([
'tcp://127.0.0.1:7001',
'tcp://127.0.0.1:7002',
'tcp://127.0.0.1:7003',
], ['cluster' => 'redis']);
选型决策树
单机 + 普通 PHP-FPM → ext-redis(快,稳)
要 Cluster + 复杂特性 → predis
Swoole/Hyperf 协程项目 → Swoole\Coroutine\Redis 或 hyperf/redis
学习/Demo → 你自己手写的(就是这篇代码)
---
七、常见坑(都是踩过的)
1. Pub/Sub 连接独占 —— 订阅后这条连接不能再发普通命令,生产中要分两条连接
2. Pipeline 不是事务 —— 中间命令失败不会回滚,要事务用 MULTI/EXEC(也支持 pipeline 化)
3. 跨 slot 操作 —— MGET k1 k2 k3 在 cluster 下,如果 k1/k2/k3 不在同一 slot 会报错。用 hash tag 强制同 slot
4. 大 key 阻塞 —— KEYS *、SMEMBERS huge_set 在生产是大忌,用 SCAN/SSCAN
5. 超时设置 —— connectTimeout 1-2 秒,readTimeout 视命令而定(BLPOP 要更长)
6. 断线重连 —— Connection 被关后第一次 command 应自动重连,但 Pub/Sub 状态会丢,要业务自己重订阅
7. MOVED 风暴 —— 集群滚动重启时,客户端不缓存路由会被 MOVED 打爆。我们的实现是收到 MOVED 立刻更新表
8. CRC16 表错 —— 必须用 CCITT 标准表,网上有错版本会让你 debug 一周
---
八、面试 / 进阶考点
1. RESP 为什么不用二进制更紧凑? —— 文本可读,nc/telnet 调试方便,损失一点带宽换易用性
2. Pipeline 和 MULTI 区别? —— Pipeline 只是批量发送,服务端单独执行;MULTI 是原子事务
3. MOVED vs ASK? —— MOVED 永久迁移更新路由表;ASK 临时迁移不更新
4. 为啥是 16384 个 slot 不是 65536? —— 心跳包要带 slot bitmap,16384 位 = 2KB,够小;65536 = 8KB 浪费
5. 客户端怎么发现新节点? —— CLUSTER SLOTS/CLUSTER NODES,生产中收到 MOVED 触发刷新
6. Pub/Sub 在 Cluster 怎么工作? —— 默认全局广播(性能差);7.0+ 有 Sharded Pub/Sub,按 channel 算 slot
更多推荐
所有评论(0)