我在之前的一篇文章中,介绍了我在之前的项目中遇到的端口复用,windows 的udp里端口复用导致了一个bug,具体的链接参考如下:

关于Socket中端口复用_zhc的博客-CSDN博客_socket端口复用

而我现在是想总结一下,linux下的情况,想从头再梳理下,REUSEADDR 和 REUSEPORT的作用。

1.SO_REUSEADDR 是为了解决time_wait问题而产生的

SO_REUSEADDR 它的作用是当服务器端主动断开链接的时候,处在timewait阶段大约60秒钟,服务器重新启动的话,会出现这个错误:"Address already in use"

而如果服务器的套接字设置了这个选项,就可以再一次绑定这个 ip和端口。

而这个就衍生了一个问题。就是我可以在两个不同网卡的同一个端口上进行绑定tcp服务器,也要用到这个字段,这里一定要注意的是,一定是两个不同的真实ip,如果一个是通用的

INADDR_ANY(也就是我们常说的0.0.0.0),另外一个是10.2.4.567 也不行,一会报这个错误,还有一个要注意的是,报错不是在bind的时候报错,而是在监听的时候报错。

下面的代码就会报错,具体的代码粘贴如下:

//
// Created by hczhang on 2022/2/15.
//

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<signal.h>
#include<sys/wait.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include <errno.h>

#define BUF_SIZE 30
void read_childproc(int sig);

int main(int argc,char *argv[]){
    int serv_sock,clnt_sock,serv_sock1;
    struct sockaddr_in serv_adr,clnt_adr,serv_adr1;



    pid_t pid;
    struct sigaction act;
    socklen_t adr_sz;
    int str_len,state;
    char buf[BUF_SIZE];

    act.sa_handler = read_childproc;
    sigemptyset(&act.sa_mask);
    act.sa_flags=0;
    state = sigaction(SIGCHLD,&act,0);
    serv_sock = socket(AF_INET,SOCK_STREAM,0);
    serv_sock1 = socket(AF_INET,SOCK_STREAM,0);
    int option=1;
    socklen_t optlen = sizeof(option);
    printf("server bind start\n");
    setsockopt(serv_sock,SOL_SOCKET,SO_REUSEADDR,(void*)&option,optlen);
    setsockopt(serv_sock1,SOL_SOCKET,SO_REUSEADDR,(void*)&option,optlen);


    memset(&serv_adr,0,sizeof(serv_adr));
    serv_adr.sin_family=AF_INET;
    serv_adr.sin_addr.s_addr= htonl(atoi("0.0.0.0"));

    serv_adr.sin_port= htons(atoi("12345"));

    memset(&serv_adr1,0,sizeof(serv_adr1));
    serv_adr1.sin_family=AF_INET;
    serv_adr1.sin_addr.s_addr= htonl(atoi("10.2.6.185"));

    serv_adr1.sin_port= htons(atoi("12345"));


    if(bind(serv_sock,(struct sockaddr*)&serv_adr,sizeof(serv_adr))==-1){
        printf("1 bind error:%s\n", strerror(errno));
        exit(1);
    }

    if(bind(serv_sock1,(struct sockaddr*)&serv_adr1,sizeof(serv_adr1))==-1){
        printf("2 bind error:%s\n", strerror(errno));
        exit(1);
    }

    if(listen(serv_sock,5)==-1){
        printf("1 listen error:%s\n", strerror(errno));
        exit(1);
    }

    if(listen(serv_sock1,5)==-1){
        printf("2 listen error:%s\n", strerror(errno));
        exit(1);
    }

    while(1){
        adr_sz = sizeof(clnt_adr);
        clnt_sock = accept(serv_sock,(struct sockaddr*)&clnt_adr,&adr_sz);
        if(clnt_sock==-1){
            printf("accept error:%s\n",strerror(errno));
            continue;
        }

        else
            printf("new client connected...\n");
        pid = fork();
        if(pid ==-1){
            close(clnt_sock);
            continue;
        }
        // 子进程运行区域
        if(pid == 0){
            printf("child process enter!!!\n");
            close(serv_sock);//把复制过来的句柄关闭
            while((str_len= read(clnt_sock,buf,BUF_SIZE)!=0)){
                write(clnt_sock,buf,str_len);
            }
            close(clnt_sock);
            printf("client disconnected...\n");
            return 0;
        }else{
            //父进程执行区域
            close(clnt_sock);
        }

    }
    close(serv_sock);

    return 0;
}
void read_childproc(int sig){
    pid_t pid;
    int status;
    pid = waitpid(-1,&status,WNOHANG);
    printf("removed proc id:%d\n",pid);
}

这个程序启动后就会报错,如果把0.0.0.0改成一个真实的不重复的ip就不会报错了。

2.SO_REUSEPORT 是为了解决惊群问题而产生的

刚才说了SO_REUSEADDR的两个ip不能相同,但是SO_REUSEPORT为了解决惊群效应,能让其绑定相同的ip,和端口,具体的测试代码如下:

//
// Created by hczhang on 2022/2/15.
//

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<signal.h>
#include<sys/wait.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include <errno.h>

#define BUF_SIZE 30
void read_childproc(int sig);

int main(int argc,char *argv[]){
    int serv_sock,clnt_sock,serv_sock1;
    struct sockaddr_in serv_adr,clnt_adr,serv_adr1;



    pid_t pid;
    struct sigaction act;
    socklen_t adr_sz;
    int str_len,state;
    char buf[BUF_SIZE];

    act.sa_handler = read_childproc;
    sigemptyset(&act.sa_mask);
    act.sa_flags=0;
    state = sigaction(SIGCHLD,&act,0);
    serv_sock = socket(AF_INET,SOCK_STREAM,0);
    serv_sock1 = socket(AF_INET,SOCK_STREAM,0);
    int option=1;
    socklen_t optlen = sizeof(option);
    printf("server bind start\n");
    setsockopt(serv_sock,SOL_SOCKET,SO_REUSEPORT,(void*)&option,optlen);
    setsockopt(serv_sock1,SOL_SOCKET,SO_REUSEPORT,(void*)&option,optlen);


    memset(&serv_adr,0,sizeof(serv_adr));
    serv_adr.sin_family=AF_INET;
    serv_adr.sin_addr.s_addr= htonl(atoi("0.0.0.0"));

    serv_adr.sin_port= htons(atoi("12345"));

    memset(&serv_adr1,0,sizeof(serv_adr1));
    serv_adr1.sin_family=AF_INET;
    serv_adr1.sin_addr.s_addr= htonl(atoi("10.2.6.185"));

    serv_adr1.sin_port= htons(atoi("12345"));


    if(bind(serv_sock,(struct sockaddr*)&serv_adr,sizeof(serv_adr))==-1){
        printf("1 bind error:%s\n", strerror(errno));
        exit(1);
    }

    if(bind(serv_sock1,(struct sockaddr*)&serv_adr1,sizeof(serv_adr1))==-1){
        printf("2 bind error:%s\n", strerror(errno));
        exit(1);
    }

    if(listen(serv_sock,5)==-1){
        printf("1 listen error:%s\n", strerror(errno));
        exit(1);
    }

    if(listen(serv_sock1,5)==-1){
        printf("2 listen error:%s\n", strerror(errno));
        exit(1);
    }

    while(1){
        adr_sz = sizeof(clnt_adr);
        clnt_sock = accept(serv_sock,(struct sockaddr*)&clnt_adr,&adr_sz);
        if(clnt_sock==-1){
            printf("accept error:%s\n",strerror(errno));
            continue;
        }

        else
            printf("new client connected...\n");
        pid = fork();
        if(pid ==-1){
            close(clnt_sock);
            continue;
        }
        // 子进程运行区域
        if(pid == 0){
            printf("child process enter!!!\n");
            close(serv_sock);//把复制过来的句柄关闭
            while((str_len= read(clnt_sock,buf,BUF_SIZE)!=0)){
                write(clnt_sock,buf,str_len);
            }
            close(clnt_sock);
            printf("client disconnected...\n");
            return 0;
        }else{
            //父进程执行区域
            close(clnt_sock);
        }

    }
    close(serv_sock);

    return 0;
}
void read_childproc(int sig){
    pid_t pid;
    int status;
    pid = waitpid(-1,&status,WNOHANG);
    printf("removed proc id:%d\n",pid);
}

只是之前的 SO_REUSEADDR换成SO_REUSEPORT就可以了,不报错了程序启动后。

而惊群问题的解决方案有两个:

除了加锁的解决方法外,还有其他2个办法:
1. 利用reuseport机制(需要3.9以后版本),但这需要在每个子进程去创建监听端口(而不是继承父进程的),这样就可以保证每个子进程的套接字都是独立的,它们都有自己的accept队列,由内核来做负载均衡;
2. liunx 4.5内核在epoll已经新增了EPOLL_EXCLUSIVE选项,在多个进程同时监听同一个socket,只有一个被唤醒。

具体的原理我暂时不再这里记录,先把两种解决方案的代码实现下:

对于reuseport机制,具体多进程代码如下所示:

客户端代码统一都是一个(可以用一个网络助手来代替):

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<signal.h>
#include<sys/wait.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<errno.h>

#define BUF_SIZE 1024

int main(int argc,char *argv[]){
    int sock;
    char message[BUF_SIZE];
    int str_len,recv_len,recv_cnt;

    struct sockaddr_in serv_addr;

    sock = socket(AF_INET,SOCK_STREAM,0);

    if(sock ==-1){
        printf("socket error\n");
    }

    memset(&serv_addr,0,sizeof(serv_addr));

    serv_addr.sin_family=AF_INET;
    serv_addr.sin_addr.s_addr= htonl(atoi("0.0.0.0"));
    serv_addr.sin_port = htons(atoi("12345"));

    if(connect(sock,(struct sockaddr*)&serv_addr,sizeof(serv_addr))==-1){
        printf("connect error,%d\n",errno);
		exit(1);
    }else{
        printf("connected........\n");
    }

    while (1){
        fputs("Input message(Q/q to quit): ",stdout);
        fgets(message,BUF_SIZE,stdin);
        if(!strcmp(message,"q\n")||!strcmp(message,"Q\n"))
            break;
        str_len = write(sock,message, strlen(message));
		printf("send data to server %s size is:%d\n",message,str_len);
		// 在这里等到从server读取到等量的数据再显示
        recv_len=0;
        while(recv_len<str_len){
            recv_cnt = read(sock,&message[recv_len],BUF_SIZE-1);
            if(recv_cnt ==-1){
                printf("read error\n");
            }
            recv_len+=recv_cnt;
        }
        message[recv_len]=0;//字符串结尾放\0
        printf("Message from server:%s",message);
    }
    close(sock);
    return 0;
}

对应的服务器代码如下:

#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#define PROCESS_NUM 10
#define MAXEVENTS 64
// socket创建和绑定
int sock_creat_bind(char *port) {
  int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
  struct sockaddr_in serveraddr;
  serveraddr.sin_family = AF_INET;
  serveraddr.sin_port = htons(atoi(port));
  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

  // 设置reuseport
  int option = 1;
  socklen_t optlen = sizeof(option);
  setsockopt(sock_fd, SOL_SOCKET, SO_REUSEPORT, (void *)&option, optlen);
  printf("i have set reuse port\n");

  bind(sock_fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));

  printf("sock_fd is :%d\n", sock_fd);
  return sock_fd;
}
//利用fcntl设置文件或者函数调用的状态标志
int make_nonblocking(int fd) {
  int val = fcntl(fd, F_GETFL);
  val |= O_NONBLOCK;
  if (fcntl(fd, F_SETFL, val) < 0) {
    perror("fcntl set");
    return -1;
  }
  return 0;
  /*
   * int flag = fcntl(fd,F_GETFL,0);
  fcntl(fd,F_SETFL,flag|O_NONBLOCK);
   * */
}

int main(int argc, char *argv[]) {



  if (argc < 2) {
    printf("usage: [port] %s", argv[1]);
    exit(1);
  }


  int i;
  for (i = 0; i < PROCESS_NUM; ++i) {
    int pid = fork();
    if (pid == 0) {
    int epoll_fd;
        if ((epoll_fd = epoll_create(MAXEVENTS)) < 0) {
            perror("epoll_create");
            exit(1);
        }
        int sock_fd;

      if ((sock_fd = sock_creat_bind(argv[1])) < 0) {
        perror("socket and bind");
        exit(1);
      }
      if (make_nonblocking(sock_fd) < 0) {
        perror("make non blocking");
        exit(1);
      }
      if (listen(sock_fd, SOMAXCONN) < 0) {
        perror("listen");
        exit(1);
      }
      struct epoll_event event;
      event.data.fd = sock_fd;
      event.events = EPOLLIN;
      if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event) < 0) {
        perror("epoll_ctl");
        exit(1);
      }
      /*buffer where events are returned*/
        struct epoll_event *events;
      events = (struct epoll_event *)calloc(MAXEVENTS, sizeof(event));
      while (1) {
        int num, j;
        num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
        printf("process %d return from epoll_wait\n", getpid());
        sleep(2);
        for (i = 0; i < num; ++i) {
          if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
              (!(events[i].events & EPOLLIN))) {
            fprintf(stderr, "epoll error\n");
            close(events[i].data.fd);
            continue;
          } else if (sock_fd == events[i].data.fd) {
            //收到关于监听套接字的通知,意味着一个或者多个传入连接
            struct sockaddr in_addr;
            socklen_t in_len = sizeof(in_addr);
            if (accept(sock_fd, &in_addr, &in_len) < 0) {
              printf("process %d accept failed %s\n", getpid(),
                     strerror(errno));
            } else {
              printf("process %d accept successful!\n", getpid());
            }
          }
        }
      }
    }
  }
  wait(0);

  return 0;
}

现在看reuseport解决了惊群问题,无论是水平触发和边沿触发

对于这个 EPOLLEXCLUSIVE选项,我进行了下测试代码如下:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<unistd.h>
#include<sys/epoll.h>
#include<netdb.h>
#include<stdlib.h>
#include<fcntl.h>
#include<sys/wait.h>
#include<errno.h>
#define PROCESS_NUM 10
#define MAXEVENTS 64
//socket创建和绑定
int sock_creat_bind(char * port){
    int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_port = htons(atoi(port));
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

    // 设置reuseport
 /*   int option=1;
    socklen_t optlen = sizeof(option);
    setsockopt(sock_fd,SOL_SOCKET,SO_REUSEPORT,(void*)&option,optlen);*/
    printf("i donot set reuse port\n");

    bind(sock_fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));


    return sock_fd;
}
//利用fcntl设置文件或者函数调用的状态标志
int make_nonblocking(int fd){
    int val = fcntl(fd, F_GETFL);
    val |= O_NONBLOCK;
    if(fcntl(fd, F_SETFL, val) < 0){
        perror("fcntl set");
        return -1;
    }
    return 0;
    /*
     * int flag = fcntl(fd,F_GETFL,0);
    fcntl(fd,F_SETFL,flag|O_NONBLOCK);
     * */
}

int main(int argc, char *argv[])
{
    int sock_fd, epoll_fd;
    struct epoll_event event;
    struct epoll_event *events;

    if(argc < 2){
        printf("usage: [port] %s", argv[1]);
        exit(1);
    }
    if((sock_fd = sock_creat_bind(argv[1])) < 0){
        perror("socket and bind");
        exit(1);
    }
    if(make_nonblocking(sock_fd) < 0){
        perror("make non blocking");
        exit(1);
    }
    if(listen(sock_fd, SOMAXCONN) < 0){
        perror("listen");
        exit(1);
    }
    if((epoll_fd = epoll_create(MAXEVENTS))< 0){
        perror("epoll_create");
        exit(1);
    }
    event.data.fd = sock_fd;
    event.events = EPOLLIN|EPOLLEXCLUSIVE;
    if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event) < 0){
        perror("epoll_ctl");
        exit(1);
    }
    /*buffer where events are returned*/
    events = (struct epoll_event*)calloc(MAXEVENTS, sizeof(event));
    int i;
    for(i = 0; i < PROCESS_NUM; ++i){
        int pid = fork();
        if(pid == 0){
            while(1){
                int num, j;
                num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
                printf("process %d return from epoll_wait\n", getpid());
                // sleep(2);
                for(i = 0; i < num; ++i){
                    if((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))){
                        fprintf(stderr, "epoll error\n");
                        close(events[i].data.fd);
                        continue;
                    }else if(sock_fd == events[i].data.fd){
                        //收到关于监听套接字的通知,意味着一个或者多个传入连接
                        struct sockaddr in_addr;
                        socklen_t in_len = sizeof(in_addr);
                        if(accept(sock_fd, &in_addr, &in_len) < 0){
                            printf("process %d accept failed!\n", getpid());
                        }else{
                            printf("process %d accept successful!\n", getpid());
                        }
                    }
                }
            }
        }
    }
    wait(0);
    free(events);
    close(sock_fd);
    return 0;
}

当收到客户端的发送数据请求时,当我们不设置边沿触发的情况,默认是水平触发,则输出结果如下:

hczhang@hczhang-OptiPlex-3050:~/testcommono/socket/tcp_server$ gcc thunder_epoll.cpp -o ghj
hczhang@hczhang-OptiPlex-3050:~/testcommono/socket/tcp_server$ ./ghj 12345
i donot set reuse port
process 16319 return from epoll_wait
process 16318 return from epoll_wait
process 16319 accept successful!
process 16318 accept failed!
 

我们看,并不是所有的进程都被唤醒了,而只是唤醒了两个。这个感觉是水平触发的影响。

具体可以参考下这篇博客的解释:

​​​​​​epoll的陷阱实践 | 演道网

主要的关键的摘录如下:

1.这个标识肯定是有作用的。因为源码是公开的,并且官方文档中也写明了。

epoll: add EPOLLEXCLUSIVE flag · torvalds/linux@df0108c · GitHub

2.导致所有线程都被唤醒的原因可能是因为LT模式。LT模式的特点是,如果epoll中监听的socket有事件,那么不管什么时候调用,epoll_wait都会返回。我们设置了EPOLLEXCLUSIVE标识,可以保证所有的epoll_wait同时调用的时候只有一个返回,但是这个线程返回之后,在调用accept把监听事件从epoll中取出来之前,别的线程的epoll_wait也在阻塞监听,就相当于这个时候又有epoll_wait调用,基于LT的特点,所以又返回了。

最后的结果就是,LT模式下的EPOLLEXCLUSIVE并不能解决监听套接字的惊群问题

我把上面的

event.events = EPOLLIN|EPOLLEXCLUSIVE;

替换为

event.events = EPOLLIN|EPOLLEXCLUSIVE|EPOLLET;

的话就只是唤起一个进程了。

修改后输出结果如下:

i donot set reuse port
process 7901 return from epoll_wait
process 7901 accept successful!

针对这个标志位的多线程模型代码如下,和多进程模型没有啥区别:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<unistd.h>
#include<sys/epoll.h>
#include<netdb.h>
#include<stdlib.h>
#include<fcntl.h>
#include<sys/wait.h>
#include<errno.h>
#include<pthread.h>
#define PROCESS_NUM 10
#define MAXEVENTS 64
//socket创建和绑定
int sock_creat_bind(char * port){
    int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_port = htons(atoi(port));
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

    // 设置reuseport
    /*   int option=1;
       socklen_t optlen = sizeof(option);
       setsockopt(sock_fd,SOL_SOCKET,SO_REUSEPORT,(void*)&option,optlen);*/
    printf("i donot set reuse port\n");

    bind(sock_fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));


    return sock_fd;
}
//利用fcntl设置文件或者函数调用的状态标志
int make_nonblocking(int fd){
    int val = fcntl(fd, F_GETFL);
    val |= O_NONBLOCK;
    if(fcntl(fd, F_SETFL, val) < 0){
        perror("fcntl set");
        return -1;
    }
    return 0;
    /*
     * int flag = fcntl(fd,F_GETFL,0);
    fcntl(fd,F_SETFL,flag|O_NONBLOCK);
     * */
}


int sock_fd, epoll_fd;
struct epoll_event event;
struct epoll_event *events;
// thread function
void * thread_func(void *args){
    printf("i am the new thread id:%ld\n",pthread_self());
    while(1){
        int num;
        num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
        printf("process %d return from epoll_wait\n", getpid());
        sleep(2);
        for(int i = 0; i < num; ++i){
            if((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))){
                fprintf(stderr, "epoll error\n");
                close(events[i].data.fd);
                continue;
            }else if(sock_fd == events[i].data.fd){
                //收到关于监听套接字的通知,意味着一个或者多个传入连接
                struct sockaddr in_addr;
                socklen_t in_len = sizeof(in_addr);
                if(accept(sock_fd, &in_addr, &in_len) < 0){
                    printf("process %d accept failed!\n", getpid());
                }else{
                    printf("process %d accept successful!\n", getpid());
                }
            }
        }
    }
}
int main(int argc, char *argv[])
{


    if(argc < 2){
        printf("usage: [port] %s", argv[1]);
        exit(1);
    }
    if((sock_fd = sock_creat_bind(argv[1])) < 0){
        perror("socket and bind");
        exit(1);
    }
    if(make_nonblocking(sock_fd) < 0){
        perror("make non blocking");
        exit(1);
    }
    if(listen(sock_fd, SOMAXCONN) < 0){
        perror("listen");
        exit(1);
    }
    if((epoll_fd = epoll_create(MAXEVENTS))< 0){
        perror("epoll_create");
        exit(1);
    }
    event.data.fd = sock_fd;
    event.events = EPOLLIN|EPOLLEXCLUSIVE|EPOLLET;
    if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event) < 0){
        perror("epoll_ctl");
        exit(1);
    }
    /*buffer where events are returned*/
    events = (struct epoll_event*)calloc(MAXEVENTS, sizeof(event));
    pthread_t threadid;
    for(int i = 0; i < PROCESS_NUM; ++i){
        pthread_create(&threadid,NULL,thread_func,NULL);
        }
    while(1);
    return 0;
}


 

Logo

更多推荐