目录

  • 一、业务背景与架构总览
  • 二、在线状态管理:Redis 存储与自动下线
    • 2.1 状态枚举与存储模型
    • 2.2 心跳检测与自动下线
    • 2.3 状态查询的性能优化
  • 三、状态变更广播:消息队列分发
    • 3.1 为什么需要消息队列?
    • 3.2 RabbitMQ 广播方案
    • 3.3 消息可靠性——确认与重试
  • 四、实时通信:WebSocket 长连接调度
    • 4.1 连接生命周期管理
    • 4.2 派单消息的精准路由
    • 4.3 断线重连与状态恢复
  • 五、工程化踩坑
    • 5.1 心跳风暴打垮 Redis
    • 5.2 WebSocket 连接泄漏
  • 六、总结

一、业务背景与架构总览

在一个派单驱动的即时服务平台上,司机的在线状态是调度系统的核心输入。调度引擎必须实时知道"谁在线、谁空闲、谁在哪",才能在订单到达时毫秒级匹配最近的可用司机。

整体架构上,在线状态由 Redis 承载,状态变更通过 RabbitMQ 广播给订单服务、计费服务等下游模块;司机端通过 WebSocket 与调度中心保持长连接,接收派单指令。

司机端 App
    │
    ├── WebSocket ───────── Hub 连接管理 ────── 调度引擎
    │       │                                       │
    │       ├── 心跳 (每 30s) ──── Redis 续期 ──────┤
    │       └── 状态上报 ──── Redis 写入 ──── RabbitMQ 广播 ─── 订单服务
    │                                                      ─── 计费服务
    │                                                      ─── 监控服务

选型考量:

组件 候选方案 选用理由
状态存储 Redis / MySQL / etcd Redis 读写延迟 < 1ms,TTL 天然支持自动过期
消息广播 RabbitMQ / Kafka / Redis Pub/Sub RabbitMQ 的 Exchange 路由灵活,支持持久化与死信
长连接 WebSocket / SSE / TCP 自定义协议 WebSocket 全双工,浏览器和原生 App 均有成熟 SDK

二、在线状态管理:Redis 存储与自动下线

2.1 状态枚举与存储模型

司机的在线状态远比"在线/离线"更精细。定义如下枚举:


go

type DriverStatus int

const (
    StatusOffline     DriverStatus = iota // 0: 离线
    StatusOnline                          // 1: 在线(可接单)
    StatusBusy                            // 2: 接单中(不可接新单)
    StatusPaused                          // 3: 小休(临时不可接单)
    StatusNavigating                      // 4: 导航中(前往取货点)
)

Redis 中存储两个关键 Key:


go

const (
    // 司机状态:Hash 类型,字段包括 status、lat、lng、last_heartbeat
    keyDriverState = "driver:state:%s"   // driver:state:rider_10086

    // 在线司机集合:Geo 类型,用于按位置检索附近在线司机
    keyOnlineDrivers = "drivers:online:geo"
)

// 写入状态
func (s *StateService) UpdateState(riderID string, status DriverStatus, lat, lng float64) error {
    pipe := s.rdb.Pipeline()

    key := fmt.Sprintf(keyDriverState, riderID)
    now := time.Now().Unix()

    pipe.HMSet(ctx, key,
        "status", int(status),
        "lat", lat,
        "lng", lng,
        "last_heartbeat", now,
    )
    pipe.Expire(ctx, key, 90*time.Second) // 90 秒无心跳自动过期

    // 在线且未接单的司机加入 GEO 集合
    if status == StatusOnline {
        pipe.GeoAdd(ctx, keyOnlineDrivers, &redis.GeoLocation{
            Name:      riderID,
            Longitude: lng,
            Latitude:  lat,
        })
    } else {
        pipe.ZRem(ctx, keyOnlineDrivers, riderID)
    }

    _, err := pipe.Exec(ctx)
    return err
}

为什么用 Hash + Geo 组合? Hash 存储完整状态信息(含心跳时间、坐标),Geo 仅存储可接单司机的坐标,供调度引擎按距离查询。两者各司其职,避免 Geo 中混入不可接单的司机导致无效检索。

2.2 心跳检测与自动下线

司机端每 30 秒发送一次心跳,服务端续期 Redis Key 的 TTL。若 90 秒内未收到心跳,Key 自动过期,视为离线。这是 Redis TTL 机制的天然优势——无需单独部署定时扫描任务。


go

// 心跳处理
func (s *StateService) Heartbeat(riderID string, lat, lng float64) error {
    key := fmt.Sprintf(keyDriverState, riderID)

    pipe := s.rdb.Pipeline()
    pipe.HMSet(ctx, key,
        "last_heartbeat", time.Now().Unix(),
        "lat", lat,
        "lng", lng,
    )
    pipe.Expire(ctx, key, 90*time.Second)
    pipe.GeoAdd(ctx, keyOnlineDrivers, &redis.GeoLocation{
        Name:      riderID,
        Longitude: lng,
        Latitude:  lat,
    })

    _, err := pipe.Exec(ctx)
    return err
}

自动下线的感知链路:Redis Key 过期并不会主动通知业务层。解决方案是结合 Redis Keyspace Notifications——监听 __keyevent@0__:expired 频道:


go

func (s *StateService) WatchExpired() {
    pubsub := s.rdb.PSubscribe(ctx, "__keyevent@0__:expired")
    defer pubsub.Close()

    for msg := range pubsub.Channel() {
        // msg.Payload 格式: driver:state:rider_10086
        if strings.HasPrefix(msg.Payload, "driver:state:") {
            riderID := strings.TrimPrefix(msg.Payload, "driver:state:")
            s.handleOffline(riderID) // 触发下线逻辑
        }
    }
}

func (s *StateService) handleOffline(riderID string) {
    // 1. 从在线司机集合中移除
    s.rdb.ZRem(ctx, keyOnlineDrivers, riderID)

    // 2. 若该司机有进行中的订单,触发异常处理
    order, err := s.orderRepo.FindActiveByRider(riderID)
    if err == nil && order != nil {
        s.alertService.SendAlert(riderID, "司机异常离线,订单需重新分配")
    }

    // 3. 广播状态变更
    s.publishStateChange(riderID, StatusOffline)
}

心跳参数选择:心跳间隔 30s,TTL 90s。3 个心跳周期内未收到数据即判定离线,这个余量足以覆盖短暂的网络抖动(如过隧道),又不会让离线判定过于迟钝。

2.3 状态查询的性能优化

调度引擎在高并发下频繁查询司机状态,直接读 Redis 没有问题,但同一秒内对同一司机的重复查询浪费资源。增加本地短时缓存:


go

func (s *StateService) GetState(riderID string) (*DriverState, error) {
    // 本地 LFU 缓存,TTL 2 秒
    if cached, ok := s.localCache.Get(riderID); ok {
        return cached.(*DriverState), nil
    }

    key := fmt.Sprintf(keyDriverState, riderID)
    result, err := s.rdb.HGetAll(ctx, key).Result()
    if err != nil {
        return nil, err
    }
    if len(result) == 0 {
        return &DriverState{Status: StatusOffline}, nil
    }

    state := &DriverState{
        Status:   DriverStatus(parseInt(result["status"])),
        Lat:      parseFloat(result["lat"]),
        Lng:      parseFloat(result["lng"]),
        LastBeat: parseInt64(result["last_heartbeat"]),
    }

    s.localCache.Set(riderID, state, 2*time.Second)
    return state, nil
}

2 秒的本地缓存窗口对业务几乎无感,但在调度引擎批量匹配司机时能减少 70% 以上的 Redis 查询。


三、状态变更广播:消息队列分发

3.1 为什么需要消息队列?

司机的状态变更不仅是调度引擎关心的事情。订单服务需要知道"接单的司机是否还在线",计费服务需要知道"服务中的司机何时变为空闲",监控服务需要记录全量状态轨迹。

如果每个下游模块都轮询 Redis,会产生 N 倍读压力,且状态变更的时效性无法保证。消息队列解决这个问题:状态变更作为一个事件写入队列,各模块订阅自己关心的消息即可。


go

// 状态变更事件
type StateChangeEvent struct {
    RiderID   string       `json:"rider_id"`
    OldStatus DriverStatus `json:"old_status"`
    NewStatus DriverStatus `json:"new_status"`
    Timestamp int64        `json:"timestamp"`
    Lat       float64      `json:"lat"`
    Lng       float64      `json:"lng"`
}
3.2 RabbitMQ 广播方案

使用 RabbitMQ 的 Topic Exchange,按事件类型路由到不同队列:

Exchange: driver.state (type=topic)
    │
    ├── routing_key: state.change.online    → Queue: order.consume
    ├── routing_key: state.change.busy      → Queue: order.consume
    ├── routing_key: state.change.#         → Queue: monitor.consume
    └── routing_key: state.change.*         → Queue: billing.consume

go

func (s *StateService) publishStateChange(riderID string, newStatus DriverStatus) error {
    oldStatus := s.getLastKnownStatus(riderID)

    event := StateChangeEvent{
        RiderID:   riderID,
        OldStatus: oldStatus,
        NewStatus: newStatus,
        Timestamp: time.Now().Unix(),
    }

    body, _ := json.Marshal(event)
    routingKey := fmt.Sprintf("state.change.%s", newStatus.String())

    return s.mqChannel.Publish(
        "driver.state", // exchange
        routingKey,     // routing key
        false,          // mandatory
        false,          // immediate
        amqp.Publishing{
            ContentType:  "application/json",
            Body:         body,
            DeliveryMode: amqp.Persistent, // 消息持久化到磁盘
            MessageId:    fmt.Sprintf("%s_%d", riderID, event.Timestamp),
        },
    )
}

关键设计:DeliveryMode: amqp.Persistent 确保消息写入磁盘,RabbitMQ 重启后不丢失。状态变更消息不容丢失——计费模块需要精确知道司机的服务起止时间。

3.3 消息可靠性——确认与重试

上游发布消息后必须等待 RabbitMQ 的 Publisher Confirm,未确认则重试:


go

func (s *StateService) publishWithRetry(event StateChangeEvent, routingKey string, maxRetries int) error {
    body, _ := json.Marshal(event)

    // 开启 Publisher Confirm 模式
    confirms := s.mqChannel.NotifyPublish(make(chan amqp.Confirmation, 1))

    for i := 0; i < maxRetries; i++ {
        err := s.mqChannel.Publish(
            "driver.state",
            routingKey,
            false, false,
            amqp.Publishing{
                ContentType:  "application/json",
                Body:         body,
                DeliveryMode: amqp.Persistent,
            },
        )
        if err != nil {
            time.Sleep(time.Duration(1<<i) * 100 * time.Millisecond) // 指数退避
            continue
        }

        confirm := <-confirms
        if confirm.Ack {
            return nil
        }
        // Nack,重试
    }

    return fmt.Errorf("failed to publish after %d retries", maxRetries)
}

下游消费端同样需要手动 ACK,处理失败时 Requeue 或进入死信队列,防止消息静默丢失。


四、实时通信:WebSocket 长连接调度

4.1 连接生命周期管理

司机上线后立即建立 WebSocket 连接,这是派单指令到达司机的唯一通道。连接管理采用 Hub 模式:


go

type DriverHub struct {
    clients    map[string]*DriverClient  // riderID → Client
    register   chan *DriverClient
    unregister chan *DriverClient
    mu         sync.RWMutex
}

type DriverClient struct {
    hub    *DriverHub
    conn   *websocket.Conn
    send   chan []byte
    riderID string
}

func (h *DriverHub) Run() {
    for {
        select {
        case client := <-h.register:
            h.mu.Lock()
            h.clients[client.riderID] = client
            h.mu.Unlock()

        case client := <-h.unregister:
            h.mu.Lock()
            if _, ok := h.clients[client.riderID]; ok {
                delete(h.clients, client.riderID)
                close(client.send)
            }
            h.mu.Unlock()
        }
    }
}

一司机一连接:每个 riderID 只允许一个活跃 WebSocket 连接。新连接到达时踢掉旧连接,避免僵尸连接堆积。


go

func (h *DriverHub) Register(client *DriverClient) {
    h.mu.Lock()
    if old, ok := h.clients[client.riderID]; ok {
        close(old.send)   // 关闭旧连接
        old.conn.Close()
    }
    h.clients[client.riderID] = client
    h.mu.Unlock()
    h.register <- client
}
4.2 派单消息的精准路由

调度引擎匹配到司机后,通过 Hub 直接推送给目标连接:


go

type DispatchMessage struct {
    Type      string  `json:"type"`       // "dispatch"
    OrderID   string  `json:"order_id"`
    PickupLat float64 `json:"pickup_lat"`
    PickupLng float64 `json:"pickup_lng"`
    DropLat   float64 `json:"drop_lat"`
    DropLng   float64 `json:"drop_lng"`
    Distance  float64 `json:"distance"`
    Price     float64 `json:"price"`
    Timeout   int     `json:"timeout"`    // 抢单倒计时(秒)
}

func (h *DriverHub) Dispatch(riderID string, msg DispatchMessage) error {
    h.mu.RLock()
    client, ok := h.clients[riderID]
    h.mu.RUnlock()

    if !ok {
        return fmt.Errorf("driver %s is not connected", riderID)
    }

    msg.Type = "dispatch"
    data, _ := json.Marshal(msg)

    select {
    case client.send <- data:
        return nil
    case <-time.After(3 * time.Second):
        // 3 秒内未写入成功,视为连接异常
        h.unregister <- client
        return fmt.Errorf("send timeout, driver may be disconnected")
    }
}

派单消息带有倒计时字段 timeout,司机端收到后开始倒计时。超时未响应则订单自动流转给下一位司机,整个过程中无需 WebSocket 再次交互。

4.3 断线重连与状态恢复

司机 App 切后台或网络切换时 WebSocket 可能断开。重连后的状态恢复流程:


go

// 司机重连
func (h *DriverHub) OnReconnect(riderID string, conn *websocket.Conn) {
    // 1. 踢掉旧连接,注册新连接
    client := &DriverClient{
        hub:     h,
        conn:    conn,
        send:    make(chan []byte, 64),
        riderID: riderID,
    }
    h.Register(client)

    go client.WritePump()
    go client.ReadPump()

    // 2. 查询 Redis 恢复上次状态
    state, err := h.stateService.GetState(riderID)
    if err == nil && state.Status != StatusOffline {
        // 3. 推送当前待处理的派单(如果有)
        pending, _ := h.dispatchService.GetPendingOrders(riderID)
        for _, order := range pending {
            h.Dispatch(riderID, order)
        }

        // 4. 发送状态回执
        client.send <- h.buildStateSync(state)
    }
}

重连后立即推送当前待处理派单,避免司机因断线错过订单。


五、工程化踩坑

5.1 心跳风暴打垮 Redis

现象:某次大促期间在线司机数突破 5000,每 30 秒一轮心跳共计 5000 次 Redis HMSet + Expire。监控显示 Redis CPU 飙升至 85%,部分心跳请求超时导致司机被误判为离线。

根因:5000 次心跳集中在秒级窗口内到达,Redis 单线程处理不过来。每个心跳是两次命令(HMSet + Expire),实际是 10000 次命令/30s。

解决:心跳请求在网关层做批量聚合,而非逐条写入 Redis:


go

// 心跳聚合器
type HeartbeatAggregator struct {
    buffer map[string]*HeartbeatData
    mu     sync.Mutex
}

func (a *HeartbeatAggregator) Collect(riderID string, lat, lng float64) {
    a.mu.Lock()
    a.buffer[riderID] = &HeartbeatData{Lat: lat, Lng: lng, Time: time.Now()}
    a.mu.Unlock()
}

func (a *HeartbeatAggregator) Flush() {
    a.mu.Lock()
    batch := a.buffer
    a.buffer = make(map[string]*HeartbeatData)
    a.mu.Unlock()

    pipe := a.rdb.Pipeline()
    for riderID, data := range batch {
        key := fmt.Sprintf("driver:state:%s", riderID)
        pipe.HMSet(ctx, key,
            "last_heartbeat", data.Time.Unix(),
            "lat", data.Lat,
            "lng", data.Lng,
        )
        pipe.Expire(ctx, key, 90*time.Second)
    }
    pipe.Exec(ctx) // 一次 Pipeline 批量提交
}

每 3 秒一批,5000 个心跳从 10000 次 Redis 命令降为 1 次 Pipeline(内部仍然是多个命令,但不经过网络往返),Redis CPU 降到 15%。

5.2 WebSocket 连接泄漏

现象:运行 48 小时后,ESTABLISHED 状态的连接数远超在线司机数。部分司机已离线,但服务端连接未释放。

根因:ReadPump 中 ReadMessage 阻塞读取,若客户端非正常断开(如直接杀进程),TCP 连接不会立即关闭,服务端 goroutine 一直阻塞在 ReadMessage 上。

解决:设置 ReadDeadline,超时后主动关闭:


go

func (c *DriverClient) ReadPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()

    c.conn.SetReadDeadline(time.Now().Add(120 * time.Second))
    c.conn.SetPongHandler(func(string) error {
        c.conn.SetReadDeadline(time.Now().Add(120 * time.Second))
        return nil
    })

    for {
        _, message, err := c.conn.ReadMessage()
        if err != nil {
            break // 超时或断开,退出循环并触发 defer 清理
        }
        // 处理消息...
        c.conn.SetReadDeadline(time.Now().Add(120 * time.Second))
    }
}

ReadDeadline 120 秒 + PongHandler 续期。服务端每 54 秒发 Ping,客户端回 Pong 后重置 Deadline。超过 120 秒未收到任何消息(包括 Pong),连接自动关闭,goroutine 退出,不残留。


六、总结

维度 技术决策 踩过的坑 关键收获
状态存储 Redis Hash + TTL 自动过期 心跳风暴打垮 Redis 网关层批量聚合 + Pipeline 写入
自动下线 Keyspace Notification 监听过期 过期事件偶有延迟 业务层双重校验(再查一次 Redis)
状态广播 RabbitMQ Topic Exchange 消息丢失导致计费偏差 Publisher Confirm + 手动 ACK
WebSocket 连接 Hub 模式 + 一司机一连接 僵尸连接泄漏 ReadDeadline + Ping/Pong 保活
断线恢复 重连后查询 Redis 补推派单 重连瞬间状态不一致 先恢复状态再推送待处理订单

司机在线状态管理看似简单——一个 Key、一个 TTL、一个心跳。但在 5000 并发的规模下,心跳写入从"随手操作"变成了系统瓶颈;WebSocket 连接从"建连即忘"变成了需要精心设计生命周期的资源。状态同步没有银弹,只有对每一层(Redis → RabbitMQ → WebSocket)都做好失败兜底,才能保证调度引擎看到的始终是准确的司机画像。

更多推荐