协程的原理

首先抛出个问题,什么是协程,协程存在的原因是什么?能够解决什么问题?

1. 什么是协程

协程是比线程更小的一种执行单元,你可以认为是轻量级的线程,之所以说轻,其中一方面的原因是协程所持有的栈比线程要小很多,java当中会为每个线程分配1M左右的栈空间,而协程可能只有几十或者几百K,栈主要用来保存函数参数、局部变量和返回地址等信息。

我们知道,而线程的调度是在操作系统中进行的,而协程调度则是在用户空间进行的,是开发人员通过调用系统底层的执行上下文相关api来完成的,有些语言,比如nodejs、go在语言层面支持了协程,而有些语言,比如C,需要使用第三方库才可以拥有协程的能力。

由于线程是操作系统的最小执行单元,因此也可以得出,协程是基于线程实现的,协程的创建、切换、销毁都是在某个线程中来进行的。

使用协程是因为线程的切换成本比较高,而协程在这方面很有优势。

2. 协程存在的原因

情景一

我们知道,当系统或者服务器同时处理多个任务的时候,我们第一时间想到的是采用多线程去解决。我们拿一个游戏服务器去举例子,假如只有10个人同时去注册连接我们的服务器,我们需要采用10个线程去处理任务,由于用户量很少,处理速度很快,游戏也不会卡顿。第二天,我们的用户数增加到了100人,我们需要增加线程到100个,第三天到10000人,线程数增加到10000到,我们仔细想一想,是不是有点不对劲。问题来了,因为每个线程至少会占用4M的内存空间,10000个线程会消耗39G的内存,而服务器的内存配置只有区区8G,这时候你有2种选择,一是选择增加服务器,二是选择提高代码效率。那么是否有方法能够提高效率呢?

我们知道操作系统在线程等待IO的时候,会阻塞当前线程,切换到其它线程,这样在当前线程等待IO的过程中,其它线程可以继续执行。当系统线程较少的时候没有什么问题,但是当线程数量非常多的时候,却产生了问题。一是系统线程会占用非常多的内存空间,二是过多的线程切换会占用大量的系统时间。

在这里插入图片描述
此时,协程的出现正好可以解决这两个问题。协程是运行在线程之上,当一个协程执行完成之后,可以选择主动让出,让另一个协程运行在当前的线程之上。协程并没有增加线程的数量,只是在线程的基础上通过分时复用的方式运行多个协程,而且协程的切换在用户态完成,切换的代价比线程从用户态到内核态的代价小很多
在这里插入图片描述
所以,对上面这个问题,我们只需要启动100个线程,每个线程运行100个协程,减少了线程的开销。

但是,操作系统是不知道协程的存在,只知道线程,协程只有在等待IO过程中才能重复利用线程,线程在等待IO的过程中会陷入阻塞状态。因此在协程调用阻塞IO操作的时候,操作系统会让线程进入阻塞状态,当前的协程和其它绑定在该线程之上的协程都会陷入阻塞而得不到调度,这往往是不能接受的。
因此在协程中不能调用导致线程阻塞的操作。也就是说,协程只有和异步IO结合起来,才能发挥最大的威力。

那么如何处理在协程中调用阻塞IO的操作呢?一般有2种处理方式:

  1. 在调用阻塞IO操作的时候,重新启动一个线程去执行这个操作,等执行完成后,协程再去读取结果。这其实和多线程没有太大区别。
  2. 对系统的IO进行封装,改成异步调用的方式,这需要大量的工作,最好寄希望于编程语言原生支持。

情景二

在我们现在 CS, BS 开发模式下, 服务器的吞吐量是一个很重要的参数。其
实吞吐量是 IO 处理时间加上业务处理。为了简单起见,比如,客户端与服务器
之间是长连接的,客户端定期给服务器发送心跳包数据。客户端发送一次心跳
包到服务器,服务器更新该新客户端状态的。 心跳包发送的过程, 业务处理时
长等于 IO 读取(RECV 系统调用)加上业务处理(更新客户状态)。吞吐量等于
1s 业务处理次数。
在这里插入图片描述
那如何提升 recv 的性能。若只有一个客户端, recv 的性能也没有必要提
升,也不能提升。 若在有百万计的客户端长连接的情况,我们该如何提升。 这个我们使用epoll管理百万计的客户端长连接,代码如下:

while (1) {
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);

    for (i = 0;i < nready;i ++) {

        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);
            
            setnonblock(connfd);

            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

        } else {
            handle(sockfd);
        }
    }
}

对于响应式服务器,所有的客户端的操作驱动都是来源于这个大循环。来源于
epoll_wait 的反馈结果。
对于服务器处理百万计的 IO。 Handle(sockfd)实现方式有两种。
第一种: handle(sockfd)函数内部对sockfd进行读写动作,如下:

int handle(int sockfd) {

    recv(sockfd, rbuffer, length, 0);
    
    parser_proto(rbuffer, length);

    send(sockfd, sbuffer, length, 0);
    
}

handle 的 io 操作(send,recv) 与 epoll_wait 是在同一个处理流程里面的。
这就是 IO 同步操作。
优点:

  1. sockfd管理方便。
  2. 操作逻辑清晰。

缺点:

  1. 服务器程序依赖epoll_wait的循环响应速度慢
  2. 程序性能差。

第二种: handle(sockfd)函数内部将sockfd的操作,push到线程池中,代码如下:

int thread_cb(int sockfd) {
    // 此函数是在线程池创建的线程中运行。
    // 与handle不在一个线程上下文中运行
    recv(sockfd, rbuffer, length, 0);
    parser_proto(rbuffer, length);
    send(sockfd, sbuffer, length, 0);
}

int handle(int sockfd) {
    //此函数在主线程 main_thread 中运行
    //在此处之前,确保线程池已经启动。
    push_thread(sockfd, thread_cb); //将sockfd放到其他线程中运行。
}

Handle 函数是将 sockfd 处理方式放到另一个已经其他的线程中运行, 如此做法,将 io 操作(recv, send)与 epoll_wait 不在一个处理流程里面, 使得 io操(recv,send)与 epoll_wait 实现解耦。 这就叫做 IO 异步操作。
优点:

  1. 子模块好规划。
  2. 程序性能高。

缺点:
正因为子模块好规划,使得模块之间的 sockfd 的管理异常麻烦。 每一个子线程都需要管理好 sockfd,避免在 IO 操作的时候, sockfd 出现关闭或其他异常。

所以有没有一种方式具有异步的性能,同步的代码逻辑-----协程!

code:
异步与同步区别

3. 协程如何使用

在做网络 IO 编程的时候, 有一个非常理想的情况, 就是每次 accept 返回的时候, 就为新来的客户端分配一个线程, 这样一个客户端对应一个线程。 就不会有多个线程共用一个 sockfd。 每请求每线程的方式, 并且代码逻辑非常易读。 但是创建线程的代价十分巨大。例如:

while (1) {
    int nfds = epoll_wait(epoll_fd, events, curfds, -1);
    if (nfds == -1) {
        perror("epoll_wait");
        break;
    }

    for (i = 0;i < nfds;i ++) {
        int sockfd = listenfd(events[i].data.fd, sockfds);
        if (sockfd) {
            socklen_t len = sizeof(struct sockaddr_in);
            int clientfd = accept(sockfd, (struct sockaddr*)&remote, &len);

            pthread_t thread_id;
            pthread_create(&thread_id, NULL, client_cb, &clientfd);
        }
        else
        {
            ...
        }
}

如果我们有协程,就可以这样实现:
https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c

while (1) {
    int nfds = epoll_wait(epoll_fd, events, curfds, -1);
    if (nfds == -1) {
        perror("epoll_wait");
        break;
    }

    for (i = 0;i < nfds;i ++) {
        int sockfd = listenfd(events[i].data.fd, sockfds);
        if (sockfd) {
            socklen_t len = sizeof(struct sockaddr_in);
            int clientfd = accept(sockfd, (struct sockaddr*)&remote, &len);
            
            nty_coroutine *read_co;
            nty_coroutine_create(&read_co, server_reader, &clientfd);
        }
        else
        {
            ...
        }
}

4. 协程的异步操作

IO异步操作一般如何实现,在send与recv调用的时候,如何实现:

while (1) {  
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);

    for (i = 0;i < nready;i ++) {

        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);
            
            setnonblock(connfd);

            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

        } else {
            
            epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
            recv(sockfd, buffer, length, 0);

            //parser_proto(buffer, length);

            send(sockfd, buffer, length, 0);
            epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, NULL);
        }
    }
}

在进行IO操作(recv,send)之前,先执行了 epoll_ctl的del操作,将相应的sockfd从epfd中删除掉,在执行完IO操作(recv,send)再进行epoll_ctl的add的动作。这段代码看起来似乎好像没有什么作用。如果是在多个上下文中,这样的做法就很有意义了。能够保证sockfd只在一个上下文中能够操作IO的。不会出现在多个上下文同时对一个IO进行操作的。协程的IO异步操作正式是采用此模式进行的。

在这里插入图片描述
在协程的上下文IO异步操作(nty_recv,nty_send)函数,步骤如下:

  1. 将sockfd 添加到epoll管理中;
  2. 进行上下文环境切换,由协程上下文yield到调度器的上下文;
  3. 调度器获取下一个协程上下文。Resume新的协程。

在这里插入图片描述

参考:
https://www.jianshu.com/p/8bcec0d2b594
https://www.jianshu.com/p/658cb5360960

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐