基于reactor方法+epoll处理http请求
Server.h
结构体定义
为了便利后续的开发,我们应该定义一个用于管理I/O的结构体。定义为struct conn 类型,这个结构体用于描述一个网络连接的完整上下文信息。它将套接字、读写缓冲区、事件回调函数以及连接处理状态统一封装,使得 Reactor 在事件分发时,能够围绕单个连接完成接收数据、组织响应和发送数据等操作。这种设计的好处是把“连接的数据”和“连接的行为”绑定在一起,便于实现事件驱动的网络编程模型。
// 将每个fd,即I/O触发相应的信息封装成以及结构体
struct conn
{
int fd; //文件描述符
bool is_listen; //判断是否是用于监听新的连接请求的文件描述符
char rbuff[BUFF_SIZE]; //读数据缓冲区
int rlen; //读数据缓冲区长度
char wbuff[BUFF_SIZE]; //写缓冲区
int wlen; //写缓冲区长度
// 声明不同事件触发的回调函数
CALLBACK send_callback; //发送数据的callback
CALLBACK recv_callback; //接收数据的callback
CALLBACK accept_callback; //接收新的连接的callback
int status; // 发送数据的状态机 0 ,1 ,2(便于后续分批次发送数据,防止一次没发完)
};
完整代码
#ifndef SERVER_H
#define SERVER_H
#define BUFF_SIZE 2048
#include <stdbool.h>
typedef int (*CALLBACK)(int fd);
// 将每个fd,即I/O触发相应的信息封装成以及结构体
struct conn
{
int fd;
bool is_listen;
char rbuff[BUFF_SIZE];
int rlen;
char wbuff[BUFF_SIZE];
int wlen;
// 声明不同事件触发的回调函数
CALLBACK send_callback;
CALLBACK recv_callback;
CALLBACK accept_callback;
int status; // 发送数据的状态机 0 ,1 ,2
};
int http_request(struct conn *conn_item); // 处理接收数据的业务代码
int http_response(struct conn *conn_item); // 封装要发送数据的业务的代码
#endif
reactor.c
这个reactor是我们实现server端的网络I/O的基本功能,包括本地ip的绑定,监听,连接,以及epoll相关的操作。
init_server
这里通过传入我们设置的server端port,来绑定一个本地的ip,将一个fd开启为listen状态来监听新的连接请求。
int init_server(unsigned int port)
{
// 1. create socket
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
{
perror("socket");
return -1;
}
// 2. bind
struct sockaddr_in serverAddr = {0};
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(PORT);
serverAddr.sin_addr.s_addr = INADDR_ANY;
if (bind(listen_fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0)
{
perror("bind");
close(listen_fd);
return -1;
}
// 3. listen
if (listen(listen_fd, LISTEN_BACKLOG) < 0)
{
perror("listen");
close(listen_fd);
return -1;
}
return listen_fd;
}
新的连接加入操作
当我们有新的连接请求的时候,执行accept触发三次握手建立连接,这个时候我们可以根据句柄fd充当结构体数组struct conn的下标,然后给这个fd设置对应的回调函数callback,读写数据的buffer,并将这个fd添加到epoll维护的集合中。
这里set_event函数主要是设置EPOLLIN和EPOLLOUTS事件,并负责对epoll_event的添加和修改,event_register用于初始化buffer和对应的回调函数callback。
int set_event(int fd, int event, short flag)
{
struct epoll_event ev = {0};
ev.events = event;
ev.data.fd = fd;
if (flag == 1)
{
return epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}
else if (flag == 0)
{
return epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
return -1;
}
int event_register(int fd, int status)
{
conn_list[fd].fd = fd;
memset(conn_list[fd].rbuff, 0, BUFF_SIZE);
conn_list[fd].rlen = 0;
memset(conn_list[fd].wbuff, 0, BUFF_SIZE);
conn_list[fd].wlen = 0;
conn_list[fd].recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
set_event(fd, status, 1);
}
accept_cb
有新的连接请求的时候,使用调用accept函数来建立新的连接的回调函数。
int accept_cb(int fd)
{
struct sockaddr_in clientAddr = {0};
socklen_t len = sizeof(clientAddr);
int client_fd = accept(fd, (struct sockaddr *)&clientAddr, &len);
if (client_fd < 0)
{
perror("accept");
return -1;
}
printf("client %d connect successfully...\n", client_fd);
event_register(client_fd, EPOLLIN | EPOLLET);
return client_fd;
}
recv_cb
当一个I/O有数据发送过来的时候,使用recv()函数来接收数据的回调函数。
int recv_cb(int fd)
{
memset(conn_list[fd].rbuff, 0, BUFF_SIZE);
int count = recv(fd, conn_list[fd].rbuff, BUFF_SIZE, 0);
if (count == 0)
{
printf("client disconnect: %d\n", fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
return -1;
}
if (count < 0)
{
perror("recv"); // 对端发送reset信息,用于退出,属于正常现象
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
return 0;
}
conn_list[fd].rlen = count;
http_request(&conn_list[fd]);
set_event(fd, EPOLLOUT, 0);
// printf("RECV(%d bytes): %s\n", count, conn_list[fd].rbuff);
// memcpy(conn_list[fd].wbuff, conn_list[fd].rbuff, count);
return count;
}
send_cb
当我们需要向对端发送数据的时候,需要使用send()函数向对端发送数据的时候的回调函数。
int send_cb(int fd)
{
// 组织要发送的data
http_response(&conn_list[fd]);
int count = 0;
if (conn_list[fd].status == 1)
{
printf("SEND:%s\n", conn_list[fd].wbuff);
count = send(fd, conn_list[fd].wbuff, conn_list[fd].wlen, 0);
if (count < 0)
{
perror("send");
return -1;
}
set_event(fd, EPOLLOUT, 0);
}
else if (conn_list[fd].status == 2)
{
// printf("SEND:%s\n", conn_list[fd].wbuff);
set_event(fd, EPOLLOUT, 0);
}
else if (conn_list[fd].status == 0)
{
if (conn_list[fd].wbuff != 0)
{
int count = send(fd, conn_list[fd].wbuff, conn_list[fd].wlen, 0);
}
set_event(fd, EPOLLOUT, 0);
}
set_event(fd, EPOLLIN, 0);
return count;
}
Webserver.c
我们之前在reactor.c中实现了网络I/O的基本功能,现在需要实现对业务的操作,即对数据的接收和组织要发送的数据的操作。
接收数据
我们需要将对应fd的结构体数组的地址传入,然后对对读缓冲区的data进行解析。
// receive data bussiness process
int http_request(struct conn *conn_item)
{
if (conn_item == NULL)
return -1;
printf("RECV(%d bytes): %s\n", conn_item->rlen, conn_item->rbuff);
// init the write buffer
memset(conn_item->wbuff, 0, BUFF_SIZE);
conn_item->wlen = 0;
conn_item->status = 0; // 设置最初的状态为0
return 0;
}
发送数据
我们需要将对应fd的结构体数组的地址传入,需要我们将需要发送的数据组织好,然后放入到写缓冲区中,这里我们分批次来发送http协议,先发送http头,再使用sendfile()函数直接发送http body,我们需要发送的http body存储在index.html中。
int http_response(struct conn *conn_item)
{
if (conn_item == NULL)
return -1;
// open file
int file_fd = open("./index.html", O_RDONLY);
if (file_fd < 0)
{
perror("file open");
return -1;
}
// read the file context
struct stat f_st;
fstat(file_fd, &f_st);
if (conn_item->status == 0)
{
conn_item->wlen = sprintf(conn_item->wbuff, "HTTP/1.1 200 OK\r\n"
"Content-Type: text/html\r\n"
"Accept-Ranges: bytes\r\n"
"Content-Length: %ld\r\n"
"Date: Tue, 30 Apr 2024 13:16:46 GMT\r\n\r\n",
f_st.st_size);
conn_item->status = 1;
}
else if (conn_item->status == 1)
{
int ret = sendfile(conn_item->fd, file_fd, NULL, f_st.st_size);
if (ret == -1)
{
perror("sendfile");
}
conn_item->status = 2;
}
else if (conn_item->status == 2)
{
memset(conn_item->wbuff, 0, BUFF_SIZE);
conn_item->wlen = 0;
conn_item->status = 0; // status return value 0
}
return 0;
}
完整代码
Server.h
#ifndef SERVER_H
#define SERVER_H
#define BUFF_SIZE 2048
#include <stdbool.h>
typedef int (*CALLBACK)(int fd);
// 将每个fd,即I/O触发相应的信息封装成以及结构体
struct conn
{
int fd;
bool is_listen;
char rbuff[BUFF_SIZE];
int rlen;
char wbuff[BUFF_SIZE];
int wlen;
// 声明不同事件触发的回调函数
CALLBACK send_callback;
CALLBACK recv_callback;
CALLBACK accept_callback;
int status; // 发送数据的状态机 0 ,1 ,2
};
int http_request(struct conn *conn_item); // 处理接收数据的业务代码
int http_response(struct conn *conn_item); // 封装要发送数据的业务的代码
#endif
Webserver.c
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <string.h>
#include <sys/sendfile.h>
#include "Server.h"
// receive data bussiness process
int http_request(struct conn *conn_item)
{
if (conn_item == NULL)
return -1;
printf("RECV(%d bytes): %s\n", conn_item->rlen, conn_item->rbuff);
// init the write buffer
memset(conn_item->wbuff, 0, BUFF_SIZE);
conn_item->wlen = 0;
conn_item->status = 0; // 设置最初的状态为0
return 0;
}
// send data bussiness process
int http_response(struct conn *conn_item)
{
if (conn_item == NULL)
return -1;
// open file
int file_fd = open("./index.html", O_RDONLY);
if (file_fd < 0)
{
perror("file open");
return -1;
}
// read the file context
struct stat f_st;
fstat(file_fd, &f_st);
#if 0
conn_item->wlen = sprintf(conn_item->wbuff, "HTTP/1.1 200 OK\r\n"
"Content-Type: text/html\r\n"
"Accept-Ranges: bytes\r\n"
"Content-Length: %ld\r\n"
"Date: Tue, 30 Apr 2024 13:16:46 GMT\r\n\r\n",
f_st.st_size);
#else
if (conn_item->status == 0)
{
conn_item->wlen = sprintf(conn_item->wbuff, "HTTP/1.1 200 OK\r\n"
"Content-Type: text/html\r\n"
"Accept-Ranges: bytes\r\n"
"Content-Length: %ld\r\n"
"Date: Tue, 30 Apr 2024 13:16:46 GMT\r\n\r\n",
f_st.st_size);
conn_item->status = 1;
}
else if (conn_item->status == 1)
{
int ret = sendfile(conn_item->fd, file_fd, NULL, f_st.st_size);
if (ret == -1)
{
perror("sendfile");
}
conn_item->status = 2;
}
else if (conn_item->status == 2)
{
memset(conn_item->wbuff, 0, BUFF_SIZE);
conn_item->wlen = 0;
conn_item->status = 0; // status return value 0
}
#endif
return 0;
// conn_item->wlen = read(file_fd, conn_item->wbuff + conn_item->wlen, BUFF_SIZE - f_st.st_size);
// return conn_item->wlen;
}
index.html
<!DOCTYPE html>
<html>
<head>
<title>Welcome to nginx!</title>
<style>
html {
color-scheme: light dark;
}
body {
width: 35em;
margin: 0 auto;
font-family: Tahoma, Verdana, Arial, sans-serif;
}
</style>
</head>
<body>
<h1>Welcome to nginx!</h1>
<p>If you see this page, the nginx web server is successfully installed and
working. Further configuration is required.</p>
<p>For online documentation and support please refer to
<a href="http://nginx.org/">nginx.org</a>.<br />
Commercial support is available at
<a href="http://nginx.com/">nginx.com</a>.
</p>
<p><em>Thank you for using nginx.</em></p>
</body>
</html>
reactor.c
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
// #include <stdbool.h>
#include "Server.h"
#define PORT 8080
#define MAX_PORTS 1
// #define BUFF_SIZE 1024
#define MAX_EVENTS 1024
#define CONNLIST_SIZE 1048576 // 1024*1024
#define LISTEN_BACKLOG 5
// typedef int (*CALLBACK)(int fd);
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
// // 将每个fd,即I/O触发相应的信息封装成以及结构体
// struct conn
// {
// int fd;
// bool is_listen;
// char rbuff[BUFF_SIZE];
// int rlen;
// char wbuff[BUFF_SIZE];
// int wlen;
// // 声明不同事件触发的回调函数
// CALLBACK send_callback;
// CALLBACK recv_callback;
// CALLBACK accept_callback;
// int status;
// };
// set global epoll fd
int epfd = 0;
struct conn conn_list[CONNLIST_SIZE] = {0};
// set event
int set_event(int fd, int event, short flag)
{
struct epoll_event ev = {0};
ev.events = event;
ev.data.fd = fd;
if (flag == 1)
{
return epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}
else if (flag == 0)
{
return epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
return -1;
}
int event_register(int fd, int status)
{
conn_list[fd].fd = fd;
memset(conn_list[fd].rbuff, 0, BUFF_SIZE);
conn_list[fd].rlen = 0;
memset(conn_list[fd].wbuff, 0, BUFF_SIZE);
conn_list[fd].wlen = 0;
conn_list[fd].recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
set_event(fd, status, 1);
}
int send_cb(int fd)
{
// 组织要发送的data
http_response(&conn_list[fd]);
#if 0
int count = send(fd, conn_list[fd].wbuff, conn_list[fd].wlen, 0);
if (count < 0)
{
perror("send");
return -1;
}
#else
int count = 0;
if (conn_list[fd].status == 1)
{
printf("SEND:%s\n", conn_list[fd].wbuff);
count = send(fd, conn_list[fd].wbuff, conn_list[fd].wlen, 0);
if (count < 0)
{
perror("send");
return -1;
}
set_event(fd, EPOLLOUT, 0);
}
else if (conn_list[fd].status == 2)
{
// printf("SEND:%s\n", conn_list[fd].wbuff);
set_event(fd, EPOLLOUT, 0);
}
else if (conn_list[fd].status == 0)
{
if (conn_list[fd].wbuff != 0)
{
int count = send(fd, conn_list[fd].wbuff, conn_list[fd].wlen, 0);
}
set_event(fd, EPOLLOUT, 0);
}
#endif
set_event(fd, EPOLLIN, 0);
return count;
}
int recv_cb(int fd)
{
memset(conn_list[fd].rbuff, 0, BUFF_SIZE);
int count = recv(fd, conn_list[fd].rbuff, BUFF_SIZE, 0);
if (count == 0)
{
printf("client disconnect: %d\n", fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
return -1;
}
if (count < 0)
{
perror("recv"); // 对端发送reset信息,用于退出,属于正常现象
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
return 0;
}
conn_list[fd].rlen = count;
http_request(&conn_list[fd]);
set_event(fd, EPOLLOUT, 0);
// printf("RECV(%d bytes): %s\n", count, conn_list[fd].rbuff);
// memcpy(conn_list[fd].wbuff, conn_list[fd].rbuff, count);
return count;
}
int accept_cb(int fd)
{
struct sockaddr_in clientAddr = {0};
socklen_t len = sizeof(clientAddr);
int client_fd = accept(fd, (struct sockaddr *)&clientAddr, &len);
if (client_fd < 0)
{
perror("accept");
return -1;
}
printf("client %d connect successfully...\n", client_fd);
#if 0
conn_list[client_fd].fd = client_fd;
conn_list[client_fd].recv_callback = recv_callback;
conn_list[client_fd].send_callback = send_callback;
if (set_event(client_fd, EPOLLIN, 1) < 0)
{
perror("epoll_ctl: client_fd");
close(client_fd);
return -1;
}
#else
event_register(client_fd, EPOLLIN | EPOLLET);
#endif
return client_fd;
}
int init_server(unsigned int port)
{
// 1. create socket
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
{
perror("socket");
return -1;
}
// 2. bind
struct sockaddr_in serverAddr = {0};
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(PORT);
serverAddr.sin_addr.s_addr = INADDR_ANY;
if (bind(listen_fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0)
{
perror("bind");
close(listen_fd);
return -1;
}
// 3. listen
if (listen(listen_fd, LISTEN_BACKLOG) < 0)
{
perror("listen");
close(listen_fd);
return -1;
}
return listen_fd;
}
int main()
{
#if 0
int listen_fd = init_server(PORT);
if (listen_fd < 0)
return -1;
epfd = epoll_create1(0);
if (epfd < 0)
{
perror("epoll_create1");
close(listen_fd);
return -1;
}
conn_list[listen_fd].fd = listen_fd;
conn_list[listen_fd].accept_callback = accept_callback;
if (set_event(listen_fd, EPOLLIN, 1) < 0)
{
perror("epoll_ctl: listen_fd");
close(epfd);
close(listen_fd);
return -1;
}
#else
epfd = epoll_create1(0);
if (epfd < 0)
{
perror("epoll_create1");
return -1;
}
int i = 0;
for (i = 0; i < MAX_PORTS; i++)
{
int listen_fd = init_server(PORT + i);
if (listen_fd < 0)
return -1;
conn_list[listen_fd].fd = listen_fd;
conn_list[listen_fd].accept_callback = accept_cb;
conn_list[listen_fd].is_listen = true;
if (set_event(listen_fd, EPOLLIN, 1) < 0)
{
perror("epoll_ctl: listen_fd");
close(epfd);
close(listen_fd);
return -1;
}
}
#endif
// main loop
while (1)
{
struct epoll_event events[MAX_EVENTS] = {0};
int nreadys = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (nreadys < 0)
{
perror("epoll_wait");
break;
}
int i = 0;
for (i = 0; i < nreadys; i++)
{
int currfd = events[i].data.fd;
if (conn_list[currfd].is_listen)
{
conn_list[currfd].accept_callback(currfd);
}
else if (events[i].events & EPOLLIN)
{
if (conn_list[currfd].recv_callback != NULL)
conn_list[currfd].recv_callback(currfd);
}
else if (events[i].events & EPOLLOUT)
{
if (conn_list[currfd].send_callback != NULL)
conn_list[currfd].send_callback(currfd);
}
}
}
return 0;
}
执行结果
编译源代码之后,使用浏览器来访问8080端口,结果如下:

server端结果:
更多推荐

所有评论(0)