Linux高性能服务器编程springsnail源码解析(c++负载均衡)
在《Linux高性能服务器编程》一书中为了帮助大家将书中的知识融汇到实际项目中,作者特意编写了一个较为完整的负载均衡服务器程序springsnail。里面用到了进程池、有限状态机这些知识点。自己是在网上找到的这个程序的源码,起初接触的时候,总共1400多行代码里面没有一行注释,网上也没有一篇博客去对这个项目进行解析,因此写这篇博客供大家一起分享学习。咱们从main函数开始,我们要介绍几个c语言..
在《Linux高性能服务器编程》一书中为了帮助大家将书中的知识融汇到实际项目中,作者特意编写了一个较为完整的负载均衡服务器程序springsnail。里面用到了进程池、有限状态机这些知识点。自己是在网上找到的这个程序的源码,起初接触的时候,总共1400多行代码里面没有一行注释,网上也没有一篇博客去对这个项目进行解析,因此写这篇博客供大家一起分享学习。springsnail源码(带注释)
咱们从main函数开始,我们要介绍几个c语言中的两个宏
__FILE__:用以指示本行语句所在源文件的文件名
__LINE__:用以指示本行语句在源文件中的位置信息
这两个参数在后面咱们程序的调试中有着非常正要的作用。并且整个程序的日志系统也是依靠这两个宏和可变参的log()函数完成的。
接着大家需要了解getopt函数这个函数可以通过提取命令行参数对命令行参数进行解析。
接着后面是解析xml文件的一长串代码
vector< host > balance_srv; //host在前面的mgr.h文件中定义 //一个是负载均衡服务器,另一个是逻辑服务器
vector< host > logical_srv;
host tmp_host;
memset( tmp_host.m_hostname, '\0', 1024 );
char* tmp_hostname;
char* tmp_port;
char* tmp_conncnt;
bool opentag = false;
char* tmp = buf; //此时tem指向config.xml文件的内容
char* tmp2 = NULL;
char* tmp3 = NULL;
char* tmp4 = NULL;
while( tmp2 = strpbrk( tmp, "\n" ) ) //在源字符串tmp中找出最先含有搜索字符串"\n"中任一字符的位置并返回,若没找到则返回空指针
{
*tmp2++ = '\0';
if( strstr( tmp, "<logical_host>" ) ) //strstr(str1,str2)函数用于判断字符串str2是否是str1的子串。如果是,则该函数返回str2在str1首次出现的地址,否则返回null
{
if( opentag )
{
log( LOG_ERR, __FILE__, __LINE__, "%s", "parse config file failed" );
return 1;
}
opentag = true;
}
else if( strstr( tmp, "</logical_host>" ) ) // 有/ 符号
{
if( !opentag )
{
log( LOG_ERR, __FILE__, __LINE__, "%s", "parse config file failed" );
return 1;
}
logical_srv.push_back( tmp_host );
memset( tmp_host.m_hostname, '\0', 1024 );
opentag = false;
}
else if( tmp3 = strstr( tmp, "<name>" ) )
{
tmp_hostname = tmp3 + 6; //将tmp_hostname指针指向<name>后面的IP地址的首个地址
tmp4 = strstr( tmp_hostname, "</name>" );
if( !tmp4 )
{
log( LOG_ERR, __FILE__, __LINE__, "%s", "parse config file failed" );
return 1;
}
*tmp4 = '\0';
memcpy( tmp_host.m_hostname, tmp_hostname, strlen( tmp_hostname ) );
}
else if( tmp3 = strstr( tmp, "<port>" ) )
{
tmp_port = tmp3 + 6;
tmp4 = strstr( tmp_port, "</port>" );
if( !tmp4 )
{
log( LOG_ERR, __FILE__, __LINE__, "%s", "parse config file failed" );
return 1;
}
*tmp4 = '\0';
tmp_host.m_port = atoi( tmp_port );
}
else if( tmp3 = strstr( tmp, "<conns>" ) )
{
tmp_conncnt = tmp3 + 7;
tmp4 = strstr( tmp_conncnt, "</conns>" );
if( !tmp4 )
{
log( LOG_ERR, __FILE__, __LINE__, "%s", "parse config file failed" );
return 1;
}
*tmp4 = '\0';
tmp_host.m_conncnt = atoi( tmp_conncnt );
}
else if( tmp3 = strstr( tmp, "Listen" ) )
{
tmp_hostname = tmp3 + 6;
tmp4 = strstr( tmp_hostname, ":" );
if( !tmp4 )
{
log( LOG_ERR, __FILE__, __LINE__, "%s", "parse config file failed" );
return 1;
}
*tmp4++ = '\0';
tmp_host.m_port = atoi( tmp4 );
memcpy( tmp_host.m_hostname, tmp3, strlen( tmp3 ) );
balance_srv.push_back( tmp_host );
memset( tmp_host.m_hostname, '\0', 1024 );
}
tmp = tmp2;
}
这段代码使用了有限状态机的思想,当我们遇到< logical_host >的时候我们将optag设置为true;代表开始解析,然后通过\n来对buf中所保存的信息进行处理,从而读入所要代理的服务器的ip地址端口号以及所要建立的连接数。
遇到< /logical_host >时,代表一个服务器的信息已经读完,然后将optag设置为false。如此往复直到读到buf的结尾。(Listen的读取大家可以自行查看)。
需要说的是从这种方法是比较死的如果标签中多个空格咱们的程序就无法去解析它,并且源码附带xml格式Listen部分是不正确的。因此推荐大家可以了解一下tinyxml这个库,用它来解析咱们的xml文件会方便许多(当然是更改正确后的xml文件)。
接着通过将processpool的构造函数设置为私有,然后通过一个静态的方法去调用这个构造函数从而实现了一个单例模式,也就是说无论用户用这个类去构造多少个对象,这些对象都是同一个,保证了任务只能由一个对象去实现。
接着咱们对各个类进行介绍,刚开始可能会有些云里雾里,其实很正常,等对代码有整体有了一个认识之后就知道了。
process
//子进程类
class process
{
public:
process() : m_pid( -1 ){}
public:
int m_busy_ratio; //给每台实际处理服务器(业务逻辑服务器)分配一个加权比例
pid_t m_pid; //目标子进程的PID
int m_pipefd[2]; //父进程和子进程通信用的管道,父进程给子进程通知事件,子进程给父进程发送加权比
};
这个类在后面会被包含在processpool类当中,用来保存子进程的一些信息
processpool:
template< typename C, typename H, typename M >
class processpool
{
private:
processpool( int listenfd, int process_number = 8 );
public:
static processpool< C, H, M >* create( int listenfd, int process_number = 8 )
{
if( !m_instance ) //单件模式
{
m_instance = new processpool< C, H, M >( listenfd, process_number );
}
return m_instance;
}
~processpool()
{
delete [] m_sub_process;
}
//启动进程池
void run( const vector<H>& arg );
private:
void notify_parent_busy_ratio( int pipefd, M* manager ); //获取目前连接数量,将其发送给父进程
int get_most_free_srv(); //找出最空闲的服务器
void setup_sig_pipe(); //统一事件源
void run_parent();
void run_child( const vector<H>& arg );
private:
static const int MAX_PROCESS_NUMBER = 16; //进程池允许最大进程数量
static const int USER_PER_PROCESS = 65536; //每个子进程最多能处理的客户数量
static const int MAX_EVENT_NUMBER = 10000; //EPOLL最多能处理的的事件数
int m_process_number; //进程池中的进程总数
int m_idx; //子进程在池中的序号(从0开始)
int m_epollfd; //当前进程的epoll内核事件表fd
int m_listenfd; //监听socket
int m_stop; //子进程通过m_stop来决定是否停止运行
process* m_sub_process; //保存所有子进程的描述信息
static processpool< C, H, M >* m_instance; //进程池静态实例
};
static int EPOLL_WAIT_TIME = 5000; //eppll的超时值
static int sig_pipefd[2]; //用于处理信号的管道,以实现统一事件源,后面称之为信号管道
static void sig_handler( int sig ) //时间处理函数,将捕获的信号通过sig_pipefd发送给调用的进程
{
int save_errno = errno;
int msg = sig;
send( sig_pipefd[1], ( char* )&msg, 1, 0 );
errno = save_errno;
}
static void addsig( int sig, void( handler )(int), bool restart = true ) //设置信号处理函数
{
struct sigaction sa;
memset( &sa, '\0', sizeof( sa ) );
sa.sa_handler = handler;
if( restart )
{
sa.sa_flags |= SA_RESTART; //重新调用被该信号终止的系统调用
}
sigfillset( &sa.sa_mask ); //????
assert( sigaction( sig, &sa, NULL ) != -1 );
}
进程池类,内部有整体的一个框架,通过这个类将其他类整合到一起
run_child( )
template< typename C, typename H, typename M >
void processpool< C, H, M >::run_child( const vector<H>& arg )
{
setup_sig_pipe(); //注册统一事件源
int pipefd_read = m_sub_process[m_idx].m_pipefd[1];
add_read_fd( m_epollfd, pipefd_read );
epoll_event events[ MAX_EVENT_NUMBER ];
M* manager = new M( m_epollfd, arg[m_idx] ); //此处实例化一个mgr类的对象
assert( manager );
int number = 0;
int ret = -1;
while( ! m_stop )
{
number = epoll_wait( m_epollfd, events, MAX_EVENT_NUMBER, EPOLL_WAIT_TIME ); //监听m_epollfd上是否有事件
if ( ( number < 0 ) && ( errno != EINTR ) )//错误处理
{
log( LOG_ERR, __FILE__, __LINE__, "%s", "epoll failure" );
break;
}
if( number == 0 ) //在EPOLL_WAIT_TIME指定事件内没有事件到达时返回0
{
manager->recycle_conns();
continue;
}
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
if( ( sockfd == pipefd_read ) && ( events[i].events & EPOLLIN ) ) //是父进程发送的消息(通知有新的客户连接到来)
{
int client = 0;
ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 );
if( ( ( ret < 0 ) && ( errno != EAGAIN ) ) || ret == 0 ) //recv失败
{
continue;
}
else //建立和客户端的连接
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( m_listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
if ( connfd < 0 )
{
log( LOG_ERR, __FILE__, __LINE__, "errno: %s", strerror( errno ) );
continue;
}
add_read_fd( m_epollfd, connfd ); //将客户端文件描述符connfd上的可读事件加入内核时间表
C* conn = manager->pick_conn( connfd ); //获取一个空闲的连接
if( !conn )
{
closefd( m_epollfd, connfd );
continue;
}
conn->init_clt( connfd, client_address ); //初始化客户端信息
notify_parent_busy_ratio( pipefd_read, manager );
}
}
//处理自身进程接收到的信号
else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) )
{
int sig;
char signals[1024];
ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );
if( ret <= 0 )
{
continue;
}
else
{
for( int i = 0; i < ret; ++i )
{
switch( signals[i] )
{
case SIGCHLD:
{
pid_t pid;
int stat;
while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 ) //等收集退出的子进程,由于设置了WNOHANG因此不等待
{
continue;
}
break;
}
case SIGTERM: //退出该进程
case SIGINT:
{
m_stop = true;
break;
}
default:
{
break;
}
}
}
}
}
else if( events[i].events & EPOLLIN )//有sockfd上有数据可读
{
RET_CODE result = manager->process( sockfd, READ );
switch( result )
{
case CLOSED:
{
notify_parent_busy_ratio( pipefd_read, manager );
break;
}
default:
break;
}
}
else if( events[i].events & EPOLLOUT ) //有事件可写(只有sockfd写缓冲满了或者给某个sockfd注册O_EPOLLOUT才会触发)
{
RET_CODE result = manager->process( sockfd, WRITE );
switch( result ) //根据返回的状态进行处理
{
case CLOSED:
{
notify_parent_busy_ratio( pipefd_read, manager );
break;
}
default:
break;
}
}
else
{
continue;
}
}
}
close( pipefd_read );
close( m_epollfd );
}
host:
class host
{
public:
char m_hostname[1024]; //保存ip地址
int m_port; //保存端口号
int m_conncnt; //连接数
};
保存连接信息,包含在mgr类中
mgr
class mgr
{
public:
mgr( int epollfd, const host& srv ); //在构造mgr的同时调用conn2srv和服务端建立连接
~mgr();
int conn2srv( const sockaddr_in& address ); //和服务端建立连接同时返回socket描述符
conn* pick_conn( int cltfd ); //从连接好的连接中(m_conn中)拿出一个放入任务队列(m_used)中
void free_conn( conn* connection ); // 释放连接(当连接关闭或者中断后,将其fd从内核事件表删除,并关闭fd),并将同srv进行连接的放入m_freed中
int get_used_conn_cnt(); //获取当前任务数(被notify_parent_busy_ratio()调用)
void recycle_conns(); //从m_freed中回收连接(由于连接已经被关闭,因此还要调用conn2srv())放到m_conn中
RET_CODE process( int fd, OP_TYPE type ); //通过fd和type来控制对服务端和客户端的读写,是整个负载均衡的核心功能
private:
static int m_epollfd; //内核时间表fd
map< int, conn* > m_conns; //准备好的连接
map< int, conn* > m_used; //要被使用的连接
map< int, conn* > m_freed; //使用后被释放的连接
host m_logic_srv; //保存服务端的信息
};
这个类为进程池类中的负责调度连接的类。
process()
RET_CODE mgr::process( int fd, OP_TYPE type )
{
conn* connection = m_used[ fd ]; //首先根据fd获取连接类,该类中保存有相对应的客户端和服务端的fd
if( !connection )
{
return NOTHING;
}
if( connection->m_cltfd == fd ) //如果是客户端fd
{
int srvfd = connection->m_srvfd;
switch( type )
{
case READ: //读操作
{
RET_CODE res = connection->read_clt(); //则调用conn的read_ckt方法
switch( res )
{
case OK:
{
log( LOG_DEBUG, __FILE__, __LINE__, "content read from client: %s", connection->m_clt_buf );
}
case BUFFER_FULL:
{
modfd( m_epollfd, srvfd, EPOLLOUT );
break;
}
case IOERR:
case CLOSED: //客户端关闭连接
{
free_conn( connection );
return CLOSED;
}
default:
break;
}
if( connection->m_srv_closed ) //服务端关闭连接
{
free_conn( connection );
return CLOSED;
}
break;
}
case WRITE:
{
RET_CODE res = connection->write_clt();
switch( res )
{
case TRY_AGAIN:
{
modfd( m_epollfd, fd, EPOLLOUT );
break;
}
case BUFFER_EMPTY:
{
modfd( m_epollfd, srvfd, EPOLLIN );
modfd( m_epollfd, fd, EPOLLIN );
break;
}
case IOERR:
case CLOSED:
{
free_conn( connection );
return CLOSED;
}
default:
break;
}
if( connection->m_srv_closed )
{
free_conn( connection );
return CLOSED;
}
break;
}
default:
{
log( LOG_ERR, __FILE__, __LINE__, "%s", "other operation not support yet" );
break;
}
}
}
else if( connection->m_srvfd == fd ) //如果是服务端fd
{
int cltfd = connection->m_cltfd;
switch( type )
{
case READ:
{
RET_CODE res = connection->read_srv();
switch( res )
{
case OK:
{
log( LOG_DEBUG, __FILE__, __LINE__, "content read from server: %s", connection->m_srv_buf ); //此处的break不能加,在读完消息之后
//应该继续去触发BUFFER_FULL从而通知可写
}
case BUFFER_FULL:
{
modfd( m_epollfd, cltfd, EPOLLOUT );
break;
}
case IOERR:
case CLOSED:
{
modfd( m_epollfd, cltfd, EPOLLOUT );
connection->m_srv_closed = true;
break;
}
default:
break;
}
break;
}
case WRITE:
{
RET_CODE res = connection->write_srv();
switch( res )
{
case TRY_AGAIN:
{
modfd( m_epollfd, fd, EPOLLOUT );
break;
}
case BUFFER_EMPTY:
{
modfd( m_epollfd, cltfd, EPOLLIN );
modfd( m_epollfd, fd, EPOLLIN );
break;
}
case IOERR:
case CLOSED:
{
/*
if( connection->m_srv_write_idx == connection->m_srvread_idx )
{
free_conn( connection );
}
else
{
modfd( m_epollfd, cltfd, EPOLLOUT );
}
*/
modfd( m_epollfd, cltfd, EPOLLOUT );
connection->m_srv_closed = true;
break;
}
default:
break;
}
break;
}
default:
{
log( LOG_ERR, __FILE__, __LINE__, "%s", "other operation not support yet" );
break;
}
}
}
else
{
return NOTHING;
}
return OK;
}
conn
class conn
{
public:
conn();
~conn();
void init_clt( int sockfd, const sockaddr_in& client_addr ); //初始化客户端地址
void init_srv( int sockfd, const sockaddr_in& server_addr ); //初始化服务器端地址
void reset(); //重置读写缓冲
RET_CODE read_clt(); //从客户端读入的信息写入m_clt_buf
RET_CODE write_clt(); //把从服务端读入m_srv_buf的内容写入客户端
RET_CODE read_srv(); //从服务端读入的信息写入m_srv_buf
RET_CODE write_srv(); //把从客户端读入m_clt_buf的内容写入服务端
public:
static const int BUF_SIZE = 2048; //缓冲区大小
char* m_clt_buf; //客户端文件缓冲区
int m_clt_read_idx; //客户端读下标
int m_clt_write_idx; //客户端写下标
sockaddr_in m_clt_address; //客户端地址
int m_cltfd; //客户端fd
char* m_srv_buf; //服务端文件缓冲区
int m_srv_read_idx; //服务端读下标
int m_srv_write_idx; //服务端写下标
sockaddr_in m_srv_address; //服务端地址
int m_srvfd; //服务端fd
bool m_srv_closed; //标志(用来标志服务端是否关闭)
};
这个类主要负责连接好之后对客户端和服务端的读写操作,以及返回服务端的状态
read_clt()
RET_CODE conn::read_clt()
{
int bytes_read = 0;
while( true )
{
if( m_clt_read_idx >= BUF_SIZE ) //如果读入的数据大于BUF_SIZE
{
log( LOG_ERR, __FILE__, __LINE__, "%s", "the client read buffer is full, let server write" );
return BUFFER_FULL;
}
bytes_read = recv( m_cltfd, m_clt_buf + m_clt_read_idx, BUF_SIZE - m_clt_read_idx, 0 ); //因为存在分包的问题(recv所读入的并非是size的大小),因此我们根据recv的返回值进行循环读入,直到读满m_clt_buf或者recv的返回值为0(数据被读完)
if ( bytes_read == -1 )
{
if( errno == EAGAIN || errno == EWOULDBLOCK ) // 非阻塞情况下: EAGAIN表示没有数据可读,请尝试再次调用,而在阻塞情况下,如果被中断,则返回EINTR; EWOULDBLOCK等同于EAGAIN
{
break;
}
return IOERR;
}
else if ( bytes_read == 0 ) //连接被关闭
{
return CLOSED;
}
m_clt_read_idx += bytes_read; //移动读下标
}
return ( ( m_clt_read_idx - m_clt_write_idx ) > 0 ) ? OK : NOTHING; //当读下标大于写下标时代表正常
}
这是springsnail当中一些主要类和函数的作用,我们在看这个项目的时候还要多看看高性能这本书的有限状态机,统一事件源,进程池这些章节的内容。
有不足之处还望指出互相交流学习。
更多推荐
所有评论(0)