网络库其实现在有比较多,像今天博客讲述的陈硕的就是基于cpp写的,只支持在Linux 上运行的。

TODO

用户的注册的callback是否可以使用到muduo库里面的线程池。当业务比较大的时候。

前言


陈硕大神的开源网络库
以往的一篇博客,若学过会更好理解

什么时候用muduo库呢?
muduo库不支持跨平台使用,如果有这方面的需求,可以使用libevent,但是libevent的模型没有muduo库那么强,one loop per thread 的思想实际上更好的用到了多核的性能,采用的一个IO线程进行监听listen fd,其他线程进行处理accept fd 上来的事件。

谈到了muduo库,总会有人问上一嘴libevent。


简单介绍libevent
可以使用libevent,libevent 实际上就是通过条件编译让不同的平台调用不同的接口进行适配。纯C编写的网络库,实际上他是通过填充接口体eventop 来支持各种多路复用接口的,这些方法都是在初始化的时候使用一些static 方法提前注册的。(C语言中非常常用的方式,我这段期间经常能见到类似的操作。)
在这里插入图片描述


阻塞和非阻塞的简单理解:
recv 和 send 的接口都是默认是阻塞的,并且在读取条件或发送条件不满则的时候会阻塞线程。
recv 返回值:
在这里插入图片描述

-1 : 表示连接上有错误,但errno == EAGAIN 本次读取完毕
0 : 表示对端断开连接,读到\0
> 0 表示读取的字节数。
所谓的epoll相关的接口都是在处理等待数据准备的这一阶段。

muduo库依赖了linux的epoll和pthread以及boost库,所以只能用在linux下面。并且muduo库是LT模式的,但是每一个文件描述符都是非阻塞的,经过相关的验证,证实了实际上LT模式下的muduo库的效率其实也是非常高的。


前置知识

muduo库将网络和业务逻辑进行了解耦,使用方只需要在回调函数里面编写业务逻辑即可。
setConnectionCallback,链接的创建回调,用户的连接创建和断开。

setMessageCallback 消息的读写事件回调,专门处理用户的读写事件。

setThreadNum,其中一个线程负责新用户的连接事件,mainThread 负责监听是否有事件,工作线程负责处理每一个事件的读写事件。
muduo 提供对外的接口,比如说连接到来之后可以调用你提前的回调函数,消息读写后盗用对应的回调函数。 一般来说注册了上述的回调就够用了。

下图为一个实例,一个Echo Server

#include "examples/simple/echo/echo.h"
 
#include "muduo/base/Logging.h"
 
using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;
 
// using namespace muduo;
// using namespace muduo::net;
 
EchoServer::EchoServer(muduo::net::EventLoop* loop,
                       const muduo::net::InetAddress& listenAddr)
  : server_(loop, listenAddr, "EchoServer")
{
  server_.setConnectionCallback(
      std::bind(&EchoServer::onConnection, this, _1));
  server_.setMessageCallback(
      std::bind(&EchoServer::onMessage, this, _1, _2, _3));
}
 
void EchoServer::start()
{
  server_.start();
}
 
void EchoServer::onConnection(const muduo::net::TcpConnectionPtr& conn)
{
  LOG_INFO << "EchoServer - " << conn->peerAddress().toIpPort() << " -> "
           << conn->localAddress().toIpPort() << " is "
           << (conn->connected() ? "UP" : "DOWN");
}
 
void EchoServer::onMessage(const muduo::net::TcpConnectionPtr& conn,
                           muduo::net::Buffer* buf,
                           muduo::Timestamp time)
{
  muduo::string msg(buf->retrieveAllAsString());
  LOG_INFO << conn->name() << " echo " << msg.size() << " bytes, "
           << "data received at " << time.toString();
  conn->send(msg);
}

muduo库的模型如下,由下图的mainReactor 进行监听Acceptor ,然后将监听上来的文件描述符打包成TcpConnection,由负载均衡算法挂到不同的subReactor上面。

Acceptor 的用法:TcpServer 当中封装了一个Acceptor,注册该类的ReadCallback即可,专门监听listen fd 上是否有新的连接。

在这里插入图片描述


下述的同步和异步都是操作系统的概念,是IO相关的,不是并发那套。

recv 数据准备好就是TCP的缓冲区上有数据了。

而TCP是全双工的,意味着他有发送缓冲区和接收缓冲区,数据结构上来看就是他有两端buffer数据。

对于同步IO的简单理解
int size = recv(sockfd,buf ,1024,0); buf是我们用户层的buffer,不是TCP里面的,实际上就是从TCP的缓冲区搬到应用层; 这个过程其实就是从内核缓冲区到应用层缓冲区的过程。 调用recv就是我们自己搬回来的数据返回; 即IO同步。

在这里插入图片描述

IO 异步


aio_read ,传入sockfd 和 buf 表示关心的文件描述符和对应的缓冲区,然后操作系统拷贝完数据后发送sigio信号。

相当于给内核传送下面的是数据

Node.js 就是基于异步非阻塞模式下的高性能服务器,使用js进行编写的。

唤醒subLoop的原因是因为subLoop有可能epoll_wait ,此时要设置epoll_ctl 添加节点需要唤醒是吗?
那肯定的,需要设置节点的状态,那么肯定是不在epoll_wait 这个状态的了。

同步还是异步举例


A 等待 B做完事情,得到返回值,继续处理。 --同步

A 操作告诉B操作他感兴趣的事件和通知方式,A操作继续执行自己的业务逻辑;当B监听到响应事件发生过后,B会通知A,A开始相应的数据处理逻辑。 --异步

且epoll + fork 不一定就比 epoll + pthread 的效率差,如代表nginx的效率实际上也是非常高。
nginx 可能比 muduo库效率更高。 还是跟运用场景有关。

muduo为什么用LT模式


  • LT 能够兼容很多主机;
  • 低延迟处理;
  • 并且由于ET模式需要一次性将数据读完才返回,导致进入epoll_wait 的时间延迟了,从用户看来,就是当有一条连接进来的时候,延迟变高了。
  • 平均的分配每一个socket上的读取时间,相当于雨露均沾,这并不是坏处,相当于所有的连接都能并发了。

其实muduo库是一种服务器集群的一种现象。

vscode 的一些编译方法

在vscode 的任意一个cpp文件按F1,选择 c/c++ edit 就可以调出 c_cpp_properties的这个文件
在这里插入图片描述
在这里插入图片描述

底层数据结构分析


noncopyable

  • 大部分的类都会继承noncopyable,noncopyable就是将构造和赋值重载删除,那么子类若要进行拷贝构造和赋值都要调用父类的拷贝构造或赋值,那么就会失败。
  • 而构造函数放在protected能够保证只有子类能访问,即子类类型定义对象是没问题的。
// 编译器级别,保证头文件不会被重复包含
#pragma once

/**
 * noncopytable 被继承以后,派生类对象可以正常的构造和析构,但是派生类对象
 * 无法进行拷贝构造和赋值操作。
*/
class noncopyable
{
    public:
    // 拷贝构造和赋值重载是删除
    noncopyable(const noncopyable&) = delete;
    noncopyable& operator=(const noncopyable&) = delete;
    protected:
    // 构造和析构是默认
    noncopyable() = default;
    ~noncopyable() = default;
};

muduo库的日志库做的非常好,甚至提供了专门的动态库供外部连接,即使你不用muduo库的核心内容,挑这个日志库使用也是可以的!!

Logger


日志级别:

  • INFO
  • ERROR
  • FATAL
  • DEBUG

上述DEBUG默认是关闭的,因为DEBUG会打印太多信息,拖慢主机的执行进度。

// 定义日志的级别 INFO ERROR FATAL DEBUG
enum LogLevel
{
    INFO, // 普通信息
    ERROR, // 错误信息
    FATAL, // Core 信息
    DEBUG, //调试信息
};

日志类的使用:

  • 提供setLogLevel设置日志级别
  • log 结合已经设置的日志级别进行输出

日志类是一个单例类,instance()方法就是获取日志的唯一实例对象。

// 输出一个日志类,单例类
class Logger:noncopyable
{
public:
    static Logger& instance();// 获取日志唯一的实例对象
    // 设置日志级别
    void setLogLevel(int level);
    // 写日志
    void log(std::string msg);
private:
    int logLevel_; // 前_ 系统级别变量可能产生冲突!,所以用后_;
    Logger(){}
};

单例类实现:
instance 是非常简单,c++11 的特性能够让他直接成为实例对象。

// 获取日志唯一的实例对象
Logger& Logger::instance()
{
    static Logger logger;
    return logger;
}

设置日志级别的实现

// 设置日志级别
void Logger::setLogLevel(int level)
{
    logLevel_ = level;
}

写日志,设置打印日志的格式如注释,通过方法类Timestamp的now方法打印当前的时间戳转化为的当前时间。下面有解释Timestamp的now方法。

// 写日志  [级别信息] time : msg
void Logger::log(std::string msg)
{
    switch (logLevel_)
    {
    case INFO:
    std::cout << "[INFO]";
    break;
    case ERROR:
    std::cout << "[ERROR]";
    break;
    case FATAL:
    std::cout << "[FATAL]";
    break;
    case DEBUG:
    std::cout << "[DEBUG]";
    break;

    default:
        break;
    }
    // 打印时间,msg
    // now获取时间戳,toString 将时间戳转化为一个buf进行返回。
    std::cout << Timestamp::now().toString() << " : " << msg << std::endl;
}

最终打印出来的日志格式:
在这里插入图片描述

设置宏来方便打印日志


其实这个做法在一些开源库以及纯C的项目是非常见,它可以绑定一些方法来节省调用方传递太多的参数。
为了用户使用的更加方便,通常我们可以通过设置宏来进行解决:
为了防止宏替换出现的一些问题,所以一般我们可以通过do … while 方法来进行解决的。

  1. 获取单例对象
  2. 设置log级别
  3. 给一段buffer长度
  4. 调用snprintf函数 C 库函数 int snprintf(char *str, size_t size, const char *format, ...) 设将可变参数(...)按照 format 格式化成字符串,并将字符串复制到 str 中,size 为要写入的字符的最大数目,超过 size会被截断。 也就是说这个函数是安全的。

通常用\进行换行,而且里面不要嵌入注释。

#define LOG_INFO(logmsgFormat,...) \
    do  \
    {\
        Logger &logger = Logger::instance(); \
        logger.setLogLevel(INFO);\
        char buff[1024] = {0};\
        snprintf(buff, 1024, logmsgFormat, ##__VA_ARGS__); \
    }while(0)

LOG_DUBUG 的打印信息通常太多,所以我们通常可以用宏来控制。

#ifdef MUDEBUG

#else

#endif

时间类,只实现了获取当前时间戳,和通过时间戳转化成 2022/11/12 10:42:12 这种格式时间的接口。

Timestamp 时间类


explicit 的作用:

explicit 是防止隐式类型转化,避免赋值的是时候进行隐式类型转化。

// 时间戳,用来打印日志的时候有用
class Timestamp
{
    public:
    Timestamp();
    // 避免隐式类型转化
    explicit Timestamp(int64_t microSecondsSinceEpoch);
    static Timestamp now(); // 获取当前的时间
    std::string toString() const; // 将当前时间转化为 2022/11/8 这种形式的时间
    private:
    int64_t microSecondsSinceEpoch_;// 长整型变量表示时间
};

now 函数:
获取当前的时间,返回Timestamp对象
time 函数 在time.h里面,作用就是返回当前时间的时间戳,然后我们用int64_t 构造一个对象返回。
time_t 就是 #define __SLONGWORD_TYPE long int

 // 获取当前的时间
Timestamp Timestamp::now()
{
    // 通过调用time获取当前的时间戳
    return Timestamp(time(NULL));
}

toString 函数
实现:就是将时间戳转化成当前时间格式的方法
localtime 函数 struct tm *localtime(const time_t *timer) 使用 timer 的值来填充 tm 结构。timer 的值被分解为 tm 结构,并用本地时区表示。

std::string Timestamp::toString() const
{
    char buf[128] = {0};
    // 之前写的http项目没有加时间
    tm* tm_time = localtime(&microSecondsSinceEpoch_);
    // 年/月/日 时:分:秒
    snprintf(buf,128,"%4d/%02d/%02d %02d:%02d:%02d",
    tm_time->tm_year + 1900,
    tm_time->tm_mon + 1,
    tm_time->tm_mday,
    tm_time->tm_hour,
    tm_time->tm_min,
    tm_time->tm_sec);
    return buf;
}

tm结构体的字段

年是需要加1900 才能使用,月份需要+1;

struct tm *localtime(const time_t *timer);
    
struct tm
{
  int tm_sec;			/* Seconds.	[0-60] (1 leap second) */
  int tm_min;			/* Minutes.	[0-59] */
  int tm_hour;			/* Hours.	[0-23] */
  int tm_mday;			/* Day.		[1-31] */
  int tm_mon;			/* Month.	[0-11] */
  int tm_year;			/* Year	- 1900.  */
  int tm_wday;			/* Day of week.	[0-6] */
  int tm_yday;			/* Days in year.[0-365]	*/
  int tm_isdst;			/* DST.		[-1/0/1]*/
}

InetAddress


InetAddress 是对sockaddr_in结构体的一层封装

explicit 默认初始化值定义的时候都不能够出现。当声明和定义

主要功能:
设置端口号和ip到sockaddr_in 里面

InetAddress::InetAddress(uint16_t port, std::string ip)
{
    bzero(&addr_, sizeof addr_);
    addr_.sin_family = AF_INET;
    addr_.sin_port = htons(port);
    addr_.sin_addr.s_addr = inet_addr(ip.c_str()); // 转化成网络字节序,
}

sprintf 表示不固定长度。

下面接口的主要功能:

  • 获取 sockaddr_in 里面的获取端口号和ip,主要就是网络转主机序列。
std::string InetAddress::toIp() const
{
    // addr_ 里面读ip地址,但是已经是网络字节序了!
    char buf[64] = {0};
    // 因为sin_addr 已经是网络字节序了,需要通过ntop变成主机序列。
    ::inet_ntop(AF_INET, &addr_.sin_addr, buf,sizeof(buf));
    return buf;
}
std::string InetAddress::toIpPort() const
{
    // ip:port
    char buf[64] = {0};
    ::inet_ntop(AF_INET, &addr_.sin_addr, buf,sizeof(buf));
    size_t end = strlen(buf);
    uint16_t port = ntohs(addr_.sin_port);
    sprintf(buf + end, ":%u", port); // %u 就是无符号的格式

    return buf;
}
uint16_t InetAddress::toPort() const
{
    return ntohs(addr_.sin_port);
}

TcpServer 直接相关的类实际上包含 EventLoop, InetAddress,我们已经解决了InetAddress, 我们现在只有先解决EventLoop

TcpServer 是对外的服务器变成所使用的类 当中实际上只有Channel 和 Poller 这两个是外部的类,所以我们先挑其中一个解决。 而 EventLoop 的主要成员就是 Channel 和 Poller

Channel 封装了一个fd,以及需要关心的事件events_,以及已经就绪的事件revents_,以及事件回调的函数方法。

Channel


啥时候不需要只需要前置声明?
Channel 中为何 只放 class EventLoop 的前置声明呢?
因为头文件实际上也可以按需给,如果我们只给出类型的声明,就可以只进行前置声明,后续暴露的.h文件就可以尽量给少一点。

class EventLoop 和 class Timestamp 就是前置声明。

下面就是定义两个函数类型。 相当于 void(*EventCallBack)() 这种。

using EventCallBack = std::function<void()>;
using ReadEventCallBack = std::function<void(Timestamp)>;

成员遍历一览:
revents_ 是EventLoop的poller模块调用epoll_wait的时候返回的具体发生的事件。这个时候EventLoop 会调用set_revents 设置回Channel当中,往后Channel有handlerEvent方法调用就能使用revents进行操作。

下面的回调是由我们用户决定的,也就是OnMessage,OnConnection最终就是在 ,等等方法。

ReadEventCallBack readCallback_;// void()
EventCallBack writeCallback_;   // void(Timestamp);
EventCallBack closeCallback_;
EventCallBack errorCallback_;

注释的比较详细

static const int kNoneEvent;  // 对任何事件都不感兴趣
static const int kReadEvent;  // 对读事件
static const int kWriteEvent; // 对写事件

EventLoop *loop_; // Reactor
const int fd_;    // fd就是关心的文件描述符
int events_;      // 注册fd 感兴趣的事件
int revents_;     // 表示Channel需要处理的事件
int index_;       // 这个是表达当前Channel的状态,标记处于上述三种事件状态

std::weak_ptr<void> tie_; // 当执行回调函数的时候用来判断是否有被remove
bool tied_; 				// 搭配tie_ 一起进行判断,为true才进行tie_的强类型转化

// Channel 通道能够获知fd最终发生的具体事件revents,所以它负责调用具体事件的回调操作
// 每一个Channel都代表一个事件节点,这里的定义回调的类型
ReadEventCallBack readCallback_;// void()
EventCallBack writeCallback_;   // void(Timestamp);
EventCallBack closeCallback_;
EventCallBack errorCallback_;

构造函数
传参loop,表示一个Channel一定会绑定一个他所属的EventLoop。因为Channel需要最终在Poller上进行epoll_wait,但是他们之间不直接相关,而是通过EventLoop,这里绑定EventLoop后可以通过EventLoop找到对应的Poller。

Channel::Channel(EventLoop *loop, int fd)
    : loop_(loop), fd_(fd), events_(0), revents_(0), index_(-1), tied_(-1){}

handlerEvent 是处理事件
下面用到了 Timestamp 说明需要知道Timestamp的大小。所以最好给出头文件。

void handlerEvent(Timestamp receiveTime);

表明当前fd的状态的相关函数。

// 返回fd当前的事件状态
bool isNoneEvent() const {return events_ == kNoneEvent;}
bool isWriting() const {return events_ & kWriteEvent;}
bool isReading() const {return events_ & kReadEvent;}

shared_from_this 的作用:

下述是一个错误示范,当我们需要一个指针来构造一个智能指针,用下面的做法一定会错误。

原因是this是裸指针,我们这么操作和用同一个裸指针给两个智能指针赋值是一个意思

#include<iostream>
using namespace std;
#include<memory>
class A
{
public:
	A(int y = 0) :x(y) { }
	A* getthis()
	{
		return this;
	}
	int x;
};


int main(void)
{
	shared_ptr<A>sp1(new A());
	shared_ptr<A>sp2(sp1->getthis());
	cout << sp1.use_count() << endl;
	cout << sp2.use_count() << endl;
}

shared_from_this() 的作用

shared_from_this的返回值是一个智能指针,他就是把返回的指针转化成shared_ptr 来使用,这个时候引用计数实际上进行了++,所以use_count 为2,程序不会出错。

class A :public enable_shared_from_this<A>
{
public:
	A(int y = 0) :x(y) { }
	shared_ptr<A> getthis()
	{
		return shared_from_this();
	}
	int x;
};


int main(void)
{
	shared_ptr<A>sp1(new A());
	shared_ptr<A>sp2(sp1->getthis());
	cout << sp1.use_count() << endl;
	cout << sp2.use_count() << endl;
}

设置回调的方法也很简单:
他们的实现也比较简单,直接move一下给自己用就行了。

 // 设置回调函数对象
void setReadCallback(ReadEventCallBack cb) { readCallback_ = std::move(cb); }
void setWriteCallback(EventCallBack cb) { writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallBack cb){closeCallback_ = std::move(cb);}
void setErrorCallback(EventCallBack cb){errorCallback_ = std::move(cb);}

tie 函数:
用弱智能指针判断是否channel被remove掉。防止调用HandlerEvent的时候Channel已经释放了,这个tie函数会在TcpConnection建立成功的回调函数里面被调用。

// 防止当channel 被手动remove掉,channel还在执行回调操作
void tie(const std::shared_ptr<void> &);

为什么要提供set_revents 函数?

因为Channel本身只存储了文件描述符,以及就绪后对应事件的处理方法,所以需要有一个函数,让Poller模块有方法能够告知Channel需要对哪些事件就绪进行通知。

int events() const { return events_; }
// 为什么设置这里的revent? 而不是通过epoll进行监听然后设置呢?
int set_revents(int revt) { revents_ = revt; return revents_;}
//bool isNoneEvent() const { return events_ == kNoneEvent; }
 // 返回fd当前的事件状态
bool isNoneEvent() const {return events_ == kNoneEvent;}

倘若有读写事件需要关心,那么Channel肯定得知道。那么知道之后,我们肯定需要让Poller模块去帮我们关心,所以下面的update方法,实际上就是让我们的EventLoop模块去关心这个文件描述符

实际上由于Poller和Channel都有一个EventLoop* ,而EventLoop有 std::unique_ptr<Poller> poller_;字段可以找到Poller结构体,Poller结构体就可以进行更新了。

void enableWriting()
{
    events_ |= kWriteEvent;
    update(); // 调用epoll_ctl 让红黑树关心这个事件
}
// one loop pre thread
EventLoop* ownerLoop() {return loop_;}

就是update调用EvnetLoop的方法。

void Channel::update()
{
    // 通过Channel 找到 EventLoop。找到Poller 调用对应的事件
    // add code...
    loop_->updateChannel(this);
}

定义成员函数:

tie实际上就是设置一个观察者,让tie_这个弱智能指针去观察对象。

// 观察者,观察一个强智能指针
void Channel::tie(const std::shared_ptr<void>& obj)
{
    tie_ = obj;
    tied_ = true;
}

当改变channel 所表示fd的events事件后,update负责在poller里面更改fd响应的事件。即调用epoll_ctl函数。

EventLoop =》 ChannelList Poller

update就是上面所说的,就是简单的调用updateChannel 方法而已。

void Channel::update()
{
    // 通过Channel 找到 EventLoop。找到Poller 调用对应的事件
    // add code...
    loop_->updateChannel(this);
}
// 在channel所属的EventLoop,把当前的channel删除掉
void Channel::remove()
{
    // add code
    loop_->removeChannel(this);
}

loop_最终会调用到EpollPoller中的removeChannel

void EPollPoller::removeChannel(Channel *channel)
{
    int fd = channel->fd();
    channels_.erase(fd); // 删除map中映射

    LOG_INFO("func=%s => fd=%d  \n", __FUNCTION__, channel->fd());

    int index = channel->index();
    if (index == kAdded) // 如果还是在状态2,就删掉
    {
        update(EPOLL_CTL_DEL, channel);
    }
    channel->set_index(kNew); // 设置状态为可添加
}

传入的shared_ptr 究竟是个啥,要怎么理解?
tied_ 是一个bool类型。
理解channel最重要的两个函数,handlerEvent 最重要的就是判断是否对象还存在,是否能够继续调用;
那么tied_ 和 tie() 一个成员变量,一个成员函数什么时候被调用呢?
tied_ 是在 tie 方法设置之后 才是true,tie方法又是上层TcpConnection调用连接建立成功的方法connectEstablished来进行建立成功的。说明当连接建立好的时候,这个时候如果TcpConnection还存在,表明连接状态是良好的,此时我们调用EPollPoller中handlerEvent的时候能保证执行正确。
如果TcpConnection因为某种缘故断开了,这个时候处理这个handlerEvent事件已经没有意义,因为也发送不回去了,此时上层的shared_ptr已经跟着TcpConnection一起消失,大佬的编码是就不处理了。
这里大佬的编码中采用不处理的方式。因为Channel已经跟着TcpConnection一同释放了。所以这里没有else分支。
handlerEventWithGuard 则是对回调对应的具体的事件。

// fd得到poller通知以后处理事件的。
// 成员变量tied_ 主要就是判断Channel是否又被tied_ 过,以及是否正在被tied_
void Channel::handlerEvent(Timestamp receiveTime)
{
    std::shared_ptr<void> guard;
    if(tied_)
    {
        guard = tie_.lock();// 将弱指针提升,返回值不为NULL表示成功
        if(guard)
        {
            handlerEventWithGuard(receiveTime);
        }// 无需else分支,因为Channel已经和TcpConnection一起走了。
    }
    else
    {
        handlerEventWithGuard(receiveTime);
    }
}

// 根据poller通知的channel发生的具体事件,由channel执行对应的回调方法。
void Channel::handlerEventWithGuard(Timestamp receiveTime)
{
    LOG_INFO("channel handlerEvent revents:%d\n",revents_);
    if((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
    {
        if(closeCallback_)
        {
            closeCallback_();
        }
    }

    if(revents_ & EPOLLERR)
    {
        if(errorCallback_)
        {
            errorCallback_();
        }
    }

    if(revents_& (EPOLLIN | EPOLLPRI))
    {
        //可读事件
        if(readCallback_)
        {
            readCallback_(receiveTime);
        }
    }
    if(revents_ & EPOLLOUT)
    {
        if(writeCallback_)
        {
            writeCallback_();
        }
    }
}

Poller.h


有两个成员变量

using ChannelList = std::vector<Channel*>; 

这里的key就是sockfd,这里的value 就是 key 对应的Channel的参数类型。

using ChannelMap = std::unordered_map<int,Channel*>;

Poller是一个基类, 他的poll这个接口是为了让子类可以用epoll或者是poll进行轮询。相当于让派生类来实现多态;但我们只实现其中的EpollPoller

保留统一的接口进行IO复用

virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;

updateChannel 传参的地方this

/// Changes the interested I/O events.
/// Must be called in the loop thread.
virtual void updateChannel(Channel* channel) = 0;

Poller中的removeChannel 就是往红黑树做操作,removeChannel 也是接受一个channel对象指针,通过这个对象拿到对应信息,对我们的红黑树做操作。
那么Channel中的removeChannel函数又会做什么呢?
就是调用Poller的removeChannel,仅仅如此~

那么哪里会调用removeChannel呢?
Acceptor的析构函数,会把监听套接字从主Loop删除。

/// Remove the channel, when it destructs.
/// Must be called in the loop thread.
virtual void removeChannel(Channel* channel) = 0;

hasChannel 判断参数channel 是否在当前Poller 当中。
在Channel的析构函数中使用,但仅仅是进行断言。

bool hasChannel(Channel* channel) const;

newDefaultPoller为什么不放在Poller.cc里面,实际上放进去并没有错,但是由于他要返回的对象指针会包含两个对象,EpollPoller.h PollPoler.h 两个,所以说他会新开一个文件.h来使用

获取到一个事件循环的poller; 实际上这个就是单例模式的getInstance,返回的是一个基类的Poller对象指针。

static Poller* newDefaultPoller(EventLoop* loop);// 注意这里的操作,在DefaultPoller.cc实现

DefaultPoller.h文件中 基类不要引用派生类 放在这里相当于放到一个公共的头文件里面。

#include"Poller.h"
#include"EPollPoller.h"
#include<stdlib.h>

Poller* Poller::newDefaultPoller(EventLoop* loop)
{
    if(::getenv("MUDUO_USE_POLL"))
    {
        return nullptr;// 生成poll的实例
    }
    else
    {
        // 因为设计到具体要new 这个对象了,所以说要弄一个DefaultPoller.h
        return new EPollPoller(loop);// 生成epoll的实例
    }
}

EpollPoller (调用epoll的模块)


继承的时候需要知道这个EpollPoller 继承自Poller,所以需要知道Polelr的大小。 所以需要引头文件

class EPollPoller : public Poller

override放在子类,表明希望父类是一个虚函数,如果不是,那么会生成编译错误。

override的作用,有时候眼瞎,看不清函数,子类可能没有重写父类的接口,而是自己定义了一个函数,害怕这种情况的发生。

epoll_event 是epoll_wait 定义的一次性上层能拿到的就绪队列长度。

using EventList = std::vector<struct epoll_event>;
int epollfd_; // epoll_create 的返回值
EventList events_; //  就是上层的一次性能拿上来的最大的这个底层就绪的数组。

poll实际上就是一个 epoll 还是 poll的一个分歧点

Timestamp poll(int timeoutms,ChannelList* activeChannels) override;

基类定义的类型,子类也定义,说明using 定义的类型不能继承下来。

using EventList = std::vector<struct epoll_event>;
static const int kInitEventListSize = 16; // std::vector<struct epoll_event> 的初始值
// 更新channel通道,这里实际上就是对channel里面的事件events给epoll进行设置。
void update(int operation,Channel* channel);

EpollPoller中的poll方法,EventLoop 里面有activeChannels_实际上也是活跃的数量,然后EventLoop 在loop中处理对应的活跃的Channel,即进行回调。

// 填写活跃的链接,交付给EpollEvent,然后EventLoop 再把这个给Channel,Channel再回调
void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;

EPollPoller 的部分实现讲解


EpollPoller中封装了对于红黑树的操作。即调用了epoll_ctl对红黑树的节点做更新,删除。

里面有三种状态

// channel 未添加到EPollPoller   中
const int kNew = -1; // EPollPoller   的成员index_ = -1
// channel 已添加到poller中
const int kAdded = 1;
// channel从poller中删除
const int kDeleted = 2;

epoll_create1 的作用在于他可以设置flags,其中flags 中有

EPOLL_COLEXEC 就是 父进程创建的,fork之后子进程不会也打开这个文件描述符。 默认子进程会打开父进程所打开的文件描述符。 kInitEventListSize 源码当中就是16。

在这里插入图片描述

EPollPoller::EPollPoller(EventLoop* loop) // EPOLL_CLOEXEC , kInitEventListSize
   : Poller(loop), epollfd_(::epoll_create1(EPOLL_CLOEXEC)), events_(kInitEventListSize) // vector<epoll_event>
{
    if (epollfd_ < 0)
    {
        LOG_FATAL("epoll_create errir:%d \n", errno);
    }
} // epoll_create

析构函数,相当于我们删掉底层的Poller

EPollPoller::~EPollPoller()
{
    // 编译器检查是否有实现,基类是否是virtual
    ::close(epollfd_);
}

为什么每一个Channel都要保持三种状态?
在updateChannel,removeChannel 的时候,会根据Channel的状态进行调整,Channel不会remove后直接删掉。
因为Channel的声明周期是由TcpConnection维护的,只有TcpConnection消失,Channel才会真正消失。
在这里插入图片描述

channel 里面的index 就是给EpollPoller里面的状态。 对应于未添加,已添加,删除 都在下面这个函数。

主要逻辑:

EventLoop(事件循环, 放着所有的channel)

        ChannelList            	Poller

​						ChannelMap <fd,channel*>   (只有在Poller里面注册过的,才会在ChannelMap里面出现)
// 对应epoll_ctl
void EPollPoller::updateChannel(Channel *channel)
{
    const int index = channel->index();
    if (index == kNew || index == kDeleted)
    {
        // 没有添加过的状态就是kNew,原来删除的和没有添加过的处理动作动作EPOLL_CTL_ADD 
        if (index == kNew)
        {
            int fd = channel->fd();
            channels_[fd] = channel;
        }
        // 如果已经是delete状态,那么可以直接复用,也就是新添加一个
        channel->set_index(kAdded);
        update(EPOLL_CTL_ADD, channel);
    }
    else // channel 已经在poller上注册过了
    {
        int fd = channel->fd();
        // isNoneEvent 存在说明此次我们选择删除。
        if (channel->isNoneEvent())
        {
            update(EPOLL_CTL_DEL, channel);
            channel->set_index(kDeleted);
        }
        else
        {
            // 对某些事件感兴趣
            update(EPOLL_CTL_MOD, channel);
        }
    }
}

removeChannel的时候考虑kAdded情况其实足够,因为其他两种情况Channel都已经不在红黑树上了。

void EPollPoller::removeChannel(Channel *channel)
{
    int fd = channel->fd();
    channels_.erase(fd); // 删除map中映射

    LOG_INFO("func=%s => fd=%d  \n", __FUNCTION__, channel->fd());

    int index = channel->index();
    if (index == kAdded) // 如果还是在状态2,就删掉
    {
        update(EPOLL_CTL_DEL, channel);
    }
    channel->set_index(kNew); // 设置状态为可添加
}

updateChannel:

  1. 如channel没有被注册,或者是之前添加然后被删除了,那么我们统一插入epoll模型中。
  2. 如已经注册,则查看是否有 events_ 是否和 kNoneEvent 相同 (这个封装成了一个函数罢了), 那么执行删除,否则调用update,更新红黑树上的节点的关心事件。
// 对应epoll_ctl
void EPollPoller::updateChannel(Channel *channel)
{
    const int index = channel->index();
    LOG_INFO("func=%s => fd=%d events=%d index=%d \n", __FUNCTION__, channel->fd(), channel->events(), index);
    if (index == kNew || index == kDeleted)
    {
        // 没有添加过的状态就是kNew,原来删除的
        if (index == kNew)
        {
            int fd = channel->fd();
            channels_[fd] = channel;
        }
        channel->set_index(kAdded);
        update(EPOLL_CTL_ADD, channel);
    }
    else // channel 已经在poller上注册过了
    {
        int fd = channel->fd();
        if (channel->isNoneEvent())
        {
            update(EPOLL_CTL_DEL, channel);
            channel->set_index(kDeleted);
        }
        else
        {
            // 对某些事件感兴趣
            update(EPOLL_CTL_MOD, channel);
        }
    }
}

update函数:
非常简单,实际上把channel 的东西全部取出来,赋值给一个栈上的event,然后调用epoll_ctl进行对红黑树的操作。

// 更新channel通道 epoll_ctl add/mod/del
void EPollPoller::update(int operation, Channel *channel)
{
    epoll_event event;
    ::memset(&event, 0, sizeof(event));
    int fd = channel->fd();
    event.events = channel->events();
    event.data.fd = fd;
    event.data.ptr = channel; // 赋值给void* 了


    if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
    {
        if (operation == EPOLL_CTL_DEL)
        {
            LOG_ERROR("epoll_ctl del error %d\n", errno);
        }
        else
        {
            LOG_FATAL("epoll_ctl add/mod error%d\n", errno);
        }
    }
}

poll方法:

主要调用epoll_wait,然后对numEvents 也就是多少个文件描述符有事件就绪了。

由于struct event 这个结构体我们用vector来存,所以我们需要&*event_.begin() 这种方式来取数组的首元素。

saveError 可能是保存epoll_wait 之后的结果,防止执行到后面的时候saveError被人修改了。

// poll 主要调用epoll_wait ,moduo 库用的是LT模式,activeChannels 是给到EventLoop的
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
{
    // 实际上使用LOG_DEBUG更加合理
    LOG_INFO("func=%s => fd total count:%d\n", channels_.size());
    // 我们这里用的是vector,所以需要向下面这样写
    int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
    // poll在多个线程可能都会被调用,所以提前保存一份
    int saveErrno = errno;
    Timestamp now(Timestamp::now());
    // 事件就绪后,记录时间戳,然后将revetns记录到
    if (numEvents > 0) // 只调用一次即可
    {
        // 已经发生过了事件
        LOG_INFO("%d events happened \n", numEvents);
        fillActiveChannels(numEvents, activeChannels);
        // 这一轮所有监听的events都上来了,底层的就绪队列可能很大,所以此时需要扩容
        if (numEvents == events_.size())
        {
            events_.resize(events_.size() * 2);
        }
    }
    else if (numEvents == 0)
    {
        LOG_DEBUG("%s timeout! \n", __FUNCTION__);
    }
    else
    {
        if (saveErrno != EINTR)
        {
            errno = saveErrno;
            LOG_ERROR("EPollPoller::poll()");
        }
    }
    return now;
}

fillActiveChannels函数:

EpollPoller 里面没有ChannelList ,他是由EventLoop调用的,也就是一次epoll_wait 的结果直接给了EventLoop,他自己不保存的。

就是填充ChannelList 这个数组。

// 填写活跃的连接,EPollPoller:: 注意加上!
void EPollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const
{
    for (int i = 0; i < numEvents; ++i)
    {
        // 隐式类型转化
        Channel *channel = static_cast<Channel *>(events_[i].data.ptr);
        channel->set_revents(events_[i].events);
        // 事件就在这里了。
        activeChannels->push_back(channel);
    }
}

EventLoop 事件循环(Reactor)

EPollPoller 是事件分发器, EventLoop是反应堆模型。

EventLoop 这里是线程本地存储的的。
变量即使线程本地存储的,又是extern的。

extern 是用来声明一个变量的,若在声明的后面直接跟定义,那么肯定是有问题的。

和普通文件一样,定义只要放在.h 里面就一定会出问题!

例外:static 修饰的变量可以直接定义,因为他的连接属性已经改变了

CurrentThread

在这里插入图片描述

tid的访问时系统调用,所以弄了一个cacheTid,防止每次都调用系统调用拿,加快效率。

对于__thread的小实验


头文件extern __thread int t_cachedTid;

extern 的时候 所有的线程extern 就是自己线程中的这个变量。

下面的三个文件进行测试extern static 搭配 __thread 使用的场景:

// test.h
#pragma once
#include<unistd.h>
#include <sys/syscall.h>

void func();
    // 线程本地存储
    //extern __thread int t_cachedTid;
    extern __thread int t_cachedTid = 0;
    //static __thread int t_cachedTid = 0;

    void cacheTid();
    
    inline int tid()
    {
        if(__builtin_expect(t_cachedTid == 0,0))
        {
            cacheTid();
        }
        return  t_cachedTid;
    }

// test.cc
#include"test.h"
#include<iostream>
using namespace std;
//__thread int t_cachedTid = 0;
    void cacheTid()
    {
        if(t_cachedTid == 0)
        {
            // 通过linux系统调用获取tid值
            t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
        }
    }
int main()
{
  cout << tid() << endl;
  func();
  return 0;
}

// test2.cc
#include"test.h"
#include<iostream>
using namespace std;

void func()
{
  cout <<  tid() << endl; 
}
namespace CurrentThread
{
    __thread int t_cachedTid = 0;
    void cacheTid()
    {
        if(t_cachedTid == 0)
        {
            // 通过linux系统调用获取tid值
            t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
        }
    }
}

EventLoop 成员


EventLoop 里面的 vector*,别名 ChannelList,就是poller 返回给EventLoop的哪些文件描述符已经就绪了。

using Functor = std::function<void()>;
using ChannelList = std::vector<Channel*>;
std::atomic_bool looping_; // 原子操作,通过CAS实现
std::atomic_bool quit_;    // 标识退出loop循环

每一个Channel 都会在自己所在的subLoop里面执行。

wakeupFd 就是 当mainLoop 里面获得一个新连接,就可以通过 subLoop 里面的wakeupFd,用来唤醒指定的subLoop,subLoop 会将这个wakupFd_ 放在epoll_wait 里面进行等待。

int wakeupFd_; // 重要,唤醒subReactor,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channels

callingPendingFunctors_;

pendingFunctors; mutex_ 是用来保护这里的待执行的vetcor

std::mutex mutex_;                      // 互斥锁,用来保护上面vector容器的线程安全 
std::atomic_bool callingPendingFunctors_;// 标识当前loop是否有需要执行的回调操作
std::vector<Functor> pendingFunctors_;  // 存储loop需要执行的所有的回调操作

wakefd的两种创建方式。

socketpair

这个与管道不同就是双端都是可读可写的。 这里走的是网络通信

在这里插入图片描述

eventfd

eventfd 是内核的事件通知机制。muduo库中就是用这个系统调用创建每一个线程的wakeupFd的

在这里插入图片描述

mainLoop 选择 并且唤醒对应subLoop 用的字段。 就是有一个新的连接要给subLoop,但是subLoop可能在epoll_wait,所以需要将他唤醒。

int wakeupFd_; // 重要,唤醒subReactor,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channels
 // 在当前loop中执行cb
void runInLoop(Functor cb);
// 把cb放入队列中,唤醒loop所在的线程,执行cb
void queueInLoop(Functor cb); 
// 用来唤醒loop所在的线程的 mainReactor 唤醒 subReactor的
void wakeup();

很明显,就是调用poller 里面的updateChanel 方法而已。

void EventLoop::updateChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  poller_->updateChannel(channel); // 这里是poller执行
}

证明EventLoop对象是在创建他的线程里面。

// 判断EventLoop 对象是否在自己的线程里面。
bool isInLoopThread() const {return threadId_ == CurrentThread::tid();}

如果isInLoopThread 不满足,说明evnetLoop对象不在他所创建的线程当中.

目前理解: 就是vector 里面放着待执行的回调方法,而当当前线程执行这个vector的时候,需要是Channel对应的EventLoop里面有才行,否则就重新放回去,然后唤醒它所在的线程。

// 把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb)
{
    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.emplace_back(cb);
    }
    // 唤醒相应的需要执行上面的回调操作的loop线程
    // || callingPendingFunctors_ 标识当前loop正在执行回调,此时loop又有了新的回调。
    if(!isInLoopThread() || callingPendingFunctors_)  // 这里的callingPendingFunctors_待解释
    {
        wakeup();// 唤醒loop所在线程
    }
}

EventLoop 的部分实现讲解


t_loopInThisThread 在此处声明+定义,每一个线程只有一份。

// 防止一个线程创建多个EventLoop thread local
__thread EventLoop *t_loopInThisThread = nullptr;

unique_ptr 就是里面的Poller的超时时间,10s

// epoll的超时时间,这里是Poller的超时时间
const int kPollTimeMs = 10000;

创建一个wakefd,用了eventfd这个函数来创建

// 创建wakeupfd,用来notify唤醒subReactor处理新来的channel
int createEventfd()
{
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0)
    {
        LOG_FATAL("eventfd error:%d \n", errno);
    }
    return evtfd;
}

构造函数的字段讲解:

looping_ 就是循环的接口

quit_ 表示退出用的接口

callingPendingFunctors_ 是否有需要处理的回调

threadId 就是当前的线程号,通过CurrentThread的tid就可以获取。

poller_ 这里其实就是创建一个poller, 一个EventLoop 会有一个Poller

wakeupFd_ 这个就是createEventfd() 调用那个系统调用然后创建一个wakeupFd

wakeupChannel_ : 需要Poller 关心这个wakeupfd

currentActiveChannel_ : 目前有事件已经就绪的channel

handlerRead 就是读取一个整型的数据,目的就是为了唤醒subLoop ,由于调用这个方法的时候在类内部,所以用绑定器提前给定this的位置。

enableReading 是设置wakeupfd 关心 读事件。

此时每一个eventLoop 都会监听wakeupChannel 的EPOLLIN 读事件了。然后设置wakeupfd事件就绪的回调事件。

EventLoop::EventLoop()
    : looping_(false), quit_(false), callingPendingFunctors_(false), threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_)), currentActiveChannel_(nullptr)
{
    LOG_DEBUG("EvetnLoop created %p in thread %d \n", this, threadId_);
    if (t_loopInThisThread)
    {
        LOG_FATAL("Another EventLoop %p exists in this thread %d \n", t_loopInThisThread, threadId_);
    }
    else
    {
        t_loopInThisThread = this;
    }
    // 设置wakeupFd的监听事件类型,以及发生事件后的回调方法
    wakeupChannel_->setReadCallback(std::bind(&EventLoop::handlerRead, this));
    // 每一个eventloop都将监听wakeupchannel的EPOLLIN读时间
    wakeupChannel_->enableReading();
}

wakeup 析构的时候可以将通道关闭,资源释放。

EventLoop::~EventLoop()
{
    wakeupChannel_->disableAll();
    wakeupChannel_->remove();
    ::close(wakeupFd_);
    t_loopInThisThread = nullptr;
}

开启事件循环,最重要的一个函数!

主要逻辑:

// 开启事件循环
void EventLoop::loop()
{
    // 开启事件循环
    looping_ = true;
    // 退出设置为false
    quit_ = false;

    LOG_INFO("EventLoop %p start looping \n",this);
    // 只要没有quit_ 不满足,则一直运行
    while(!quit_)
    {
        // 主要监听两类fd,一种是client的fd(listen上来的),一种是wakeupfd(唤醒subloop的fd)
        activeChannels_.clear();
        // 这里可以看到,是让poller模块将activeChannels_填充好带上来
        pollReturntime_ = poller_->poll(kPollTimeMs,&activeChannels_);
        // 然后对于所有事件就绪的Channel ,拿出来调用回调函数
        for(Channel* channel: activeChannels_)
        {
            // Poller 监听哪些channel发生时间了,然后上报给EventLoop,通知channel通知
            channel->handlerEvent(pollReturntime_);
        }
        // 执行当前EventLoop事件循环需要处理的回调操作
        /**
         * IO 线程 mainLoop accept| =》 fd channel 打包fd, channel得分配给subloop
         * mainLoop事先注册一个回调(需要subloop来执行)  wakeup subloop后,执行下面的cb操作
        */
        doPendingFunctors(); // 此时只有一种情况,MainLoop 给我新的Channel
    }

    LOG_INFO("EventLoop %p stop looping. \n",this);
    looping_ = false;
    
}

回忆 Channel 的handlerEvent 做了啥工作。

// fd得到poller通知以后处理事件的。
// 成员变量tied_ 主要就是判断Channel是否又被tied_ 过,以及是否正在被tied_
void Channel::handlerEvent(Timestamp receiveTime)
{
    std::shared_ptr<void> guard;
    if(tied_)
    {
        guard = tie_.lock();// 将弱指针提升,返回值不为NULL表示成功
        if(guard)
        {
            handlerEventWithGuard(receiveTime);
        }
    }
    else
    {
        handlerEventWithGuard(receiveTime);
    }
}

以及对于doPendingFunctors的理解:

有一种情况,当主Loop 给从Loop分配文件描述符的时候,此时往wakefd调用回调后苏醒了,从Loop就需要调用一下doPendingFunctors,判断是否是需要我往红黑树插上新的节点。

困惑:这里要是没有事件,lock这把锁每次都要加,会不会影响效率。

// 为了并发操作,定义局部的vector
 void EventLoop::doPendingFunctors()
 {
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }
    // 上面是对效率的考量,主要是下面需要进行执行任务
    for(const Functor& functor:functors)
    {
        functor(); // 执行当前loop需要执行的回调操作
    }
    callingPendingFunctors_ = false;
 }
// 为了并发操作,定义局部的vector
 void EventLoop::doPendingFunctors()
 {
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }
    // 上面是对效率的考量,主要是下面需要进行执行任务
    for(const Functor& functor:functors)
    {
        functor(); // 执行当前loop需要执行的回调操作
    }
    callingPendingFunctors_ = false;
 }

quit 函数:

若是自己的loop里面调用quit,说明loop 肯定不在poll函数了。

困惑:怎么会有非loop线程调用quit?在subloop(worker)中,调用mainLoop(IO线程)里面的quit?

是因为EventLoopThreadPool一开始是在主线程创立的,若此时其他线程上的EventLoopThread需要删除掉,则需要唤醒从线程,由从线程进行删除。

在这里插入图片描述

// 退出事件循环 1.loop在自己的线程中 2. 非loop的线程中,调用loop的quit
// subloop1 退出 subloop2 不理解
void EventLoop::quit()
{
    quit_ = true;
    if(! isInLoopThread()) // 如果是在其他线程中,调用quit; 在一个subloop(worker)中调用mainLoop的quit
    {
        wakeup();
    }
}

runInLoop:

在当前的loop,就执行回调方法; 否则就执行cb

void EventLoop::runInLoop(Functor cb)
{
    if(isInLoopThread()) // 在当前的loop线程中,执行cb
    {
        cb();
    }
    else // 在非loop 线程中执行cb
    {
        queueInLoop(cb);
    }
}

queueInLoop:

callingPendingFunctors_ 表示已经被唤醒,没有阻塞在loop上,避免执行期间又有新的回调到来,而无法执行。

他依旧会到poll,但是我已经wakeup,它poll立马又会返回。

// 把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb)
{
    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.emplace_back(cb);
    }
    // 唤醒相应的需要执行上面的回调操作的loop线程
    // || callingPendingFunctors_ 标识当前loop正在执行回调,此时loop又有了新的回调。
    if(!isInLoopThread() || callingPendingFunctors_)  // 这里的callingPendingFunctors_待解释
    {
        wakeup();// 唤醒loop所在线程
    }
}

wakeup就是通过给wakeupFd写一个内容,让线程起来。 唤醒loop所在的线程

// 用来唤醒loop所在的线程的 向wakeupfd写一个数据
void EventLoop::wakeup()
{
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_,&one,sizeof one);
    if(n != sizeof one)
    {
        LOG_ERROR("EventLoop::wakeup() wirtes %lu bytes instead of 8\n",n);
    }
}
// 为了并发操作,定义局部的vector
 void EventLoop::doPendingFunctors()
 {
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }
    // 上面是对效率的考量,主要是下面需要进行执行任务
    for(const Functor& functor:functors)
    {
        functor(); // 执行当前loop需要执行的回调操作
    }
    callingPendingFunctors_ = false;
 }

思考:

在mainLoop 和 subLoop 添加一个生产者消费者线程安全的队列。 mainLoop负责放channel,subLoop 负责拿channel。 若是这样,逻辑就清晰,但是muduo库里面只通过wakeupfd进行调用。

Thread 类


线程函数的类型。 若是要带参数,可以使用绑定器进行绑定。

using ThreadFunc = std::function<void()>;

这里封装了c++11的线程库,用了智能指针进行封装。

std::shared_ptr<std::thread> thread_; // 这里的thread是线程里面的thread

注意这里的tid_ 也不是pthread_self() 的结果。

void setDefaultName();
bool started_;
bool joined_;  
std::shared_ptr<std::thread> thread_; // 这里的thread是线程里面的thread
pid_t tid_;			// 线程的tid,这个值是top命令查看出来的
ThreadFunc func_;    // 定义线程的函数对象
std::string name_; // 线程名,调试的时候打印用的
static std::atomic_int numCreated_; // 产生的线程的数量

构造函数,默认start_ 不启动,tid_ 为0,启动的线程函数是传参的func,name_ 表示线程名称。

调用setDefaultName 就是将一定的方法把线程名称创建出来。

std::atomic_int Thread::numCreated_(0); // =0 这种explicit不能用
Thread::Thread(ThreadFunc func, const std::string &name) // 这里不需要把模板参数也写出来
    : started_(false), joined_(false), tid_(0), func_(std::move(func)), name_(name)
{
    setDefaultName();
}

setDefaultName 如果名字为空,则会创建默认的名字。

void Thread::setDefaultName()
{
    int num = ++numCreated_;
    // 如果名字为空,则会创建默认的名字。
    if(name_.empty())
    {
        char buf[32] = {0};
        snprintf(buf,sizeof buf,"Thread%d",num);
        name_ = buf;
    }
}

join 表示他是一个普通的工作线程,必须等他执行完才能够进行释放,实际上我们的资源是由shared_ptr维护的。 所以即使我们这里没有释放,也不会出现问题。

这里的析构方法实际上也就是分离线程的方法。

这里也是因为主线程虽然要释放了,但是从线程可能还有工作,此时我们的策略是分离,而不应该杀死线程。若主线程没有设置 join_ 为true,那么就是说主线程实际上可以不等待了,那么此时设置分离他的状态。

Thread::~Thread()
{
    if(started_ && !joined_)
    {
        thread_->detach();// thread类提供的设置分离线程的方法  pthread_deatch
    }
}

start 函数就是可以执行

thread_ 函数是通过右值的形式进行右值引用的。 此时开启线程,专门执行该线程函数。

为什么弄个信号量,做这么奇怪的操作?

这里是这样的,主线程创建一个线程,此时我们需要保证这个函数执行完,主线程返回的时候,从线程的tid必须是一个已经创建好的值。但是由于线程的调度策略我们并不知道,所以用信号量,让主线程在从线程 sem_post 生产一个的时候才放行主线程。

这里用的是二元信号量,本身就有互斥的含义。

void Thread::start() // 一个Thread对象,记录的就是一个新线程的详细信息、
{
  started_ = true;
  sem_t sem;
 // 第二个是是否进程间共享,第三个就是信号量的数量
  sem_init(&sem,false,0);
  // 开启线程,&获取外部成员变量
  thread_ = std::shared_ptr<std::thread>(new std::thread([&](){
    // 获取线程的tid值
    tid_ = CurrentThread::tid();
    sem_post(&sem); // sem + 1
    // 开启一个新线程
    func_(); // 开启一个新线程,专门执行该线程函数
  }));  

  // 等待新创建线程的tid值, 这里会直接阻塞住
  sem_wait(&sem); // sem - 1
}

主线程等待从线程。

void Thread::join()
{
    joined_ = true;
    thread_->join();
}

startLoop 与 threadFunc

startLoop 的终极目标就是创建一个新线程然后里面有着EventLoop,然后返回给上层

startLoop是初始化底层的Thread函数,创建新线程,执行新的线程函数threadFunc,threadFunc 会创建 EventLoop 并启动loop事件循环

而startLoop 返回的EventLoop* 的时候需要保证底层的Thread已经没问题并且loop。

EventLoop *EventLoopThread::startLoop() // 这里才创建新线程
{
    // 启动底层的线程
    thread_.start(); // start 里面的func_ 就是 EventLoopThread::threadFunc,也就是下面的函数得执行完才到我
    EventLoop *loop = nullptr;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        while (loop_ == nullptr)
        {
            cond_.wait(lock); // 表示底层的线程都没创建好
        }
        loop = loop_;
    }
    return loop;
}

// 这个是新线程执行的函数
void EventLoopThread::threadFunc()
{
    EventLoop loop; // one loop per thread
    if (callback_)
    {
        callback_(&loop);
    }
    {
        std::unique_lock<std::mutex> lock(mutex_);
        loop_ = &loop;
        cond_.notify_one();
    }
    loop.loop();// 新线程就在这里调用epoll_wait,EventLoop 的 loop 函数,即开启了loop
    std::unique_lock<std::mutex> lock(mutex_);
    loop_ = nullptr; // 这里表示底层的事件已经结束了,这里是彻底Reactor 都不运行才会到这里
}

EventLoopThreadPool

EventLoop* baseLoop_; // EventLoop loop; 用户创建的一开始的Loop
std::string name_;
bool started_;
int numThreads_;
int next_;  // 用来轮询派发任务的
std::vector<std::unique_ptr<EventLoopThread>> threads_; // 封装了批量的EventLoopThread
std::vector<EventLoop*> loops_;
EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg)
    : baseLoop_(baseLoop), name_(nameArg), started_(false), numThreads_(0), next_(0)
{
}
void setThreadNum(int numThreads){numThreads_ = numThreads;}

start 首先对于所有的EventLoopThread 进行设置名字。然后调用线程初始化函数初始化baseLoop_

void EventLoopThreadPool::start(const ThreadInitCallback &cb = ThreadInitCallback())
{
    started_ = true;
    for (int i = 0; i < numThreads_; ++i)
    {
        char buf[name_.size() + 32];
        snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
        EventLoopThread *t = new EventLoopThread(cb, buf);
        // vector<std::unique_ptr<EventLoopThread>>
        threads_.push_back(std::unique_ptr<EventLoopThread>(t));
        // vector<EventLoop*>
        loops_.push_back(t->startLoop()); // 底层创建线程,绑定一个新的EventLoop,并返回还loop的地址
    }
    // 整个服务端只有一个线程运行着baseLoop
    if (numThreads_ == 0 && cb)
    {
        cb(baseLoop_);
    }
}

getNextLoop

// 如果是多线程,baseLoop_默认以轮询的方式分配Channel给subLoop
EventLoop *EventLoopThreadPool::getNextLoop()
{
    // IO线程运行baseLoop_;
    EventLoop *loop = baseLoop_;
    // 若没有设置底层的线程数,返回的永远是baseLoop_;
    // 工作线程若是有,就在这里,获取下一个处理事件的loop。
    if (!loops_.empty())
    {
        loop = loops_[next_];
        ++next_;
        // 相当于 循环的数组 next_ 这样走
        if (next_ >= loops_.size())
        {
            next_ = 0;
        }
    }
    return loop;
}

Socket


是为了讲述Accept 而用到的类,TcpConnection也会使用到。

成员变量只有一个,就是sockfd_。

const int sockfd_;

成员变量就是填入了InetAddress,其实上也就是封装了一个对象。

int Socket::accept(InetAddress *peeraddr)
{
    /**
     * 参数不合法
     * 对返回的connfd 没有设置非阻塞
     * poller + non-blocking IO
    */
    struct sockaddr_in addr;
    bzero(&addr, sizeof addr);
    socklen_t len = sizeof addr; // accept 必须初始化 
    // accept4 可以让返回的connfd设置flag是nonblock
    int connfd = ::accept4(sockfd_, (sockaddr *)&addr, &len,SOCK_NONBLOCK| SOCK_CLOEXEC);
    if (connfd >= 0)
    {
        peeraddr->setSockAddr(addr);
    }
    return connfd;
}

TcpConnection


TcpConnection 封装了Channel, TcpConnnection的私有函数实际上就是被注册到Channel当中,当检测到事件的时候实际上调用的就是TcpConnection传递的方法。
用户定义的消息回调会在TcpConnection当中的读到内容的时候进行调用回调。
在这里插入图片描述
TcpServer会在接受到一个新连接的时候调用用户的connectionEstablished函数。

在这里插入图片描述

Acceptor MainReactor 核心


acctpor 里面的EventLoop 是MainLoop,他也会打包成一个Channel 给到mainLoop 当中。

newConnectionCallback_ 就是 TcpServer的构造函数里面初始化了

acceptor_->setNewConnectioCallback(std::bind(&TcpServer::newConnection,this,std::placeholders::_1,std::placeholders::_2)); 

成员变量:

EventLoop* loop_; // Acceptor 用的就是用户定义的baseLoop,也是mainLoop
Socket acceptSocket_;
Channel acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listenning_;

newConnection 这个函数就是主要就是选择一个subLoop,这里面的ioLoop,然后将就绪事件传给他。

// 新的客户端会调用这里
void TcpServer::newConnection(int sockfd,const InetAddress& peerAddr)
{
    // 轮询算法选择一个subLoop,来管理channel
    EventLoop* ioLoop = threadPool_->getNextLoop();
    char buf[64] = {} ;
    snprintf(buf,sizeof buf,"-%s#%d",ipPort_.c_str(),nextConnId_);
    ++ nextConnId_;// 不是atomic是因为这个函数在主线程运行
    std::string connName = name_ + buf;
    LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
             name_.c_str(),connName.c_str(),peerAddr.toIpPort().c_str());

    // 通过sockfd获取其绑定的本机的ip地址和端口信息。
    sockaddr_in local;
    ::bzero(&local,sizeof local);
    socklen_t addrlen = sizeof local;
    if(::getsockname(sockfd,(sockaddr*)&local,&addrlen) < 0)
    {
        LOG_ERROR("sockets::getLocalAddr");
    }
    InetAddress localAddr(local);

    TcpConnectionPtr conn(new TcpConnection(
        ioLoop,
        connName,
        sockfd,
        localAddr,
        peerAddr
    ));

    connections_[connName] = conn;
    // 下面的回调都是用户设置给TcpServer-> TcpConnection->Channel->poller->notify channel调用回调
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);

    // 设置了如何关闭连接的回调
    conn->setCloseCallback(
        std::bind(&TcpServer::removeConnection,this,std::placeholders::_1)
    );
    // 直接调用TcpConnection::connectionEstablished
    ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished,conn));
}

构造函数定义了accpetChannel 相关的读事件就绪,应证了回调函数的执行都在Channel模块。

Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
    : loop_(loop), acceptSocket_(createNonblocking()),
      acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
{
    acceptSocket_.setReuseAddr(true);
    acceptSocket_.ReusePort(true);
    acceptSocket_.bindAddress(listenAddr);
    // TcpServer::start() Accept.listen
    acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

这里调用handleRead就是执行回调方法,因为AcctorChannel 有事件就绪只要就是acceptfd上有事件就绪了。

两个回调函数,一个是accept上来需要调用的,首先把连接获取上来; 下面的就是另一个回调,就是启动subLoop接受这个connfd,接下来newConnectionCallback_里面会调用将TcpConnection所需要的函数都绑定到TcpConnection,然后TcpConnection往Channel的回调方法也会调用到其中的方法。

// listenfd有事件链接,就是有新用户链接了
void Acceptor::handleRead()
{
    InetAddress peerAddr;
    int connfd = acceptSocket_.accept(&peerAddr);
    if (connfd >= 0)
    {
        if (newConnectionCallback_)
        {
            //void TcpServer::newConnection(int sockfd,const InetAddress& peerAddr)
            newConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop,唤醒,分发当前新客户端的Channel
        }
        else
        {
            ::close(connfd);
        }
    }
    else
    {
        LOG_ERROR("%s:%s:%d accept error:%d\n", __FILE__, __FUNCTION__, __LINE__, errno);
        // 资源用完
        if (errno == EMFILE)
        {
            // 可以调整当前进程的文件描述符上线
            LOG_ERROR("%s:%s:%d sockfd reached limit!\n", __FILE__, __FUNCTION__, __LINE__);
        }
    }
}

也就是linsten sock 有事件就绪,就会调用Accept::handleRead 方法读取,然后调用EvnetLoopThreadPoll里面的getNextLoop 然后选择一个ioLoop 把新的connfd打包成Channel 发送给他,并且将用户设置的方法TcpServer.cc保存,后续connfd统一都进行回调注册。 因为listen sock 的处理方法是固定的,用户传的方法,都是监听的事件就绪后的业务处理。

newConnection 这个函数,设置了用户的事件就绪后需要做什么。

// 下面的回调都是用户设置给TcpServer-> TcpConnection->Channel->poller->notify channel调用回调
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);

在这里插入图片描述

博主简化的版本,只用于学习,不可用于商用。
https://gitee.com/wuyi-ljh/somebloglinkstoshare 附带一张思维导图。

总结


个人认为比较重要的模块都有叙述,差的部分会在后续更新,gitee里面的思维导图也会持续完善,并且会梳理一下整体的逻辑。可以关注一下,以防丢失~

借鉴:
长文梳理Muduo库核心代码及优秀编程细节剖析
muduo学习笔记:base部分之CurrentThread命名空间与Thread类

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐