司机状态维护与实时调度系统设计
目录
- 一、业务背景与架构总览
- 二、在线状态管理:Redis 存储与自动下线
- 2.1 状态枚举与存储模型
- 2.2 心跳检测与自动下线
- 2.3 状态查询的性能优化
- 三、状态变更广播:消息队列分发
- 3.1 为什么需要消息队列?
- 3.2 RabbitMQ 广播方案
- 3.3 消息可靠性——确认与重试
- 四、实时通信:WebSocket 长连接调度
- 4.1 连接生命周期管理
- 4.2 派单消息的精准路由
- 4.3 断线重连与状态恢复
- 五、工程化踩坑
- 5.1 心跳风暴打垮 Redis
- 5.2 WebSocket 连接泄漏
- 六、总结
一、业务背景与架构总览
在一个派单驱动的即时服务平台上,司机的在线状态是调度系统的核心输入。调度引擎必须实时知道"谁在线、谁空闲、谁在哪",才能在订单到达时毫秒级匹配最近的可用司机。
整体架构上,在线状态由 Redis 承载,状态变更通过 RabbitMQ 广播给订单服务、计费服务等下游模块;司机端通过 WebSocket 与调度中心保持长连接,接收派单指令。
司机端 App
│
├── WebSocket ───────── Hub 连接管理 ────── 调度引擎
│ │ │
│ ├── 心跳 (每 30s) ──── Redis 续期 ──────┤
│ └── 状态上报 ──── Redis 写入 ──── RabbitMQ 广播 ─── 订单服务
│ ─── 计费服务
│ ─── 监控服务
选型考量:
| 组件 | 候选方案 | 选用理由 |
|---|
| 状态存储 | Redis / MySQL / etcd | Redis 读写延迟 < 1ms,TTL 天然支持自动过期 |
| 消息广播 | RabbitMQ / Kafka / Redis Pub/Sub | RabbitMQ 的 Exchange 路由灵活,支持持久化与死信 |
| 长连接 | WebSocket / SSE / TCP 自定义协议 | WebSocket 全双工,浏览器和原生 App 均有成熟 SDK |
二、在线状态管理:Redis 存储与自动下线
2.1 状态枚举与存储模型
司机的在线状态远比"在线/离线"更精细。定义如下枚举:
go
type DriverStatus int
const (
StatusOffline DriverStatus = iota // 0: 离线
StatusOnline // 1: 在线(可接单)
StatusBusy // 2: 接单中(不可接新单)
StatusPaused // 3: 小休(临时不可接单)
StatusNavigating // 4: 导航中(前往取货点)
)
Redis 中存储两个关键 Key:
go
const (
// 司机状态:Hash 类型,字段包括 status、lat、lng、last_heartbeat
keyDriverState = "driver:state:%s" // driver:state:rider_10086
// 在线司机集合:Geo 类型,用于按位置检索附近在线司机
keyOnlineDrivers = "drivers:online:geo"
)
// 写入状态
func (s *StateService) UpdateState(riderID string, status DriverStatus, lat, lng float64) error {
pipe := s.rdb.Pipeline()
key := fmt.Sprintf(keyDriverState, riderID)
now := time.Now().Unix()
pipe.HMSet(ctx, key,
"status", int(status),
"lat", lat,
"lng", lng,
"last_heartbeat", now,
)
pipe.Expire(ctx, key, 90*time.Second) // 90 秒无心跳自动过期
// 在线且未接单的司机加入 GEO 集合
if status == StatusOnline {
pipe.GeoAdd(ctx, keyOnlineDrivers, &redis.GeoLocation{
Name: riderID,
Longitude: lng,
Latitude: lat,
})
} else {
pipe.ZRem(ctx, keyOnlineDrivers, riderID)
}
_, err := pipe.Exec(ctx)
return err
}
为什么用 Hash + Geo 组合? Hash 存储完整状态信息(含心跳时间、坐标),Geo 仅存储可接单司机的坐标,供调度引擎按距离查询。两者各司其职,避免 Geo 中混入不可接单的司机导致无效检索。
2.2 心跳检测与自动下线
司机端每 30 秒发送一次心跳,服务端续期 Redis Key 的 TTL。若 90 秒内未收到心跳,Key 自动过期,视为离线。这是 Redis TTL 机制的天然优势——无需单独部署定时扫描任务。
go
// 心跳处理
func (s *StateService) Heartbeat(riderID string, lat, lng float64) error {
key := fmt.Sprintf(keyDriverState, riderID)
pipe := s.rdb.Pipeline()
pipe.HMSet(ctx, key,
"last_heartbeat", time.Now().Unix(),
"lat", lat,
"lng", lng,
)
pipe.Expire(ctx, key, 90*time.Second)
pipe.GeoAdd(ctx, keyOnlineDrivers, &redis.GeoLocation{
Name: riderID,
Longitude: lng,
Latitude: lat,
})
_, err := pipe.Exec(ctx)
return err
}
自动下线的感知链路:Redis Key 过期并不会主动通知业务层。解决方案是结合 Redis Keyspace Notifications——监听 __keyevent@0__:expired 频道:
go
func (s *StateService) WatchExpired() {
pubsub := s.rdb.PSubscribe(ctx, "__keyevent@0__:expired")
defer pubsub.Close()
for msg := range pubsub.Channel() {
// msg.Payload 格式: driver:state:rider_10086
if strings.HasPrefix(msg.Payload, "driver:state:") {
riderID := strings.TrimPrefix(msg.Payload, "driver:state:")
s.handleOffline(riderID) // 触发下线逻辑
}
}
}
func (s *StateService) handleOffline(riderID string) {
// 1. 从在线司机集合中移除
s.rdb.ZRem(ctx, keyOnlineDrivers, riderID)
// 2. 若该司机有进行中的订单,触发异常处理
order, err := s.orderRepo.FindActiveByRider(riderID)
if err == nil && order != nil {
s.alertService.SendAlert(riderID, "司机异常离线,订单需重新分配")
}
// 3. 广播状态变更
s.publishStateChange(riderID, StatusOffline)
}
心跳参数选择:心跳间隔 30s,TTL 90s。3 个心跳周期内未收到数据即判定离线,这个余量足以覆盖短暂的网络抖动(如过隧道),又不会让离线判定过于迟钝。
2.3 状态查询的性能优化
调度引擎在高并发下频繁查询司机状态,直接读 Redis 没有问题,但同一秒内对同一司机的重复查询浪费资源。增加本地短时缓存:
go
func (s *StateService) GetState(riderID string) (*DriverState, error) {
// 本地 LFU 缓存,TTL 2 秒
if cached, ok := s.localCache.Get(riderID); ok {
return cached.(*DriverState), nil
}
key := fmt.Sprintf(keyDriverState, riderID)
result, err := s.rdb.HGetAll(ctx, key).Result()
if err != nil {
return nil, err
}
if len(result) == 0 {
return &DriverState{Status: StatusOffline}, nil
}
state := &DriverState{
Status: DriverStatus(parseInt(result["status"])),
Lat: parseFloat(result["lat"]),
Lng: parseFloat(result["lng"]),
LastBeat: parseInt64(result["last_heartbeat"]),
}
s.localCache.Set(riderID, state, 2*time.Second)
return state, nil
}
2 秒的本地缓存窗口对业务几乎无感,但在调度引擎批量匹配司机时能减少 70% 以上的 Redis 查询。
三、状态变更广播:消息队列分发
3.1 为什么需要消息队列?
司机的状态变更不仅是调度引擎关心的事情。订单服务需要知道"接单的司机是否还在线",计费服务需要知道"服务中的司机何时变为空闲",监控服务需要记录全量状态轨迹。
如果每个下游模块都轮询 Redis,会产生 N 倍读压力,且状态变更的时效性无法保证。消息队列解决这个问题:状态变更作为一个事件写入队列,各模块订阅自己关心的消息即可。
go
// 状态变更事件
type StateChangeEvent struct {
RiderID string `json:"rider_id"`
OldStatus DriverStatus `json:"old_status"`
NewStatus DriverStatus `json:"new_status"`
Timestamp int64 `json:"timestamp"`
Lat float64 `json:"lat"`
Lng float64 `json:"lng"`
}
3.2 RabbitMQ 广播方案
使用 RabbitMQ 的 Topic Exchange,按事件类型路由到不同队列:
Exchange: driver.state (type=topic)
│
├── routing_key: state.change.online → Queue: order.consume
├── routing_key: state.change.busy → Queue: order.consume
├── routing_key: state.change.# → Queue: monitor.consume
└── routing_key: state.change.* → Queue: billing.consume
go
func (s *StateService) publishStateChange(riderID string, newStatus DriverStatus) error {
oldStatus := s.getLastKnownStatus(riderID)
event := StateChangeEvent{
RiderID: riderID,
OldStatus: oldStatus,
NewStatus: newStatus,
Timestamp: time.Now().Unix(),
}
body, _ := json.Marshal(event)
routingKey := fmt.Sprintf("state.change.%s", newStatus.String())
return s.mqChannel.Publish(
"driver.state", // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent, // 消息持久化到磁盘
MessageId: fmt.Sprintf("%s_%d", riderID, event.Timestamp),
},
)
}
关键设计:DeliveryMode: amqp.Persistent 确保消息写入磁盘,RabbitMQ 重启后不丢失。状态变更消息不容丢失——计费模块需要精确知道司机的服务起止时间。
3.3 消息可靠性——确认与重试
上游发布消息后必须等待 RabbitMQ 的 Publisher Confirm,未确认则重试:
go
func (s *StateService) publishWithRetry(event StateChangeEvent, routingKey string, maxRetries int) error {
body, _ := json.Marshal(event)
// 开启 Publisher Confirm 模式
confirms := s.mqChannel.NotifyPublish(make(chan amqp.Confirmation, 1))
for i := 0; i < maxRetries; i++ {
err := s.mqChannel.Publish(
"driver.state",
routingKey,
false, false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent,
},
)
if err != nil {
time.Sleep(time.Duration(1<<i) * 100 * time.Millisecond) // 指数退避
continue
}
confirm := <-confirms
if confirm.Ack {
return nil
}
// Nack,重试
}
return fmt.Errorf("failed to publish after %d retries", maxRetries)
}
下游消费端同样需要手动 ACK,处理失败时 Requeue 或进入死信队列,防止消息静默丢失。
四、实时通信:WebSocket 长连接调度
4.1 连接生命周期管理
司机上线后立即建立 WebSocket 连接,这是派单指令到达司机的唯一通道。连接管理采用 Hub 模式:
go
type DriverHub struct {
clients map[string]*DriverClient // riderID → Client
register chan *DriverClient
unregister chan *DriverClient
mu sync.RWMutex
}
type DriverClient struct {
hub *DriverHub
conn *websocket.Conn
send chan []byte
riderID string
}
func (h *DriverHub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client.riderID] = client
h.mu.Unlock()
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client.riderID]; ok {
delete(h.clients, client.riderID)
close(client.send)
}
h.mu.Unlock()
}
}
}
一司机一连接:每个 riderID 只允许一个活跃 WebSocket 连接。新连接到达时踢掉旧连接,避免僵尸连接堆积。
go
func (h *DriverHub) Register(client *DriverClient) {
h.mu.Lock()
if old, ok := h.clients[client.riderID]; ok {
close(old.send) // 关闭旧连接
old.conn.Close()
}
h.clients[client.riderID] = client
h.mu.Unlock()
h.register <- client
}
4.2 派单消息的精准路由
调度引擎匹配到司机后,通过 Hub 直接推送给目标连接:
go
type DispatchMessage struct {
Type string `json:"type"` // "dispatch"
OrderID string `json:"order_id"`
PickupLat float64 `json:"pickup_lat"`
PickupLng float64 `json:"pickup_lng"`
DropLat float64 `json:"drop_lat"`
DropLng float64 `json:"drop_lng"`
Distance float64 `json:"distance"`
Price float64 `json:"price"`
Timeout int `json:"timeout"` // 抢单倒计时(秒)
}
func (h *DriverHub) Dispatch(riderID string, msg DispatchMessage) error {
h.mu.RLock()
client, ok := h.clients[riderID]
h.mu.RUnlock()
if !ok {
return fmt.Errorf("driver %s is not connected", riderID)
}
msg.Type = "dispatch"
data, _ := json.Marshal(msg)
select {
case client.send <- data:
return nil
case <-time.After(3 * time.Second):
// 3 秒内未写入成功,视为连接异常
h.unregister <- client
return fmt.Errorf("send timeout, driver may be disconnected")
}
}
派单消息带有倒计时字段 timeout,司机端收到后开始倒计时。超时未响应则订单自动流转给下一位司机,整个过程中无需 WebSocket 再次交互。
4.3 断线重连与状态恢复
司机 App 切后台或网络切换时 WebSocket 可能断开。重连后的状态恢复流程:
go
// 司机重连
func (h *DriverHub) OnReconnect(riderID string, conn *websocket.Conn) {
// 1. 踢掉旧连接,注册新连接
client := &DriverClient{
hub: h,
conn: conn,
send: make(chan []byte, 64),
riderID: riderID,
}
h.Register(client)
go client.WritePump()
go client.ReadPump()
// 2. 查询 Redis 恢复上次状态
state, err := h.stateService.GetState(riderID)
if err == nil && state.Status != StatusOffline {
// 3. 推送当前待处理的派单(如果有)
pending, _ := h.dispatchService.GetPendingOrders(riderID)
for _, order := range pending {
h.Dispatch(riderID, order)
}
// 4. 发送状态回执
client.send <- h.buildStateSync(state)
}
}
重连后立即推送当前待处理派单,避免司机因断线错过订单。
五、工程化踩坑
5.1 心跳风暴打垮 Redis
现象:某次大促期间在线司机数突破 5000,每 30 秒一轮心跳共计 5000 次 Redis HMSet + Expire。监控显示 Redis CPU 飙升至 85%,部分心跳请求超时导致司机被误判为离线。
根因:5000 次心跳集中在秒级窗口内到达,Redis 单线程处理不过来。每个心跳是两次命令(HMSet + Expire),实际是 10000 次命令/30s。
解决:心跳请求在网关层做批量聚合,而非逐条写入 Redis:
go
// 心跳聚合器
type HeartbeatAggregator struct {
buffer map[string]*HeartbeatData
mu sync.Mutex
}
func (a *HeartbeatAggregator) Collect(riderID string, lat, lng float64) {
a.mu.Lock()
a.buffer[riderID] = &HeartbeatData{Lat: lat, Lng: lng, Time: time.Now()}
a.mu.Unlock()
}
func (a *HeartbeatAggregator) Flush() {
a.mu.Lock()
batch := a.buffer
a.buffer = make(map[string]*HeartbeatData)
a.mu.Unlock()
pipe := a.rdb.Pipeline()
for riderID, data := range batch {
key := fmt.Sprintf("driver:state:%s", riderID)
pipe.HMSet(ctx, key,
"last_heartbeat", data.Time.Unix(),
"lat", data.Lat,
"lng", data.Lng,
)
pipe.Expire(ctx, key, 90*time.Second)
}
pipe.Exec(ctx) // 一次 Pipeline 批量提交
}
每 3 秒一批,5000 个心跳从 10000 次 Redis 命令降为 1 次 Pipeline(内部仍然是多个命令,但不经过网络往返),Redis CPU 降到 15%。
5.2 WebSocket 连接泄漏
现象:运行 48 小时后,ESTABLISHED 状态的连接数远超在线司机数。部分司机已离线,但服务端连接未释放。
根因:ReadPump 中 ReadMessage 阻塞读取,若客户端非正常断开(如直接杀进程),TCP 连接不会立即关闭,服务端 goroutine 一直阻塞在 ReadMessage 上。
解决:设置 ReadDeadline,超时后主动关闭:
go
func (c *DriverClient) ReadPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadDeadline(time.Now().Add(120 * time.Second))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(120 * time.Second))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
break // 超时或断开,退出循环并触发 defer 清理
}
// 处理消息...
c.conn.SetReadDeadline(time.Now().Add(120 * time.Second))
}
}
ReadDeadline 120 秒 + PongHandler 续期。服务端每 54 秒发 Ping,客户端回 Pong 后重置 Deadline。超过 120 秒未收到任何消息(包括 Pong),连接自动关闭,goroutine 退出,不残留。
六、总结
| 维度 | 技术决策 | 踩过的坑 | 关键收获 |
|---|
| 状态存储 | Redis Hash + TTL 自动过期 | 心跳风暴打垮 Redis | 网关层批量聚合 + Pipeline 写入 |
| 自动下线 | Keyspace Notification 监听过期 | 过期事件偶有延迟 | 业务层双重校验(再查一次 Redis) |
| 状态广播 | RabbitMQ Topic Exchange | 消息丢失导致计费偏差 | Publisher Confirm + 手动 ACK |
| WebSocket 连接 | Hub 模式 + 一司机一连接 | 僵尸连接泄漏 | ReadDeadline + Ping/Pong 保活 |
| 断线恢复 | 重连后查询 Redis 补推派单 | 重连瞬间状态不一致 | 先恢复状态再推送待处理订单 |
司机在线状态管理看似简单——一个 Key、一个 TTL、一个心跳。但在 5000 并发的规模下,心跳写入从"随手操作"变成了系统瓶颈;WebSocket 连接从"建连即忘"变成了需要精心设计生命周期的资源。状态同步没有银弹,只有对每一层(Redis → RabbitMQ → WebSocket)都做好失败兜底,才能保证调度引擎看到的始终是准确的司机画像。
更多推荐
所有评论(0)