C++20协程项目-高并发Web聊天室,包括协程如何调用MySQL+Redis
来源:程序员老廖
功能概览
-
用户注册/登录:用户名+密码注册,MySQL 存储,INSERT IGNORE 原子去重
-
Token 认证:登录后 Redis 存储 token,1 小时过期
-
WebSocket 实时聊天:RFC 6455 完整实现,支持文本帧、Ping/Pong、Close
-
消息持久化:Redis List 存储最近 200 条聊天记录
-
历史消息推送:WebSocket 连接建立后推送最近 50 条历史消息
-
全员广播:消息实时推送给所有在线 WebSocket 客户端
-
HTTP Keep-Alive:复用 TCP 连接减少开销
-
嵌入式前端:商业 IM 风格 UI,左右分栏布局,彩色头像,连接状态指示器,响应式适配
系统架构
整体架构

线程模型

数据流
用户注册流程

WebSocket 聊天消息流程

文件结构与模块职责
chatapp/
|
+-- coro_net.h 核心协程网络层
| |-- FireAndForget 协程返回类型,启动即忘
| |-- Worker epoll 事件循环 + 协程句柄管理 + 跨线程调度
| |-- AsyncRead 异步 socket 读 awaiter
| |-- AsyncWrite 异步 socket 写 awaiter
| +-- SwitchToWorker 跨线程切换 awaiter
|
+-- async_redis.h Redis 异步访问层
| |-- RedisAwaiter Redis 命令结果容器
| |-- WorkerRedis hiredis-async 适配器,集成 Worker epoll
| +-- AsyncRedisCommand co_await Redis 命令 awaiter
|
+-- async_mysql.h MySQL 异步访问层
| |-- ThreadPool 通用线程池,条件变量调度
| |-- MySQLPool MySQL 连接池,阻塞 acquire/release
| |-- MySQLResult SQL 查询结果
| |-- AsyncMySQLQuery co_await SQL 执行 awaiter
| +-- mysql_escape SQL 参数转义
|
+-- http_server.h HTTP 协议层
| |-- HttpRequest 请求解析:方法、路径、头、body、JSON、Cookie
| |-- HttpResponse 响应构建:状态码、头、body、JSON、重定向
| +-- json_escape JSON 字符串转义
|
+-- websocket.h WebSocket 协议层
| |-- ws_crypto SHA1 + Base64 握手算法
| |-- WSOpcode 帧类型枚举
| |-- WSFrame 帧结构
| |-- ws_encode_frame 服务端帧编码,无 mask
| |-- WSClient 在线客户端信息
| +-- WSManager 连接管理器,广播
|
+-- chat_page.h 嵌入式前端页面(从 chat_app.h 分离)
| +-- HTML_PAGE 商业 IM 风格前端(左右分栏、彩色头像、连接状态)
|
+-- chat_app.h 业务逻辑层
| |-- generate_token 随机 token 生成
| |-- hash_password FNV-1a 密码哈希
| |-- handle_http_conn HTTP/WebSocket 主协程
| +-- 路由处理 注册/登录/发消息/获取消息/WebSocket
|
+-- main.cpp 入口
| |-- worker_thread Worker 线程函数
| |-- init_mysql_tables 建表
| +-- main 参数解析、初始化、accept 循环
|
+-- Makefile 编译脚本
+-- benchmark.cpp HTTP 压测工具
+-- ws_bench.cpp WebSocket 压测工具
+-- bench_layers.cpp 分层性能测试
+-- bench_pool.sh 连接池参数调优脚本
+-- test_ws.py WebSocket 功能测试脚本
项目源码:C++20协程项目-高并发Web聊天室,包括协程如何调用MySQL+Redis
核心设计详解
协程网络层 coro_net.h
这是整个系统的基石,将 epoll 事件驱动与 C++20 协程无缝结合。
FireAndForget —— 协程返回类型
class FireAndForget {
struct promise_type {
FireAndForget get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; } // 创建即启动
std::suspend_never final_suspend() noexcept { return {}; } // 结束不挂起
void return_void() {}
void unhandled_exception() { std::terminate(); }
};
};
FireAndForget 是一种 "启动即忘" 的协程。协程创建后立即开始执行( initial_suspend 返回
suspend_never ),执行完成后自动销毁( final_suspend 返回 suspend_never )。调用者不需要管理协程生命周期。
这种设计适用于 "每连接一个协程" 的模型:accept 后创建协程,协程自己管理 fd 的整个生命周期。
Worker —— epoll 事件循环
Worker 是系统的核心调度器,每个 Worker 在独立线程上运行,管理:
-
epoll 事件循环:处理 socket IO 事件
-
协程句柄映射:fd → coroutine_handle 的映射表
-
ready_events 缓存:处理事件到达但协程未注册 handle 的时序窗口
-
跨线程唤醒:eventfd 机制,支持从任意线程唤醒 Worker
-
自定义 FdHandler:支持 hiredis-async 等第三方库注册自己的 fd 处理逻辑

AsyncRead/AsyncWrite —— 异步 IO Awaiter

关键优化: await_ready 的先尝试读取(fast path)避免了在数据已就绪时不必要的 suspend/resume 开销。高吞吐场景下,大部分 IO 操作可以在 await_ready 中直接完成。
Redis 异步访问 async_redis.h
Redis 使用 hiredis-async 库,与 Worker 的 epoll 深度集成。

每 Worker 一个 Redis 连接的设计:
-
Redis 协议天然支持 pipeline —— 多个命令可以在一个 TCP 连接上交错发送
-
hiredis-async 内部维护命令队列,自动 pipeline
-
一个连接即可服务该 Worker 上的所有协程,无需连接池
MySQL 异步访问 async_mysql.h
MySQL C API ( libmysqlclient ) 是同步阻塞的,无法直接集成到 epoll。解决方案:线程池 + 连接池 + 协程挂起。

关键设计决策:
1. 为什么不用 MySQL 非阻塞 API?
-
MySQL C API 的非阻塞模式需要 mysql_real_connect_nonblocking 等函数,API 复杂且文档不足
-
MariaDB Connector/C 的非阻塞 API 与 MySQL 不兼容
-
线程池方案简单可靠,性能损失可控
2. 连接池大小如何选择?
-
连接数 = min(CPU 核数 x 2, 并发协程数)
-
8-16 连接已足够饱和大部分场景(MySQL IO 本身是瓶颈)
-
连接过多反而增加 MySQL 内部锁竞争
3. 协程如何不阻塞 Worker 线程?
-
await_suspend 把 SQL 任务提交到 ThreadPool 后立即返回
-
Worker 线程可以继续处理其他协程的 IO 事件
-
SQL 执行完成后通过 worker->schedule + eventfd 唤醒 Worker
HTTP 协议层 http_server.h
实现了 HTTP/1.1 的核心子集:
-
请求解析:方法、路径、Query String、Headers、Body、JSON 提取
-
响应构建:状态码、Headers、Content-Length 自动计算
-
Keep-Alive:通过 Connection header 判断是否复用连接
-
无第三方依赖:手写解析器,约 230 行代码
WebSocket 协议层 websocket.h
完整实现 RFC 6455 WebSocket 协议:

WebSocket 帧格式 (RFC 6455 Section 5.2):

|
Opcode |
类型 |
用途 |
|---|---|---|
|
0x1 |
Text frame |
聊天消息 |
|
0x8 |
Close frame |
关闭连接 |
|
0x9 |
Ping frame |
保活检测 |
|
0xA |
Pong frame |
Ping 响应 |
客户端→服务端:必须 mask(4 字节 XOR 密钥)
服务端→客户端:不能 mask
WSManager —— 连接管理与广播
class WSManager {
map<int, WSClient> clients_; // fd -> client info
mutex mu_;
void broadcast(const string& message) {
string frame = ws_encode_frame(WS_TEXT, message);
lock_guard lock(mu_);
for (auto& [fd, client] : clients_) {
::write(fd, frame.data(), frame.size()); // 同步写
}
}
};
业务逻辑层 chat_app.h
前端 UI 架构
嵌入式前端采用商业 IM 风格设计,参考微信 Web / Telegram 等产品的交互范式:

配色方案:
|
元素 |
颜色 |
说明 |
|---|---|---|
|
主色调 |
#07c160 |
微信绿 |
|
背景色 |
#f0f0f0 |
浅灰 |
|
侧边栏 |
#2e2e2e |
深色 |
|
自己的气泡 |
#95ec69 |
绿色 |
|
他人的气泡 |
#ffffff |
白色 |
前端特性:
|
特性 |
实现方式 |
|---|---|
|
彩色头像 |
根据用户名哈希从 12 色板选色,首字母大写显示 |
|
消息气泡 |
自己绿色右对齐,他人白色左对齐,微信同款圆角 |
|
连接状态 |
绿色/红色/橙色闪烁圆点,实时反映 WebSocket 状态 |
|
多行输入 |
textarea 自动扩展高度,Shift+Enter 换行,Enter 发送 |
|
响应式布局 |
768px 以下侧边栏隐藏,通过汉堡菜单切换显示 |
|
断线重连 |
WebSocket 断开后 2 秒自动重连,状态指示器联动 |
|
侧边栏预览 |
最新消息自动更新到聊天室描述区域 |
核心协程生命周期
handle_http_connection 是系统的核心协程,管理单个 TCP 连接的完整生命周期:

协程并发模型
本项目采用 "每个连接一个协程" 的并发模型。每当主线程 accept 一个新的客户端 TCP 连接时,就会创建一个独立
的 handle_http_connection 协程实例。该协程独立管理这个连接的完整生命周期:从读取 HTTP 请求、路由分
发、到 WebSocket 消息循环、最终关闭连接。
一个连接 = 一个协程

关键特征:
-
每个连接独立:协程 A 的 co_await AsyncRead 挂起时,Worker 线程立刻去执行协程 B 的就绪操作,不会阻塞
-
M:N 映射:M 个协程映射到 N 个 Worker 线程。当前默认 N=4,可支撑 10,000+ 并发连接
-
连接亲和性:一个协程从创建到销毁始终在同一个 Worker 线程上运行,不跨线程迁移
FireAndForget —— 启动即忘的协程生命周期
handle_http_connection 的返回类型是 FireAndForget ,这并不意味着"发出去就不管了",而是 "调用者不需要 co_await 等待它完成" 。协程创建后自行运行,直到连接关闭时自动销毁:

协程调度全景图

与其他并发模型的对比

|
模型 |
并发方式 |
10K 连接开销 |
代码风格 |
缺点 |
|---|---|---|---|---|
|
每连接一个线程 |
OS 线程 |
~80GB 栈内存 |
同步、直观 |
线程数受限,上下文切换重 |
|
事件回调 |
回调函数 |
极低 |
回调嵌套、状态机 |
可读性差,错误处理复杂 |
|
每连接一个协程 |
协程 + epoll |
~几十 MB |
同步风格,异步执行 |
需要编译器支持 C++20 |
本项目的协程模型兼具了线程的 代码可读性 和事件驱动的 性能。每个连接的处理逻辑看起来就像普通的顺序代 码,但在 co_await 处自动让出执行权,让 Worker 线程去服务其他连接。
请求处理全流程
HTTP 请求处理流程

用户注册流程

为什么用 INSERT IGNORE 而不是 SELECT + INSERT?
-
原子性:没有检查和插入之间的竞态条件
-
单次 SQL:一次往返替代两次
-
+88% QPS:从 5.4K 提升到 10.1K
用户登录流程

WebSocket 建连流程

WebSocket 消息流程

协程调度机制
Worker 事件循环

AsyncRead/AsyncWrite 原理
三阶段协议:

为什么要在 await_ready 中先尝试 syscall?

高吞吐场景下,TCP 接收缓冲区经常有数据,try-first 可以跳过整个 suspend/resume 流程。
跨线程调度 SwitchToWorker

ready_events 缓存机制与 Bug 修复
问题场景
Stale ready_events Bug
======================
Timeline:
t0: Main thread accept(fd=37), add_client_fd(37, EPOLLIN|EPOLLET)
Data is already in TCP buffer (client sent HTTP request)
t1: Worker epoll_wait returns EPOLLIN for fd=37
But handles_[37] is nullptr (coroutine hasn't registered yet)
--> ready_events_[37] = EPOLLIN (cached!)
t2: Coroutine starts, first AsyncRead:
await_ready: ::read(37) succeeds! returns data
--> return true (no suspend needed)
--> ready_events_[37] is NOT cleared
t3: ... HTTP processing, WebSocket handshake ...
t4: WebSocket msg loop, AsyncRead for new frame:
await_ready: ::read(37) returns EAGAIN (no data)
--> return false, proceed to await_suspend
t5: await_suspend -> set_client_handle(37, h)
Worker finds ready_events_[37] has EPOLLIN
--> immediately resume coroutine!
t6: await_resume: ::read(37) returns -1 (EAGAIN!)
--> return -1
t7: WebSocket loop: if (n <= 0) break;
--> Connection closed! BUG!
修复方案
bool await_ready() {
result_ = ::read(fd_, buf_, len_);
if (result_ >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
// 读取成功,清除可能残留的 stale ready_events
if (t_worker) t_worker->clear_ready_events(fd_);
return true;
}
return false;
}
同时在 WebSocket 消息循环中增加 EAGAIN 防御性重试:
ssize_t n;
do {
n = co_await AsyncRead(client_fd, (char*)header, 2);
} while (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR));
if (n <= 0) break;
双层防御:
-
clear_ready_events 从根源消除 stale 缓存
-
EAGAIN 重试作为额外保险
连接复用策略
Redis:每 Worker 一个 hiredis-async 连接

为什么不用连接池?
Redis 单线程处理命令,pipeline 已达最大吞吐
多连接到同一 Redis 实例不会提升性能
一个连接简化了管理,减少了资源消耗
MySQL:全局连接池 + 线程池

为什么不直接在协程中使用 MySQL?
-
MySQL C API 是同步阻塞的,会阻塞整个 Worker 线程
-
一个被阻塞的 Worker 线程意味着该 Worker 上的所有协程都停止调度
-
线程池隔离了阻塞操作,Worker 线程永远不会被 MySQL 阻塞
编译与运行
依赖安装
sudo apt install g++ libhiredis-dev libmysqlclient-dev redis-server mysql-server
数据库准备
mysql -u root -e "
CREATE DATABASE IF NOT EXISTS chatapp CHARACTER SET utf8mb4;
CREATE USER IF NOT EXISTS 'chatuser'@'localhost' IDENTIFIED BY 'chatpass';
GRANT ALL PRIVILEGES ON chatapp.* TO 'chatuser'@'localhost';
FLUSH PRIVILEGES;
"
编译
make # 编译 chatserver, benchmark, ws_bench
make clean # 清理
运行
./chatserver --port 9090 --workers 4 --mysql-user chatuser --mysql-pass chatpass
浏览器访问 http://127.0.0.1:9090
完整参数
./chatserver [options]
--port N HTTP 端口 (默认: 8080)
--workers N Worker 线程数 (默认: 4)
--redis-host H Redis 地址 (默认: 127.0.0.1)
--redis-port N Redis 端口 (默认: 6379)
--mysql-host H MySQL 地址 (默认: 127.0.0.1)
--mysql-port N MySQL 端口 (默认: 3306)
--mysql-user U MySQL 用户 (默认: root)
--mysql-pass P MySQL 密码 (默认: 空)
--mysql-db D MySQL 数据库 (默认: chatapp)
--mysql-pool N MySQL 连接池大小 (默认: 8)
--thread-pool N 线程池大小 (默认: 8)
性能测试
测试环境:单机,MariaDB 10.6,Redis 7.x,g++ 11.4,C++20,4 Worker 线程
HTTP 注册/登录性能
压测工具
./benchmark [options]
-h HOST 目标地址 (默认: 127.0.0.1)
-p PORT 目标端口 (默认: 9090)
-t N 线程数 (默认: 4)
-c N 每线程连接数 (默认: 1)
-n N 总请求数 (默认: 1000)
-m MODE 模式: register 或 login
-k 启用 Keep-Alive
注册性能
|
方案 |
连接池 |
QPS |
说明 |
|---|---|---|---|
|
纯 MySQL INSERT, 16连接直连 |
- |
16,155 |
基准上限 |
|
INSERT IGNORE + HTTP + 协程 |
8 |
10,163 |
达到基准的 63% |
|
INSERT IGNORE + HTTP + 协程 |
16 |
10,905 |
达到基准的 67% |
|
INSERT IGNORE + HTTP + 协程 |
32 |
12,392 |
达到基准的 77% |
登录性能
|
连接模式 |
并发连接数 |
QPS |
|---|---|---|
|
短连接 |
64 |
9,259 |
|
Keep-Alive |
16 |
9,461 |
|
Keep-Alive |
32 |
10,493 |
|
Keep-Alive |
64 |
10,799 |
WebSocket 性能
压测工具
./ws_bench [options]
-h HOST 目标地址 (默认: 127.0.0.1)
-p PORT 目标端口 (默认: 9090)
-c N 并发连接数 (默认: 100)
-n N 总消息数 (默认: 1000)
-t N 线程数 (默认: 4)
-m MODE 模式: connect, throughput, echo, concurrent
连接容量
|
并发连接数 |
成功率 |
连接速率 |
总耗时 |
|---|---|---|---|
|
50 |
100% |
1,356 conn/s |
0.15s |
|
500 |
100% |
2,110 conn/s |
1.24s |
|
1,000 |
100% |
2,469 conn/s |
1.96s |
|
2,000 |
100% |
1,949 conn/s |
4.77s |
|
5,000 |
100% |
1,429 conn/s |
12.3s |
|
10,000 |
100% |
3,144 conn/s |
20.5s |
4 Worker 线程可维护 10,000+ 并发 WebSocket 长连接。连接建立耗时主要来自 HTTP 注册/登录,WebSocket握手本身很快。
回声延迟(单发送者 + N-1 旁观者)
|
在线连接 |
成功率 |
P50 |
P90 |
P95 |
P99 |
Max |
|---|---|---|---|---|---|---|
|
1 |
100% |
0.27ms |
0.27ms |
0.28ms |
0.28ms |
1.5ms |
|
10 |
100% |
0.27ms |
0.28ms |
0.29ms |
1.65ms |
3.2ms |
|
100 |
100% |
0.79ms |
1.10ms |
1.38ms |
2.09ms |
6.3ms |
|
500 |
93.8% |
28.1ms |
778ms |
1,323ms |
1,936ms |
1,997ms |
100 连接以内,P50 延迟 < 1ms。500 连接时广播成为瓶颈,延迟急剧上升。
消息吞吐量(发送 + 广播全接收)
|
连接数 |
消息/连接 |
发送 QPS |
接收率 |
广播 QPS |
总耗时 |
|---|---|---|---|---|---|
|
10 |
100 |
19,856 |
100% |
2,451 |
4.1s |
|
50 |
20 |
14,990 |
100% |
3,518 |
14.2s |
|
100 |
10 |
13,387 |
100% |
3,796 |
26.4s |
消息零丢失。发送 QPS ~15-20K,广播 QPS ~2.5-3.8K(计入 N 倍扇出)。
并发回声(多连接同时发+收)
|
连接数 |
消息总数 |
成功率 |
QPS |
P50 |
P90 |
P99 |
Max |
|---|---|---|---|---|---|---|---|
|
10 |
1,000 |
100% |
20,387 |
0.06ms |
0.29ms |
0.58ms |
1.9ms |
|
50 |
1,000 |
100% |
17,537 |
0.26ms |
0.30ms |
1.91ms |
2.2ms |
|
100 |
1,0000 |
100% |
20,335 |
0.26ms |
0.29ms |
1.06ms |
5.5ms |
并发模式下 QPS 稳定在 ~20K echo/s,P50 < 0.3ms。
总结
本项目展示了 C++20 协程在实际网络应用中的完整实践:
|
维度 |
数据 |
|---|---|
|
代码规模 |
~1,500 行 C++ |
|
并发连接 |
10,000+ WebSocket |
|
HTTP QPS |
10K-12K 注册/s |
|
WebSocket 延迟 |
P50 < 0.3ms |
|
消息吞吐 |
~20K msg/s |
|
Worker 线程 |
4 个 |
|
外部依赖 |
hiredis, libmysqlclient |
协程的核心价值:用同步代码的可读性,获得异步代码的性能。
更多推荐
所有评论(0)