本文从“一请求一线程”模型开始,逐步演进到 select/poll/epoll,最后实现 Reactor 模式,包含全部代码实现和原理说明,适合 Linux C/C++ 后端开发者系统学习与回顾 I/O 多路复用与事件驱动设计。


一、网络编程基础

1.1 查看网卡配置

Linux

# 不显示端口或连接信息
ip a

1.2 查看网络连接的进程信息

Linux

# 传统 net-tools 工具
netstat -anop
# 更现代命令
ss -tunap

MacOS

lsof -i -Pn

1.3 显示当前 shell 环境下的所有资源限制

ulimitshell 内置命令(BashZsh 等)

# 如果需要修改限制,可使用 ulimit -<选项> <值>(通常只有 root 权限的用户才能提高硬限制:软限制的上限)
$ ulimit -a
real-time non-blocking time  (microseconds, -R) unlimited
core file size              (blocks, -c) 0					# core 转储文件的最大大小
data seg size               (kbytes, -d) unlimited	# 进程数据段的最大大小
scheduling priority                 (-e) 0
file size                   (blocks, -f) unlimited	# 单个文件的最大大小
pending signals                     (-i) 15151
max locked memory           (kbytes, -l) 499500
max memory size             (kbytes, -m) unlimited
open files                          (-n) 1048576		# 进程最多能同时打开的文件描述符数量
pipe size                (512 bytes, -p) 8
POSIX message queues         (bytes, -q) 819200
real-time priority                  (-r) 0
stack size                  (kbytes, -s) 8192				# 栈的最大大小
cpu time                   (seconds, -t) unlimited	# 进程可占用的 CPU 时间(秒)
max user processes                  (-u) 15151			# 用户最多可运行的进程数
virtual memory              (kbytes, -v) unlimited	# 虚拟内存的最大大小
file locks                          (-x) unlimited

1.4 查看内核/用户空间的位数(即操作系统是64位还是32位)

Linux

uname -m

1.5 网络调试助手

https://github.com/nicedayzhu/netAssist

python3 --version
pip3 install PyQt5
python3 main.py

1.6 wrk

现代 HTTP 基准测试工具,用于对 Web 服务器进行性能压测。

https://github.com/wg/wrk

git clone https://github.com/wg/wrk.git
# 或者
git clone git@github.com:wg/wrk.git

二、一请求一线程模型

  • server_io.c
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>

#define LISTENER_PORT   8888    // 定义一个0~1023以外的监听端口
#define LISTEN_BACKLOG  10      // 定义已完成连接队列的最大长度
#define BUFFER_CAPACITY 1024    // 定义缓冲区容量

void* ioHandleFunc (void* arg) {
    int connectFd = *(int*)arg;
    free(arg);                        // 释放堆上分配的 fd,避免内存泄漏
    while (1)
    {
        char buffer[BUFFER_CAPACITY] = {0};
        int recvSize = recv(connectFd, buffer, sizeof(buffer), 0);  // 接收数据
        if (recvSize < 0) {
            perror("recv failed");
            break;
        }
        if (recvSize == 0) {    // 处理原因:客户端断开连接后 recvSize = 0,由于send的第三个参数使用recvSize,会导致死循环
            printf("connect closed. connectFd = %d\n", connectFd);
            break;
        }
        printf("recv = %s, recvSize = %d\n", buffer, recvSize);

        int sendSize = send(connectFd, buffer, recvSize, MSG_NOSIGNAL); // MSG_NOSIGNAL: 客户端断开连接,操作系统不会向进程发送 SIGPIPE 信号,而是会返回 -1
        // int sendSize = send(connectFd, buffer, recvSize, 0);    // 由于使用的是recvSize,此时值为0,不会触发实际的网络传输,直接返回0,所以没有触发 SIGPIPE 信号
        if (sendSize < 0) {
            perror("send failed");
            break;
        }
        printf("send = %s, sendSize = %d\n", buffer, sendSize);
    }
    close(connectFd);
    return NULL;
}

int main() {
    int socketFd = socket(AF_INET, SOCK_STREAM, 0); // 生成监听fd
    if (socketFd < 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    printf("socketFd = %d\n", socketFd);

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;  // 通信协议族IPv4
    addr.sin_addr.s_addr = htonl(INADDR_ANY);   // INADDR_ANY(0):所有IP地址
    addr.sin_port = htons(LISTENER_PORT);   // 自定义一个0~1023以外的监听端口

    if (bind(socketFd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { // 监听fd绑定IP地址和端口号
        perror("bind failed");
        close(socketFd);
        exit(EXIT_FAILURE);
    }

    if (listen(socketFd, LISTEN_BACKLOG) < 0) {    // socketFd进入监听状态,设置已完成连接队列的最大长度
        perror("listen failed");
        close(socketFd);
        exit(EXIT_FAILURE);
    }

    struct sockaddr_in clientAddr;
    socklen_t clientAddrLen = sizeof(clientAddr);

    printf("wait connect...\n");
    while (1)
    {
        int connectFd = accept(socketFd, (struct sockaddr *)&clientAddr, &clientAddrLen);    // IO/连接fd
        if (connectFd < 0) {
            perror("accept failed");
            break;
        }
        printf("connectFd = %d\n", connectFd);

        // 避免直接使用 &connectFd 产生竞争条件
        int *pfd = malloc(sizeof(int));
        *pfd = connectFd;

        pthread_t thread;
        int ret = pthread_create(&thread, NULL, ioHandleFunc, pfd);
        if (ret != 0) {    // 成功返回0
            fprintf(stderr, "pthread_create failed. ret = %d", ret);
            close(connectFd);
            break;
        }
    }

    close(socketFd);

    return 0;
}
  • 核心特点

    • 优点

      编程简单直观:代码逻辑简单,容易理解和调试。

    • 缺点

      资源开销巨大:每个线程都有独立的栈空间(通常1~8MB)和线程控制块,创建和销毁开销可观。当并发数达到几千甚至上万时,内存会迅速耗尽。调度开销高:大量线程同时处于可运行状态时,操作系统内核需要频繁进行上下文切换(保存/恢复寄存器、内存映射等),这会导致CPU有效利用率显著下降。


三、I/O 多路复用

3.1 select

  • server_select.c
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

#define LISTENER_PORT   8888    // 定义一个0~1023以外的监听端口
#define LISTEN_BACKLOG  10      // 定义已完成连接队列的最大长度
#define BUFFER_CAPACITY 1024    // 定义缓冲区容量

int main() {
    int socketFd = socket(AF_INET, SOCK_STREAM, 0);
    if (socketFd == -1) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    printf("socketFd = %d\n", socketFd);

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;  // 通信协议族IPv4
    addr.sin_addr.s_addr = htonl(INADDR_ANY);   // INADDR_ANY(0):所有IP地址
    addr.sin_port = htons(LISTENER_PORT);   // 自定义一个0~1023以外的监听端口

    if (bind(socketFd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { // 监听fd绑定IP地址和端口号
        perror("bind failed");
        close(socketFd);
        exit(EXIT_FAILURE);
    }

    if (listen(socketFd, LISTEN_BACKLOG) == -1) {    // socketFd进入监听状态,设置已完成连接队列的最大长度
        perror("listen failed");
        close(socketFd);
        exit(EXIT_FAILURE);
    }

    // 定义可读fd全集合
    fd_set rfds;
    FD_ZERO(&rfds);
    FD_SET(socketFd, &rfds);

    int maxFd = socketFd;

    while (1)
    {
        fd_set readfds = rfds;  // 重置传入传出参数
        if (select(maxFd + 1, &readfds, NULL, NULL, NULL) == -1 ) {
            perror("select failed");
            close(socketFd);
            exit(EXIT_FAILURE);
        }

        if (FD_ISSET(socketFd, &readfds)) {    // socketFd 就绪
            struct sockaddr_in clientAddr;
            socklen_t clientAddrLen = sizeof(clientAddr);
            int connectFd = accept(socketFd, (struct sockaddr *)&clientAddr, &clientAddrLen);
            if (connectFd == -1) {
                perror("accept failed");
                continue;
            }
            printf("connectFd = %d\n", connectFd);
            if (connectFd > maxFd) {
                maxFd = connectFd;
            }
            FD_SET(connectFd, &rfds);
        }

        for (int i = socketFd + 1; i <= maxFd; ++i) {
            if (FD_ISSET(i, &readfds)) {    // IO/连接fd 就绪
                char buffer[BUFFER_CAPACITY] = {0};
                int recvSize = recv(i, buffer, sizeof(buffer), 0);  // 接收数据
                if (recvSize == -1) {
                    perror("recv failed");
                    FD_CLR(i, &rfds);
                    close(i);
                    continue;
                }
                if (recvSize == 0) {    // 处理原因:客户端断开连接后 recvSize = 0
                    printf("connect closed. connectFd = %d\n", i);
                    FD_CLR(i, &rfds);
                    close(i);
                    continue;
                }
                // 限制打印长度
                printf("recv = %.*s, recvSize = %d\n", recvSize, buffer, recvSize);

                int sendSize = send(i, buffer, recvSize, MSG_NOSIGNAL); // MSG_NOSIGNAL: 客户端断开连接,操作系统不会向进程发送 SIGPIPE 信号,而是会返回 -1
                // int sendSize = send(connectFd, buffer, recvSize, 0);    // 由于使用的是recvSize,此时值为0,不会触发实际的网络传输,直接返回0,所以没有触发 SIGPIPE 信号
                if (sendSize == -1) {
                    perror("send failed");
                    continue;
                }
                // 限制打印长度
                printf("send = %.*s, sendSize = %d\n", sendSize, buffer, sendSize);
            }
        }
    }
    
    return 0;
}
  • fd_set定义

    fd_setselect 系统调用中用来表示一组文件描述符的数据结构,通常实现为位掩码(bitmask)

    系统定义(简化版):

    // 常见实现(Linux)
    // fd_set 内部实际就是一个 1024 位的数组
    // Linux(64-bit): 16 个 64 位整数; Linux(32-bit): 32 个 32 位整数
    // 这就是为什么 fd_set 在不同平台都能支持 1024 个 fd
    #define FD_SETSIZE 1024
    typedef struct {
        unsigned long fds_bits[FD_SETSIZE / (8 * sizeof(long))];
    } fd_set;
    
  • 宏函数

    • FD_SET(fd, set) 本质是:set->bits[fd / 64] |= (1UL << (fd % 64))
    • FD_ISSET(fd, set) 本质是:(set->bits[fd / 64] >> (fd % 64)) & 1
  • select运行机制

    阶段 描述
    用户准备 调用 FD_ZEROFD_SET 构建三个集合(分别监控可读、可写、异常事件),并计算 nfds = max_fd + 1(所有被监控 fd 的最大值 +1)。
    内核阻塞 select 进入内核,将三个 fd_set用户态拷贝到内核态。然后内核轮询检查 [0, nfds) 范围内的每个文件描述符是否满足事件条件。
    就绪返回 一旦有描述符就绪(或超时、被信号打断),内核修改这些 fd_set —— 只保留已经就绪的描述符,未就绪的对应位清零。然后将修改后的集合拷贝回用户态,并返回就绪的总数。
    用户处理 返回后,用户调用 FD_ISSET 逐个测试哪些 fd 仍在集合中,从而知道哪些描述符可读/可写/异常。
  • select 的缺点 → 为什么被 epoll / kqueue 取代?

    缺点 说明
    描述符上限 1024 现代高并发服务器无法接受(C10K 问题)
    O(n) 遍历 即使只有 1 个活跃连接,也要扫描全部 1024 个描述符
    重复拷贝 每次调用都拷贝整个 fd_set 进内核(即使没变化)
    集合被修改 需要每次都重新构造,编程不便

3.2 poll —— select 改进版

  • server_poll.c
#include <sys/socket.h>
#include <netinet/in.h>
#include <poll.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>

#define LISTENER_PORT   8888    // 定义一个0~1023以外的监听端口
#define LISTEN_BACKLOG  10      // 定义已完成连接队列的最大长度
#define BUFFER_CAPACITY 1024    // 定义缓冲区容量
#define FD_SIZE         1024    // 定义fd的最大数量

int main() {
    int socketFd = socket(AF_INET, SOCK_STREAM, 0);
    if (socketFd == -1) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    printf("socketFd = %d\n", socketFd);

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;  // 通信协议族IPv4
    addr.sin_addr.s_addr = htonl(INADDR_ANY);   // INADDR_ANY(0):所有IP地址
    addr.sin_port = htons(LISTENER_PORT);   // 自定义一个0~1023以外的监听端口

    if (bind(socketFd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { // 监听fd绑定IP地址和端口号
        perror("bind failed");
        close(socketFd);
        exit(EXIT_FAILURE);
    }

    if (listen(socketFd, LISTEN_BACKLOG) == -1) {    // socketFd进入监听状态,设置已完成连接队列的最大长度
        perror("listen failed");
        close(socketFd);
        exit(EXIT_FAILURE);
    }

    struct pollfd fds[FD_SIZE] = {0};
    fds[socketFd].fd = socketFd;
    fds[socketFd].events = POLLIN;

    int maxFd = socketFd;

    while (1) 
    {
        if (poll(fds, maxFd + 1, -1) == -1) {
            perror("poll failed");
            close(socketFd);
            exit(EXIT_FAILURE);
        }

        if (fds[socketFd].revents & POLLIN) {   // socketFd 就绪
            struct sockaddr_in clientAddr;
            socklen_t clientAddrLen = sizeof(clientAddr);
            int connectFd = accept(socketFd, (struct sockaddr *)&clientAddr, &clientAddrLen);
            if (connectFd == -1) {
                perror("accept failed");
                continue;
            }
            printf("connectFd = %d\n", connectFd);
            if (connectFd > maxFd) {
                maxFd = connectFd;
            }
            fds[connectFd].fd = connectFd;
            fds[connectFd].events = POLLIN;
        }
        
        for (int i = socketFd + 1; i <= maxFd; ++i) {
            if (fds[i].revents & POLLIN) {  // IO/连接fd 就绪
                char buffer[BUFFER_CAPACITY] = {0};
                int recvSize = recv(i, buffer, sizeof(buffer), 0);  // 接收数据
                if (recvSize == -1) {
                    perror("recv failed");
                    memset(&fds[i], 0, sizeof(struct pollfd));
                    close(i);
                    continue;
                }
                if (recvSize == 0) {    // 处理原因:客户端断开连接后 recvSize = 0
                    printf("connect closed. connectFd = %d\n", i);
                    memset(&fds[i], 0, sizeof(struct pollfd));
                    close(i);
                    continue;
                }
                // 限制打印长度
                printf("recv = %.*s, recvSize = %d\n", recvSize, buffer, recvSize);

                int sendSize = send(i, buffer, recvSize, MSG_NOSIGNAL); // MSG_NOSIGNAL: 客户端断开连接,操作系统不会向进程发送 SIGPIPE 信号,而是会返回 -1
                // int sendSize = send(connectFd, buffer, recvSize, 0);    // 由于使用的是recvSize,此时值为0,不会触发实际的网络传输,直接返回0,所以没有触发 SIGPIPE 信号
                if (sendSize == -1) {
                    perror("send failed");
                    continue;
                }
                // 限制打印长度
                printf("send = %.*s, sendSize = %d\n", sendSize, buffer, sendSize);
            }
        }
    }
    
    return 0;
}
  • struct pollfd 结构

    struct pollfd {
        int   fd;         // 要监控的文件描述符
        short events;     // 感兴趣的事件(位掩码)
        short revents;    // 实际发生的事件(内核填写)
    };
    
  • 核心特点

    • 优点

      无 1024 限制:可以监控任意多个文件描述符(只要系统内存允许)。

      不修改传入的 eventsrevents 单独返回,下一次调用无需重建数组。

    • 缺点

      仍然是 O(n):当大量连接(如 10K)只有少数活跃时,内核仍需遍历整个 pollfd 数组,效率低下。

      用户态/内核态拷贝:整个数组仍要拷贝到内核,再拷贝回来。


3.3 epoll

epoll 是 Linux 特有的高性能 I/O 多路复用机制,采用事件驱动模型。

  • server_epoll.c
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

#define LISTENER_PORT   8888    // 定义一个0~1023以外的监听端口
#define LISTEN_BACKLOG  10      // 定义已完成连接队列的最大长度
#define BUFFER_CAPACITY 1024    // 定义缓冲区容量
#define MAX_EVENTS      1024    // events 数组的最大长度(即最多返回多少个事件)

int main() {
    int socketFd = socket(AF_INET, SOCK_STREAM, 0);
    if (socketFd == -1) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    printf("socketFd = %d\n", socketFd);

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;  // 通信协议族IPv4
    addr.sin_addr.s_addr = htonl(INADDR_ANY);   // INADDR_ANY(0):所有IP地址
    addr.sin_port = htons(LISTENER_PORT);   // 自定义一个0~1023以外的监听端口

    if (bind(socketFd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { // 监听fd绑定IP地址和端口号
        perror("bind failed");
        close(socketFd);
        exit(EXIT_FAILURE);
    }

    if (listen(socketFd, LISTEN_BACKLOG) == -1) {    // socketFd进入监听状态,设置已完成连接队列的最大长度
        perror("listen failed");
        close(socketFd);
        exit(EXIT_FAILURE);
    }

    // 使用 epoll_create1 替代旧的 epoll_create,它没有废弃的 size 参数,语义更清晰
    int epollFd = epoll_create1(0);
    if (epollFd == -1) {
        perror("epoll_create failed");
        close(socketFd);
        exit(EXIT_FAILURE);
    } 
    printf("epollFd = %d\n", epollFd);

    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = socketFd;
    if (epoll_ctl(epollFd, EPOLL_CTL_ADD, socketFd, &ev) == -1) {
        perror("epoll_ctl_add failed: socketFd");
        close(epollFd);
        close(socketFd);
        exit(EXIT_FAILURE);
    }

    struct epoll_event events[MAX_EVENTS] = {0};

    while (1)
    {
        int nfds = epoll_wait(epollFd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            perror("epoll_wait failed");
            close(epollFd);
            close(socketFd);
            exit(EXIT_FAILURE);
        }
        for (int i = 0; i < nfds; ++i) {
            if (events[i].data.fd == socketFd) {    // socketFd 就绪
                struct sockaddr_in clientAddr;
                socklen_t clientAddrLen = sizeof(clientAddr);
                int connectFd = accept(socketFd, (struct sockaddr *)&clientAddr, &clientAddrLen);
                if (connectFd == -1) {
                    perror("accept failed");
                    continue;
                }
                printf("connectFd = %d\n", connectFd);
                struct epoll_event ev;
                ev.events = EPOLLIN;
                ev.data.fd = connectFd;
                if (epoll_ctl(epollFd, EPOLL_CTL_ADD, connectFd, &ev) == -1) {
                    perror("epoll_ctl_add failed: connectFd");
                    close(connectFd);
                    continue;
                }
            } else {    // IO/连接fd 就绪
                int connectFd = events[i].data.fd;
                char buffer[BUFFER_CAPACITY] = {0};
                int recvSize = recv(connectFd, buffer, sizeof(buffer), 0);  // 接收数据
                if (recvSize == -1) {
                    perror("recv failed");
                    if (epoll_ctl(epollFd, EPOLL_CTL_DEL, connectFd, NULL) == -1) {
                        perror("epoll_ctl_del failed: connectFd");
                    }
                    close(connectFd);
                    continue;
                }
                if (recvSize == 0) {    // 处理原因:客户端断开连接后 recvSize = 0
                    printf("connect closed. connectFd = %d\n", connectFd);
                    if (epoll_ctl(epollFd, EPOLL_CTL_DEL, connectFd, NULL) == -1) {
                        perror("epoll_ctl_del failed: connectFd");
                    }
                    close(connectFd);
                    continue;
                }
                // 限制打印长度
                printf("recv = %.*s, recvSize = %d\n", recvSize, buffer, recvSize);

                int sendSize = send(connectFd, buffer, recvSize, MSG_NOSIGNAL); // MSG_NOSIGNAL: 客户端断开连接,操作系统不会向进程发送 SIGPIPE 信号,而是会返回 -1
                // int sendSize = send(connectFd, buffer, recvSize, 0);    // 由于使用的是recvSize,此时值为0,不会触发实际的网络传输,直接返回0,所以没有触发 SIGPIPE 信号
                if (sendSize == -1) {
                    perror("send failed");
                    continue;
                }
                // 限制打印长度
                printf("send = %.*s, sendSize = %d\n", sendSize, buffer, sendSize);
            }
        }
    }

    return 0;
}
  • epoll_event 结构体

    struct epoll_event {
        uint32_t events;   // 事件掩码(EPOLLIN, EPOLLOUT, EPOLLET...)
        union {
            void   *ptr;
            int     fd;
            uint32_t u32;
            uint64_t u64;
        } data;            // 用户数据,通常存放 fd
    };
    
  • epoll 运行机制

    epoll 的高性能依赖于内核中的两个核心数据结构和一个回调机制

    • 内核数据结构

      红黑树(rbtree):存储所有通过 epoll_ctl 注册到 epoll 实例的文件描述符;增删改查操作的时间复杂度为 O(log n),适合管理大量连接。

      就绪链表(ready list):存储已经就绪(有 I/O 事件)的文件描述符;当描述符状态变化时,内核通过回调机制直接将其加入链表,无需轮询扫描。

    • 工作流程(三个 API 的协作)

      用户进程                 内核
         |                      |
         +-- epoll_create() --->|  创建 epoll 实例(在内核中),返回 epfd
         |                      |
         +-- epoll_ctl(ADD) --->|  将 fd 加入红黑树,并设置回调函数
         |                      |  (回调将就绪 fd 加入就绪链表)
         |                      |
         +-- epoll_wait() ----->|  检查就绪链表是否为空
         |                      |   - 非空:拷贝就绪事件到用户空间并返回
         |                      |   - 空:进程睡眠,等待就绪事件唤醒
         |<---- 就绪事件 ---------+
         |                      |
         | 处理事件             |
         +----------------------+
      
  • 工作模式:LT vs ET

    模式 触发条件 举例(fd 可读) 特点
    LT(水平触发) 只要 fd 处于可读/可写状态,每次 epoll_wait 都会返回该事件 缓冲区有 1 字节数据未读,后续 epoll_wait 会一直返回该 fd 默认模式
    ET(边缘触发) 仅当 fd 状态从不就绪变为就绪时触发一次 缓冲区首次有数据到达时触发一次;若未读完,后续 epoll_wait 不会再触发,直到新数据再次到达 高效,适合高吞吐量、低延迟场景(如大文件传输),要求应用层一次性处理完所有数据,减少了用户态/内核态切换

    可以这样理解:LT 是“状态通知”,ET 是“变化通知”。


3.4 select / poll / epoll 对比总结

特性 select poll epoll
描述符上限 1024(FD_SETSIZE) 无限制(受内存限制) 无限制(受内存限制)
时间复杂度 O(n) 扫描所有 fd O(n) 扫描所有 fd O(1) 仅处理就绪 fd
内核/用户拷贝 每次调用拷贝全量集合 每次调用拷贝全量数组 只需注册一次,就绪事件增量拷贝
工作模式 水平触发(LT) 水平触发(LT) 支持 LT 和 ET
就绪通知 修改传入的 fd_set 填充 revents 返回独立的 events 数组
适用场景 小规模(< 1024) 中等规模(几千) 大规模(万级以上)

四、Reactor 设计模式的原理与实现

reactor是一种处理高并发 I/O 的经典事件驱动模型。核心思想是:一个线程(事件循环)不断监听多个事件源,当某个 I/O 事件就绪(如数据到达、连接请求),就把对应的处理回调分派给处理器执行。

  • server_reactor.h
// 使用头文件保护符(Include Guard)防止头文件重复包含
// #ifndef 宏
#ifndef SERVER_REACTOR_H
#define SERVER_REACTOR_H

#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/time.h>
#include <string.h>

#define LISTENER_PORT   8888    // 定义一个0~1023以外的监听端口
#define LISTEN_BACKLOG  10      // 定义已完成连接队列的最大长度
#define BUFFER_CAPACITY 1024    // 定义缓冲区容量
#define MAX_EVENTS      1024    // events 数组的最大长度(即最多返回多少个事件)
#define MAX_FD_NUM      1048576 // 进程最多能同时打开的文件描述符数量

// 前向声明结构体(告诉编译器存在这个结构体)
struct event_handler;

typedef void (*handle) (struct event_handler *handler);

// 定义事件处理器结构
typedef struct event_handler {
    int fd;
    handle handle_read;
    handle handle_write;
    char *buffer;
    int buffer_size;
} event_handler_t;

enum FD_TYPE {
    SCOKETFD,
    CONNECTFD
};

// 声明, 不分配存储空间; 告诉编译器这是一个外部链接的变量
// 多个 .c 文件包含该头文件,都只是声明,不会产生重复定义,链接时会统一指向那个唯一的定义。注意必须搭配一个 .c 文件中的实际定义
extern int epoll_fd;
extern event_handler_t handler_list[MAX_FD_NUM];

// 初始化服务器
int init_server();
// 定义三种事件类型的回调函数
void accept_cb(event_handler_t *handler);  // 建立IO连接事件回调
void read_cb(event_handler_t *handler);   // 接收数据事件回调
void write_cb(event_handler_t *handler);  // 发送数据事件回调
// 设置 epoll 事件
void set_epoll_event(int fd, int epoll_op_type, int epoll_events);
// 注册事件
void register_event(int fd, int fd_type);

#endif
  • server_reactor.c
#include "server_reactor.h"

// 定义
int epoll_fd;
event_handler_t handler_list[MAX_FD_NUM];

// 初始化服务器
int init_server() {
    int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (socket_fd == -1) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    printf("socket_fd = %d\n", socket_fd);

    // 设置 SO_REUSEADDR,允许端口重用
    int opt = 1;
    if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
        perror("setsockopt failed");
        close(socket_fd);
        exit(EXIT_FAILURE);
    }

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;  // 通信协议族IPv4
    addr.sin_addr.s_addr = htonl(INADDR_ANY);   // INADDR_ANY(0):所有IP地址
    addr.sin_port = htons(LISTENER_PORT);   // 自定义一个0~1023以外的监听端口

    if (bind(socket_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { // 监听fd绑定IP地址和端口号
        perror("bind failed");
        close(socket_fd);
        exit(EXIT_FAILURE);
    }

    if (listen(socket_fd, LISTEN_BACKLOG) == -1) {    // socket_fd进入监听状态,设置已完成连接队列的最大长度
        perror("listen failed");
        close(socket_fd);
        exit(EXIT_FAILURE);
    }
    return socket_fd;
}

// 建立IO连接事件回调
void accept_cb(event_handler_t *handler) {
    int socket_fd = handler->fd;
    struct sockaddr_in clientAddr;
    socklen_t clientAddrLen = sizeof(clientAddr);
    int connect_fd = accept(socket_fd, (struct sockaddr *)&clientAddr, &clientAddrLen);
    if (connect_fd == -1) {
        perror("accept failed");
        return ;
    }
    printf("connect_fd = %d\n", connect_fd);

    register_event(connect_fd, CONNECTFD);
}

// 接收数据事件回调
void read_cb(event_handler_t *handler) {
    int connect_fd = handler->fd;
    char *buffer = (char *)calloc(BUFFER_CAPACITY, sizeof(char));
    if (buffer == NULL) {
        perror("calloc failed");
        return ;
    }
    int recv_size = recv(connect_fd, buffer, BUFFER_CAPACITY, 0);  // 接收数据
    if (recv_size == -1) {
        perror("recv failed");
        set_epoll_event(connect_fd, EPOLL_CTL_DEL, 0);
        return ;
    }
    if (recv_size == 0) {    // 处理原因:客户端断开连接后 recv_size = 0
        printf("connect closed. connect_fd = %d\n", connect_fd);
        set_epoll_event(connect_fd, EPOLL_CTL_DEL, 0);
        return ;
    }
    // 限制打印长度
    printf("recv = %.*s, recv_size = %d\n", recv_size, buffer, recv_size);

    set_epoll_event(connect_fd, EPOLL_CTL_MOD, EPOLLOUT);

    handler->buffer = buffer;
    handler->buffer_size = recv_size;
}

// 发送数据事件回调
void write_cb(event_handler_t *handler) {
    int connect_fd = handler->fd;
    char *buffer = handler->buffer;
    int buffer_size = handler->buffer_size;
    int send_size = send(connect_fd, buffer, buffer_size, MSG_NOSIGNAL); // MSG_NOSIGNAL: 客户端断开连接,操作系统不会向进程发送 SIGPIPE 信号,而是会返回 -1
    // int send_size = send(connect_fd, buffer, recvSize, 0);    // 由于使用的是recvSize,此时值为0,不会触发实际的网络传输,直接返回0,所以没有触发 SIGPIPE 信号
    if (send_size == -1) {
        perror("send failed");
        return ;
    }
    // 限制打印长度
    printf("send = %.*s, send_size = %d\n", send_size, buffer, send_size);

    set_epoll_event(connect_fd, EPOLL_CTL_MOD, EPOLLIN);

    free(handler->buffer);
    handler->buffer = NULL;
    handler->buffer_size = 0;
}

// 设置 epoll 事件
void set_epoll_event(int fd, int epoll_op_type, int epoll_events) {
    switch (epoll_op_type) {
    case EPOLL_CTL_ADD:
    case EPOLL_CTL_MOD:
        struct epoll_event ev;
        ev.events = epoll_events;
        ev.data.fd = fd;
        if (epoll_ctl(epoll_fd, epoll_op_type, fd, &ev) == -1) {
            perror("epoll_ctl_add failed");
            close(fd);
            return ;
        }
        break;
    case EPOLL_CTL_DEL:
        if (epoll_ctl(epoll_fd, epoll_op_type, fd, NULL) == -1) {
            perror("epoll_ctl_del failed");
            return ;
        }
        close(fd);
        break;
    default:
        break;
    }
}

// 注册事件
void register_event(int fd, int fd_type) {
    event_handler_t handler;
    handler.fd = fd;
    switch (fd_type) {
    case SCOKETFD:
        handler.handle_read = accept_cb;
        break;
    case CONNECTFD:
        handler.handle_read = read_cb;
        handler.handle_write = write_cb;
        break;
    default:
        break;
    }
    handler_list[fd] = handler;

    set_epoll_event(fd, EPOLL_CTL_ADD, EPOLLIN);
}
  • main.c
#include "server_reactor.h"

int main() {
    // 初始化服务器
    int socket_fd = init_server();

    // 使用 epoll_create1 替代旧的 epoll_create,它没有废弃的 size 参数,语义更清晰
    epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create failed");
        close(socket_fd);
        exit(EXIT_FAILURE);
    } 
    printf("epoll_fd = %d\n", epoll_fd);

    register_event(socket_fd, SCOKETFD);

    struct epoll_event events[MAX_EVENTS] = {0};
    while (1)
    {
        int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds == -1) {
            perror("epoll_wait failed");
            close(epoll_fd);
            close(socket_fd);
            exit(EXIT_FAILURE);
        }
        for (int i = 0; i < nfds; ++i) {
            int fd = events[i].data.fd;
            event_handler_t *handler = &handler_list[fd];
            switch (events[i].events)   // 根据事件类型执行不同的回调函数
            {
            case EPOLLIN:
                handler->handle_read(handler);
                break;
            case EPOLLOUT:
                handler->handle_write(handler);
                break;
            default:
                break;
            }
        }
    }
    
    return 0;
}
  • 核心特点

    将“等待事件”与“处理事件”解耦:主循环只通过 epoll_wait 获取就绪 fd,并直接回调预先注册的 handle_read / handle_write,不再需要在主循环中写一堆 if-else 去区分 fd 和业务逻辑。

  • 与裸 epoll 的关键区别

    裸 epoll 在事件循环里用 if (fd == socket_fd) 区分连接请求和普通数据,业务越多分叉越乱;
    Reactor 通过 handler_list[fd] 将每个 fd 与其处理回调绑定,主循环仅负责分派,不掺杂任何具体业务逻辑。

  • 连接上下文管理

    event_handler_t 为每个连接保存了 fdbufferbuffer_size 和对应的读写回调,让数据在 read_cbwrite_cb 之间自然传递,避免使用全局数组或临时变量传递状态。


📚 系列导航

本文所有代码均在 Ubuntu 24.04 ARM64 + GCC 环境下编译运行通过,欢迎留言交流。


📌 更新:下一篇已发布

搞定了 IO 多路复用和 Reactor,该深入网络协议核心了!本文的第四篇《从TCP三次握手到QUIC:一篇搞懂网络传输协议核心原理》已发布。从 TCP 的三次握手、四次挥手,到 UDP 对比、ARQ 重传、流量控制与拥塞控制,再到 KCP 和 QUIC 协议,一篇理清网络传输的核心原理,全是干货,快来一起进阶吧!

👉 点击阅读:《从TCP三次握手到QUIC:一篇搞懂网络传输协议核心原理》

更多推荐