tcp 端口复用与惊群效应(REUSEADDR、REUSEPORT)
我在之前的一篇文章中,介绍了我在之前的项目中遇到的端口复用,windows 的udp里端口复用导致了一个bug,具体的链接参考如下:关于Socket中端口复用_zhc的博客-CSDN博客_socket端口复用而我现在是想总结一下,linux下的情况,想从头再梳理下,REUSEADDR 和 REUSEPORT的作用。1.SO_REUSEADDR 是为了解决time_wait问题而产生的SO_REUS
我在之前的一篇文章中,介绍了我在之前的项目中遇到的端口复用,windows 的udp里端口复用导致了一个bug,具体的链接参考如下:
关于Socket中端口复用_zhc的博客-CSDN博客_socket端口复用
而我现在是想总结一下,linux下的情况,想从头再梳理下,REUSEADDR 和 REUSEPORT的作用。
1.SO_REUSEADDR 是为了解决time_wait问题而产生的
SO_REUSEADDR 它的作用是当服务器端主动断开链接的时候,处在timewait阶段大约60秒钟,服务器重新启动的话,会出现这个错误:"Address already in use"
而如果服务器的套接字设置了这个选项,就可以再一次绑定这个 ip和端口。
而这个就衍生了一个问题。就是我可以在两个不同网卡的同一个端口上进行绑定tcp服务器,也要用到这个字段,这里一定要注意的是,一定是两个不同的真实ip,如果一个是通用的
INADDR_ANY(也就是我们常说的0.0.0.0),另外一个是10.2.4.567 也不行,一会报这个错误,还有一个要注意的是,报错不是在bind的时候报错,而是在监听的时候报错。
下面的代码就会报错,具体的代码粘贴如下:
//
// Created by hczhang on 2022/2/15.
//
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<signal.h>
#include<sys/wait.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include <errno.h>
#define BUF_SIZE 30
void read_childproc(int sig);
int main(int argc,char *argv[]){
int serv_sock,clnt_sock,serv_sock1;
struct sockaddr_in serv_adr,clnt_adr,serv_adr1;
pid_t pid;
struct sigaction act;
socklen_t adr_sz;
int str_len,state;
char buf[BUF_SIZE];
act.sa_handler = read_childproc;
sigemptyset(&act.sa_mask);
act.sa_flags=0;
state = sigaction(SIGCHLD,&act,0);
serv_sock = socket(AF_INET,SOCK_STREAM,0);
serv_sock1 = socket(AF_INET,SOCK_STREAM,0);
int option=1;
socklen_t optlen = sizeof(option);
printf("server bind start\n");
setsockopt(serv_sock,SOL_SOCKET,SO_REUSEADDR,(void*)&option,optlen);
setsockopt(serv_sock1,SOL_SOCKET,SO_REUSEADDR,(void*)&option,optlen);
memset(&serv_adr,0,sizeof(serv_adr));
serv_adr.sin_family=AF_INET;
serv_adr.sin_addr.s_addr= htonl(atoi("0.0.0.0"));
serv_adr.sin_port= htons(atoi("12345"));
memset(&serv_adr1,0,sizeof(serv_adr1));
serv_adr1.sin_family=AF_INET;
serv_adr1.sin_addr.s_addr= htonl(atoi("10.2.6.185"));
serv_adr1.sin_port= htons(atoi("12345"));
if(bind(serv_sock,(struct sockaddr*)&serv_adr,sizeof(serv_adr))==-1){
printf("1 bind error:%s\n", strerror(errno));
exit(1);
}
if(bind(serv_sock1,(struct sockaddr*)&serv_adr1,sizeof(serv_adr1))==-1){
printf("2 bind error:%s\n", strerror(errno));
exit(1);
}
if(listen(serv_sock,5)==-1){
printf("1 listen error:%s\n", strerror(errno));
exit(1);
}
if(listen(serv_sock1,5)==-1){
printf("2 listen error:%s\n", strerror(errno));
exit(1);
}
while(1){
adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock,(struct sockaddr*)&clnt_adr,&adr_sz);
if(clnt_sock==-1){
printf("accept error:%s\n",strerror(errno));
continue;
}
else
printf("new client connected...\n");
pid = fork();
if(pid ==-1){
close(clnt_sock);
continue;
}
// 子进程运行区域
if(pid == 0){
printf("child process enter!!!\n");
close(serv_sock);//把复制过来的句柄关闭
while((str_len= read(clnt_sock,buf,BUF_SIZE)!=0)){
write(clnt_sock,buf,str_len);
}
close(clnt_sock);
printf("client disconnected...\n");
return 0;
}else{
//父进程执行区域
close(clnt_sock);
}
}
close(serv_sock);
return 0;
}
void read_childproc(int sig){
pid_t pid;
int status;
pid = waitpid(-1,&status,WNOHANG);
printf("removed proc id:%d\n",pid);
}
这个程序启动后就会报错,如果把0.0.0.0改成一个真实的不重复的ip就不会报错了。
2.SO_REUSEPORT 是为了解决惊群问题而产生的
刚才说了SO_REUSEADDR的两个ip不能相同,但是SO_REUSEPORT为了解决惊群效应,能让其绑定相同的ip,和端口,具体的测试代码如下:
//
// Created by hczhang on 2022/2/15.
//
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<signal.h>
#include<sys/wait.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include <errno.h>
#define BUF_SIZE 30
void read_childproc(int sig);
int main(int argc,char *argv[]){
int serv_sock,clnt_sock,serv_sock1;
struct sockaddr_in serv_adr,clnt_adr,serv_adr1;
pid_t pid;
struct sigaction act;
socklen_t adr_sz;
int str_len,state;
char buf[BUF_SIZE];
act.sa_handler = read_childproc;
sigemptyset(&act.sa_mask);
act.sa_flags=0;
state = sigaction(SIGCHLD,&act,0);
serv_sock = socket(AF_INET,SOCK_STREAM,0);
serv_sock1 = socket(AF_INET,SOCK_STREAM,0);
int option=1;
socklen_t optlen = sizeof(option);
printf("server bind start\n");
setsockopt(serv_sock,SOL_SOCKET,SO_REUSEPORT,(void*)&option,optlen);
setsockopt(serv_sock1,SOL_SOCKET,SO_REUSEPORT,(void*)&option,optlen);
memset(&serv_adr,0,sizeof(serv_adr));
serv_adr.sin_family=AF_INET;
serv_adr.sin_addr.s_addr= htonl(atoi("0.0.0.0"));
serv_adr.sin_port= htons(atoi("12345"));
memset(&serv_adr1,0,sizeof(serv_adr1));
serv_adr1.sin_family=AF_INET;
serv_adr1.sin_addr.s_addr= htonl(atoi("10.2.6.185"));
serv_adr1.sin_port= htons(atoi("12345"));
if(bind(serv_sock,(struct sockaddr*)&serv_adr,sizeof(serv_adr))==-1){
printf("1 bind error:%s\n", strerror(errno));
exit(1);
}
if(bind(serv_sock1,(struct sockaddr*)&serv_adr1,sizeof(serv_adr1))==-1){
printf("2 bind error:%s\n", strerror(errno));
exit(1);
}
if(listen(serv_sock,5)==-1){
printf("1 listen error:%s\n", strerror(errno));
exit(1);
}
if(listen(serv_sock1,5)==-1){
printf("2 listen error:%s\n", strerror(errno));
exit(1);
}
while(1){
adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock,(struct sockaddr*)&clnt_adr,&adr_sz);
if(clnt_sock==-1){
printf("accept error:%s\n",strerror(errno));
continue;
}
else
printf("new client connected...\n");
pid = fork();
if(pid ==-1){
close(clnt_sock);
continue;
}
// 子进程运行区域
if(pid == 0){
printf("child process enter!!!\n");
close(serv_sock);//把复制过来的句柄关闭
while((str_len= read(clnt_sock,buf,BUF_SIZE)!=0)){
write(clnt_sock,buf,str_len);
}
close(clnt_sock);
printf("client disconnected...\n");
return 0;
}else{
//父进程执行区域
close(clnt_sock);
}
}
close(serv_sock);
return 0;
}
void read_childproc(int sig){
pid_t pid;
int status;
pid = waitpid(-1,&status,WNOHANG);
printf("removed proc id:%d\n",pid);
}
只是之前的 SO_REUSEADDR换成SO_REUSEPORT就可以了,不报错了程序启动后。
而惊群问题的解决方案有两个:
除了加锁的解决方法外,还有其他2个办法: 1. 利用reuseport机制(需要3.9以后版本),但这需要在每个子进程去创建监听端口(而不是继承父进程的),这样就可以保证每个子进程的套接字都是独立的,它们都有自己的accept队列,由内核来做负载均衡; 2. liunx 4.5内核在epoll已经新增了EPOLL_EXCLUSIVE选项,在多个进程同时监听同一个socket,只有一个被唤醒。
具体的原理我暂时不再这里记录,先把两种解决方案的代码实现下:
对于reuseport机制,具体多进程代码如下所示:
客户端代码统一都是一个(可以用一个网络助手来代替):
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<signal.h>
#include<sys/wait.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<errno.h>
#define BUF_SIZE 1024
int main(int argc,char *argv[]){
int sock;
char message[BUF_SIZE];
int str_len,recv_len,recv_cnt;
struct sockaddr_in serv_addr;
sock = socket(AF_INET,SOCK_STREAM,0);
if(sock ==-1){
printf("socket error\n");
}
memset(&serv_addr,0,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr= htonl(atoi("0.0.0.0"));
serv_addr.sin_port = htons(atoi("12345"));
if(connect(sock,(struct sockaddr*)&serv_addr,sizeof(serv_addr))==-1){
printf("connect error,%d\n",errno);
exit(1);
}else{
printf("connected........\n");
}
while (1){
fputs("Input message(Q/q to quit): ",stdout);
fgets(message,BUF_SIZE,stdin);
if(!strcmp(message,"q\n")||!strcmp(message,"Q\n"))
break;
str_len = write(sock,message, strlen(message));
printf("send data to server %s size is:%d\n",message,str_len);
// 在这里等到从server读取到等量的数据再显示
recv_len=0;
while(recv_len<str_len){
recv_cnt = read(sock,&message[recv_len],BUF_SIZE-1);
if(recv_cnt ==-1){
printf("read error\n");
}
recv_len+=recv_cnt;
}
message[recv_len]=0;//字符串结尾放\0
printf("Message from server:%s",message);
}
close(sock);
return 0;
}
对应的服务器代码如下:
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#define PROCESS_NUM 10
#define MAXEVENTS 64
// socket创建和绑定
int sock_creat_bind(char *port) {
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(atoi(port));
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
// 设置reuseport
int option = 1;
socklen_t optlen = sizeof(option);
setsockopt(sock_fd, SOL_SOCKET, SO_REUSEPORT, (void *)&option, optlen);
printf("i have set reuse port\n");
bind(sock_fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
printf("sock_fd is :%d\n", sock_fd);
return sock_fd;
}
//利用fcntl设置文件或者函数调用的状态标志
int make_nonblocking(int fd) {
int val = fcntl(fd, F_GETFL);
val |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, val) < 0) {
perror("fcntl set");
return -1;
}
return 0;
/*
* int flag = fcntl(fd,F_GETFL,0);
fcntl(fd,F_SETFL,flag|O_NONBLOCK);
* */
}
int main(int argc, char *argv[]) {
if (argc < 2) {
printf("usage: [port] %s", argv[1]);
exit(1);
}
int i;
for (i = 0; i < PROCESS_NUM; ++i) {
int pid = fork();
if (pid == 0) {
int epoll_fd;
if ((epoll_fd = epoll_create(MAXEVENTS)) < 0) {
perror("epoll_create");
exit(1);
}
int sock_fd;
if ((sock_fd = sock_creat_bind(argv[1])) < 0) {
perror("socket and bind");
exit(1);
}
if (make_nonblocking(sock_fd) < 0) {
perror("make non blocking");
exit(1);
}
if (listen(sock_fd, SOMAXCONN) < 0) {
perror("listen");
exit(1);
}
struct epoll_event event;
event.data.fd = sock_fd;
event.events = EPOLLIN;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event) < 0) {
perror("epoll_ctl");
exit(1);
}
/*buffer where events are returned*/
struct epoll_event *events;
events = (struct epoll_event *)calloc(MAXEVENTS, sizeof(event));
while (1) {
int num, j;
num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
printf("process %d return from epoll_wait\n", getpid());
sleep(2);
for (i = 0; i < num; ++i) {
if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
} else if (sock_fd == events[i].data.fd) {
//收到关于监听套接字的通知,意味着一个或者多个传入连接
struct sockaddr in_addr;
socklen_t in_len = sizeof(in_addr);
if (accept(sock_fd, &in_addr, &in_len) < 0) {
printf("process %d accept failed %s\n", getpid(),
strerror(errno));
} else {
printf("process %d accept successful!\n", getpid());
}
}
}
}
}
}
wait(0);
return 0;
}
现在看reuseport解决了惊群问题,无论是水平触发和边沿触发
对于这个 EPOLLEXCLUSIVE选项,我进行了下测试代码如下:
#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<unistd.h>
#include<sys/epoll.h>
#include<netdb.h>
#include<stdlib.h>
#include<fcntl.h>
#include<sys/wait.h>
#include<errno.h>
#define PROCESS_NUM 10
#define MAXEVENTS 64
//socket创建和绑定
int sock_creat_bind(char * port){
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(atoi(port));
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
// 设置reuseport
/* int option=1;
socklen_t optlen = sizeof(option);
setsockopt(sock_fd,SOL_SOCKET,SO_REUSEPORT,(void*)&option,optlen);*/
printf("i donot set reuse port\n");
bind(sock_fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
return sock_fd;
}
//利用fcntl设置文件或者函数调用的状态标志
int make_nonblocking(int fd){
int val = fcntl(fd, F_GETFL);
val |= O_NONBLOCK;
if(fcntl(fd, F_SETFL, val) < 0){
perror("fcntl set");
return -1;
}
return 0;
/*
* int flag = fcntl(fd,F_GETFL,0);
fcntl(fd,F_SETFL,flag|O_NONBLOCK);
* */
}
int main(int argc, char *argv[])
{
int sock_fd, epoll_fd;
struct epoll_event event;
struct epoll_event *events;
if(argc < 2){
printf("usage: [port] %s", argv[1]);
exit(1);
}
if((sock_fd = sock_creat_bind(argv[1])) < 0){
perror("socket and bind");
exit(1);
}
if(make_nonblocking(sock_fd) < 0){
perror("make non blocking");
exit(1);
}
if(listen(sock_fd, SOMAXCONN) < 0){
perror("listen");
exit(1);
}
if((epoll_fd = epoll_create(MAXEVENTS))< 0){
perror("epoll_create");
exit(1);
}
event.data.fd = sock_fd;
event.events = EPOLLIN|EPOLLEXCLUSIVE;
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event) < 0){
perror("epoll_ctl");
exit(1);
}
/*buffer where events are returned*/
events = (struct epoll_event*)calloc(MAXEVENTS, sizeof(event));
int i;
for(i = 0; i < PROCESS_NUM; ++i){
int pid = fork();
if(pid == 0){
while(1){
int num, j;
num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
printf("process %d return from epoll_wait\n", getpid());
// sleep(2);
for(i = 0; i < num; ++i){
if((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))){
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
}else if(sock_fd == events[i].data.fd){
//收到关于监听套接字的通知,意味着一个或者多个传入连接
struct sockaddr in_addr;
socklen_t in_len = sizeof(in_addr);
if(accept(sock_fd, &in_addr, &in_len) < 0){
printf("process %d accept failed!\n", getpid());
}else{
printf("process %d accept successful!\n", getpid());
}
}
}
}
}
}
wait(0);
free(events);
close(sock_fd);
return 0;
}
当收到客户端的发送数据请求时,当我们不设置边沿触发的情况,默认是水平触发,则输出结果如下:
hczhang@hczhang-OptiPlex-3050:~/testcommono/socket/tcp_server$ gcc thunder_epoll.cpp -o ghj
hczhang@hczhang-OptiPlex-3050:~/testcommono/socket/tcp_server$ ./ghj 12345
i donot set reuse port
process 16319 return from epoll_wait
process 16318 return from epoll_wait
process 16319 accept successful!
process 16318 accept failed!
我们看,并不是所有的进程都被唤醒了,而只是唤醒了两个。这个感觉是水平触发的影响。
具体可以参考下这篇博客的解释:
主要的关键的摘录如下:
1.这个标识肯定是有作用的。因为源码是公开的,并且官方文档中也写明了。
epoll: add EPOLLEXCLUSIVE flag · torvalds/linux@df0108c · GitHub
2.导致所有线程都被唤醒的原因可能是因为LT模式。LT模式的特点是,如果epoll中监听的socket有事件,那么不管什么时候调用,epoll_wait都会返回。我们设置了EPOLLEXCLUSIVE标识,可以保证所有的epoll_wait同时调用的时候只有一个返回,但是这个线程返回之后,在调用accept把监听事件从epoll中取出来之前,别的线程的epoll_wait也在阻塞监听,就相当于这个时候又有epoll_wait调用,基于LT的特点,所以又返回了。
最后的结果就是,LT模式下的EPOLLEXCLUSIVE并不能解决监听套接字的惊群问题
我把上面的
event.events = EPOLLIN|EPOLLEXCLUSIVE;
替换为
event.events = EPOLLIN|EPOLLEXCLUSIVE|EPOLLET;
的话就只是唤起一个进程了。
修改后输出结果如下:
i donot set reuse port
process 7901 return from epoll_wait
process 7901 accept successful!
针对这个标志位的多线程模型代码如下,和多进程模型没有啥区别:
#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<unistd.h>
#include<sys/epoll.h>
#include<netdb.h>
#include<stdlib.h>
#include<fcntl.h>
#include<sys/wait.h>
#include<errno.h>
#include<pthread.h>
#define PROCESS_NUM 10
#define MAXEVENTS 64
//socket创建和绑定
int sock_creat_bind(char * port){
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(atoi(port));
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
// 设置reuseport
/* int option=1;
socklen_t optlen = sizeof(option);
setsockopt(sock_fd,SOL_SOCKET,SO_REUSEPORT,(void*)&option,optlen);*/
printf("i donot set reuse port\n");
bind(sock_fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
return sock_fd;
}
//利用fcntl设置文件或者函数调用的状态标志
int make_nonblocking(int fd){
int val = fcntl(fd, F_GETFL);
val |= O_NONBLOCK;
if(fcntl(fd, F_SETFL, val) < 0){
perror("fcntl set");
return -1;
}
return 0;
/*
* int flag = fcntl(fd,F_GETFL,0);
fcntl(fd,F_SETFL,flag|O_NONBLOCK);
* */
}
int sock_fd, epoll_fd;
struct epoll_event event;
struct epoll_event *events;
// thread function
void * thread_func(void *args){
printf("i am the new thread id:%ld\n",pthread_self());
while(1){
int num;
num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
printf("process %d return from epoll_wait\n", getpid());
sleep(2);
for(int i = 0; i < num; ++i){
if((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))){
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
}else if(sock_fd == events[i].data.fd){
//收到关于监听套接字的通知,意味着一个或者多个传入连接
struct sockaddr in_addr;
socklen_t in_len = sizeof(in_addr);
if(accept(sock_fd, &in_addr, &in_len) < 0){
printf("process %d accept failed!\n", getpid());
}else{
printf("process %d accept successful!\n", getpid());
}
}
}
}
}
int main(int argc, char *argv[])
{
if(argc < 2){
printf("usage: [port] %s", argv[1]);
exit(1);
}
if((sock_fd = sock_creat_bind(argv[1])) < 0){
perror("socket and bind");
exit(1);
}
if(make_nonblocking(sock_fd) < 0){
perror("make non blocking");
exit(1);
}
if(listen(sock_fd, SOMAXCONN) < 0){
perror("listen");
exit(1);
}
if((epoll_fd = epoll_create(MAXEVENTS))< 0){
perror("epoll_create");
exit(1);
}
event.data.fd = sock_fd;
event.events = EPOLLIN|EPOLLEXCLUSIVE|EPOLLET;
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event) < 0){
perror("epoll_ctl");
exit(1);
}
/*buffer where events are returned*/
events = (struct epoll_event*)calloc(MAXEVENTS, sizeof(event));
pthread_t threadid;
for(int i = 0; i < PROCESS_NUM; ++i){
pthread_create(&threadid,NULL,thread_func,NULL);
}
while(1);
return 0;
}
更多推荐
所有评论(0)