Linux系统编程+网络编程项目总结
webserver项目开发——项目来自牛客网项目下载地址:https://github.com/gaojingcome/WebServer(ps:关于环境要求和项目启动的配置要求也在链接中)根据该项目官方说法的功能如下:利用IO复用技术Epoll与线程池实现多线程的Reactor高并发模型;利用正则与状态机解析HTTP请求报文,实现处理静态资源的请求;利用标准库容器封装char,实现自动增长的缓冲
webserver服务器项目开发——项目来自牛客网
项目下载地址:https://github.com/gaojingcome/WebServer
(ps:关于环境要求和项目启动的配置要求也在链接中)
根据该项目官方说法的功能如下:
- 利用IO复用技术Epoll与线程池实现多线程的Reactor高并发模型;
- 利用正则与状态机解析HTTP请求报文,实现处理静态资源的请求;
- 利用标准库容器封装char,实现自动增长的缓冲区;
- 基于小根堆实现的定时器,关闭超时的非活动连接;
- 利用单例模式与阻塞队列实现异步的日志系统,记录服务器运行状态;
- 利用RAII机制实现了数据库连接池,减少数据库连接建立与关闭的开销,同时实现了用户注册登录功能。
- 增加logsys,threadpool测试单元(todo: timer, sqlconnpool, httprequest, httpresponse)
首先第一步肯定是分析主函数,然后层层分析开来,如下为目录下的main.cpp函数
#include <unistd.h>
#include "server/webserver.h"
int main() {
/* 守护进程 后台运行 */
//daemon(1, 0);
WebServer server(
1316, 3, 60000, false, /* 端口 ET模式 timeoutMs 优雅退出 */
3306, "root", "root", "riven", /* Mysql配置 用户名,密码,数据库名称*/
12, 6, true, 1, 1024); /* 连接池数量 线程池数量 日志开关 日志等级 日志异步队列容量 */
server.Start();
}
由注释可以很清楚看出,实例化了一个WebServer对象并设置了连接的一些配置,然后就是程序的启动,接下来我们看Start()函数,该函数在webserver.cpp中,也就是主要功能的代码部分,首先先了解一下webserver.h中事先的定义声明,下面只展示了部分变量的说明,函数会进一步分析
class WebServer {
public:
/*
端口,触发模式,延时,是否优雅关闭,数据库端口,数据库用户名,数据库密码,数据库名称,
连接线程池的数量,线程池线程的数量,是否打开日志,日志等级,日志异步队列的大小
*/
WebServer(
int port, int trigMode, int timeoutMS, bool OptLinger,
int sqlPort, const char* sqlUser, const char* sqlPwd,
const char* dbName, int connPoolNum, int threadNum,
bool openLog, int logLevel, int logQueSize);
~WebServer();
void Start();
private:
static const int MAX_FD = 65536; //文件描述符的最大数量
static int SetFdNonblock(int fd); //设置文件描述符非阻塞
int port_; //端口
bool openLinger_;//是否打开优雅关闭
int timeoutMS_; /* 毫秒MS */
bool isClose_; //线程池是否关闭
int listenFd_; //监听文件描述符
char* srcDir_; //资源的目录
uint32_t listenEvent_; //监听的文件描述符的事件
uint32_t connEvent_; //连接的文件描述符的事件
std::unique_ptr<HeapTimer> timer_; //定时器
std::unique_ptr<ThreadPool> threadpool_; //线程池
std::unique_ptr<Epoller> epoller_; //epoll对象
std::unordered_map<int, HttpConn> users_; //保存的客户端连接的信息
};
几点说明:
1、按照posix标准,一般整形对应的*_t类型为:
1字节 uint8_t
2字节 uint16_t
4字节 uint32_t
8字节 uint64_t
2、unique_ptr 是 C++ 11 提供的用于防止内存泄漏的智能指针中的一种实现,独享被管理对象指针所有权的智能指针。unique_ptr对象包装一个原始指针,并负责其生命周期。当该对象被销毁时,会在其析构函数中删除关联的原始指针。unique_ptr具有->和*运算符重载符,因此它可以像普通指针一样使用。unique_ptr分别定义了HeapTimer、ThreadPool、Epoller类型的指针,通过容器unordered_map保存int类型的文件描述符和HttpConn类型的数据集合。
接下来看这个start()函数
void WebServer::Start() {
int timeMS = -1; /* epoll wait timeout == -1 无事件将阻塞 */
if(!isClose_) { LOG_INFO("========== Server start =========="); }
while(!isClose_) {
if(timeoutMS_ > 0) {
timeMS = timer_->GetNextTick();//传入超时时间
}
//统计事件数量 epoller_->Wait本质就是封装的epoll_wait
int eventCnt = epoller_->Wait(timeMS);//超时才阻塞
for(int i = 0; i < eventCnt; i++) {
/* 处理事件 */
int fd = epoller_->GetEventFd(i);
uint32_t events = epoller_->GetEvents(i);
//如果是监听描述符,接收客户端的连接
if(fd == listenFd_) {
DealListen_();
}
//出错信号,则关闭连接
else if(events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
assert(users_.count(fd) > 0);
CloseConn_(&users_[fd]);
}
//有数据,处理读操作:采用同步的Reactor模式:将任务交给线程池的队列中
else if(events & EPOLLIN) {
assert(users_.count(fd) > 0);
DealRead_(&users_[fd]);
}
//有数据,处理写操作
else if(events & EPOLLOUT) {
assert(users_.count(fd) > 0);
DealWrite_(&users_[fd]);
} else {
LOG_ERROR("Unexpected event");
}
}
}
}
看这段程序启动函数,首先定义并初始化了epoll延迟时间为-1,表示没有事件读取会阻塞住,然后以是否关闭标志写入服务器开启的日志标识,并以是否关闭进入事件循环,这里是传入超时时间,关于具体延时是怎么实现会在下面定时器功能说明,接着以epoll_wait的形式传入延时并统计出传入的事件数量,然后开始遍历每个事件。遍历中定义文件描述符由epoll指向得到文件描述符函数,定义事件通过得到事件函数,判断分支如果是监听描述符,接收客户端的连接;如果是出错信号,则关闭连接;有输入数据,处理读操作:采用同步的Reactor模式:将任务交给线程池的队列中;有输出数据,处理写操作:采用同步的Reactor模式:将任务交给线程池的队列中;否则日志输出Unexpected event。
**
利用IO复用技术Epoll与线程池实现多线程的Reactor高并发模型
**
首先介绍IO多路复用技术中的epoll,IO多路复用的作用是使得程序能同时监听多个文件描述符,能提高程序的性能,与多线程和多进程相比,I/O 多路复用的最大优势是系统开销小,系统不需要建立新的进程或者线程,也不必维护这些线程和进程。在Linux系统下实现调用有三种方式:select、poll、epoll,且都在内核区中完成。
(以下文件描述符全部用 fd 表示)
select:
该方法需要至少两个fd,一个fd的列表集合,该集合中设置需要用的fd,而另一个就是必须的临时fd,专门负责监听,首先从用户态拷贝到内核态,调用系统函数select对fd的列表进行扫描,直到这些fd中的一个或多个进行IO操作,也就是有数据时,该函数返回,再从内核态拷贝回用户态,返回时告诉进程有哪些描述符需要IO操作。
处理文件描述符函数:FD_CLR(清除fd)、FD_ISSET(fd是否在集合里)、FD_SET(新增fd)、FD_ZERO(清除所有fd的集合)
缺点:① fd的最大数量限制在1024 ② 每次调用select都需要从用户态拷贝到内核区
③ 每次调用select都需要在内核区遍历每个fd ④ fd集合不能重复使用,每次都需要重置
poll:
该方法和select的方法大致相同,不同的是将fd的集合替换成了数组,将需要监听的文件描述符和事件优化到了结构体中。
struct pollfd{
int fd;
short events;//事件
short revents;//返回的事件
};
事件活动有POLLIN、POLLOUT等,同样的,先设置fd的数组,将fd[0]设置为负责监听的fd,同样的,调用系统函数poll,让内核区帮忙检测哪些fd有数据,解决了select的缺点①和④,解决了①很明显,将集合改为了数组,最大1024数量不再有限制,解决了④是因为检测的事件在events里,而返回的由内核区检测的事件在revents,并不会改变events里原有的事件,但是缺点②③还是存在问题。
epoll:
接下来就是重头戏,也是用的最多的epoll,采用的是红黑树结构,直接在内核区创建了结构体
struct eventpoll{
...
struct rb_root rbr;//根
struct list_head rdlist;//相当于双向链表
...
};
在内核区中,调用epoll_ctl函数添加不同的fd(包括监听fd)到rbr中,然后调用epoll_wait()函数检测哪些fd有数据,并保存在rdlist中。关于就绪列表如何维护?给内核中断处理程序注册一个回调函数。这里提到了两个处理函数,另外还有epoll_create、epoll_data、epoll_event、epoll_ctl(对fd进行增删改)。关于epoll 的工作模式分为两种:(默认)LT模式(水平触发)和ET模式(边沿触发)。
LT模式:
LT模式缺省的工作方式,这种做法中内核会告诉一个fd是否就绪,然后进行IO操作,若不做任何操作,内核也会通知。这种模式同时支持阻塞和非阻塞。简单来说,只要有数据,LT模式都会通知,直到全部读完。
ET模式:
只支持非阻塞模式,这种模式下,当fd就绪时内核发通知,直到某些通知导致fd不再就绪,不作继续IO操作时,内核就不会通知,全部结束才会通知。简单来说,有数据不读或者只读一部分,LT模式不通知,只有一次性读完才会通知。
介绍完了基本概念知识,下面看看代码实现部分的具体流程,这里先看epoll.h文件
class Epoller {
public:
explicit Epoller(int maxEvent = 1024);
~Epoller();
bool AddFd(int fd, uint32_t events);//添加要检测的事件
bool ModFd(int fd, uint32_t events);//修改事件
bool DelFd(int fd);//删除事件
int Wait(int timeoutMs = -1);//调用内核检测
int GetEventFd(size_t i) const;//根据数字索引号 找到对应事件的fd
uint32_t GetEvents(size_t i) const;//获取事件
private:
int epollFd_; //epoll_create()创建一个epoll对象,返回值就是epollFd
std::vector<struct epoll_event> events_; //检测到的事件的集合
};
几点说明:
首先是explicit关键字,用于构造函数中,防止类构造函数的隐式自动转换。
其次是用vector容器装的epoll_event结构体类型的集合,用来保存检测到的事件。
接下来看看epoll.cpp的相关函数以及epoll的代码具体实现(主要分析以代码注释呈现,具体epoll实现模式重点在后面):
//构造函数:传入参数为最大事件数,
//对类内成员epollFd_和events_分别初始化,其中调用函数epoll_create创建了512个
Epoller::Epoller(int maxEvent):epollFd_(epoll_create(512)), events_(maxEvent){
assert(epollFd_ >= 0 && events_.size() > 0);//与优雅退出有关,暂不考虑深入研究
}
//析构函数
Epoller::~Epoller() {
close(epollFd_);//关闭epoll文件描述符
}
//是否添加文件描述符
bool Epoller::AddFd(int fd, uint32_t events) {
if(fd < 0) return false;//fd合法性检验
epoll_event ev = {0};//根据epoll_event结构体定义变量
ev.data.fd = fd;//赋值要添加数据的fd
ev.events = events;//赋值要添加数据的事件
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev);//对epollFd_调用epoll_ctl实现添加操作
}
//是否修改文件描述符
bool Epoller::ModFd(int fd, uint32_t events) {
if(fd < 0) return false;//fd合法性检验
epoll_event ev = {0};
ev.data.fd = fd;
ev.events = events;
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_MOD, fd, &ev);//对epollFd_调用epoll_ctl实现修改操作
}
//是否删除文件描述符
bool Epoller::DelFd(int fd) {
if(fd < 0) return false;
epoll_event ev = {0};//由于是删除,这里只需要结构体变量,不需要赋值
return 0 == epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, &ev);//对epollFd_调用epoll_ctl实现删除操作
}
//epoll_wait
int Epoller::Wait(int timeoutMs) {
//根据超时时间返回,调用epoll_wait检测函数,传回fd
//传入参数依次为:epoll绑定的fd 初始化事件 事件的大小 epoll_wait阻塞等待的延迟时间
return epoll_wait(epollFd_, &events_[0], static_cast<int>(events_.size()), timeoutMs);
}
//获得事件文件描述符
int Epoller::GetEventFd(size_t i) const {
assert(i < events_.size() && i >= 0);
return events_[i].data.fd;
}
//获得事件
uint32_t Epoller::GetEvents(size_t i) const {
assert(i < events_.size() && i >= 0);
return events_[i].events;
}
回过来继续看我们刚刚webserver.cpp里的start函数,
由于一旦进入循环,接收到没有关闭的信号根据文件描述符和事件开始,首先判断是否属于监听描述符,如果是,进入DealListen_()函数
//处理监听
void WebServer::DealListen_() {
struct sockaddr_in addr;//保存客户端的连接信息
socklen_t len = sizeof(addr);//获取地址长度
//do...while:至少执行一次
do {
int fd = accept(listenFd_, (struct sockaddr *)&addr, &len);//ET模式下也是非阻塞的
if(fd <= 0) { return;}
//判断最大数量
else if(HttpConn::userCount >= MAX_FD) {
SendError_(fd, "Server busy!");
LOG_WARN("Clients is full!");
return;
}
AddClient_(fd, addr);
} while(listenEvent_ & EPOLLET);
}
这里首先定义了结构体,用于保存客户端的连接信息,(socklen_t是一种数据类型,它其实和int差不多,在32位机下,size_t和int的长度相同,都是32 bits,但在64位机下,size_t(32bits)和int(64 bits)的长度是不一样的,socket编程中的accept函数的第三个参数的长度必须和int的长度相同。于是便有了socklen_t类型)。然后用了个do…while循环,众所周知,循环体中至少需要执行一次,循环条件为监听事件与上ET模式,也就是说,至少要进入一次监听文件描述符,循环体中可以看到,定义了TCP协议下客户端的连接请求fd,用于接收客户端连接,默认阻塞,并对数量进行合理性检测,将fd和地址添加进客户端。其中有个AddClient函数。
//添加客户端
void WebServer::AddClient_(int fd, sockaddr_in addr) {
assert(fd > 0);
users_[fd].init(fd, addr);//初始化键对值信息
//超时时间
if(timeoutMS_ > 0) {
//根据超时时间判断关闭连接
timer_->add(fd, timeoutMS_, std::bind(&WebServer::CloseConn_, this, &users_[fd]));
}
epoller_->AddFd(fd, EPOLLIN | connEvent_);//新连接的也要加入ET模式
SetFdNonblock(fd);//设置非阻塞
LOG_INFO("Client[%d] in!", users_[fd].GetFd());
}
首先根据客户端连接的信息初始化fd和地址,然后根据超时时间判断是否关闭连接(这里关于超时时间后续详细说明),根据epoll对象实现的AddFd函数,添加fd并加入ET模式读方式 或上 连接事件,设置非阻塞。这里设置非阻塞的函数也顺便说明一下:
int WebServer::SetFdNonblock(int fd) {
//assert(fd > 0);
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK);
//等价于
/*int flag = fcntl(fd, F_GETFD, 0);
flag |= O_NONBLOCK;
fcntl(fd, F_SETFL, flag);*/
}
上面的代码注释拆解不难看出,嵌套使用了fcntl函数(fcntl系统调用可以用来对已打开的文件描述符进行各种控制操作以改变已打开文件的的各种属性),先设置获取文件描述符,最后再设置文件状态加入非阻塞。到此监听事件结束,开始连接新的事件处理读或写。
接着是错误信号的处理:
事件出现EPOLLRDHUP或EPOLLHUP或EPOLLERR信号,则表示连接错误或异常,关闭连接。关闭功能实现如下:
//关闭连接
void WebServer::CloseConn_(HttpConn* client) {
assert(client);
LOG_INFO("Client[%d] quit!", client->GetFd());
epoller_->DelFd(client->GetFd());//删除错误的描述符
client->Close();//根据HttpConn类型删除这个对象
}
调用epoll对象的删除fd函数删除错误的文件描述符,根据HttpConn类型关闭连接。
继续进行读事件的操作:
进行写事件的操作:
//处理读
void WebServer::DealRead_(HttpConn* client) {
assert(client);
ExtentTime_(client);//有读则有通信,需要延长超时时间
threadpool_->AddTask(std::bind(&WebServer::OnRead_, this, client));//在子线程中添加read任务
}
//处理写
void WebServer::DealWrite_(HttpConn* client) {
assert(client);
ExtentTime_(client);//有写则有通信,需要延长超时时间
threadpool_->AddTask(std::bind(&WebServer::OnWrite_, this, client));//在子线程中添加write任务
}
//延长超时时间
void WebServer::ExtentTime_(HttpConn* client) {
assert(client);
if(timeoutMS_ > 0) { timer_->adjust(client->GetFd(), timeoutMS_); }
}
可以看到处理读和写操作时就有通信连接,则需要延长epoll_wait的超时时间。在延时函数中传入的参数是HttpConn类型的客户端,若延迟时间为正,通过HeapTime类型的智能指针,利用函数adjust(上一结点未超时,调整超时时间),关于这部分函数在超时部分详细分析。回过头看处理读或写函数,我们看到线程池类型的对象,首先看看线程池的头文件部分说明:
class ThreadPool {
public:
//线程池数:8 explicit:防止隐式转换 如 A a = 8; 是不通过的
explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) {
//assert作测试使用:如果它的条件返回错误,则终止程序执行
assert(threadCount > 0);
//创建threadCount个线程
for(size_t i = 0; i < threadCount; i++) {
//每个线程执行
std::thread([pool = pool_] {
std::unique_lock<std::mutex> locker(pool->mtx);//获取锁
while(true) {
if(!pool->tasks.empty()) {
auto task = std::move(pool->tasks.front());//获取第一个任务
pool->tasks.pop();//取出
locker.unlock();//解锁
task();//调用,执行任务
locker.lock();//加锁
}
//线程池为空
else if(pool->isClosed) break;//线程池关闭则break
else pool->cond.wait(locker);//否则阻塞等待
}
}).detach();//线程分离
}
}
//无参构造:默认
ThreadPool() = default;
ThreadPool(ThreadPool&&) = default;
~ThreadPool() {
if(static_cast<bool>(pool_)) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->isClosed = true;//关闭信号
}
pool_->cond.notify_all();//全部唤醒,走到循环的else if(pool->isClosed) break;
}
}
template<class F>
void AddTask(F&& task) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->tasks.emplace(std::forward<F>(task));//添加任务进队列
}
pool_->cond.notify_one();//唤醒阻塞的线程
}
private:
//结构体 池子
struct Pool {
std::mutex mtx; //互斥锁
std::condition_variable cond; //条件变量
bool isClosed; //是否关闭
std::queue<std::function<void()>> tasks;//队列(保存的是任务)
};
std::shared_ptr<Pool> pool_; //池子
};
首先可以看到定义了池子的结构体,池子内为了保证在任意时刻只允许一个线程访问共享数据的互斥锁(互斥量),结合互斥锁一起使用的条件变量,以达到满足某个条件时阻塞的要求,是否关闭的标志,还定义了保存任务信息的队列,(类模版std::function是一种通用、多态的函数封装。std::function的实例可以对任何可以调用的目标实体进行存储、复制、和调用操作,这些目标实体包括普通函数、Lambda表达式、函数指针、以及其它函数对象等。)最后定义了Pool结构体类型的共享指针,也就是我们服务器的线程池子,需要时取出使用,使用完毕放回池子。
然后我们看到,除了两个定义默认的无参构造,可以看到初始化定义了线程池为8,以:的形式传入了池子变量,通过make_shared在动态内存中分配一个对象并初始化,接着创建threadCount个线程,根据我们创建的池子变量代入,先获取互斥锁,进入循环,若没有任务则获取第一个任务,执行任务的同时对互斥锁进行加锁解锁,保证只有一个线程访问。若线程池为空,关闭线程池,停止线程池使用,否则阻塞等待。
这里有两点注意,第一点,构造函数用到了explicit关键字,为了防止隐式转换,第二点,每个线程执行时用到了lamda函数。
其次,析构函数中,关闭锁,关闭信号,最后全部唤醒,走到循环的else if(pool->isClosed) break。同样注意两点,第一点,static_cast主要用于类型转换,第二点,notify_all唤醒函数。
从关键函数AddTask中,首先看到为了保证读和写的复用性,使用了类模板,同样的,先将互斥锁解锁,然后通过emplace添加任务进队列,最后通过池子的条件变量使用notify_one函数唤醒阻塞的线程。
除此以外,还需注意lock_guard和unique_lock的使用,可以看到只有构造函数中,每个线程执行时使用了unique_lock,也是unique_lock可以随时加锁解锁,而且条件变量需要unique_lock类型作为参数,而另外使用lock_guard的情况无需以上条件,这也是二者的区别所在。
接续读写函数中,延时后,利用线程池的添加任务功能,功能实现中使用了bind函数绑定客户端,添加read或write任务,至此利用IO复用技术Epoll与线程池实现多线程的Reactor高并发模型部分大体分析完毕,进入http实现业务逻辑的部分。
利用标准库容器封装char,实现自动增长的缓冲区
先看读操作:
//子线程中执行
void WebServer::OnRead_(HttpConn* client) {
assert(client);
int ret = -1;
int readErrno = 0;
ret = client->read(&readErrno);//调用HttpConn类型读取客户端数据
if(ret <= 0 && readErrno != EAGAIN) {
CloseConn_(client);
return;
}
OnProcess(client);//业务逻辑的处理
}
void WebServer::OnProcess(HttpConn* client) {
//调用客户端处理
//处理成功
if(client->process()) {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);//修改fd,监听是否可写
} else {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLIN);
}
}
代码中看到在子线程中执行的OnRead_函数,先调用HttpConn类型读取客户端数据,如果返回值为-1或者出现读错误,关闭客户端,如果正常读取,则调用OnProcess进行业务逻辑的处理。再来看看OnProcess函数,因为代码走到这里则表示调用客户端处理成功,然后根据process函数判断,通过epoll对象处理的修改fd,监听是否可写或可读。为了进一步分析read和process函数,先看httpconn.h/.cpp:
class HttpConn {
public:
HttpConn();
~HttpConn();
void init(int sockFd, const sockaddr_in& addr);
ssize_t read(int* saveErrno);
ssize_t write(int* saveErrno);
void Close();
int GetFd() const;
int GetPort() const;
const char* GetIP() const;
sockaddr_in GetAddr() const;
bool process();
int ToWriteBytes() {
return iov_[0].iov_len + iov_[1].iov_len;
}
bool IsKeepAlive() const {
return request_.IsKeepAlive();
}
static bool isET;
static const char* srcDir; //资源的目录
static std::atomic<int> userCount; //所有连接的客户端数量
private:
int fd_;
struct sockaddr_in addr_;
bool isClose_;
int iovCnt_;//内存块的数量
struct iovec iov_[2];//分配两块内存(读写都是两块)
Buffer readBuff_; // 读缓冲区(请求)
Buffer writeBuff_; // 写缓冲区(响应)
HttpRequest request_;//封装请求
HttpResponse response_;//封装响应
};
由于考虑的是缓冲区,所以这里只考虑定义了内存块的数量,分配了两块内存(可增长),读缓冲区和写缓冲区,这里看到了另一个类Buffer
更多推荐
所有评论(0)