http://blog.csdn.net/winlinvip/article/details/7764526

Linux多进程服务器真的很给力,赞一个!

Linux多进程一般是master负责侦听,worker接受和伺服client。

一个使用了以下技术的多进程模型:

1. sigset:安全信号,信号屏蔽和接受。

2. epoll:异步io模型。

master进程使用信号模型,侦听用户信号和程序信号,并和worker交流。它的主循环是sigsuspend。

worker进程使用事件模型,使用epoll_wait等待事件,同时他也接受信号(信号会中断epoll_wait)。

[cpp]  view plain copy
  1. // mpserver —— multiple processes server  
  2. #include <stdio.h>  
  3. #include <stdlib.h>  
  4. #include <iostream>  
  5. using namespace std;  
  6.   
  7. #include <unistd.h>  
  8. #include <signal.h>  
  9. #include <sys/types.h>  
  10. #include <sys/socket.h>  
  11. #include <netinet/in.h>  
  12. #include <sys/wait.h>  
  13. #include <sys/epoll.h>  
  14. #include <string.h>  
  15. #include <errno.h>  
  16.   
  17. #define err_exit(msg) cout << "[error] " << msg << endl; exit(1)  
  18.   
  19. struct UserOptions{  
  20.     int port;  
  21.     int num_processes;  
  22. };  
  23.   
  24. void discovery_user_options(int argc, char** argv, UserOptions& options){  
  25.     if(argc <= 2){  
  26.         cout << "Usage: " << argv[0] << " <port> <num_processes>" << endl  
  27.             << "port: the port to listen" << endl  
  28.             << "num_processes: the num to start processes. if 0, use single process mode" << endl;  
  29.         exit(1);  
  30.     }  
  31.       
  32.     options.port = atoi(argv[1]);  
  33.     options.num_processes = atoi(argv[2]);  
  34. }  
  35.   
  36. int listen_server_socket(UserOptions& options){  
  37.     int serverfd = socket(AF_INET, SOCK_STREAM, 0);  
  38.       
  39.     if(serverfd == -1){  
  40.         err_exit("init socket error!");  
  41.     }  
  42.     cout << "init socket success! #" << serverfd << endl;  
  43.       
  44.     int reuse_socket = 1;  
  45.     if(setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1){  
  46.         err_exit("setsockopt reuse-addr error!");  
  47.     }  
  48.     cout << "setsockopt reuse-addr success!" << endl;  
  49.       
  50.     sockaddr_in addr;  
  51.     addr.sin_family = AF_INET;  
  52.     addr.sin_port = htons(options.port);  
  53.     addr.sin_addr.s_addr = INADDR_ANY;  
  54.     if(bind(serverfd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1){  
  55.         err_exit("bind socket error!");  
  56.     }  
  57.     cout << "bind socket success!" << endl;  
  58.       
  59.     if(listen(serverfd, 10) == -1){  
  60.         err_exit("listen socket error!");  
  61.     }  
  62.     cout << "listen socket success! " << options.port << endl;  
  63.       
  64.     return serverfd;  
  65. }  
  66.   
  67. void signal_handler(int signo){  
  68.     cout << "#" << getpid() << " get a signal:";  
  69.       
  70.     bool do_exit = false;  
  71.     switch(signo){  
  72.         case SIGCHLD:  
  73.             cout << "SIGCHLD";  
  74.             int status;  
  75.             while(waitpid(0, &status, WNOHANG) > 0){  
  76.             }  
  77.             break;  
  78.         case SIGALRM:  
  79.             cout << "SIGALRM";  
  80.             break;  
  81.         case SIGIO:  
  82.             cout << "SIGIO";  
  83.             break;  
  84.         case SIGINT:  
  85.             cout << "SIGINT";  
  86.             do_exit = true;  
  87.             break;  
  88.         case SIGHUP:  
  89.             cout << "SIGHUP";  
  90.             do_exit = true;  
  91.             break;  
  92.         case SIGTERM:  
  93.             cout << "SIGTERM";  
  94.             do_exit = true;  
  95.             break;  
  96.         case SIGQUIT:  
  97.             cout << "SIGQUIT";  
  98.             do_exit = true;  
  99.             break;  
  100.         case SIGUSR1:  
  101.             cout << "SIGUSR1";  
  102.             break;  
  103.         case SIGUSR2:  
  104.             cout << "SIGUSR2";  
  105.             break;  
  106.     }  
  107.       
  108.     cout << "!" << endl;  
  109.     if(do_exit){  
  110.         exit(0);  
  111.     }  
  112. }  
  113.   
  114. void register_signal_handler_imp(int signum, void (*handler)(int)){  
  115.     struct sigaction action;  
  116.       
  117.     action.sa_handler = handler;  
  118.     sigemptyset(&action.sa_mask);  
  119.     //action.sa_flags = 0;  
  120.       
  121.     if(sigaction(signum, &action, NULL) == -1){  
  122.         err_exit("register signal handler failed!");  
  123.     }  
  124. }  
  125. void register_signal_handler(){  
  126.     register_signal_handler_imp(SIGCHLD, signal_handler);  
  127.     register_signal_handler_imp(SIGALRM, signal_handler);  
  128.     register_signal_handler_imp(SIGIO, signal_handler);  
  129.     register_signal_handler_imp(SIGINT, signal_handler);  
  130.     register_signal_handler_imp(SIGHUP, signal_handler);  
  131.     register_signal_handler_imp(SIGTERM, signal_handler);  
  132.     register_signal_handler_imp(SIGQUIT, signal_handler);  
  133.     register_signal_handler_imp(SIGUSR1, signal_handler);  
  134.     register_signal_handler_imp(SIGUSR2, signal_handler);  
  135.     cout << "register signal handler success!" << endl;  
  136. }  
  137.   
  138. void block_specified_signals(){  
  139.     sigset_t set;  
  140.       
  141.     sigemptyset(&set);  
  142.       
  143.     sigaddset(&set, SIGCHLD);  
  144.     sigaddset(&set, SIGALRM);  
  145.     sigaddset(&set, SIGIO);  
  146.     sigaddset(&set, SIGINT);  
  147.     sigaddset(&set, SIGHUP);  
  148.     sigaddset(&set, SIGTERM);  
  149.     sigaddset(&set, SIGQUIT);  
  150.     sigaddset(&set, SIGUSR1);  
  151.     sigaddset(&set, SIGUSR2);  
  152.       
  153.     if(sigprocmask(SIG_BLOCK, &set, NULL) == -1){  
  154.         err_exit("sigprocmask block signal failed!");  
  155.     }  
  156.     cout << "sigprocmask block signal success!" << endl;  
  157. }  
  158.   
  159. void unblock_all_signals(){  
  160.     sigset_t set;  
  161.     sigemptyset(&set);  
  162.       
  163.     // the master process block all signals, we unblock all for we use epoll to wait events.  
  164.     if(sigprocmask(SIG_SETMASK, &set, NULL) == -1){  
  165.         err_exit("sigprocmask block signal failed!");  
  166.     }  
  167.     cout << "sigprocmask block signal success!" << endl;  
  168. }  
  169.   
  170. void epoll_add_event(int ep, int fd){  
  171.     epoll_event ee;  
  172.     ee.events = EPOLLIN;  
  173.     ee.data.fd = fd;  
  174.     if(epoll_ctl(ep, EPOLL_CTL_ADD, fd, &ee) == -1){  
  175.         err_exit("epoll add event failed!");  
  176.     }  
  177.     cout << "epoll add fd success: #" << fd << endl;  
  178. }  
  179. void epoll_remove_event(int ep, int fd){  
  180.     if(epoll_ctl(ep, EPOLL_CTL_DEL, fd, NULL) == -1){  
  181.         err_exit("epoll remove event failed! fd=" << fd << ", errno=0x" << hex << errno << dec << " " << strerror(errno));  
  182.     }  
  183.     cout << "epoll remove fd success: #" << fd << endl;  
  184. }  
  185.   
  186. void close_client(int ep, int fd){  
  187.     cout << "the client dead, remove it: #" << fd << endl;  
  188.     epoll_remove_event(ep, fd);  
  189.     close(fd);  
  190. }  
  191.   
  192. void worker_get_event(int ep, int serverfd, epoll_event& active){  
  193.     int fd = active.data.fd;  
  194.       
  195.     // server listening socket event.  
  196.     if(fd == serverfd){  
  197.         int client = accept(serverfd, NULL, 0);  
  198.         if(client == -1){  
  199.             err_exit("accept client socket error!");  
  200.         }  
  201.         cout << "get a client: #" << client << endl;  
  202.         epoll_add_event(ep, client);  
  203.         return;  
  204.     }  
  205.       
  206.     // client event.  
  207.     if((active.events & EPOLLHUP) == EPOLLHUP || (active.events & EPOLLERR) == EPOLLERR){  
  208.         cout << "get a EPOLLHUP or EPOLLERR event from client #" << fd << endl;  
  209.         close_client(ep, fd);  
  210.         return;  
  211.     }  
  212.     if((active.events & EPOLLIN) == EPOLLIN){  
  213.         if(true){  
  214.             cout << "get a EPOLLIN event from client #" << fd << endl;  
  215.             char buf[1024];  
  216.             memset(buf, 0, sizeof(buf));  
  217.             if(recv(fd, buf, sizeof(buf), 0) <= 0){  
  218.                 close_client(ep, fd);  
  219.                 return;  
  220.             }  
  221.             cout << "recv from client: " << buf << endl;  
  222.         }  
  223.         if(true){  
  224.             char msg[] = "hello, server ping!";  
  225.             if(send(fd, msg, sizeof(msg), 0) <= 0){  
  226.                 close_client(ep, fd);  
  227.                 return;  
  228.             }  
  229.         }  
  230.         return;  
  231.     }  
  232. }  
  233.   
  234. void worker_process_cycle(int serverfd){  
  235.     cout << "start worker process cycle" << endl;  
  236.     unblock_all_signals();  
  237.   
  238.     int ep = epoll_create(1024);  
  239.     if(ep == -1){  
  240.         err_exit("epoll_create failed!");  
  241.     }  
  242.     cout << "epoll create success!" << endl;  
  243.     epoll_add_event(ep, serverfd);  
  244.       
  245.     for(;;){  
  246.         epoll_event events[1024];  
  247.         int incoming = epoll_wait(ep, events, 1024, -1);  
  248.         if(incoming == -1){  
  249.             break;  
  250.         }  
  251.         for(int i = 0; i < incoming; i++){  
  252.             worker_get_event(ep, serverfd, events[i]);  
  253.         }  
  254.     }  
  255.       
  256.     cout << "worker process exited" << endl;  
  257.     close(ep);  
  258. }  
  259.   
  260. void start_worker_process(int serverfd){  
  261.     pid_t pid = fork();  
  262.       
  263.     if(pid == -1){  
  264.         err_exit("fork process failed");  
  265.     }  
  266.       
  267.     if(pid == 0){  
  268.         worker_process_cycle(serverfd);  
  269.         exit(0);  
  270.     }  
  271.       
  272.     cout << "fork process success: #" << pid << endl;  
  273. }  
  274.   
  275. int main(int argc, char** argv){  
  276.     register_signal_handler();  
  277.     block_specified_signals();  
  278.       
  279.     //sleep(3);  
  280.     UserOptions options;  
  281.     discovery_user_options(argc, argv, options);  
  282.     int serverfd = listen_server_socket(options);  
  283.       
  284.     for(int i = 0; i < options.num_processes; i++){  
  285.         start_worker_process(serverfd);  
  286.     }  
  287.     if(options.num_processes == 0){  
  288.         worker_process_cycle(serverfd);  
  289.     }  
  290.       
  291.     sigset_t set;  
  292.     sigemptyset(&set);  
  293.     for(;;){  
  294.         sigsuspend(&set);  
  295.     }  
  296.   
  297.     return 0;  
  298. }  

[cpp]  view plain copy
  1. // mpclient —— multiple processes client  
  2. #include <stdio.h>  
  3. #include <stdlib.h>  
  4. #include <iostream>  
  5. using namespace std;  
  6.   
  7. #include <unistd.h>  
  8. #include <signal.h>  
  9. #include <sys/types.h>  
  10. #include <sys/socket.h>  
  11. #include <netinet/in.h>  
  12. #include <arpa/inet.h>  
  13. #include <string.h>  
  14.   
  15. #define err_exit(msg) cout << "[error] " << msg << endl; exit(1)  
  16.   
  17. struct UserOptions{  
  18.     char* server_ip;  
  19.     int port;  
  20.     int num_processes;  
  21. };  
  22.   
  23. void discovery_user_options(int argc, char** argv, UserOptions& options){  
  24.     if(argc <= 2){  
  25.         cout << "Usage: " << argv[0] << " <server_ip> <port> <num_processes>" << endl  
  26.             << "server_ip: the ip address of server" << endl  
  27.             << "port: the port to connect at" << endl  
  28.             << "num_processes: the num to start processes. if 0, use single process mode" << endl;  
  29.         exit(1);  
  30.     }  
  31.       
  32.     options.server_ip = argv[1];  
  33.     options.port = atoi(argv[2]);  
  34.     options.num_processes = atoi(argv[3]);  
  35. }  
  36.   
  37. int connect_server_socket(UserOptions& options){  
  38.     int clientfd = socket(AF_INET, SOCK_STREAM, 0);  
  39.       
  40.     if(clientfd == -1){  
  41.         err_exit("init socket error!");  
  42.     }  
  43.     cout << "init socket success!" << endl;  
  44.       
  45.     sockaddr_in addr;  
  46.     addr.sin_family = AF_INET;  
  47.     addr.sin_port = htons(options.port);  
  48.     addr.sin_addr.s_addr = inet_addr(options.server_ip);  
  49.     if(connect(clientfd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1){  
  50.         err_exit("connect socket error!");  
  51.     }  
  52.     cout << "connect socket success!" << endl;  
  53.       
  54.     return clientfd;  
  55. }  
  56.   
  57. int main(int argc, char** argv){  
  58.     UserOptions options;  
  59.     discovery_user_options(argc, argv, options);  
  60.     int* fds = new int[options.num_processes];  
  61.     for(int i = 0; i < options.num_processes; i++){  
  62.         fds[i] = connect_server_socket(options);  
  63.     }  
  64.       
  65.     bool do_loop = true;  
  66.     while(do_loop){  
  67.         for(int i = 0; i < options.num_processes; i++){  
  68.             int fd = fds[i];  
  69.             if(true){  
  70.                 char msg[] = "hello, client ping!";  
  71.                 if(send(fd, msg, sizeof(msg), 0) <= 0){  
  72.                     close(fd);  
  73.                     do_loop = false;  
  74.                     break;  
  75.                 }  
  76.             }  
  77.             if(true){  
  78.                 char buf[1024];  
  79.                 memset(buf, 0, sizeof(buf));  
  80.                 if(recv(fd, buf, sizeof(buf), 0) <= 0){  
  81.                     close(fd);  
  82.                     do_loop = false;  
  83.                     break;  
  84.                 }  
  85.                 cout << "recv from server: " << buf << endl;  
  86.             }  
  87.               
  88.             sleep(3);  
  89.         }  
  90.     }  
  91.       
  92.     delete[] fds;  
  93.   
  94.     return 0;  
  95. }  


master启动的主要过程:

1. 注册信号处理函数:sigaction。

2. 屏蔽所有关心的信号,让内核暂时保留信号,不要发给程序,因为这时候程序不系统被打断。

3. 侦听端口。

4. 启动worker进程。

5. 等待信号的到来:sigsuspend。


worker启动的主要过程:

1. 取消信号的屏蔽。因为这时候worker拷贝的是master的内存结构,所以信号屏蔽也拷贝过来了。若不取消,则无法收到退出消息,只能重启服务器或它自己退出才能杀这个进程了。

2. 创建epoll。

3. 将侦听的server_socket_fd添加到epoll。

4. 等待epoll事件的到来:epoll_wait。或者信号也能中断这个epoll_wait:返回-1.


典型的多进程服务器模型如下图所示:


考虑一个RTMP服务器,假设和FMS类似以vhost为边界启动worker,但是client应该连接到指定的worker。所以这个典型的模型就需要修改了。

有一种方式是master接受连接,handshake,然后接收到clieint的connect请求后,知道连接的vhost是哪个,再把这个client fd转发给worker。如下图所示:


但这种方式master过于复杂,它实际上需要做worker要做的事情。多进程稳定的一个原因是进程的自然边界,worker进程崩溃不会影响其他worker和master,如果中心master崩溃,那么系统就完蛋了。所以master要尽量简单不能有过多的逻辑。另外,单进程和多进程切换的一个条件,就是worker能用一个函数表达,单进程时直接调用这个函数,而多进程时fork后调用这个函数;若master要混合worker的功能,单进程时也需要过多交互。

改进如下图:


master变得简单,worker功能也单一了。worker发现不是自己的client时,将client发给给master,由master分发给特定的worker。单进程时没有分发这一步了;多进程时master也只需要分发fd,而不需要处理协议的逻辑。

在master和worker之间传递fd不是一个好办法,因为过多频繁的SIGIO可能导致“I/O possible”;google上找了一下说是由于信号SIGIO没有处理引起,但明显已经侦听了handler;有人说是signal queue overflow,恩,这个很像!

socketpaire有时还会引起“[write_channel] sendmsg error, ret=-1, errno=0xb Resource temporarily unavailable”错误。管道错误。一般发生在worker进程中,如果多个woker进程都向master发送数据,而master处理一个数据后sleep或者做其他的事情去了,就会造成这种情况(master的socketpaire缓冲区满了)。

master若需要和worker使用socketpair通信,master使用信号来实现异步,当worker的socketpair有SIGIO信号时,说明worker在发数据,如何判断是哪个worker发的呢?可以用select,当然不太好;可以使用F_SETSIG来更改默认的SIGIO,改为SIGRTMIN+fd,当worker有数据时,收到的信号就包含了fd了。

获取SIGIO的fd的另外一个方法,是使用siginfo_t取得fd。使用F_SETSIG可以设置的SIG较少,SIGRTMAX为64,所以最多可用32个自定义实时信号,一般用来对SIGIO分类。必须将SIGIO定义为实时信号,然后在sigaction时设置SA_SIGINFO和使用void (*handler)(int signo, siginfo_t* info, void* context),info->si_fd就是SIGIO的fd。

以下是一个糅合了多进程、信号、socketpair、passing fd的一个原型,较稳定的一个原型:

[cpp]  view plain copy
  1. // mpserver2 —— multiple processes server  
  2. #include <stdio.h>  
  3. #include <string.h>  
  4. #include <stdlib.h>  
  5. #include <iostream>  
  6. #include <vector>  
  7. using namespace std;  
  8.   
  9. #include <unistd.h>  
  10. #include <signal.h>  
  11. #include <sys/types.h>  
  12. #include <sys/socket.h>  
  13. #include <netinet/in.h>  
  14. #include <sys/wait.h>  
  15. #include <sys/epoll.h>  
  16. #include <string.h>  
  17. #include <errno.h>  
  18. #include <fcntl.h>  
  19. #include <sys/ioctl.h>  
  20.   
  21. #define info(msg) cout << "#" << getpid() << " " << msg  
  22. #define err_exit(msg) info("[error] " << msg << ", errno=0x" << hex << errno << dec << " " << strerror(errno) << endl); exit(-1)  
  23.   
  24. // define the SIGIO to RTSig, the SIGIO now means rt-sig queue full.  
  25. #define SIG_SOCKET_IO SIGRTMIN+1  
  26.   
  27. bool global_terminate = false;  
  28. bool global_channel_msg = false;  
  29. // when global_channel_msg is true, the current_active_io_fd set to the fd.  
  30. int current_active_io_fd = -1;  
  31.   
  32. int current_client_num = 0;  
  33. bool accepting = true;  
  34.   
  35. // the num of client for a proecess to serve.  
  36. // 2+N  
  37. // 2: the serverfd, the channelfd  
  38. // N: clients.  
  39. #define max_clients_per_process 2+1000  
  40. // the physical max clients, if exceed this value, error!  
  41. #define physical_max_clients 1024  
  42.   
  43. #define max_workers 100  
  44.   
  45. struct UserOptions{  
  46.     int port;  
  47.     int num_processes;  
  48. };  
  49.   
  50. void discovery_user_options(int argc, char** argv, UserOptions& options){  
  51.     if(argc <= 2){  
  52.         cout << "Usage: " << argv[0] << " <port> <num_processes>" << endl  
  53.             << "port: the port to listen" << endl  
  54.             << "num_processes: the num to start processes. if 0, use single process mode" << endl  
  55.             << "for example: " << argv[0] << " 1990 20" << endl;  
  56.         exit(1);  
  57.     }  
  58.       
  59.     options.port = atoi(argv[1]);  
  60.     options.num_processes = atoi(argv[2]);  
  61. }  
  62.   
  63. #define CMD_FD 100  
  64. #define CMD_INFO 200  
  65. struct channel_msg{  
  66.     int command;  
  67.     int fd; // the fd, set to -1 if no fd.  
  68.     int total_client; // the total clients number of specified process.  
  69.     int param; // the param.  
  70. };  
  71. void write_channel(int sock, channel_msg* data, int size)  
  72. {  
  73.     msghdr msg;  
  74.       
  75.     // init msg_control  
  76.     if(data->fd == -1){  
  77.         msg.msg_control = NULL;  
  78.         msg.msg_controllen = 0;  
  79.     }  
  80.     else{  
  81.         union {  
  82.             struct cmsghdr cm;  
  83.             char space[CMSG_SPACE(sizeof(int))];  
  84.         } cmsg;  
  85.         memset(&cmsg, 0, sizeof(cmsg));  
  86.           
  87.         cmsg.cm.cmsg_level = SOL_SOCKET;  
  88.         cmsg.cm.cmsg_type = SCM_RIGHTS; // we are sending fd.  
  89.         cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));  
  90.           
  91.         msg.msg_control = (cmsghdr*)&cmsg;  
  92.         msg.msg_controllen = sizeof(cmsg);  
  93.         *(int *)CMSG_DATA(&cmsg.cm) = data->fd;  
  94.     }  
  95.       
  96.     // init msg_iov  
  97.     iovec iov[1];  
  98.     iov[0].iov_base = data;  
  99.     iov[0].iov_len  = size;  
  100.       
  101.     msg.msg_iov = iov;  
  102.     msg.msg_iovlen = 1;  
  103.       
  104.     // init msg_name  
  105.     msg.msg_name = NULL;  
  106.     msg.msg_namelen = 0;  
  107.       
  108.     info("[write_channel][info] data fd=" << data->fd << ", command="   
  109.         << data->command << ", total_client=" << data->total_client   
  110.         << ", param=" << data->param << endl);  
  111.           
  112.     while(true){  
  113.         ssize_t ret;  
  114.         if ((ret = sendmsg(sock, &msg, 0)) <= 0){  
  115.             // errno = 0xb Resource temporarily unavailable  
  116.             // donot retry, for it's no use, error infinite loop.  
  117.             err_exit("[write_channel] sendmsg error, ret=" << ret);  
  118.         }  
  119.         break;  
  120.     }  
  121. }  
  122. void read_channel(int sock, channel_msg* data, int size)  
  123. {  
  124.     msghdr msg;  
  125.       
  126.     // msg_iov  
  127.     iovec iov[1];  
  128.     iov[0].iov_base = data;  
  129.     iov[0].iov_len = size;  
  130.       
  131.     msg.msg_iov = iov;  
  132.     msg.msg_iovlen  = 1;  
  133.       
  134.     // msg_name  
  135.     msg.msg_name = NULL;  
  136.     msg.msg_namelen = 0;  
  137.       
  138.     // msg_control  
  139.     union { // union to create a 8B aligned memory.  
  140.         struct cmsghdr cm; // 16B = 8+4+4  
  141.         char space[CMSG_SPACE(sizeof(int))]; // 24B = 16+4+4  
  142.     } cmsg;  
  143.     memset(&cmsg, 0, sizeof(cmsg));  
  144.       
  145.     msg.msg_control = (cmsghdr*)&cmsg;  
  146.     msg.msg_controllen = sizeof(cmsg);  
  147.       
  148.     ssize_t ret;  
  149.     if ((ret = recvmsg(sock, &msg, 0)) == -1) {  
  150.         err_exit("[read_channel] recvmsg error");  
  151.     }  
  152.     if(ret == 0){  
  153.         info("[read_channel] connection closed" << endl);  
  154.         exit(0);  
  155.     }  
  156.       
  157.     info("[read_channel][info] data fd=" << data->fd << ", command="   
  158.         << data->command << ", total_client=" << data->total_client   
  159.         << ", param=" << data->param << endl);  
  160.     if(data->command == CMD_FD){  
  161.         int fd = *(int *)CMSG_DATA(&cmsg.cm);  
  162.         if(fd == 0 || fd == 1){  
  163.             err_exit("[read_channel] recvmsg invalid fd=" << fd   
  164.                 << ", data->fd=" << data->fd << ", command=" << data->command  
  165.                 << ", ret=" << ret);  
  166.         }  
  167.         data->fd = fd;  
  168.     }  
  169.     else{  
  170.         data->fd = -1;  
  171.     }  
  172. }  
  173.   
  174. void set_noblocking(int fd){  
  175.     int flag = 1;  
  176.     if(ioctl(fd, FIONBIO, &flag) == -1){  
  177.         err_exit("[master] ioctl failed!");  
  178.     }  
  179.     info("[master] server socket no-block flag set success" << endl);  
  180. }  
  181.   
  182. int listen_server_socket(UserOptions& options){  
  183.     int serverfd = socket(AF_INET, SOCK_STREAM, 0);  
  184.       
  185.     if(serverfd == -1){  
  186.         err_exit("[master] init socket error!");  
  187.     }  
  188.     info("[master] init socket success! #" << serverfd << endl);  
  189.       
  190.     int reuse_socket = 1;  
  191.     if(setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1){  
  192.         err_exit("[master] setsockopt reuse-addr error!");  
  193.     }  
  194.     info("[master] setsockopt reuse-addr success!" << endl);  
  195.       
  196.     set_noblocking(serverfd);  
  197.       
  198.     sockaddr_in addr;  
  199.     addr.sin_family = AF_INET;  
  200.     addr.sin_port = htons(options.port);  
  201.     addr.sin_addr.s_addr = INADDR_ANY;  
  202.     if(bind(serverfd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1){  
  203.         err_exit("[master] bind socket error!");  
  204.     }  
  205.     info("[master] bind socket success!" << endl);  
  206.       
  207.     if(listen(serverfd, 10) == -1){  
  208.         err_exit("[master] listen socket error!");  
  209.     }  
  210.     info("[master] listen socket success! " << options.port << endl);  
  211.       
  212.     return serverfd;  
  213. }  
  214.   
  215. // both master and worker will invoke this handler.  
  216. void signal_handler(int signo, siginfo_t* info, void* context){  
  217.     info("#" << getpid() << " get a signal:");  
  218.       
  219.     if(signo == SIG_SOCKET_IO){  
  220.         cout << "SIG_SOCKET_IO!" << endl;  
  221.         global_channel_msg = true;  
  222.         current_active_io_fd = info->si_fd;  
  223.         info("[master] get SIGIO: fd=" << info->si_fd << endl);  
  224.         return;  
  225.     }  
  226.       
  227.     switch(signo){  
  228.         case SIGCHLD:  
  229.             cout << "SIGCHLD";  
  230.             break;  
  231.         case SIGALRM:  
  232.             cout << "SIGALRM";  
  233.             break;  
  234.         case SIGIO:  
  235.             cout << "SIGIO";  
  236.             info("[master][warning] ignore the SIGIO: rt-sig queue is full!" << endl);  
  237.             break;  
  238.         case SIGINT:  
  239.             cout << "SIGINT";  
  240.             global_terminate = true;  
  241.             break;  
  242.         case SIGHUP:  
  243.             cout << "SIGHUP";  
  244.             global_terminate = true;  
  245.             break;  
  246.         case SIGTERM:  
  247.             cout << "SIGTERM";  
  248.             global_terminate = true;  
  249.             break;  
  250.         case SIGQUIT:  
  251.             cout << "SIGQUIT";  
  252.             global_terminate = true;  
  253.             break;  
  254.         case SIGUSR1:  
  255.             cout << "SIGUSR1";  
  256.             break;  
  257.         case SIGUSR2:  
  258.             cout << "SIGUSR2";  
  259.             break;  
  260.     }  
  261.       
  262.     cout << "!" << endl;  
  263. }  
  264.   
  265. void register_signal_handler_imp(int signum, void (*handler)(int, siginfo_t*, void*)){  
  266.     struct sigaction action;  
  267.       
  268.     action.sa_flags = SA_SIGINFO;  
  269.     action.sa_sigaction = handler;  
  270.     sigemptyset(&action.sa_mask);  
  271.     //action.sa_flags = 0;  
  272.       
  273.     if(sigaction(signum, &action, NULL) == -1){  
  274.         err_exit("[master] register signal handler failed!");  
  275.     }  
  276. }  
  277. void register_signal_handler(){  
  278.     register_signal_handler_imp(SIGCHLD, signal_handler);  
  279.     register_signal_handler_imp(SIGALRM, signal_handler);  
  280.     register_signal_handler_imp(SIGIO, signal_handler);  
  281.     register_signal_handler_imp(SIG_SOCKET_IO, signal_handler);  
  282.     register_signal_handler_imp(SIGINT, signal_handler);  
  283.     register_signal_handler_imp(SIGHUP, signal_handler);  
  284.     register_signal_handler_imp(SIGTERM, signal_handler);  
  285.     register_signal_handler_imp(SIGQUIT, signal_handler);  
  286.     register_signal_handler_imp(SIGUSR1, signal_handler);  
  287.     register_signal_handler_imp(SIGUSR2, signal_handler);  
  288.     info("[master] register signal handler success!" << endl);  
  289. }  
  290.   
  291. void block_specified_signals(){  
  292.     sigset_t set;  
  293.       
  294.     sigemptyset(&set);  
  295.       
  296.     sigaddset(&set, SIGCHLD);  
  297.     sigaddset(&set, SIGALRM);  
  298.     sigaddset(&set, SIGIO);  
  299.     sigaddset(&set, SIG_SOCKET_IO);  
  300.     sigaddset(&set, SIGINT);  
  301.     sigaddset(&set, SIGHUP);  
  302.     sigaddset(&set, SIGTERM);  
  303.     sigaddset(&set, SIGQUIT);  
  304.     sigaddset(&set, SIGUSR1);  
  305.     sigaddset(&set, SIGUSR2);  
  306.       
  307.     if(sigprocmask(SIG_BLOCK, &set, NULL) == -1){  
  308.         err_exit("[master] sigprocmask block signal failed!");  
  309.     }  
  310.     info("[master] sigprocmask block signal success!" << endl);  
  311. }  
  312.   
  313. void unblock_all_signals(){  
  314.     sigset_t set;  
  315.     sigemptyset(&set);  
  316.       
  317.     // the master process block all signals, we unblock all for we use epoll to wait events.  
  318.     if(sigprocmask(SIG_SETMASK, &set, NULL) == -1){  
  319.         err_exit("[worker] sigprocmask unblock signal failed!");  
  320.     }  
  321.     info("[worker] sigprocmask unblock signal success!" << endl);  
  322. }  
  323.   
  324. void epoll_add_event(int ep, int fd){  
  325.     epoll_event ee;  
  326.     ee.events = EPOLLIN;  
  327.     ee.data.fd = fd;  
  328.     if(epoll_ctl(ep, EPOLL_CTL_ADD, fd, &ee) == -1){  
  329.         err_exit("[worker] epoll add event failed!");  
  330.     }  
  331.     current_client_num ++;  
  332.     info("[worker] epoll add fd success: #" << fd << ", current_client_num=" << current_client_num << endl);  
  333. }  
  334. void epoll_remove_event(int ep, int fd){  
  335.     if(epoll_ctl(ep, EPOLL_CTL_DEL, fd, NULL) == -1){  
  336.         err_exit("[worker] epoll remove event failed! fd=" << fd << ", errno=0x" << hex << errno << dec << " " << strerror(errno));  
  337.     }  
  338.     current_client_num --;  
  339.     info("[worker] epoll remove fd success: #" << fd << ", current_client_num=" << current_client_num << endl);  
  340. }  
  341.   
  342. void close_client(int ep, int fd, int serverfd, int worker_channel, bool do_report = true){  
  343.     info("[worker] the client dead, remove it: #" << fd << endl);  
  344.     epoll_remove_event(ep, fd);  
  345.     close(fd);  
  346.       
  347.     if(serverfd == fd){  
  348.         info("[worker] warning to close the server fd" << endl);  
  349.     }  
  350.       
  351.     // start accept again.  
  352.     if(!accepting && current_client_num < max_clients_per_process){  
  353.         epoll_add_event(ep, serverfd);  
  354.         accepting = true;  
  355.     }  
  356.       
  357.     // notice the master: worker can take more.  
  358.     if(do_report){  
  359.         channel_msg msg;  
  360.         msg.command = CMD_INFO;  
  361.         msg.total_client = current_client_num;  
  362.         msg.fd = -1;  
  363.         msg.param = 0;  
  364.         write_channel(worker_channel, &msg, sizeof(channel_msg));  
  365.     }  
  366. }  
  367.   
  368. void worker_get_event(int ep, int serverfd, epoll_event& active, int worker_channel){  
  369.     info("[worker] process #" << getpid() << " current_client_num=" << current_client_num << ", worker_channel=" << worker_channel << endl);  
  370.     int fd = active.data.fd;  
  371.       
  372.     // server listening socket event.  
  373.     if(fd == serverfd){  
  374.         if(current_client_num < physical_max_clients){  
  375.             int client = accept(serverfd, NULL, 0);  
  376.             if(client == -1){  
  377.                 // thundering herd  
  378.                 // http://www.citi.umich.edu/projects/linux-scalability/reports/accept.html  
  379.                 if(errno == EAGAIN || errno == EWOULDBLOCK){  
  380.                     info("[worker][warning] get a thundering herd at #" << getpid() << endl);  
  381.                     return;  
  382.                 }  
  383.                 err_exit("[worker] accept client socket error!");  
  384.             }  
  385.               
  386.             int reuse_socket = 1;  
  387.             if(setsockopt(client, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1){  
  388.                 err_exit("[worker] setsockopt reuse-addr error!");  
  389.             }  
  390.             info("[worker] setsockopt reuse-addr success!" << endl);  
  391.               
  392.             info("[worker] process #" << getpid() << " get a client: #" << client << endl);  
  393.             epoll_add_event(ep, client);  
  394.         }  
  395.         else{  
  396.             epoll_remove_event(ep, fd); // donot accept.  
  397.             accepting = false;  
  398.             info("[worker] warning, the worker #" << getpid()   
  399.                 << " exceed the max clients, current_client_num=" << current_client_num << endl);  
  400.         }  
  401.         return;  
  402.     }  
  403.       
  404.     // channel event.  
  405.     if(fd == worker_channel){  
  406.         channel_msg msg;  
  407.         read_channel(worker_channel, &msg, sizeof(channel_msg));  
  408.         info("[worker] get a channel message. fd=" << msg.fd << ", command=" << msg.command  
  409.             << ", current_client_num=" << current_client_num << ", max_clients_per_process=" << max_clients_per_process  
  410.             << ", physical_max_clients=" << physical_max_clients << endl);  
  411.         if(msg.command == CMD_FD && msg.fd != -1){  
  412.             if(msg.fd == 0 || msg.fd == 1){  
  413.                 info("[worker][warning] ignore the invalid passing fd=" << msg.fd << endl);  
  414.                 return;  
  415.             }  
  416.             // accept all client from master, util the read overloaded.  
  417.             if(current_client_num < physical_max_clients){  
  418.                 epoll_add_event(ep, msg.fd);  
  419.                 info("[worker] get a dispatched client. #" << msg.fd << endl);  
  420.             }  
  421.             else{  
  422.                 info("[worker][warning] worker overloaded, close the fd: " << msg.fd << endl);  
  423.                 close(msg.fd);  
  424.             }  
  425.         }  
  426.         else{  
  427.             err_exit("[worker] get a invalid channel");  
  428.         }  
  429.         return;  
  430.     }  
  431.       
  432.     // client event.  
  433.     if((active.events & EPOLLHUP) == EPOLLHUP || (active.events & EPOLLERR) == EPOLLERR){  
  434.         info("[worker] get a EPOLLHUP or EPOLLERR event from client #" << fd << endl);  
  435.         close_client(ep, fd, serverfd, worker_channel);  
  436.         return;  
  437.     }  
  438.     if((active.events & EPOLLIN) == EPOLLIN){  
  439.         if(true){  
  440.             info("[worker] get a EPOLLIN event from client #" << fd << endl);  
  441.             char ch_control; // we MUST not read more bytes, for the fd maybe passing to other process!  
  442.             if(recv(fd, &ch_control, sizeof(char), 0) <= 0){  
  443.                 close_client(ep, fd, serverfd, worker_channel);  
  444.                 return;  
  445.             }  
  446.             // check the first byte, if 'M' it's message, if 'C' it's control message.  
  447.             if(ch_control == 'C'){  
  448.                 int client_required_id; //worker_channel  
  449.                 if(recv(fd, &client_required_id, sizeof(int), 0) <= 0){  
  450.                     close_client(ep, fd, serverfd, worker_channel);  
  451.                     return;  
  452.                 }  
  453.               
  454.                 pid_t pid = getpid();  
  455.                 info("[worker] get client control message: client_required_id=" << client_required_id << endl);  
  456.                 if((client_required_id % pid) == 0){  
  457.                     info("[worker][verified] client exactly required this server" << endl);  
  458.                 }  
  459.                 else{  
  460.                     // never send the follows:  
  461.                     if(fd == 0 || fd == 1 || fd == serverfd || fd == worker_channel){  
  462.                         info("[worker][warning] ignore invalid fd to passing: " << fd << endl);  
  463.                         close_client(ep, fd, serverfd, worker_channel);  
  464.                         return;  
  465.                     }  
  466.                     info("[worker] send to master to dispatch it" << endl);  
  467.                     channel_msg msg;  
  468.                     msg.fd = fd;  
  469.                     msg.command = CMD_FD;  
  470.                     msg.total_client = current_client_num;  
  471.                     msg.param = client_required_id;  
  472.                     write_channel(worker_channel, &msg, sizeof(channel_msg));  
  473.                     // donot report to master, for the CMD_FD message has reported.  
  474.                     close_client(ep, fd, serverfd, worker_channel, false);  
  475.                     return;  
  476.                 }  
  477.             }  
  478.             else{  
  479.                 char buf[1024];  
  480.                  if(recv(fd, buf, sizeof(buf), 0) <= 0){  
  481.                     close_client(ep, fd, serverfd, worker_channel);  
  482.                     return;  
  483.                 }  
  484.                info("[worker] get client data message: " << buf << endl);  
  485.             }  
  486.         }  
  487.         if(true){  
  488.             char msg[] = "hello, server ping!";  
  489.             if(send(fd, msg, sizeof(msg), 0) <= 0){  
  490.                 close_client(ep, fd, serverfd, worker_channel);  
  491.                 return;  
  492.             }  
  493.         }  
  494.         return;  
  495.     }  
  496. }  
  497.   
  498. void worker_process_cycle(int serverfd, int worker_channel){  
  499.     cout << "[worker] start worker process cycle. serverfd=" << serverfd   
  500.         << ", worker_channel=" << worker_channel   
  501.         << ((worker_channel == -1)? "(single process)" : "(multiple processes)")<< endl;  
  502.     unblock_all_signals();  
  503.   
  504.     int ep = epoll_create(1024);  
  505.     if(ep == -1){  
  506.         err_exit("[worker] epoll_create failed!");  
  507.     }  
  508.     info("[worker] epoll create success!" << endl);  
  509.     epoll_add_event(ep, serverfd);  
  510.     epoll_add_event(ep, worker_channel);  
  511.       
  512.     // worker use epoll event, the signal handler will break the epoll_wait to return -1.  
  513.     for(;;){  
  514.         epoll_event events[1024];  
  515.         int incoming = epoll_wait(ep, events, 1024, -1);  
  516.           
  517.         // get a event or error.  
  518.         if(incoming == -1){  
  519.             if(global_terminate){  
  520.                 close(worker_channel);  
  521.                 close(serverfd);  
  522.                 info("[worker] worker process exit" << endl);  
  523.                 exit(0);  
  524.             }  
  525.             break;  
  526.         }  
  527.           
  528.         for(int i = 0; i < incoming; i++){  
  529.             worker_get_event(ep, serverfd, events[i], worker_channel);  
  530.         }  
  531.     }  
  532.       
  533.     info("[worker] worker process exited" << endl);  
  534.     close(ep);  
  535. }  
  536.   
  537. struct WorkerProcess{  
  538.     pid_t pid;  
  539.     int channel;  
  540.     int num_clients;  
  541. };  
  542.   
  543. void start_worker_process(int serverfd, WorkerProcess& worker_process){  
  544.     // master/worker use domain socket to communicate.  
  545.     int fds[2];  
  546.     if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1){  
  547.         err_exit("[master] sockpair create domain socket failed");  
  548.     }  
  549.     info("[master] sockpair create domain socket success! [" << fds[0] << ", " << fds[1] << "]" << endl);  
  550.       
  551.     // master process, use the fds[0].  
  552.     worker_process.num_clients = 0;  
  553.     worker_process.channel = fds[0];  
  554.       
  555.     /** 
  556.     * SIGIO (http://www.lindevdoc.org/wiki/SIGIO) 
  557.     * The SIGIO signal is sent to a process to notify it about some event on a file descriptor, namely: 
  558.     *   the file descriptor is ready to receive data (previous write has finished). 
  559.     *   the file descriptor has new data to read 
  560.     *   an error has occurred 
  561.     * This signal is not sent by default; it must be enabled by specifying the O_ASYNC flag to the descriptor 
  562.     * 
  563.     * O_ASYNC (http://www.lindevdoc.org/wiki/O_ASYNC) 
  564.     * The O_ASYNC flag specifies that the process that owns a data stream (by default the process that opened it,  
  565.     * but can be changed with fcntl() with the F_SETOWN command) will receive a signal (SIGIO by default, but can  
  566.     * be changed with fcntl() with the F_SETSIG command) when the file is ready for reading or writing. 
  567.     */  
  568.     // set the socket owner to master, to get a SIGIO signal.  
  569.     if(fcntl(worker_process.channel, F_SETOWN, getpid()) == -1){  
  570.         err_exit("[master] fcntl F_SETOWN to master failed");  
  571.     }  
  572.     info("[master] fcntl F_SETOWN to master success" << endl);  
  573.     // set socket to async to singal SIGIO when data coming.  
  574.     int on = 1;  
  575.     if(ioctl(worker_process.channel, FIOASYNC, &on) == -1){  
  576.         err_exit("[master] ioctl set FIOASYNC failed");  
  577.     }  
  578.     info("[master] ioctl set FIOASYNC success" << endl);  
  579.     /** F_SETSIG 
  580.     * if donot use set_sig, we must use select to find which fd is read. 
  581.     * http://ajaxxx.livejournal.com/62378.html 
  582.     * Linux has the additional feature of fcntl(F_SETSIG), which lets you get the file descriptor that generated the  
  583.     * signal back as part of the signal information. So I implemented this, with the obvious semantics: receiving a  
  584.     * SIGIO would call the handler for just that file descriptor and then return. 
  585.     * remark: the SIGRTMIN=34, SIGRTMAX=62, so only 28 fd cannbe discoveried. so it's used for category signal,  
  586.     *       and we use SA_SIGINFO for sigaction to get the fd of SIGIO.  
  587.     */  
  588.     if(fcntl(worker_process.channel, F_SETSIG, SIG_SOCKET_IO) == -1){  
  589.         err_exit("[master] fcntl set rtsig failed: F_SETSIG.");  
  590.     }  
  591.     info("[master] fcntl set rtsig success" << endl);  
  592.     // set to unblocking.  
  593.     set_noblocking(worker_process.channel);  
  594.     set_noblocking(fds[1]);  
  595.       
  596.     // start process by fork.  
  597.     pid_t pid = fork();  
  598.       
  599.     if(pid == -1){  
  600.         err_exit("[master] fork process failed");  
  601.     }  
  602.       
  603.     // worker process, use the fds[1].  
  604.     if(pid == 0){  
  605.         close(fds[0]);  
  606.         worker_process_cycle(serverfd, fds[1]);  
  607.         exit(0);  
  608.     }  
  609.     // master process.  
  610.     worker_process.pid = pid;  
  611.     close(fds[1]);  
  612.       
  613.     info("[master] fork process success: #" << pid << ", channel=" << worker_process.channel << endl);  
  614. }  
  615.   
  616. int find_the_required_worker(int serverfd, vector<WorkerProcess>& workers, int client_required_id, int pre_pid){  
  617.     // find the exactly worker.  
  618.     WorkerProcess* target = NULL;  
  619.     int channel = -1;  
  620.     for(vector<WorkerProcess>::iterator ite = workers.begin(); ite != workers.end(); ++ite){  
  621.         WorkerProcess& worker = *ite;  
  622.         if((client_required_id % worker.pid) == 0){  
  623.             target = &worker;  
  624.             break;  
  625.         }  
  626.     }  
  627.     // find the most idling worker.  
  628.     int pre_num = 0;  
  629.     for(vector<WorkerProcess>::iterator ite = workers.begin(); ite != workers.end(); ++ite){  
  630.         WorkerProcess& worker = *ite;  
  631.         if(worker.num_clients < max_clients_per_process && worker.pid != pre_pid){  
  632.             if(pre_num == 0 || worker.num_clients < pre_num){  
  633.                 target = &worker;  
  634.                 pre_num = worker.num_clients;  
  635.             }  
  636.         }  
  637.     }  
  638.       
  639.     if(target == NULL){  
  640.         if(workers.size() < max_workers){  
  641.             // start a new worker  
  642.             WorkerProcess worker;  
  643.             start_worker_process(serverfd, worker);  
  644.             workers.push_back(worker);  
  645.               
  646.             return find_the_required_worker(serverfd, workers, client_required_id, pre_pid);  
  647.         }  
  648.         else{  
  649.             err_exit("[master] all " << workers.size() << " workers overloaded!");  
  650.         }  
  651.     }  
  652.     info("[master] find a target to passing fd, pid=" << target->pid << ", num_clients=" << target->num_clients << endl);  
  653.     target->num_clients ++;  
  654.       
  655.     return target->channel;  
  656. }  
  657. void on_channel_message(int serverfd, WorkerProcess& host, vector<WorkerProcess>& workers){  
  658.     int sock = host.channel;  
  659.       
  660.     channel_msg msg;  
  661.     read_channel(sock, &msg, sizeof(channel_msg));  
  662.     host.num_clients = msg.total_client;  
  663.       
  664.     cout << "[master] get a channel msg, pid=#" << host.pid << ", fd=" << msg.fd   
  665.         << ", total_client=" << msg.total_client  
  666.         << ", param=" << msg.param << ", command=" << msg.command << endl;  
  667.     if(msg.command == CMD_FD){  
  668.         // dispatch and re-dispatch fd  
  669.         int channel = find_the_required_worker(serverfd, workers, msg.param, host.pid);  
  670.         msg.command = CMD_FD;  
  671.         if(msg.fd == 0 || msg.fd == 1){  
  672.             info("[master][warning] ignore invalid fd to passing: " << msg.fd << endl);  
  673.             return;  
  674.         }  
  675.         write_channel(channel, &msg, sizeof(channel_msg));  
  676.         // direct close the fd, the target worker MUST take it or close it.  
  677.         close(msg.fd);  
  678.         host.num_clients--;  
  679.     }  
  680.     else if(msg.command == CMD_INFO){  
  681.         return;  
  682.     }  
  683.     else{  
  684.         err_exit("[master] invalid response!");  
  685.     }  
  686. }  
  687.   
  688. int main(int argc, char** argv){  
  689.     info("[master] SIGRTMIN=0x" << hex << SIGRTMIN   
  690.         << ", SIGRTMAX=0x" << hex << SIGRTMAX << dec   
  691.         << ", SIGIO=0x" << hex << SIGIO << dec   
  692.         << ", SIGPOLL=0x" << hex << SIGPOLL << dec   
  693.         << endl);  
  694.     register_signal_handler();  
  695.     block_specified_signals();  
  696.       
  697.     //sleep(3);  
  698.     UserOptions options;  
  699.     discovery_user_options(argc, argv, options);  
  700.     int serverfd = listen_server_socket(options);  
  701.       
  702.     // single process mode  
  703.     if(options.num_processes == 0){  
  704.         worker_process_cycle(serverfd, -1);  
  705.     }  
  706.     // multiple processes mode  
  707.     else{  
  708.         vector<WorkerProcess> workers;  
  709.         for(int i = 0; i < options.num_processes; i++){  
  710.             WorkerProcess worker;  
  711.             start_worker_process(serverfd, worker);  
  712.             workers.push_back(worker);  
  713.         }  
  714.           
  715.         // the empty set for sigsuspend  
  716.         sigset_t set;  
  717.         sigemptyset(&set);  
  718.         // master process use signal only.  
  719.         for(;;){  
  720.             sigsuspend(&set);  
  721.               
  722.             if(global_terminate){  
  723.                 // kill all workers.  
  724.                 for(vector<WorkerProcess>::iterator ite = workers.begin(); ite != workers.end(); ++ite){  
  725.                     WorkerProcess& worker = *ite;  
  726.                     kill(worker.pid, SIGTERM);  
  727.                 }  
  728.                 // wait for workers to exit.  
  729.                 for(vector<WorkerProcess>::iterator ite = workers.begin(); ite != workers.end(); ++ite){  
  730.                     WorkerProcess& worker = *ite;  
  731.                     int status;  
  732.                     waitpid(worker.pid, &status, 0);  
  733.                 }  
  734.                 info("[master] all worker terminated, master exit" << endl);  
  735.                 exit(0);  
  736.             }  
  737.             if(global_channel_msg){  
  738.                 int active_fd = current_active_io_fd;  
  739.                 current_active_io_fd = -1;  
  740.                 global_channel_msg = false;  
  741.                   
  742.                 // read each channel.  
  743.                 for(vector<WorkerProcess>::iterator ite = workers.begin(); ite != workers.end(); ++ite){  
  744.                     WorkerProcess& worker = *ite;  
  745.                     if(worker.channel == active_fd){  
  746.                         on_channel_message(serverfd, worker, workers);  
  747.                         break;  
  748.                     }  
  749.                 }  
  750.             }  
  751.         }  
  752.     }  
  753.   
  754.     return 0;  
  755. }  

[cpp]  view plain copy
  1. // mpclient2 —— multiple processes client  
  2. #include <stdio.h>  
  3. #include <stdlib.h>  
  4. #include <iostream>  
  5. using namespace std;  
  6.   
  7. #include <unistd.h>  
  8. #include <signal.h>  
  9. #include <sys/types.h>  
  10. #include <sys/socket.h>  
  11. #include <netinet/in.h>  
  12. #include <arpa/inet.h>  
  13. #include <string.h>  
  14.   
  15. #define err_exit(msg) cout << "[error] " << msg << endl; exit(1)  
  16.   
  17. struct UserOptions{  
  18.     char* server_ip;  
  19.     int port;  
  20.     int num_clients;  
  21.     int sleep_ms;  
  22. };  
  23.   
  24. void discovery_user_options(int argc, char** argv, UserOptions& options){  
  25.     if(argc <= 4){  
  26.         cout << "Usage: " << argv[0] << " <server_ip> <port> <num_clients> <sleep_ms>" << endl  
  27.             << "server_ip: the ip address of server" << endl  
  28.             << "port: the port to connect at" << endl  
  29.             << "num_clients: the num to start client." << endl  
  30.             << "sleep_ms: the time to sleep in ms." << endl  
  31.             << "for example: " << argv[0] << " 127.0.0.1 1990 1000 500" << endl;  
  32.         exit(1);  
  33.     }  
  34.       
  35.     options.server_ip = argv[1];  
  36.     options.port = atoi(argv[2]);  
  37.     options.num_clients = atoi(argv[3]);  
  38.     options.sleep_ms = atoi(argv[4]);  
  39.     if(options.num_clients <= 0){  
  40.         err_exit("num_clients must > 0.");  
  41.     }  
  42. }  
  43.   
  44. int connect_server_socket(UserOptions& options){  
  45.     int clientfd = socket(AF_INET, SOCK_STREAM, 0);  
  46.       
  47.     if(clientfd == -1){  
  48.         err_exit("init socket error!");  
  49.     }  
  50.     cout << "init socket success!" << endl;  
  51.       
  52.     sockaddr_in addr;  
  53.     addr.sin_family = AF_INET;  
  54.     addr.sin_port = htons(options.port);  
  55.     addr.sin_addr.s_addr = inet_addr(options.server_ip);  
  56.     if(connect(clientfd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1){  
  57.         err_exit("connect socket error!");  
  58.     }  
  59.     cout << "connect socket success!" << endl;  
  60.       
  61.     return clientfd;  
  62. }  
  63.   
  64. int main(int argc, char** argv){  
  65.     srand(0);  
  66.     UserOptions options;  
  67.     discovery_user_options(argc, argv, options);  
  68.     int* fds = new int[options.num_clients];  
  69.     for(int i = 0; i < options.num_clients; i++){  
  70.         fds[i] = connect_server_socket(options);  
  71.         usleep(10 * 1000); // to prevent the SIGIO error: maybe signal queue overflow!  
  72.     }  
  73.       
  74.     for(int i = 0; i < options.num_clients; i++){  
  75.         int fd = fds[i];  
  76.         if(true){  
  77.             char c = 'C'// control message  
  78.             if(send(fd, &c, sizeof(char), 0) <= 0){  
  79.                 close(fd);  
  80.                 err_exit("send message failed!");  
  81.             }  
  82.             int required_id = rand();  
  83.             if(send(fd, &required_id, sizeof(int), 0) <= 0){  
  84.                 close(fd);  
  85.                 err_exit("send message failed!");  
  86.             }  
  87.         }  
  88.     }  
  89.       
  90.     bool do_loop = true;  
  91.     while(do_loop){  
  92.         int ret = 0;  
  93.         for(int i = 0; i < options.num_clients; i++){  
  94.             int fd = fds[i];  
  95.             if(true){  
  96.                 char c = 'M'// data message  
  97.                 ret = send(fd, &c, sizeof(char), 0);  
  98.                 if(ret <= 0){  
  99.                     cout << "send control message to server error" << endl;  
  100.                     close(fd);  
  101.                     do_loop = false;  
  102.                     break;  
  103.                 }  
  104.                 char msg[] = "client, ping message!";  
  105.                 ret = send(fd, msg, sizeof(msg), 0);  
  106.                 if(ret <= 0){  
  107.                     cout << "send control value to server error" << endl;  
  108.                     close(fd);  
  109.                     do_loop = false;  
  110.                     break;  
  111.                 }  
  112.                 memset(&msg, 0, sizeof(msg));  
  113.             }  
  114.             if(true){  
  115.                 char buf[1024];  
  116.                 memset(buf, 0, sizeof(buf));  
  117.                 ret = recv(fd, buf, sizeof(buf), 0);  
  118.                 if(ret <= 0){  
  119.                     cout << "recv from server error" << endl;  
  120.                     close(fd);  
  121.                     do_loop = false;  
  122.                     break;  
  123.                 }  
  124.                 cout << "recv from server: " << buf << endl;  
  125.             }  
  126.               
  127.             usleep(options.sleep_ms * 1000);  
  128.         }  
  129.     }  
  130.       
  131.     delete[] fds;  
  132.   
  133.     return 0;  
  134. }  

Logo

更多推荐