浅谈高并发

实现高并发echo服务器笔记
上篇文章讲解了一个简单TCP服务器的搭建,如果不会的同学可以先跳转至上一章。
链接:【学习笔记】linux简单c-s网络模型实现及重点知识总结

简单TCP介绍以及不足

一个TCP服务器运行的步骤如下:

根据已学知识,假设我们现在搭建好了一个简单TCP服务器,如果没有使用多进程/多线程/线程池/IO多路复用技术的话,我们会发现,这台服务器每次只能建立一个连接并只为这一个连接服务。

这怎么想也不对劲吧?比如说两个人同时访问百度,A访问上了,B就要等?

我们要怎样改进才能让我们的服务器同时支持多个连接呢?要解决这个问题,我们现在就要开始考虑服务器的并发性。


使用多进程/多线程

如果我们已经学习完了操作系统,想必一定对多进程/多线程有一定了解。在考虑并发问题时,相信我们脑海里一定可以想到这两个知识。

多进程

多进程比原来多的一步是:在服务端accept后,创建一个进程,并让子进程处理后续操作,父进程继续监听listenfd。

在这里插入图片描述

很简单不是吗?如果学过多进程的读者相信很容易就能写出一个多进程版的TCP服务器。

多线程

可能有的人心里有疑问了,既然我们可以用多进程了,为什么还要用多线程呢?

如果你有这样的疑问,那么请仔细品鉴下面这句话

进程是资源分配的最小单位,线程是CPU调度的最小单位

相信学习过操作系统的同学们应该多多少少也对这句话有点印象,但是纸上得来终觉浅,这句话到底是什么意思呢?下面我们了解一下线程与进程的区别。

进程在创建时,就会创建一份PCB,而PCB包含下面四项东西:

  • 进程描述信息:进程号 、用户组标识、族亲信息
  • 进程控制信息:进程状态、调度信息、计时信息、通信信息
  • 进程资源信息:包括代码段、栈段、全局变量段
  • CPU现场信息:保证进程的正常运行

其中我们发现,为了实现并发功能,创建多进程会额外耗费资源,而这部分资源我们实际并不需要,我们需要的只是发生连接事件的文件描述符用来作IO操作而已。而且切换进程也会带来不必要的上下文切换(进程的上下文切换不仅包含了虚拟内存、栈、全局变量等用户空间的资源,还包括了内核堆栈、寄存器等内核空间的资源)要解决资源额外耗费和上下文切换带来的额外开销的问题,多线程就派上用场了。

那么,相对多进程,在创建多线程时那些资源是独有的,那些资源是共享的呢?

以下是独有的:tid(线程id)、信号屏蔽字、栈段、CUP现场信息、线程状态、errno变量等。

以下是共享的:包括代码段、堆段、全局变量段等的资源信息、文件描述符表、用户id和组id、信号处理函数等

现在,相信你一定能理解进程是资源分配的最小单位,线程是CPU调度的最小单位这句话了吧?

但是使用多线程有个不好的地方就是,在访问临界资源(多个线程访问同一个变量)时会发生异常,究其原因是因为临界区(访问临界资源的代码)并不是原子操作,我们需要对临界区加锁保证线程安全。

至于服务器是使用多进程还是多线程,这个需要具体情况具体分析。那句话怎么说来着?脱离需求谈选择都是耍流氓!(这句话说得真没错,狗头保命)


使用线程池

想象一个场景,我们去饭堂吃饭,现在饭堂里人很多,如果打饭阿姨每来一个人就打一碗饭,那么我们就要眼巴巴的看着阿姨在那里打饭,等了很久才能等到自己。

阿姨心想:我也不想一直打饭啊,我也是人,一直打饭我也累啊。于是过了几天,阿姨想到一个办法:在饭堂还没多少人打饭的时候,阿姨就事先打好了很多份饭,等有人来拿了一碗饭,阿姨就继续打饭把这个拿走的饭的空位补上,那么我们就有可能不必等待阿姨打饭,一来就可以拿到一碗饭了,岂不两全其美?

线程池就是这样的思想:我们预先创建好几个线程,负责对新的连接进行服务。服务完毕后,线程不必销毁,而是继续等待新的连接并为其服务。这样就可以省去线程创建和销毁所花费的时间了。


IO多路复用

前辈们用实践的经验告诉我们,当我们建立多个连接时,绝大部分连接都是闲置连接,只有极少数的连接会发生IO请求。

我们想象下面的场景,当有一个人进了洗手间,洗手间的坑位就会减1,即使这个人不使用坑位,直到坑位被占满了,真正想要蹲坑的人来了,发现坑位已经占满了,而且还没人蹲坑!!!!

这不是很离谱的事情吗,凭什么占着茅坑不拉*啊(doge)。

我们在处理连接的时候也是同理,当有新连接进来时,如果我们用一个线程去服务一个连接,那么我们的效率就会大幅降低,就是因为我们服务的连接出现了“占着茅坑不拉*”的人。

要解决这个问题,我们就要把“一个线程对应一个连接”转变为“一个线程对应一个连接的一次IO”。

下面我们考虑IO多路复用。

select

select的做法是:将连接放到一个集合中,select操作每次传入这个集合,然后遍历这个集合查找所有发生IO的文件描述符,然后将这个集合返回,这样我们就能得到发生IO请求的文件描述符了。

但是select有几个弊端:

  1. 集合从用户态传递给内核态、再从内核态传递给用户态过程发生了两次拷贝
  2. 集合是定长的,可能包含无效文件描述符
  3. 集合有最大容量限制,不能服务更多的连接

poll

poll针对select做出了一下优化:

  1. 集合不再是定长的了,取而代之用链表方式传递
  2. 突破了最大容量限制、但是还是会收到系统文件描述符数量限制

其他缺点完美继承(doge)

epoll

epoll作出的改进

如果说poll的改进有点鸡肋,那么epoll绝对是大刀阔斧的改革了!

epoll的改动如下:

  1. 在epoll_create()阶段会开辟一段空间,并将空间通过mmap映射至共享内存,使得epoll对单个event的操作(添加或删除操作)省掉了一次拷贝的额外开销(注意网上说epoll_wai()在返回events数组是也不会发生拷贝是错的,返回events数组的过程中发生了从内核态到用户态的拷贝,注意区别)
  2. 底层采用红黑树,增删改查操作变为O(lgn),而内部使用回调机制,不需要像 select/poll 那样轮询扫描整个集合,只需要将发生IO请求的event通过回调函数放入返回数组中,返回数组以链表的形式进行维护,使用epoll_wait()后返回该数组,使得epoll不用每次都传入集合,只用通过epoll_ctl()操作对单个event进行增删操作即可,大幅提升效率。

如图所示:

LT和ET模式

默认情况下,epoll使用的是LT(水平触发level-triggered)模式,另外还有一种效率更高的模式是ET(边缘触发edge-triggered)模式。

  • LT模式就是只要有IO请求没完成,就会一直提醒你,直到处理完所有IO请求。

  • ET模式就是只要有一个新的IO请求,他就会提醒一次,至于你处理不处理不关它事了hhh。

举个通俗易懂的例子:

  • LT模式就像快递到了,快递站会发短信通知你,假如你一天没取快递,第二天他就会再再发短信,直到你取了快递为止(老拖延症了hhh表示试过被催了三天收快递)

  • ET模式就像现在的电子邮箱,当有新的邮件收到时,他会立马提醒你,这时他已经提醒了,至于看不看就是你的事了,如果你不调其他设置的话默认新邮件只会提醒一次不会再多了。

为什么ET模式比LT模式效率高?——减少epoll_wait()次数,也就是减少底层调用,从而提高性能。

另外ET模式一般配合非阻塞IO使用,因为你不知道这次的IO是有多少数据.

需要注意的是,使用IO多路复用时,最好搭配非阻塞 I/O 一起使用,因为多路复用 API 返回的事件并不一定可读写的,如果使用阻塞 I/O, 那么在调用 read/write 时则会发生程序阻塞。

Linux man手册关于 select 的内容中有如下说明:

Under Linux, select() may report a socket file descriptor as “ready for reading”, while nevertheless a subsequent read blocks. This could for example happen when data has arrived but upon examination has wrong checksum and is discarded. There may be other circumstances in which a file descriptor is spuriously reported as ready. Thus it may be safer to use O_NONBLOCK on sockets that should not block.

谷歌翻译的结果:

在Linux下,select() 可能会将一个 socket 文件描述符报告为 “准备读取”,而后续的读取块却没有。例如,当数据已经到达,但经检查后发现有错误的校验和而被丢弃时,就会发生这种情况。也有可能在其他情况下,文件描述符被错误地报告为就绪。因此,在不应该阻塞的 socket 上使用 O_NONBLOCK 可能更安全。


实战:实现高并发echo服务器

功能

  1. 服务器端有缓存处理机制:每条数据最多4096Byte
  2. 当你收到某个特殊字符的时候\n,将之前的信息统一处理
  3. 大写变小写,小写变大写, 其他的不做处理,并传回

项目描述

使用了线程池+IO多路复用技术(epoll),实测可以扛住10k+并发请求。

代码

#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<string.h>
#include<fcntl.h>
#include<unistd.h>
#include<sys/stat.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<arpa/inet.h>
#include<errno.h>
#define myperror(errmsg) {\
    perror(errmsg);\
    exit(1);\
}
#define PORT 2400 //服务端监听端口
#define FD_SIZE 1100 //连接的描述符大小
#define TASK_SIZE 1100 //线程池任务队列大小
pthread_mutex_t handle_mutex[FD_SIZE + 100]; //每个连接创建一把锁,解决临界区资源问题
int listenfd, epollfd;
char bufpool[FD_SIZE + 100][4100]; //缓冲区
int bufidx[FD_SIZE + 100];//缓冲区指针,用来记录缓冲区的大小

void* handler_run(void* arg);

//任务队列
typedef struct Task_Pool
{
    int task[TASK_SIZE];
    int head, tail;
    pthread_mutex_t mutex;
    pthread_cond_t notempty;
}Task_Pool;

Task_Pool taskpool;

//入队操作
void push_task(int fd)
{
    pthread_mutex_lock(&taskpool.mutex);
    if(taskpool.head == (taskpool.tail + 1))
    {
        //printf("任务队列已满!\n");
        return;
    }
    taskpool.task[taskpool.tail] = fd;
    taskpool.tail = taskpool.tail + 1;
    if(taskpool.tail == TASK_SIZE) taskpool.tail = 0;
    pthread_cond_signal(&taskpool.notempty);
    pthread_mutex_unlock(&taskpool.mutex);
}

//出队操作
int pop_task()
{
    pthread_mutex_lock(&taskpool.mutex);
    while(taskpool.head == taskpool.tail)
        pthread_cond_wait(&taskpool.notempty, &taskpool.mutex);
    int retfd = taskpool.task[taskpool.head];
    taskpool.head = taskpool.head + 1;
    if(taskpool.head == TASK_SIZE) taskpool.head = 0;
    pthread_mutex_unlock(&taskpool.mutex);
    return retfd;
}

void socket_bind()
{
    struct sockaddr_in serveraddr;
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if(listenfd < 0) myperror("socket");
    int retstatus;
    serveraddr.sin_port = htons(PORT);
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = INADDR_ANY;
    retstatus = bind(listenfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
    if(retstatus < 0) myperror("bind");
    retstatus = listen(listenfd, 1024);
}

//创建线程池
void taskpool_init()
{
    pthread_t thread;
    for(int i = 0; i < 1; i++)
    {
        pthread_create(&thread, NULL, handler_run, NULL);
        pthread_detach(thread);
    }
}

//将fd从事件中删除
void del_event(int fd)
{
    //将对应缓冲区下标标记为可用
    bufidx[fd] = 0;
    if((epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) < 0)
        myperror("EPOLL_CTL_DEL");
    close(fd);
}

//将fd加入到事件中
void add_event(int fd, int status)
{
    bufidx[fd] = 0;
    struct epoll_event ev;
    ev.events = status;
    ev.data.fd = fd;
    //设置文件为非阻塞
    int oldoption = fcntl(fd, F_GETFL, 0);
    int newoption = oldoption | O_NONBLOCK;
    fcntl(fd, F_SETFL, newoption);
    if((epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)) < 0)
        myperror("EPOLL_CTL_ADD");
}

//handler:处理IO请求
void* handler_run(void* arg)
{
    char tempbuf[4100];
    int retlen;
    while(1)
    {
        int cur_fd = pop_task();
        retlen = read(cur_fd, tempbuf, 4100);
        pthread_mutex_lock(handle_mutex + cur_fd);
        //如果-1报错
        if(retlen < 0 && errno != EWOULDBLOCK) perror("read"), del_event(cur_fd);
        //if(retlen < 0) perror("read");
        //如果0关闭连接
        if(retlen == 0) /*printf("del\n"),*/ del_event(cur_fd);
        //write(1, tempbuf, retlen);
        //printf("\n\n\n");
        //处理内容
        //printf("accept content, length = [%d].\n", retlen);
        //获取fd所在buf缓冲区数组下标
        else
        {
            for(int i = 0; i < retlen; i++)
            {
                //实现大小写转换
                if(tempbuf[i] >= 'a' && tempbuf[i] <= 'z')
                    tempbuf[i] -= 32;
                else if(tempbuf[i] >= 'A' && tempbuf[i] <= 'Z')
                    tempbuf[i] += 32;
                bufpool[cur_fd][bufidx[cur_fd]++] = tempbuf[i];
                //收到\n时,将之前的信息统一处理
                if(tempbuf[i] == '\n')
                {
                    write(cur_fd, bufpool[cur_fd], bufidx[cur_fd]);
                    continue;
                }
            }
        }
        //printf("IO finished.\n");
        pthread_mutex_unlock(handle_mutex + cur_fd);
    }
}

//acceptor:监听IO请求
void acceptor_run()
{
    struct epoll_event ev, events[FD_SIZE];
    epollfd = epoll_create(FD_SIZE);
    add_event(listenfd, EPOLLIN | EPOLLET);
    while(1)
    {
        int retfds = epoll_wait(epollfd, events, FD_SIZE, -1);
        if(retfds <= 0) myperror("epoll_wait");
        for(int i = 0; i < retfds; i++)
        {
            int cur_fd = events[i].data.fd;
            //如果是监听fd,加入到epoll
            if(cur_fd == listenfd && (events[i].events & EPOLLIN))
            {
                int acceptfd;
                while((acceptfd = accept(listenfd, NULL, NULL)) > 0)
                {
                    add_event(acceptfd, EPOLLIN | EPOLLET);
                    //printf("new client connected, fd = %d.\n", acceptfd);
                }
                if(acceptfd < 0 && errno != EWOULDBLOCK) myperror("accept");
            }
            //如果是其他fd,push_task,交给handler处理
            else if(events[i].events & EPOLLIN) push_task(cur_fd);
        }
    }
}

//主线程:acceptor,采用epoll机制
int main()
{
    pthread_mutex_init(&taskpool.mutex, NULL);
    pthread_cond_init(&taskpool.notempty, NULL);

    for(int i = 0; i < FD_SIZE + 100; i++)
        pthread_mutex_init(handle_mutex + i, NULL);
    socket_bind();
    taskpool_init();
    //printf("ready.\n");
    acceptor_run();
    pthread_cond_destroy(&taskpool.notempty);
    pthread_mutex_destroy(&taskpool.mutex);
    for(int i = 0; i < FD_SIZE + 100; i++)
        pthread_mutex_destroy(handle_mutex + i);
    return 0;
}


图片来源及参考资料

  1. CSDN:答应我,这次搞懂 I/O 多路复用!
Logo

更多推荐