从 select/poll 到 epoll:手写 Reactor 模型(附完整代码)
本文从“一请求一线程”模型开始,逐步演进到 select/poll/epoll,最后实现 Reactor 模式,包含全部代码实现和原理说明,适合 Linux C/C++ 后端开发者系统学习与回顾 I/O 多路复用与事件驱动设计。
一、网络编程基础
1.1 查看网卡配置
Linux
# 不显示端口或连接信息
ip a
1.2 查看网络连接的进程信息
Linux
# 传统 net-tools 工具
netstat -anop
# 更现代命令
ss -tunap
MacOS
lsof -i -Pn
1.3 显示当前 shell 环境下的所有资源限制
ulimit 是 shell 内置命令(Bash、Zsh 等)
# 如果需要修改限制,可使用 ulimit -<选项> <值>(通常只有 root 权限的用户才能提高硬限制:软限制的上限)
$ ulimit -a
real-time non-blocking time (microseconds, -R) unlimited
core file size (blocks, -c) 0 # core 转储文件的最大大小
data seg size (kbytes, -d) unlimited # 进程数据段的最大大小
scheduling priority (-e) 0
file size (blocks, -f) unlimited # 单个文件的最大大小
pending signals (-i) 15151
max locked memory (kbytes, -l) 499500
max memory size (kbytes, -m) unlimited
open files (-n) 1048576 # 进程最多能同时打开的文件描述符数量
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192 # 栈的最大大小
cpu time (seconds, -t) unlimited # 进程可占用的 CPU 时间(秒)
max user processes (-u) 15151 # 用户最多可运行的进程数
virtual memory (kbytes, -v) unlimited # 虚拟内存的最大大小
file locks (-x) unlimited
1.4 查看内核/用户空间的位数(即操作系统是64位还是32位)
Linux
uname -m
1.5 网络调试助手
https://github.com/nicedayzhu/netAssist
python3 --version
pip3 install PyQt5
python3 main.py
1.6 wrk
现代 HTTP 基准测试工具,用于对 Web 服务器进行性能压测。
https://github.com/wg/wrk
git clone https://github.com/wg/wrk.git
# 或者
git clone git@github.com:wg/wrk.git
二、一请求一线程模型
server_io.c
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#define LISTENER_PORT 8888 // 定义一个0~1023以外的监听端口
#define LISTEN_BACKLOG 10 // 定义已完成连接队列的最大长度
#define BUFFER_CAPACITY 1024 // 定义缓冲区容量
void* ioHandleFunc (void* arg) {
int connectFd = *(int*)arg;
free(arg); // 释放堆上分配的 fd,避免内存泄漏
while (1)
{
char buffer[BUFFER_CAPACITY] = {0};
int recvSize = recv(connectFd, buffer, sizeof(buffer), 0); // 接收数据
if (recvSize < 0) {
perror("recv failed");
break;
}
if (recvSize == 0) { // 处理原因:客户端断开连接后 recvSize = 0,由于send的第三个参数使用recvSize,会导致死循环
printf("connect closed. connectFd = %d\n", connectFd);
break;
}
printf("recv = %s, recvSize = %d\n", buffer, recvSize);
int sendSize = send(connectFd, buffer, recvSize, MSG_NOSIGNAL); // MSG_NOSIGNAL: 客户端断开连接,操作系统不会向进程发送 SIGPIPE 信号,而是会返回 -1
// int sendSize = send(connectFd, buffer, recvSize, 0); // 由于使用的是recvSize,此时值为0,不会触发实际的网络传输,直接返回0,所以没有触发 SIGPIPE 信号
if (sendSize < 0) {
perror("send failed");
break;
}
printf("send = %s, sendSize = %d\n", buffer, sendSize);
}
close(connectFd);
return NULL;
}
int main() {
int socketFd = socket(AF_INET, SOCK_STREAM, 0); // 生成监听fd
if (socketFd < 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
printf("socketFd = %d\n", socketFd);
struct sockaddr_in addr;
addr.sin_family = AF_INET; // 通信协议族IPv4
addr.sin_addr.s_addr = htonl(INADDR_ANY); // INADDR_ANY(0):所有IP地址
addr.sin_port = htons(LISTENER_PORT); // 自定义一个0~1023以外的监听端口
if (bind(socketFd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { // 监听fd绑定IP地址和端口号
perror("bind failed");
close(socketFd);
exit(EXIT_FAILURE);
}
if (listen(socketFd, LISTEN_BACKLOG) < 0) { // socketFd进入监听状态,设置已完成连接队列的最大长度
perror("listen failed");
close(socketFd);
exit(EXIT_FAILURE);
}
struct sockaddr_in clientAddr;
socklen_t clientAddrLen = sizeof(clientAddr);
printf("wait connect...\n");
while (1)
{
int connectFd = accept(socketFd, (struct sockaddr *)&clientAddr, &clientAddrLen); // IO/连接fd
if (connectFd < 0) {
perror("accept failed");
break;
}
printf("connectFd = %d\n", connectFd);
// 避免直接使用 &connectFd 产生竞争条件
int *pfd = malloc(sizeof(int));
*pfd = connectFd;
pthread_t thread;
int ret = pthread_create(&thread, NULL, ioHandleFunc, pfd);
if (ret != 0) { // 成功返回0
fprintf(stderr, "pthread_create failed. ret = %d", ret);
close(connectFd);
break;
}
}
close(socketFd);
return 0;
}
-
核心特点
-
优点
编程简单直观:代码逻辑简单,容易理解和调试。
-
缺点
资源开销巨大:每个线程都有独立的栈空间(通常1~8MB)和线程控制块,创建和销毁开销可观。当并发数达到几千甚至上万时,内存会迅速耗尽。调度开销高:大量线程同时处于可运行状态时,操作系统内核需要频繁进行上下文切换(保存/恢复寄存器、内存映射等),这会导致CPU有效利用率显著下降。
-
三、I/O 多路复用
3.1 select
server_select.c
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#define LISTENER_PORT 8888 // 定义一个0~1023以外的监听端口
#define LISTEN_BACKLOG 10 // 定义已完成连接队列的最大长度
#define BUFFER_CAPACITY 1024 // 定义缓冲区容量
int main() {
int socketFd = socket(AF_INET, SOCK_STREAM, 0);
if (socketFd == -1) {
perror("socket failed");
exit(EXIT_FAILURE);
}
printf("socketFd = %d\n", socketFd);
struct sockaddr_in addr;
addr.sin_family = AF_INET; // 通信协议族IPv4
addr.sin_addr.s_addr = htonl(INADDR_ANY); // INADDR_ANY(0):所有IP地址
addr.sin_port = htons(LISTENER_PORT); // 自定义一个0~1023以外的监听端口
if (bind(socketFd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { // 监听fd绑定IP地址和端口号
perror("bind failed");
close(socketFd);
exit(EXIT_FAILURE);
}
if (listen(socketFd, LISTEN_BACKLOG) == -1) { // socketFd进入监听状态,设置已完成连接队列的最大长度
perror("listen failed");
close(socketFd);
exit(EXIT_FAILURE);
}
// 定义可读fd全集合
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(socketFd, &rfds);
int maxFd = socketFd;
while (1)
{
fd_set readfds = rfds; // 重置传入传出参数
if (select(maxFd + 1, &readfds, NULL, NULL, NULL) == -1 ) {
perror("select failed");
close(socketFd);
exit(EXIT_FAILURE);
}
if (FD_ISSET(socketFd, &readfds)) { // socketFd 就绪
struct sockaddr_in clientAddr;
socklen_t clientAddrLen = sizeof(clientAddr);
int connectFd = accept(socketFd, (struct sockaddr *)&clientAddr, &clientAddrLen);
if (connectFd == -1) {
perror("accept failed");
continue;
}
printf("connectFd = %d\n", connectFd);
if (connectFd > maxFd) {
maxFd = connectFd;
}
FD_SET(connectFd, &rfds);
}
for (int i = socketFd + 1; i <= maxFd; ++i) {
if (FD_ISSET(i, &readfds)) { // IO/连接fd 就绪
char buffer[BUFFER_CAPACITY] = {0};
int recvSize = recv(i, buffer, sizeof(buffer), 0); // 接收数据
if (recvSize == -1) {
perror("recv failed");
FD_CLR(i, &rfds);
close(i);
continue;
}
if (recvSize == 0) { // 处理原因:客户端断开连接后 recvSize = 0
printf("connect closed. connectFd = %d\n", i);
FD_CLR(i, &rfds);
close(i);
continue;
}
// 限制打印长度
printf("recv = %.*s, recvSize = %d\n", recvSize, buffer, recvSize);
int sendSize = send(i, buffer, recvSize, MSG_NOSIGNAL); // MSG_NOSIGNAL: 客户端断开连接,操作系统不会向进程发送 SIGPIPE 信号,而是会返回 -1
// int sendSize = send(connectFd, buffer, recvSize, 0); // 由于使用的是recvSize,此时值为0,不会触发实际的网络传输,直接返回0,所以没有触发 SIGPIPE 信号
if (sendSize == -1) {
perror("send failed");
continue;
}
// 限制打印长度
printf("send = %.*s, sendSize = %d\n", sendSize, buffer, sendSize);
}
}
}
return 0;
}
-
fd_set定义fd_set是select系统调用中用来表示一组文件描述符的数据结构,通常实现为位掩码(bitmask)。系统定义(简化版):
// 常见实现(Linux) // fd_set 内部实际就是一个 1024 位的数组 // Linux(64-bit): 16 个 64 位整数; Linux(32-bit): 32 个 32 位整数 // 这就是为什么 fd_set 在不同平台都能支持 1024 个 fd #define FD_SETSIZE 1024 typedef struct { unsigned long fds_bits[FD_SETSIZE / (8 * sizeof(long))]; } fd_set; -
宏函数
FD_SET(fd, set)本质是:set->bits[fd / 64] |= (1UL << (fd % 64))FD_ISSET(fd, set)本质是:(set->bits[fd / 64] >> (fd % 64)) & 1
-
select运行机制阶段 描述 用户准备 调用 FD_ZERO、FD_SET构建三个集合(分别监控可读、可写、异常事件),并计算nfds = max_fd + 1(所有被监控 fd 的最大值 +1)。内核阻塞 select进入内核,将三个fd_set从用户态拷贝到内核态。然后内核轮询检查[0, nfds)范围内的每个文件描述符是否满足事件条件。就绪返回 一旦有描述符就绪(或超时、被信号打断),内核修改这些 fd_set—— 只保留已经就绪的描述符,未就绪的对应位清零。然后将修改后的集合拷贝回用户态,并返回就绪的总数。用户处理 返回后,用户调用 FD_ISSET逐个测试哪些 fd 仍在集合中,从而知道哪些描述符可读/可写/异常。 -
select的缺点 → 为什么被epoll/kqueue取代?缺点 说明 描述符上限 1024 现代高并发服务器无法接受(C10K 问题) O(n) 遍历 即使只有 1 个活跃连接,也要扫描全部 1024 个描述符 重复拷贝 每次调用都拷贝整个 fd_set进内核(即使没变化)集合被修改 需要每次都重新构造,编程不便
3.2 poll —— select 改进版
server_poll.c
#include <sys/socket.h>
#include <netinet/in.h>
#include <poll.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#define LISTENER_PORT 8888 // 定义一个0~1023以外的监听端口
#define LISTEN_BACKLOG 10 // 定义已完成连接队列的最大长度
#define BUFFER_CAPACITY 1024 // 定义缓冲区容量
#define FD_SIZE 1024 // 定义fd的最大数量
int main() {
int socketFd = socket(AF_INET, SOCK_STREAM, 0);
if (socketFd == -1) {
perror("socket failed");
exit(EXIT_FAILURE);
}
printf("socketFd = %d\n", socketFd);
struct sockaddr_in addr;
addr.sin_family = AF_INET; // 通信协议族IPv4
addr.sin_addr.s_addr = htonl(INADDR_ANY); // INADDR_ANY(0):所有IP地址
addr.sin_port = htons(LISTENER_PORT); // 自定义一个0~1023以外的监听端口
if (bind(socketFd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { // 监听fd绑定IP地址和端口号
perror("bind failed");
close(socketFd);
exit(EXIT_FAILURE);
}
if (listen(socketFd, LISTEN_BACKLOG) == -1) { // socketFd进入监听状态,设置已完成连接队列的最大长度
perror("listen failed");
close(socketFd);
exit(EXIT_FAILURE);
}
struct pollfd fds[FD_SIZE] = {0};
fds[socketFd].fd = socketFd;
fds[socketFd].events = POLLIN;
int maxFd = socketFd;
while (1)
{
if (poll(fds, maxFd + 1, -1) == -1) {
perror("poll failed");
close(socketFd);
exit(EXIT_FAILURE);
}
if (fds[socketFd].revents & POLLIN) { // socketFd 就绪
struct sockaddr_in clientAddr;
socklen_t clientAddrLen = sizeof(clientAddr);
int connectFd = accept(socketFd, (struct sockaddr *)&clientAddr, &clientAddrLen);
if (connectFd == -1) {
perror("accept failed");
continue;
}
printf("connectFd = %d\n", connectFd);
if (connectFd > maxFd) {
maxFd = connectFd;
}
fds[connectFd].fd = connectFd;
fds[connectFd].events = POLLIN;
}
for (int i = socketFd + 1; i <= maxFd; ++i) {
if (fds[i].revents & POLLIN) { // IO/连接fd 就绪
char buffer[BUFFER_CAPACITY] = {0};
int recvSize = recv(i, buffer, sizeof(buffer), 0); // 接收数据
if (recvSize == -1) {
perror("recv failed");
memset(&fds[i], 0, sizeof(struct pollfd));
close(i);
continue;
}
if (recvSize == 0) { // 处理原因:客户端断开连接后 recvSize = 0
printf("connect closed. connectFd = %d\n", i);
memset(&fds[i], 0, sizeof(struct pollfd));
close(i);
continue;
}
// 限制打印长度
printf("recv = %.*s, recvSize = %d\n", recvSize, buffer, recvSize);
int sendSize = send(i, buffer, recvSize, MSG_NOSIGNAL); // MSG_NOSIGNAL: 客户端断开连接,操作系统不会向进程发送 SIGPIPE 信号,而是会返回 -1
// int sendSize = send(connectFd, buffer, recvSize, 0); // 由于使用的是recvSize,此时值为0,不会触发实际的网络传输,直接返回0,所以没有触发 SIGPIPE 信号
if (sendSize == -1) {
perror("send failed");
continue;
}
// 限制打印长度
printf("send = %.*s, sendSize = %d\n", sendSize, buffer, sendSize);
}
}
}
return 0;
}
-
struct pollfd结构struct pollfd { int fd; // 要监控的文件描述符 short events; // 感兴趣的事件(位掩码) short revents; // 实际发生的事件(内核填写) }; -
核心特点
-
优点
无 1024 限制:可以监控任意多个文件描述符(只要系统内存允许)。
不修改传入的 events:
revents单独返回,下一次调用无需重建数组。 -
缺点
仍然是 O(n):当大量连接(如 10K)只有少数活跃时,内核仍需遍历整个
pollfd数组,效率低下。用户态/内核态拷贝:整个数组仍要拷贝到内核,再拷贝回来。
-
3.3 epoll
epoll 是 Linux 特有的高性能 I/O 多路复用机制,采用事件驱动模型。
server_epoll.c
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#define LISTENER_PORT 8888 // 定义一个0~1023以外的监听端口
#define LISTEN_BACKLOG 10 // 定义已完成连接队列的最大长度
#define BUFFER_CAPACITY 1024 // 定义缓冲区容量
#define MAX_EVENTS 1024 // events 数组的最大长度(即最多返回多少个事件)
int main() {
int socketFd = socket(AF_INET, SOCK_STREAM, 0);
if (socketFd == -1) {
perror("socket failed");
exit(EXIT_FAILURE);
}
printf("socketFd = %d\n", socketFd);
struct sockaddr_in addr;
addr.sin_family = AF_INET; // 通信协议族IPv4
addr.sin_addr.s_addr = htonl(INADDR_ANY); // INADDR_ANY(0):所有IP地址
addr.sin_port = htons(LISTENER_PORT); // 自定义一个0~1023以外的监听端口
if (bind(socketFd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { // 监听fd绑定IP地址和端口号
perror("bind failed");
close(socketFd);
exit(EXIT_FAILURE);
}
if (listen(socketFd, LISTEN_BACKLOG) == -1) { // socketFd进入监听状态,设置已完成连接队列的最大长度
perror("listen failed");
close(socketFd);
exit(EXIT_FAILURE);
}
// 使用 epoll_create1 替代旧的 epoll_create,它没有废弃的 size 参数,语义更清晰
int epollFd = epoll_create1(0);
if (epollFd == -1) {
perror("epoll_create failed");
close(socketFd);
exit(EXIT_FAILURE);
}
printf("epollFd = %d\n", epollFd);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = socketFd;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, socketFd, &ev) == -1) {
perror("epoll_ctl_add failed: socketFd");
close(epollFd);
close(socketFd);
exit(EXIT_FAILURE);
}
struct epoll_event events[MAX_EVENTS] = {0};
while (1)
{
int nfds = epoll_wait(epollFd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait failed");
close(epollFd);
close(socketFd);
exit(EXIT_FAILURE);
}
for (int i = 0; i < nfds; ++i) {
if (events[i].data.fd == socketFd) { // socketFd 就绪
struct sockaddr_in clientAddr;
socklen_t clientAddrLen = sizeof(clientAddr);
int connectFd = accept(socketFd, (struct sockaddr *)&clientAddr, &clientAddrLen);
if (connectFd == -1) {
perror("accept failed");
continue;
}
printf("connectFd = %d\n", connectFd);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = connectFd;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, connectFd, &ev) == -1) {
perror("epoll_ctl_add failed: connectFd");
close(connectFd);
continue;
}
} else { // IO/连接fd 就绪
int connectFd = events[i].data.fd;
char buffer[BUFFER_CAPACITY] = {0};
int recvSize = recv(connectFd, buffer, sizeof(buffer), 0); // 接收数据
if (recvSize == -1) {
perror("recv failed");
if (epoll_ctl(epollFd, EPOLL_CTL_DEL, connectFd, NULL) == -1) {
perror("epoll_ctl_del failed: connectFd");
}
close(connectFd);
continue;
}
if (recvSize == 0) { // 处理原因:客户端断开连接后 recvSize = 0
printf("connect closed. connectFd = %d\n", connectFd);
if (epoll_ctl(epollFd, EPOLL_CTL_DEL, connectFd, NULL) == -1) {
perror("epoll_ctl_del failed: connectFd");
}
close(connectFd);
continue;
}
// 限制打印长度
printf("recv = %.*s, recvSize = %d\n", recvSize, buffer, recvSize);
int sendSize = send(connectFd, buffer, recvSize, MSG_NOSIGNAL); // MSG_NOSIGNAL: 客户端断开连接,操作系统不会向进程发送 SIGPIPE 信号,而是会返回 -1
// int sendSize = send(connectFd, buffer, recvSize, 0); // 由于使用的是recvSize,此时值为0,不会触发实际的网络传输,直接返回0,所以没有触发 SIGPIPE 信号
if (sendSize == -1) {
perror("send failed");
continue;
}
// 限制打印长度
printf("send = %.*s, sendSize = %d\n", sendSize, buffer, sendSize);
}
}
}
return 0;
}
-
epoll_event结构体struct epoll_event { uint32_t events; // 事件掩码(EPOLLIN, EPOLLOUT, EPOLLET...) union { void *ptr; int fd; uint32_t u32; uint64_t u64; } data; // 用户数据,通常存放 fd }; -
epoll运行机制epoll 的高性能依赖于内核中的两个核心数据结构和一个回调机制。
-
内核数据结构
红黑树(rbtree):存储所有通过
epoll_ctl注册到 epoll 实例的文件描述符;增删改查操作的时间复杂度为 O(log n),适合管理大量连接。就绪链表(ready list):存储已经就绪(有 I/O 事件)的文件描述符;当描述符状态变化时,内核通过回调机制直接将其加入链表,无需轮询扫描。
-
工作流程(三个 API 的协作)
用户进程 内核 | | +-- epoll_create() --->| 创建 epoll 实例(在内核中),返回 epfd | | +-- epoll_ctl(ADD) --->| 将 fd 加入红黑树,并设置回调函数 | | (回调将就绪 fd 加入就绪链表) | | +-- epoll_wait() ----->| 检查就绪链表是否为空 | | - 非空:拷贝就绪事件到用户空间并返回 | | - 空:进程睡眠,等待就绪事件唤醒 |<---- 就绪事件 ---------+ | | | 处理事件 | +----------------------+
-
-
工作模式:LT vs ET
模式 触发条件 举例(fd 可读) 特点 LT(水平触发) 只要 fd 处于可读/可写状态,每次 epoll_wait都会返回该事件缓冲区有 1 字节数据未读,后续 epoll_wait会一直返回该 fd默认模式 ET(边缘触发) 仅当 fd 状态从不就绪变为就绪时触发一次 缓冲区首次有数据到达时触发一次;若未读完,后续 epoll_wait不会再触发,直到新数据再次到达高效,适合高吞吐量、低延迟场景(如大文件传输),要求应用层一次性处理完所有数据,减少了用户态/内核态切换 可以这样理解:LT 是“状态通知”,ET 是“变化通知”。
3.4 select / poll / epoll 对比总结
| 特性 | select | poll | epoll |
|---|---|---|---|
| 描述符上限 | 1024(FD_SETSIZE) | 无限制(受内存限制) | 无限制(受内存限制) |
| 时间复杂度 | O(n) 扫描所有 fd | O(n) 扫描所有 fd | O(1) 仅处理就绪 fd |
| 内核/用户拷贝 | 每次调用拷贝全量集合 | 每次调用拷贝全量数组 | 只需注册一次,就绪事件增量拷贝 |
| 工作模式 | 水平触发(LT) | 水平触发(LT) | 支持 LT 和 ET |
| 就绪通知 | 修改传入的 fd_set | 填充 revents | 返回独立的 events 数组 |
| 适用场景 | 小规模(< 1024) | 中等规模(几千) | 大规模(万级以上) |
四、Reactor 设计模式的原理与实现
reactor是一种处理高并发 I/O 的经典事件驱动模型。核心思想是:一个线程(事件循环)不断监听多个事件源,当某个 I/O 事件就绪(如数据到达、连接请求),就把对应的处理回调分派给处理器执行。
server_reactor.h
// 使用头文件保护符(Include Guard)防止头文件重复包含
// #ifndef 宏
#ifndef SERVER_REACTOR_H
#define SERVER_REACTOR_H
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/time.h>
#include <string.h>
#define LISTENER_PORT 8888 // 定义一个0~1023以外的监听端口
#define LISTEN_BACKLOG 10 // 定义已完成连接队列的最大长度
#define BUFFER_CAPACITY 1024 // 定义缓冲区容量
#define MAX_EVENTS 1024 // events 数组的最大长度(即最多返回多少个事件)
#define MAX_FD_NUM 1048576 // 进程最多能同时打开的文件描述符数量
// 前向声明结构体(告诉编译器存在这个结构体)
struct event_handler;
typedef void (*handle) (struct event_handler *handler);
// 定义事件处理器结构
typedef struct event_handler {
int fd;
handle handle_read;
handle handle_write;
char *buffer;
int buffer_size;
} event_handler_t;
enum FD_TYPE {
SCOKETFD,
CONNECTFD
};
// 声明, 不分配存储空间; 告诉编译器这是一个外部链接的变量
// 多个 .c 文件包含该头文件,都只是声明,不会产生重复定义,链接时会统一指向那个唯一的定义。注意必须搭配一个 .c 文件中的实际定义
extern int epoll_fd;
extern event_handler_t handler_list[MAX_FD_NUM];
// 初始化服务器
int init_server();
// 定义三种事件类型的回调函数
void accept_cb(event_handler_t *handler); // 建立IO连接事件回调
void read_cb(event_handler_t *handler); // 接收数据事件回调
void write_cb(event_handler_t *handler); // 发送数据事件回调
// 设置 epoll 事件
void set_epoll_event(int fd, int epoll_op_type, int epoll_events);
// 注册事件
void register_event(int fd, int fd_type);
#endif
server_reactor.c
#include "server_reactor.h"
// 定义
int epoll_fd;
event_handler_t handler_list[MAX_FD_NUM];
// 初始化服务器
int init_server() {
int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (socket_fd == -1) {
perror("socket failed");
exit(EXIT_FAILURE);
}
printf("socket_fd = %d\n", socket_fd);
// 设置 SO_REUSEADDR,允许端口重用
int opt = 1;
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
perror("setsockopt failed");
close(socket_fd);
exit(EXIT_FAILURE);
}
struct sockaddr_in addr;
addr.sin_family = AF_INET; // 通信协议族IPv4
addr.sin_addr.s_addr = htonl(INADDR_ANY); // INADDR_ANY(0):所有IP地址
addr.sin_port = htons(LISTENER_PORT); // 自定义一个0~1023以外的监听端口
if (bind(socket_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { // 监听fd绑定IP地址和端口号
perror("bind failed");
close(socket_fd);
exit(EXIT_FAILURE);
}
if (listen(socket_fd, LISTEN_BACKLOG) == -1) { // socket_fd进入监听状态,设置已完成连接队列的最大长度
perror("listen failed");
close(socket_fd);
exit(EXIT_FAILURE);
}
return socket_fd;
}
// 建立IO连接事件回调
void accept_cb(event_handler_t *handler) {
int socket_fd = handler->fd;
struct sockaddr_in clientAddr;
socklen_t clientAddrLen = sizeof(clientAddr);
int connect_fd = accept(socket_fd, (struct sockaddr *)&clientAddr, &clientAddrLen);
if (connect_fd == -1) {
perror("accept failed");
return ;
}
printf("connect_fd = %d\n", connect_fd);
register_event(connect_fd, CONNECTFD);
}
// 接收数据事件回调
void read_cb(event_handler_t *handler) {
int connect_fd = handler->fd;
char *buffer = (char *)calloc(BUFFER_CAPACITY, sizeof(char));
if (buffer == NULL) {
perror("calloc failed");
return ;
}
int recv_size = recv(connect_fd, buffer, BUFFER_CAPACITY, 0); // 接收数据
if (recv_size == -1) {
perror("recv failed");
set_epoll_event(connect_fd, EPOLL_CTL_DEL, 0);
return ;
}
if (recv_size == 0) { // 处理原因:客户端断开连接后 recv_size = 0
printf("connect closed. connect_fd = %d\n", connect_fd);
set_epoll_event(connect_fd, EPOLL_CTL_DEL, 0);
return ;
}
// 限制打印长度
printf("recv = %.*s, recv_size = %d\n", recv_size, buffer, recv_size);
set_epoll_event(connect_fd, EPOLL_CTL_MOD, EPOLLOUT);
handler->buffer = buffer;
handler->buffer_size = recv_size;
}
// 发送数据事件回调
void write_cb(event_handler_t *handler) {
int connect_fd = handler->fd;
char *buffer = handler->buffer;
int buffer_size = handler->buffer_size;
int send_size = send(connect_fd, buffer, buffer_size, MSG_NOSIGNAL); // MSG_NOSIGNAL: 客户端断开连接,操作系统不会向进程发送 SIGPIPE 信号,而是会返回 -1
// int send_size = send(connect_fd, buffer, recvSize, 0); // 由于使用的是recvSize,此时值为0,不会触发实际的网络传输,直接返回0,所以没有触发 SIGPIPE 信号
if (send_size == -1) {
perror("send failed");
return ;
}
// 限制打印长度
printf("send = %.*s, send_size = %d\n", send_size, buffer, send_size);
set_epoll_event(connect_fd, EPOLL_CTL_MOD, EPOLLIN);
free(handler->buffer);
handler->buffer = NULL;
handler->buffer_size = 0;
}
// 设置 epoll 事件
void set_epoll_event(int fd, int epoll_op_type, int epoll_events) {
switch (epoll_op_type) {
case EPOLL_CTL_ADD:
case EPOLL_CTL_MOD:
struct epoll_event ev;
ev.events = epoll_events;
ev.data.fd = fd;
if (epoll_ctl(epoll_fd, epoll_op_type, fd, &ev) == -1) {
perror("epoll_ctl_add failed");
close(fd);
return ;
}
break;
case EPOLL_CTL_DEL:
if (epoll_ctl(epoll_fd, epoll_op_type, fd, NULL) == -1) {
perror("epoll_ctl_del failed");
return ;
}
close(fd);
break;
default:
break;
}
}
// 注册事件
void register_event(int fd, int fd_type) {
event_handler_t handler;
handler.fd = fd;
switch (fd_type) {
case SCOKETFD:
handler.handle_read = accept_cb;
break;
case CONNECTFD:
handler.handle_read = read_cb;
handler.handle_write = write_cb;
break;
default:
break;
}
handler_list[fd] = handler;
set_epoll_event(fd, EPOLL_CTL_ADD, EPOLLIN);
}
main.c
#include "server_reactor.h"
int main() {
// 初始化服务器
int socket_fd = init_server();
// 使用 epoll_create1 替代旧的 epoll_create,它没有废弃的 size 参数,语义更清晰
epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("epoll_create failed");
close(socket_fd);
exit(EXIT_FAILURE);
}
printf("epoll_fd = %d\n", epoll_fd);
register_event(socket_fd, SCOKETFD);
struct epoll_event events[MAX_EVENTS] = {0};
while (1)
{
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait failed");
close(epoll_fd);
close(socket_fd);
exit(EXIT_FAILURE);
}
for (int i = 0; i < nfds; ++i) {
int fd = events[i].data.fd;
event_handler_t *handler = &handler_list[fd];
switch (events[i].events) // 根据事件类型执行不同的回调函数
{
case EPOLLIN:
handler->handle_read(handler);
break;
case EPOLLOUT:
handler->handle_write(handler);
break;
default:
break;
}
}
}
return 0;
}
-
核心特点
将“等待事件”与“处理事件”解耦:主循环只通过
epoll_wait获取就绪 fd,并直接回调预先注册的handle_read/handle_write,不再需要在主循环中写一堆if-else去区分 fd 和业务逻辑。 -
与裸 epoll 的关键区别
裸 epoll 在事件循环里用
if (fd == socket_fd)区分连接请求和普通数据,业务越多分叉越乱;
Reactor 通过handler_list[fd]将每个 fd 与其处理回调绑定,主循环仅负责分派,不掺杂任何具体业务逻辑。 -
连接上下文管理
event_handler_t为每个连接保存了fd、buffer、buffer_size和对应的读写回调,让数据在read_cb和write_cb之间自然传递,避免使用全局数组或临时变量传递状态。
📚 系列导航
本文所有代码均在 Ubuntu 24.04 ARM64 + GCC 环境下编译运行通过,欢迎留言交流。
📌 更新:下一篇已发布
搞定了 IO 多路复用和 Reactor,该深入网络协议核心了!本文的第四篇《从TCP三次握手到QUIC:一篇搞懂网络传输协议核心原理》已发布。从 TCP 的三次握手、四次挥手,到 UDP 对比、ARQ 重传、流量控制与拥塞控制,再到 KCP 和 QUIC 协议,一篇理清网络传输的核心原理,全是干货,快来一起进阶吧!
更多推荐


所有评论(0)