Redis 协议层深度解析:从 RESP 协议到手写 Cluster 客户端

  延续上次风格,先讲透协议,再上能跑的完整代码,最后告诉你生产怎么选。

  ---
  一、为什么要懂 RESP?

  平时你 $redis->get('foo'),扩展把这一切包好了。但下面这些事不懂协议永远玩不转:

  - 怎么实现 Pipeline 让 1000 次操作只花 1 次 RTT?
  - Pub/Sub 为啥不能和普通命令混用?
  - MOVED/ASK 重定向到底怎么处理?
  - 为啥 Cluster 客户端要维护 16384 个 slot 的路由表?
  - 大 value 为什么要用 RESP3 的 streaming?

  一句话:Redis 客户端的本质就是 TCP 长连接 + RESP 编解码 + Cluster 路由,没了。

  ---
  二、RESP 协议大白话

  2.1 协议版本

  ┌───────┬────────────┬──────────────────────────────────────────┐
  │ 版本  │ Redis 版本 │                   特点                   │
  ├───────┼────────────┼──────────────────────────────────────────┤
  │ RESP2 │ 2.0+       │ 5 种类型,文本可读                        │
  ├───────┼────────────┼──────────────────────────────────────────┤
  │ RESP3 │ 6.0+       │ 14 种类型,支持 Map/Set/Push,推荐新项目用 │
  └───────┴────────────┴──────────────────────────────────────────┘

  我们主要讲 RESP2(99% 场景够用),最后简单讲 RESP3 增量。

  2.2 五种数据类型(RESP2)

  每个回复第一个字节决定类型,后面跟内容,全部以 \r\n 结尾。

  ┌────────┬───────────────────────────────┬──────────────────────────────────┐
  │ 首字节 │             类型              │               例子               │
  ├────────┼───────────────────────────────┼──────────────────────────────────┤
  │ +      │ Simple String(简单字符串)     │ +OK\r\n                          │
  ├────────┼───────────────────────────────┼──────────────────────────────────┤
  │ -      │ Error(错误)-ERR unknown\r\n                 │
  ├────────┼───────────────────────────────┼──────────────────────────────────┤
  │ :      │ Integer(整数)                 │ :1000\r\n                        │
  ├────────┼───────────────────────────────┼──────────────────────────────────┤
  │ $      │ Bulk String(二进制安全字符串)$5\r\nhello\r\n                  │
  ├────────┼───────────────────────────────┼──────────────────────────────────┤
  │ *      │ Array(数组)                   │ *2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n │
  └────────┴───────────────────────────────┴──────────────────────────────────┘

  Bulk String 大白话:
  $5\r\nhello\r\n
   ↑           ↑
   长度=5     正好 5 字节
  为什么要带长度?因为 value 里可能有 \r\n,光靠分隔符切不开。

  Array 嵌套:
  *3\r\n           ← 3 个元素
  :1\r\n           ← 元素1: 整数 1
  $3\r\nfoo\r\n    ← 元素2: 字符串 "foo"
  *2\r\n           ← 元素3: 嵌套数组,2 个元素
  :10\r\n
  :20\r\n

  2.3 客户端发命令的格式 —— 也是 Array

  你写 SET foo bar,客户端实际发出去的是:
  *3\r\n
  $3\r\nSET\r\n
  $3\r\nfoo\r\n
  $3\r\nbar\r\n

  就是把命令当成字符串数组发。这设计极简,Redis 解析超快。

  2.4nc 手动玩 RESP

  $ printf '*1\r\n$4\r\nPING\r\n' | nc localhost 6379
  +PONG

  恭喜,你刚刚没有用任何客户端就和 Redis 对话了。

  2.5 RESP3 增量(简介)

  % Map     %2\r\n+name\r\n+alice\r\n+age\r\n:30\r\n
  ~ Set     ~3\r\n:1\r\n:2\r\n:3\r\n
  > Push    >2\r\n+pubsub\r\n+message\r\n  ← Pub/Sub 推送
  ( BigNum  (1234567890123456789012345\r\n
  _ Null    _\r\n
  # Bool    #t\r\n / #f\r\n
  , Double  ,3.14\r\n

  最有用的是 > Push 类型,Pub/Sub 消息和命令响应不会再混在一起(RESP2 时代靠"模式"区分,容易出 bug)。

  ---
  三、整体架构

  ┌────────────────────────────────────────────┐
  │           RedisClient (facade)             │
  │  ┌──────────────────────────────────────┐  │
  │  │ __call('get', ['foo']) → command()   │  │
  │  └──────────────────────────────────────┘  │
  └──────────┬─────────────────────────────────┘
             │
      ┌──────▼──────┐    ┌─────────────┐    ┌──────────┐
      │  RespCodec  │    │  Connection │    │ Pipeline │
      │  encode/    │    │  socket I/O │    │  buffer  │
      │  decode     │    │  + reconnect│    │  + flush │
      └─────────────┘    └─────────────┘    └──────────┘

  ┌────────────────────────────────────────────┐
  │         RedisCluster (facade)              │
  │  ┌──────────────────────────────────────┐  │
  │  │ slot = crc16(key) & 16383            │  │
  │  │ node = slotMap[slot]                 │  │
  │  │ if MOVED: 更新路由表 + 重定向        │  │
  │  │ if ASK:   一次性 ASKING + 重定向     │  │
  │  └──────────────────────────────────────┘  │
  └────────────────────────────────────────────┘

  ---
  四、完整代码

  4.1 项目结构

  mini-redis/
  ├── composer.json
  ├── src/
  │   ├── Resp/
  │   │   ├── RespEncoder.php
  │   │   ├── RespDecoder.php
  │   │   └── RespType.php
  │   ├── Connection.php
  │   ├── RedisClient.php
  │   ├── Pipeline.php
  │   ├── PubSub.php
  │   └── Cluster/
  │       ├── Crc16.php
  │       ├── SlotMap.php
  │       └── RedisCluster.php
  └── examples/
      ├── basic.php
      ├── pipeline.php
      ├── pubsub.php
      └── cluster.php

  4.2 composer.json

  {
      "require": {
          "php": ">=8.2"
      },
      "autoload": {
          "psr-4": { "App\\": "src/" }
      }
  }

  ▎ 我们不依赖任何 Redis 库,从 socket 开始。完成后会告诉你生产用什么库。

  ---
  4.3 src/Resp/RespEncoder.php —— 编码器

  <?php
  namespace App\Resp;

  class RespEncoder
  {
      /** 把一个命令编码成 RESP 报文,例如 ['SET','foo','bar'] -> "*3\r\n$3\r\nSET\r\n..." */
      public function encode(array $args): string
      {
          $out = '*' . count($args) . "\r\n";
          foreach ($args as $a) {
              $a = (string) $a;
              $out .= '$' . strlen($a) . "\r\n" . $a . "\r\n";
          }
          return $out;
      }

      /** Pipeline:多个命令拼一起一次发送 */
      public function encodeMany(array $commands): string
      {
          $out = '';
          foreach ($commands as $cmd) {
              $out .= $this->encode($cmd);
          }
          return $out;
      }
  }

  就这么简单。Encoder 没什么花头,关键是 Decoder。

  ---
  4.4 src/Resp/RespType.php

  <?php
  namespace App\Resp;

  class RespError
  {
      public function __construct(public string $message) {}
  }

  class RespIncomplete {}  // 数据没收完,告诉调用方继续读

  为什么要 RespIncomplete? 因为 TCP 是流,可能你只读到半个回复,Decoder 必须能说"还不够",而不是抛错。

  ---
  4.5 src/Resp/RespDecoder.php —— 解码器(核心)

  <?php
  namespace App\Resp;

  class RespDecoder
  {
      private string $buffer = '';

      public function feed(string $data): void
      {
          $this->buffer .= $data;
      }

      /** 尝试解析一个完整回复;不够就返回 RespIncomplete */
      public function parse(): mixed
      {
          $offset = 0;
          $result = $this->parseAt($offset);
          if ($result instanceof RespIncomplete) {
              return $result;
          }
          // 成功解析,把已消费的部分丢掉
          $this->buffer = substr($this->buffer, $offset);
          return $result;
      }

      private function parseAt(int &$offset): mixed
      {
          if ($offset >= strlen($this->buffer)) {
              return new RespIncomplete();
          }

          $type = $this->buffer[$offset];
          $offset++;

          return match ($type) {
              '+' => $this->parseSimpleString($offset),
              '-' => $this->parseError($offset),
              ':' => $this->parseInteger($offset),
              '$' => $this->parseBulkString($offset),
              '*' => $this->parseArray($offset),
              default => throw new \RuntimeException("不认识的类型: $type at offset " . ($offset - 1)),
          };
      }

      private function readLine(int &$offset): string|RespIncomplete
      {
          $pos = strpos($this->buffer, "\r\n", $offset);
          if ($pos === false) return new RespIncomplete();
          $line = substr($this->buffer, $offset, $pos - $offset);
          $offset = $pos + 2;
          return $line;
      }

      private function parseSimpleString(int &$offset): string|RespIncomplete
      {
          return $this->readLine($offset);
      }

      private function parseError(int &$offset): RespError|RespIncomplete
      {
          $line = $this->readLine($offset);
          return $line instanceof RespIncomplete ? $line : new RespError($line);
      }

      private function parseInteger(int &$offset): int|RespIncomplete
      {
          $line = $this->readLine($offset);
          return $line instanceof RespIncomplete ? $line : (int) $line;
      }

      private function parseBulkString(int &$offset): string|null|RespIncomplete
      {
          $lenLine = $this->readLine($offset);
          if ($lenLine instanceof RespIncomplete) return $lenLine;

          $length = (int) $lenLine;
          if ($length === -1) return null;  // $-1\r\n 表示 nil

          // 还要读 length 字节 + \r\n
          if (strlen($this->buffer) < $offset + $length + 2) {
              return new RespIncomplete();
          }
          $value  = substr($this->buffer, $offset, $length);
          $offset += $length + 2;  // 跳过 \r\n
          return $value;
      }

      private function parseArray(int &$offset): array|null|RespIncomplete
      {
          $lenLine = $this->readLine($offset);
          if ($lenLine instanceof RespIncomplete) return $lenLine;

          $count = (int) $lenLine;
          if ($count === -1) return null;

          $items = [];
          for ($i = 0; $i < $count; $i++) {
              $item = $this->parseAt($offset);
              if ($item instanceof RespIncomplete) return $item;
              $items[] = $item;
          }
          return $items;
      }
  }

  大白话:
  1. 第一字节决定类型,分发到对应解析函数
  2. 每次都用引用 &$offset 推进游标
  3. 任何一步发现 buffer 不够,立刻返回 RespIncomplete,外层啥都不变(关键!)
  4. Array 是递归解析,自然支持无限嵌套

  ---
  4.6 src/Connection.php —— TCP 连接管理

  <?php
  namespace App;

  use App\Resp\{RespEncoder, RespDecoder, RespError, RespIncomplete};

  class Connection
  {
      /** @var resource|null */
      private $sock = null;
      private RespEncoder $encoder;
      private RespDecoder $decoder;

      public function __construct(
          public readonly string $host = '127.0.0.1',
          public readonly int $port = 6379,
          public readonly float $connectTimeout = 1.5,
          public readonly float $readTimeout = 5.0,
          public readonly ?string $password = null,
          public readonly int $database = 0,
      ) {
          $this->encoder = new RespEncoder();
          $this->decoder = new RespDecoder();
      }

      public function connect(): void
      {
          if ($this->sock !== null) return;

          $errno = 0;
          $errstr = '';
          $sock = @stream_socket_client(
              "tcp://{$this->host}:{$this->port}",
              $errno, $errstr, $this->connectTimeout,
              STREAM_CLIENT_CONNECT
          );
          if ($sock === false) {
              throw new \RuntimeException("连接 {$this->host}:{$this->port} 失败: $errstr");
          }
          stream_set_timeout($sock, (int) $this->readTimeout, (int) (($this->readTimeout - (int)$this->readTimeout) *
  1e6));
          $this->sock = $sock;

          if ($this->password) {
              $this->command(['AUTH', $this->password]);
          }
          if ($this->database !== 0) {
              $this->command(['SELECT', (string) $this->database]);
          }
      }

      public function close(): void
      {
          if ($this->sock) {
              @fclose($this->sock);
              $this->sock = null;
          }
      }

      /** 发一个命令,等一个回复 */
      public function command(array $args): mixed
      {
          $this->connect();
          $this->writeAll($this->encoder->encode($args));
          return $this->readReply();
      }

      /** Pipeline:发 N 个命令,读 N 个回复 */
      public function commandPipeline(array $commands): array
      {
          $this->connect();
          $this->writeAll($this->encoder->encodeMany($commands));

          $replies = [];
          for ($i = 0, $n = count($commands); $i < $n; $i++) {
              $replies[] = $this->readReply();
          }
          return $replies;
      }

      /** 读一个完整回复(可能要 read 多次) */
      public function readReply(): mixed
      {
          while (true) {
              $reply = $this->decoder->parse();
              if (!($reply instanceof RespIncomplete)) {
                  if ($reply instanceof RespError) {
                      throw new RedisException($reply->message);
                  }
                  return $reply;
              }
              // 数据不够,继续读
              $chunk = @fread($this->sock, 8192);
              if ($chunk === false || $chunk === '') {
                  $this->close();
                  throw new \RuntimeException("连接被关闭或读超时");
              }
              $this->decoder->feed($chunk);
          }
      }

      private function writeAll(string $data): void
      {
          $total = strlen($data);
          $written = 0;
          while ($written < $total) {
              $n = @fwrite($this->sock, substr($data, $written));
              if ($n === false || $n === 0) {
                  $this->close();
                  throw new \RuntimeException("写入失败");
              }
              $written += $n;
          }
      }

      /** Pub/Sub 用:不发命令直接读下一个推送 */
      public function readPush(): array
      {
          $reply = $this->readReply();
          if (!is_array($reply)) {
              throw new \RuntimeException("期望推送数组,收到 " . gettype($reply));
          }
          return $reply;
      }
  }

  class RedisException extends \RuntimeException {}

  几个生产级要点:
  1. writeAll 循环写,不能假设 fwrite 一次写完
  2. readReply 循环读,不能假设 fread 一次读到完整回复
  3. 错误回复(-ERR)直接抛异常,业务用 try/catch
  4. 连接出错就关掉,下次自动重连

  ---
  4.7 src/RedisClient.php —— 用户门面

  <?php
  namespace App;

  class RedisClient
  {
      public function __construct(public Connection $conn) {}

      /** 魔术方法:$redis->get('foo') 等同于 ->command(['GET', 'foo']) */
      public function __call(string $name, array $args): mixed
      {
          return $this->conn->command([strtoupper($name), ...$args]);
      }

      /** 显式版本 */
      public function command(array $args): mixed
      {
          return $this->conn->command($args);
      }

      public function pipeline(): Pipeline
      {
          return new Pipeline($this->conn);
      }

      public function pubSub(): PubSub
      {
          return new PubSub($this->conn);
      }
  }

  ---
  4.8 src/Pipeline.php —— 批量命令

  <?php
  namespace App;

  class Pipeline
  {
      private array $commands = [];

      public function __construct(private Connection $conn) {}

      public function __call(string $name, array $args): self
      {
          $this->commands[] = [strtoupper($name), ...$args];
          return $this;  // 链式
      }

      public function exec(): array
      {
          if (empty($this->commands)) return [];
          $replies = $this->conn->commandPipeline($this->commands);
          $this->commands = [];
          return $replies;
      }
  }

  Pipeline 大白话:
  - 普通模式:1000 个 GET → 1000 次 RTT
  - Pipeline:1000 个命令一次发出 → 1 次 RTT,服务端按顺序回 1000 个回复
  - 同机房 RTT 0.1ms,跨机房 50ms,效果天差地别

  ⚠️注意:Pipeline 不是事务,中间命令的结果不能影响后面命令(因为 Redis 是收到全部命令后才一起执行)。要事务用 MULTI/EXEC。

  ---
  4.9 src/PubSub.php —— 发布订阅

  <?php
  namespace App;

  class PubSub
  {
      public function __construct(private Connection $conn) {}

      /**
       * 订阅一个或多个频道,阻塞接收消息
       * @param string[] $channels
       * @param callable $onMessage fn(string $channel, string $message) => void
       */
      public function subscribe(array $channels, callable $onMessage): void
      {
          $this->conn->command(['SUBSCRIBE', ...$channels]);

          // SUBSCRIBE 命令本身会返回 N 个 ["subscribe", channel, count] 确认
          // 后续每条消息是 ["message", channel, payload]
          while (true) {
              $push = $this->conn->readPush();
              $kind = $push[0] ?? null;

              if ($kind === 'message') {
                  // ["message", channel, payload]
                  $onMessage($push[1], $push[2]);
              } elseif ($kind === 'pmessage') {
                  // 模式订阅: ["pmessage", pattern, channel, payload]
                  $onMessage($push[2], $push[3]);
              }
              // subscribe/unsubscribe 确认消息忽略
          }
      }

      public function publish(string $channel, string $message): int
      {
          return (int) $this->conn->command(['PUBLISH', $channel, $message]);
      }
  }

  Pub/Sub 大白话:
  1. 订阅后,这个连接就只能用来收消息了,不能再发普通命令(发了会报错)
  2. 所以生产中订阅独占一条连接,普通操作另开连接
  3. 服务端推送的格式固定是 ["message", channel, payload]

  ---
  4.10 试试基础功能

  examples/basic.php:

  <?php
  require __DIR__ . '/../vendor/autoload.php';

  use App\{Connection, RedisClient};

  $redis = new RedisClient(new Connection('127.0.0.1', 6379));

  var_dump($redis->ping());                    // string(4) "PONG"
  var_dump($redis->set('foo', 'bar'));         // string(2) "OK"
  var_dump($redis->get('foo'));                // string(3) "bar"
  var_dump($redis->incr('counter'));           // int(1)
  var_dump($redis->lpush('mylist', 'a', 'b')); // int(2)
  var_dump($redis->lrange('mylist', 0, -1));   // array

  examples/pipeline.php:

  <?php
  require __DIR__ . '/../vendor/autoload.php';
  use App\{Connection, RedisClient};

  $redis = new RedisClient(new Connection());

  // 不用 pipeline:1000 次 RTT
  $t = microtime(true);
  for ($i = 0; $i < 1000; $i++) $redis->set("k:$i", $i);
  echo "no-pipeline: " . round((microtime(true)-$t)*1000) . "ms\n";

  // 用 pipeline:1 次 RTT
  $t = microtime(true);
  $pipe = $redis->pipeline();
  for ($i = 0; $i < 1000; $i++) $pipe->set("k:$i", $i);
  $pipe->exec();
  echo "pipeline:    " . round((microtime(true)-$t)*1000) . "ms\n";

  跨机器测试一般会看到 几十倍到几百倍 的差距。

  examples/pubsub.php:

  <?php
  require __DIR__ . '/../vendor/autoload.php';
  use App\{Connection, RedisClient};

  $argv[1] ?? die("用法: php pubsub.php sub|pub\n");

  if ($argv[1] === 'sub') {
      $redis = new RedisClient(new Connection());
      $redis->pubSub()->subscribe(['news'], function ($channel, $msg) {
          echo "[$channel] $msg\n";
      });
  } else {
      $redis = new RedisClient(new Connection());
      $count = $redis->pubSub()->publish('news', 'Hello at ' . date('H:i:s'));
      echo "推送给 $count 个订阅者\n";
  }

  ---
  五、Redis Cluster 客户端

  5.1 Cluster 大白话

  Redis Cluster 把所有 key 分到 16384 个 slot,通过 CRC16(key) & 16383 计算。每个节点负责一部分 slot:

  Node A: slots 0-5460
  Node B: slots 5461-10922
  Node C: slots 10923-16383

  客户端必须知道路由表,否则随便发会被服务端用 MOVED 重定向。

  5.2 MOVED 和 ASK 的区别(关键!)

  ┌───────────────────────┬───────────────────────────────────────────┬─────────────────────────────────────────────┐
  │         错误          │                啥时候出现                 │               客户端怎么处理                │
  ├───────────────────────┼───────────────────────────────────────────┼─────────────────────────────────────────────┤
  │ MOVED 12345           │ slot                                      │ 更新本地路由表 + 重定向到目标节点           │
  │ 1.2.3.4:6379          │ 永久归属那个节点(我手里的路由表过时了)    │                                             │
  ├───────────────────────┼───────────────────────────────────────────┼─────────────────────────────────────────────┤
  │ ASK 12345             │ slot 正在迁移(部分 key 已经搬走)          │ 不更新路由表,只这一次去新节点,且要先发      │
  │ 1.2.3.4:6379          │                                           │ ASKING                                      │
  └───────────────────────┴───────────────────────────────────────────┴─────────────────────────────────────────────┘

  为什么有 ASK? 想象一下:slot 5000 正在从 A 搬到 B,客户端要 GET foo:
  - A 检查:foo 还在我这 → 直接返回
  - A 检查:foo 已经搬到 B 了 → 回 ASK ... B:6379
  - B 收到 GET foo 不带 ASKING:"这 slot 不归我管,滚" → 回 MOVED A
  - B 收到 ASKING + GET foo:"哦你来取暂存的,给你"

  如果用 MOVED 处理,客户端会以为 slot 整个搬完了,把还没搬走的 key 也发到 B,B 一脸懵逼。所以用 ASK
  表示"这一次破例,但路由表别改"5.3 src/Cluster/Crc16.php

  <?php
  namespace App\Cluster;

  class Crc16
  {
      private const TABLE = [
          // CRC16-CCITT (XMODEM) 标准表,256 项
          0x0000,0x1021,0x2042,0x3063,0x4084,0x50A5,0x60C6,0x70E7,
          0x8108,0x9129,0xA14A,0xB16B,0xC18C,0xD1AD,0xE1CE,0xF1EF,
          0x1231,0x0210,0x3273,0x2252,0x52B5,0x4294,0x72F7,0x62D6,
          0x9339,0x8318,0xB37B,0xA35A,0xD3BD,0xC39C,0xF3FF,0xE3DE,
          0x2462,0x3443,0x0420,0x1401,0x64E6,0x74C7,0x44A4,0x5485,
          0xA56A,0xB54B,0x8528,0x9509,0xE5EE,0xF5CF,0xC5AC,0xD58D,
          0x3653,0x2672,0x1611,0x0630,0x76D7,0x66F6,0x5695,0x46B4,
          0xB75B,0xA77A,0x9719,0x8738,0xF7DF,0xE7FE,0xD79D,0xC7BC,
          0x48C4,0x58E5,0x6886,0x78A7,0x0840,0x1861,0x2802,0x3823,
          0xC9CC,0xD9ED,0xE98E,0xF9AF,0x8948,0x9969,0xA90A,0xB92B,
          0x5AF5,0x4AD4,0x7AB7,0x6A96,0x1A71,0x0A50,0x3A33,0x2A12,
          0xDBFD,0xCBDC,0xFBBF,0xEB9E,0x9B79,0x8B58,0xBB3B,0xAB1A,
          0x6CA6,0x7C87,0x4CE4,0x5CC5,0x2C22,0x3C03,0x0C60,0x1C41,
          0xEDAE,0xFD8F,0xCDEC,0xDDCD,0xAD2A,0xBD0B,0x8D68,0x9D49,
          0x7E97,0x6EB6,0x5ED5,0x4EF4,0x3E13,0x2E32,0x1E51,0x0E70,
          0xFF9F,0xEFBE,0xDFDD,0xCFFC,0xBF1B,0xAF3A,0x9F59,0x8F78,
          0x9188,0x81A9,0xB1CA,0xA1EB,0xD10C,0xC12D,0xF14E,0xE16F,
          0x1080,0x00A1,0x30C2,0x20E3,0x5004,0x4025,0x7046,0x6067,
          0x83B9,0x9398,0xA3FB,0xB3DA,0xC33D,0xD31C,0xE37F,0xF35E,
          0x02B1,0x1290,0x22F3,0x32D2,0x4235,0x5214,0x6277,0x7256,
          0xB5EA,0xA5CB,0x95A8,0x8589,0xF56E,0xE54F,0xD52C,0xC50D,
          0x34E2,0x24C3,0x14A0,0x0481,0x7466,0x6447,0x5424,0x4405,
          0xA7DB,0xB7FA,0x8799,0x97B8,0xE75F,0xF77E,0xC71D,0xD73C,
          0x26D3,0x36F2,0x0691,0x16B0,0x6657,0x7676,0x4615,0x5634,
          0xD94C,0xC96D,0xF90E,0xE92F,0x99C8,0x89E9,0xB98A,0xA9AB,
          0x5844,0x4865,0x7806,0x6827,0x18C0,0x08E1,0x3882,0x28A3,
          0xCB7D,0xDB5C,0xEB3F,0xFB1E,0x8BF9,0x9BD8,0xABBB,0xBB9A,
          0x4A75,0x5A54,0x6A37,0x7A16,0x0AF1,0x1AD0,0x2AB3,0x3A92,
          0xFD2E,0xED0F,0xDD6C,0xCD4D,0xBDAA,0xAD8B,0x9DE8,0x8DC9,
          0x7C26,0x6C07,0x5C64,0x4C45,0x3CA2,0x2C83,0x1CE0,0x0CC1,
          0xEF1F,0xFF3E,0xCF5D,0xDF7C,0xAF9B,0xBFBA,0x8FD9,0x9FF8,
          0x6E17,0x7E36,0x4E55,0x5E74,0x2E93,0x3EB2,0x0ED1,0x1EF0,
      ];

      public static function compute(string $data): int
      {
          $crc = 0;
          $len = strlen($data);
          for ($i = 0; $i < $len; $i++) {
              $crc = (($crc << 8) & 0xFF00) ^ self::TABLE[(($crc >> 8) ^ ord($data[$i])) & 0xFF];
          }
          return $crc & 0xFFFF;
      }

      /** 算 slot,处理 hash tag {xxx} */
      public static function slot(string $key): int
      {
          // hash tag: 只对花括号里的部分计算 slot,这样可以保证某些 key 落在同一 slot
          $start = strpos($key, '{');
          if ($start !== false) {
              $end = strpos($key, '}', $start + 1);
              if ($end !== false && $end > $start + 1) {
                  $key = substr($key, $start + 1, $end - $start - 1);
              }
          }
          return self::compute($key) & 16383;
      }
  }

  Hash Tag 大白话:user:{1000}:profile 和 user:{1000}:orders 因为有 {1000},会落在同一 slot,可以一起做 MGET 和事务。

  5.4 src/Cluster/SlotMap.php

  <?php
  namespace App\Cluster;

  use App\{Connection, RedisClient};

  class SlotMap
  {
      /** @var array<int, string> slot => "host:port" */
      private array $slots = [];
      /** @var array<string, Connection> 节点连接缓存 */
      private array $connections = [];

      /** 通过 CLUSTER SLOTS 命令拉取最新路由表 */
      public function refresh(Connection $seed): void
      {
          $client = new RedisClient($seed);
          $resp = $client->command(['CLUSTER', 'SLOTS']);
          // 返回格式: [[start, end, [host, port, id], [replica1...]], ...]

          $newSlots = [];
          foreach ($resp as $range) {
              [$start, $end, $master] = $range;
              $node = $master[0] . ':' . $master[1];
              for ($s = $start; $s <= $end; $s++) {
                  $newSlots[$s] = $node;
              }
          }
          $this->slots = $newSlots;
      }

      public function nodeForSlot(int $slot): ?string
      {
          return $this->slots[$slot] ?? null;
      }

      public function setSlot(int $slot, string $node): void
      {
          $this->slots[$slot] = $node;
      }

      public function connection(string $node, ?string $password = null): Connection
      {
          if (!isset($this->connections[$node])) {
              [$host, $port] = explode(':', $node);
              $this->connections[$node] = new Connection($host, (int)$port, password: $password);
          }
          return $this->connections[$node];
      }

      public function nodes(): array
      {
          return array_unique(array_values($this->slots));
      }
  }

  5.5 src/Cluster/RedisCluster.php —— 主角

  <?php
  namespace App\Cluster;

  use App\{Connection, RedisException};

  class RedisCluster
  {
      private SlotMap $slotMap;
      private const MAX_REDIRECTS = 5;

      /**
       * @param array $seeds [["host"=>"127.0.0.1","port"=>7001], ...]
       */
      public function __construct(
          private array $seeds,
          private ?string $password = null
      ) {
          $this->slotMap = new SlotMap();
          $this->initialize();
      }

      private function initialize(): void
      {
          foreach ($this->seeds as $seed) {
              try {
                  $conn = new Connection($seed['host'], $seed['port'], password: $this->password);
                  $this->slotMap->refresh($conn);
                  return;
              } catch (\Throwable $e) {
                  continue;  // 这个种子挂了,试下一个
              }
          }
          throw new \RuntimeException('所有种子节点都连不上');
      }

      public function __call(string $name, array $args): mixed
      {
          return $this->command([strtoupper($name), ...$args]);
      }

      public function command(array $args): mixed
      {
          // 第一个参数后面就是 key(简化:支持 SET/GET 等 key 在 args[1] 的命令)
          $key = $args[1] ?? null;
          if ($key === null) {
              throw new \RuntimeException('Cluster 模式需要 key');
          }
          $slot = Crc16::slot((string) $key);

          return $this->executeWithRedirect($args, $slot);
      }

      private function executeWithRedirect(array $args, int $slot, int $redirectCount = 0, ?string $askNode = null):
  mixed
      {
          if ($redirectCount > self::MAX_REDIRECTS) {
              throw new \RuntimeException('重定向次数过多');
          }

          $node = $askNode ?? $this->slotMap->nodeForSlot($slot);
          if ($node === null) {
              // 路由表里没这个 slot,刷新一下
              $this->initialize();
              $node = $this->slotMap->nodeForSlot($slot);
              if ($node === null) {
                  throw new \RuntimeException("找不到 slot=$slot 的节点");
              }
          }

          $conn = $this->slotMap->connection($node, $this->password);

          try {
              // 如果是 ASK 重定向,要先发 ASKING
              if ($askNode !== null) {
                  $conn->command(['ASKING']);
              }
              return $conn->command($args);

          } catch (RedisException $e) {
              $msg = $e->getMessage();

              // MOVED 12345 1.2.3.4:6379
              if (str_starts_with($msg, 'MOVED ')) {
                  [, $movedSlot, $newNode] = explode(' ', $msg, 3);
                  $this->slotMap->setSlot((int)$movedSlot, $newNode);  // 永久更新
                  return $this->executeWithRedirect($args, (int)$movedSlot, $redirectCount + 1);
              }

              // ASK 12345 1.2.3.4:6379
              if (str_starts_with($msg, 'ASK ')) {
                  [, , $newNode] = explode(' ', $msg, 3);
                  // 注意:不更新 slotMap,只这一次去新节点
                  return $this->executeWithRedirect($args, $slot, $redirectCount + 1, askNode: $newNode);
              }

              // CLUSTERDOWN / TRYAGAIN: 短暂重试
              if (str_starts_with($msg, 'CLUSTERDOWN') || str_starts_with($msg, 'TRYAGAIN')) {
                  usleep(100_000);  // 等 100ms
                  return $this->executeWithRedirect($args, $slot, $redirectCount + 1);
              }

              throw $e;
          }
      }
  }

  核心逻辑总结:
  1. crc16(key) & 16383 算 slot
  2. 查本地路由表找到节点
  3. 发命令
  4. 如果回 MOVED:更新路由表 + 去新节点重发
  5. 如果回 ASK:不更新路由表,带 ASKING 去新节点重发
  6. 都不是就正常返回/抛错

  5.6 试试 Cluster

  examples/cluster.php:

  <?php
  require __DIR__ . '/../vendor/autoload.php';
  use App\Cluster\{RedisCluster, Crc16};

  // 假设你本地起了 6 节点 cluster (33)
  $cluster = new RedisCluster([
      ['host' => '127.0.0.1', 'port' => 7001],
      ['host' => '127.0.0.1', 'port' => 7002],
      ['host' => '127.0.0.1', 'port' => 7003],
  ]);

  // 看看不同 key 落在哪个 slot
  foreach (['foo', 'bar', 'user:1000', 'user:{1000}:orders'] as $k) {
      echo "key=$k slot=" . Crc16::slot($k) . "\n";
  }

  $cluster->set('foo', 'hello');
  echo $cluster->get('foo') . "\n";

  $cluster->mset('a', '1', 'b', '2');  // 注意:跨 slot 会失败,要用 hash tag

  本地起一个测试 Cluster 最快的办法:
  # 用 redis-cli --cluster 自动起 6 节点
  docker run -d -p 7001-7006:7001-7006 grokzen/redis-cluster:7.2.0

  ---
  六、生产最佳实践 / 用什么库

  ┌──────────────────┬──────────────────────────────────────────────────────────────────────────────────────┐
  │       需求       │                                         推荐                                         │
  ├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
  │ 生产首选         │ predis/predis(纯 PHP,Cluster/Sentinel/Pipeline 全支持,作者就是 Redis 官方文档贡献者) │
  ├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
  │ 极致性能         │ ext-redis(C 扩展,比 predis 快 5-10 倍,但 API 略土)                                   │
  ├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
  │ Swoole 协程环境  │ Swoole\Coroutine\Redis 或用 Swoole hook + ext-redis                                  │
  ├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
  │ Hyperf 框架      │ hyperf/redis(基于 ext-redis + 协程池)                                                │
  ├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
  │ 想看好代码       │ 读 predis 源码,架构非常清晰(Connection/Command/Profile 三层)                         │
  ├──────────────────┼──────────────────────────────────────────────────────────────────────────────────────┤
  │ RESP3 + 现代特性 │ predis 2.x 已支持;ext-redis 6.x 已支持                                               │
  └──────────────────┴──────────────────────────────────────────────────────────────────────────────────────┘

  推荐用法对比

  // 1. predis (生态最全,纯 PHP,Composer 装)
  composer require predis/predis
  $client = new Predis\Client('tcp://127.0.0.1:6379');

  // 2. ext-redis (性能最高,需要装扩展)
  $client = new Redis();
  $client->connect('127.0.0.1', 6379);

  // 3. Cluster (predis 直接支持)
  $cluster = new Predis\Client([
      'tcp://127.0.0.1:7001',
      'tcp://127.0.0.1:7002',
      'tcp://127.0.0.1:7003',
  ], ['cluster' => 'redis']);

  选型决策树

  单机 + 普通 PHP-FPM      → ext-redis(快,稳)
  要 Cluster + 复杂特性    → predis
  Swoole/Hyperf 协程项目   → Swoole\Coroutine\Redis 或 hyperf/redis
  学习/Demo               → 你自己手写的(就是这篇代码)

  ---
  七、常见坑(都是踩过的)

  1. Pub/Sub 连接独占 —— 订阅后这条连接不能再发普通命令,生产中要分两条连接
  2. Pipeline 不是事务 —— 中间命令失败不会回滚,要事务用 MULTI/EXEC(也支持 pipeline 化)
  3. 跨 slot 操作 —— MGET k1 k2 k3 在 cluster 下,如果 k1/k2/k3 不在同一 slot 会报错。用 hash tag 强制同 slot
  4. 大 key 阻塞 —— KEYS *、SMEMBERS huge_set 在生产是大忌,用 SCAN/SSCAN
  5. 超时设置 —— connectTimeout 1-2 秒,readTimeout 视命令而定(BLPOP 要更长)
  6. 断线重连 —— Connection 被关后第一次 command 应自动重连,但 Pub/Sub 状态会丢,要业务自己重订阅
  7. MOVED 风暴 —— 集群滚动重启时,客户端不缓存路由会被 MOVED 打爆。我们的实现是收到 MOVED 立刻更新表
  8. CRC16 表错 —— 必须用 CCITT 标准表,网上有错版本会让你 debug 一周

  ---
  八、面试 / 进阶考点

  1. RESP 为什么不用二进制更紧凑? —— 文本可读,nc/telnet 调试方便,损失一点带宽换易用性
  2. Pipeline 和 MULTI 区别? —— Pipeline 只是批量发送,服务端单独执行;MULTI 是原子事务
  3. MOVED vs ASK? —— MOVED 永久迁移更新路由表;ASK 临时迁移不更新
  4. 为啥是 16384 个 slot 不是 65536? —— 心跳包要带 slot bitmap,16384 位 = 2KB,够小;65536 = 8KB 浪费
  5. 客户端怎么发现新节点? —— CLUSTER SLOTS/CLUSTER NODES,生产中收到 MOVED 触发刷新
  6. Pub/Sub 在 Cluster 怎么工作? —— 默认全局广播(性能差);7.0+ 有 Sharded Pub/Sub,按 channel 算 slot

更多推荐