背景:

Orthanc是本专栏中介绍过的一款新型DICOM服务器,具有轻量级、支持REST的特性,可将任意运行Windows和Linux系统的计算机变成DICOM服务器,即miniPACS。Orthanc内嵌多种模块,数据库管理简单,且不依赖于第三方软件。因此通过剖析Orthanc源码可以学习到搭建DICOM系统中的各个环节,例如SQLite嵌入型数据库、GoogleLog日志库、DCMTK医学DICOM库,以及近期要介绍的开源Web Server,Mongoose。

上一篇博文中简单的分析了Mongoose中连接请求触发的事件序列,调试输出结果大致上符合Fossa官网给出的*NS_ACCEPT->(NS_RECV->NS_SEND->NS_POLL…)->NS_CLOSE 流程,但是实际运行时刻由于网络环境实时变化,因此输出的调试日志中偶尔会出现多次NS_ACCEPT或者多次NS_POLL等等。博文末尾提到要想了解事件触发的真正原因,需要分析ns_poll_server函数源码,接下来通过分析Mongoose和Fossa的设计来对事件触发有一个更全面的了解。*

Mongoose事件

官方说明文档中对于Mongoose的描述是:

Mongosoe has single-threaded, event-driven, asynchronous, non-blocking core. 这个内核就是Fossa。这其中单线程(single-threaded)指的就是主线程中的mg_poll_server循环。

mg_poll_server函数内部遍历所有的有效连接,通过 select 异步方式监控各连接套接字来完成一次IO迭代操作。如此反复直至处理完毕。每当select返回,针对状态发生变化的套接字(有数据要发送或接收)进行IO操作。但是mg_poll_server本身并不完成循环遍历,需要外部循环调用mg_poll_server来实现实时监控连接状态。

查看代码的话发现mg_poll_server内部就是简单的调用了ns_mgr_poll函数,那么我们看一下Fossa对该函数的描述:

Fossa是一个支持多种协议的网络库,实现了非阻塞、异步IO处理,提供基于事件的API。Fossa的使用方式是,先声明并初始化事件处理程序,创建连接;最后通过循环调用ns_mgr_poll函数实现事件监控。ns_mgr_poll迭代遍历所有套接字,接收新连接、发送和接收数据、关闭连接,并根据 具体事件 调用相应的 事件处理函数

Fossa中要求每个连接需要绑定事件处理函数,即event handler function,由用户自定义实现。事件处理是Fossa应用的核心元素——设定了程序的功能。Mongoose就是对Fossa的一次封装,规定了各种事件的默认处理程序,因此直接复制粘贴Mongoose官方文档中的示例代码就可以开启一个简单的Web Server,具体代码如下:

    #include "mongoose.h"
    int main(void) {

      struct mg_server *server = mg_create_server(NULL, NULL);
      mg_set_option(server, "document_root", ".");  // Serve current directory
      mg_set_option(server, "listening_port", "8080");  // Open port 8080

      for (;;) {
        mg_poll_server(server, 1000);   // Infinite loop, Ctrl-C to stop
      }
      mg_destroy_server(&server);

      return 0;
    }

上述代码中并未像Fossa官网所述,给出用户自定义的事件处理函数却能顺利开启Web Server( 详情可参考博文DICOM:剖析Orthanc中的Web Server, Mongoose ),这恰恰说明了Mongoose在对Fossa进行封装时给出了默认的事件处理函数,即mg_ev_handler,在该函数内部规定了Fossa中各种事件的处理流程,由于代码过长此处就不贴出来了,详情可参考Link:mg_ ev_handler。所以在使用Mongoose时主要关注的是自定义事件,另外Mongoose对Fossa的事件进行了简单的再封装,以MG_开头来标记事件,诸如MG_AUTH、MG_REQUEST、MG_CONNECT、MG_REPLY。

Fossa标志

由上面介绍了解到Mongoose重点是对Fossa的自定义事件进行二次封装,其主要贡献是设计了Fossa事件的默认处理流程函数,即上面提到的mg_ev_handler。因此要想解决上一篇博文中的疑问事件真正触发的原因是什么? 。通过分析mg_ev_handler源码只能是了解了Fossa事件的触发机制,并未真正了解原因。因此要想解决疑惑,需要分析Fossa的处理核心,即ns_mgr_poll 。在Fossa中对于每一个连接都包含相应的flag bit field,即标志位状态位特征位。而flag bit目的就是用于区别连接(这里的连接是名词,代表所有与Fossa相关的请求。在Fossa中将连接分为三类,即Inbound、Outbound和Listening)整个生命周期所处的不同阶段,对每个阶段用一个flag来表示。 ns_mgr_poll正是根据flag bit来分情况处理各种连接,比如添加新连接、开始读取数据、开始发送数据、关闭连接等等;也正是由于flag bit将连接的各个阶段区分开来,才使得能够将不同阶段的处理进行模式化,也就是事件化

所以要想搞清楚之前博文中事件的触发流程,根本是需要了解Fossa和Mongoose是如何用flag bit来表示连接的各个阶段的。Fossa官方文档指出每个连接都有标志位域。Fossa针对不同协议定义了多种标志,其中一些标志由Fossa来设置,部分标志需要由外部用户自定义的事件处理函数来设置(以此来完成用户与Fossa的交互)。下面列举主要的标志:

  • NSF_FINISHED_SENDING_DATA
  • NSF_BUFFER_BUT_DONT_SEND
  • NSF_CLOSE_IMMEDIATELY
  • NSF_USER1/NSF_USER2/NSF_USER3/NSF_USER4

以上标志位都是由外部用户自定义的事件处理函数来设置的,下面看一下Fossa内部设置的标志位,

  • NSF_SSL_HANDSHAKE_DOWN
  • NSF_CONNECTING
  • NSF_LISTENING
  • NSF_WEBSOCKET_NO_DEFRAG
  • NSF_IS_WEBSOCKET

根据标志位名称大致猜测出标志位是跟连接建立过程 连接具体状态 相关,由此可知标志位域(flag bit field) 关乎http web server的处理流程,是Fossa和Mongoose开源库内部的核心逻辑,所以需要Fossa内部自己实现——开源库中往往都会将协议规定的流程化部分自己实现,只将可定制化部分交由用户自定义。

找准了问题入手的方向,下面就以Mongoose官方文档为例进行实例测试:

实例测试

为了方便查看,再一次将官方安装说明中的代码贴在此处,如下:

        
    #include "mongoose.h"
    int main(void) {

      struct mg_server *server = mg_create_server(NULL, NULL);
      mg_set_option(server, "document_root", ".");  // Serve current directory
      mg_set_option(server, "listening_port", "8080");  // Open port 8080

      for (;;) {
        mg_poll_server(server, 1000);   // Infinite loop, Ctrl-C to stop
      }
      mg_destroy_server(&server);

      return 0;
    }

另外为了跟踪Fossa的ns_mgr_poll函数中各阶段标志位的情况,对mongoose.c中的ns_mgr_poll代码修改,添加相应的调试输出信息。具体修改如下:

      time_t ns_mgr_poll(struct ns_mgr *mgr, int milli) {
      int loop=0;

      struct ns_connection *conn, *tmp_conn;
      struct timeval tv;
      fd_set read_set, write_set;
      sock_t max_fd = INVALID_SOCKET;
      time_t current_time = time(NULL);

      FD_ZERO(&read_set);
      FD_ZERO(&write_set);
      ns_add_to_set(mgr->ctl[1], &read_set, &max_fd);

      for (conn = mgr->active_connections; conn != NULL; conn = tmp_conn) {
          printf("The for loop in adding sock or conn section is %d times\n",loop++);//Just for debugging
    tmp_conn = conn->next;
    if (!(conn->flags & (NSF_LISTENING | NSF_CONNECTING))) {
            printf("For the Flag --%d-- ,For the sock --%d--,Call user ev_handler for NS_POLL\n",conn->flags,(conn->sock!=NULL?conn->sock:-1));//Just for debugging
      ns_call(conn, NS_POLL, ¤t_time);
    }
    if (!(conn->flags & NSF_WANT_WRITE)) {
      //DBG(("%p read_set", conn));
            printf("For the Flag --%d--,For the sock --%d--, call ns_add_to_set function!\n",conn->flags,(conn->sock!=NULL?conn->sock:-1));//Just for debugging
      ns_add_to_set(conn->sock, &read_set, &max_fd);
    }
    if (((conn->flags & NSF_CONNECTING) && !(conn->flags & NSF_WANT_READ)) ||
    (conn->send_iobuf.len > 0 && !(conn->flags & NSF_CONNECTING) &&
     !(conn->flags & NSF_BUFFER_BUT_DONT_SEND))) {
      //DBG(("%p write_set", conn));
                 printf("For the Flag --%d--2--,For the sock --%d-- call ns_add_to_set function!\n",conn->flags,(conn->sock!=NULL?conn->sock:-1));//Just for debugging
      ns_add_to_set(conn->sock, &write_set, &max_fd);
    }
    if (conn->flags & NSF_CLOSE_IMMEDIATELY) {
            printf("For the Flag --%d--, For the sock --%d-- call ns_close_conn function!\n",conn->flags,(conn->sock!=NULL?conn->sock:-1));//Just for debugging
      ns_close_conn(conn);
    }
      }

      tv.tv_sec = milli / 1000;
      tv.tv_usec = (milli % 1000) * 1000;
      loop=0;
      if (select((int) max_fd + 1, &read_set, &write_set, NULL, &tv) > 0) {
    // select() might have been waiting for a long time, reset current_time
    // now to prevent last_io_time being set to the past.
    current_time = time(NULL);

    // Read wakeup messages
    if (mgr->ctl[1] != INVALID_SOCKET &&
    FD_ISSET(mgr->ctl[1], &read_set)) {
      struct ctl_msg ctl_msg;
      int len = (int) recv(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg), 0);
      send(mgr->ctl[1], ctl_msg.message, 1, 0);
      if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) {
    struct ns_connection *c;
    for (c = ns_next(mgr, NULL); c != NULL; c = ns_next(mgr, c)) {
      ctl_msg.callback(c, NS_POLL, ctl_msg.message);
    }
      }
    };
    for (conn = mgr->active_connections; conn != NULL; conn = tmp_conn) {
        printf("The for loop in select section is %d times\n",loop++);
      tmp_conn = conn->next;
      if (FD_ISSET(conn->sock, &read_set)) {
    if (conn->flags & NSF_LISTENING) {
                printf("For the Flag --%d--, For the sock --%d--, NSF_LISTENING!\n",conn->flags,(conn->sock!=NULL?conn->sock:-1));//Just for debugging
      if (conn->flags & NSF_UDP) {
                  printf("For the Flag --%d--,For the sock --%d--, call ns_handler_udp function!\n",conn->flags,(conn->sock!=NULL?conn->sock:-1));//Just for debugging
    ns_handle_udp(conn);
      } else {
    // We're not looping here, and accepting just one connection at
    // a time. The reason is that eCos does not respect non-blocking
    // flag on a listening socket and hangs in a loop.
                  printf("For the Flag --%d--,For the sock --%d-- call accept_conn function!\n",conn->flags,(conn->sock!=NULL?conn->sock:-1));//Just for debugging
    accept_conn(conn);
      }
    } else {
      conn->last_io_time = current_time;
              printf("For the Flag --%d--,For the sock --%d-- call ns_read_from_socket function!\n",conn->flags,(conn->sock!=NULL?conn->sock:-1));//Just for debugging
      ns_read_from_socket(conn);
    }
      }

      if (FD_ISSET(conn->sock, &write_set)) {
    if (conn->flags & NSF_CONNECTING) {
                printf("For the Flag --%d--,For the sock --%d-- call ns_read_from_socket function!\n",conn->flags,(conn->sock!=NULL?conn->sock:-1));//Just for debugging
      ns_read_from_socket(conn);
    } else if (!(conn->flags & NSF_BUFFER_BUT_DONT_SEND)) {
      conn->last_io_time = current_time;
              printf("For the Flag --%d--,For the sock --%d-- call ns_write_to_socket function!\n",conn->flags,(conn->sock!=NULL?conn->sock:-1));//Just for debugging
      ns_write_to_socket(conn);
    }
      }
    }
      }
      loop=0;
      for (conn = mgr->active_connections; conn != NULL; conn = tmp_conn) {
          printf("The for loop in the close section is %d times\n",loop++);
    tmp_conn = conn->next;
    if ((conn->flags & NSF_CLOSE_IMMEDIATELY) ||
    (conn->send_iobuf.len == 0 &&
      (conn->flags & NSF_FINISHED_SENDING_DATA))) {
                  printf("For the Flag --%d--2--,For the sock --%d-- call ns_close_conn function!\n",conn->flags,(conn->sock!=NULL?conn->sock:-1));//Just for debugging
      ns_close_conn(conn);
    }
      }

      return current_time;
    }

【注】:代码中的printf输出语句就是为了方便调试添加的,待测试完毕请自行删除,以免影响Mongoose服务器性能。

测试结果

ns_mgr_poll函数内部结构可知,函数内部主要分成三大功能模块:
1. 链接配置阶段(即添加新连接到服务端链表,及设置对各个连接的可读或可写性检查)
2. 链接监控阶段(利用select异步模型监控各连接读写状态)
3. 链接清理阶段(根据连接实际状态查看是否需要关闭)

在调试信息中的结果与上述三大类一一对应,为了方便查看调试结果对不同的调试信息使用了不同的背景色。如下图所示:
这里写图片描述

该图表明Mongoose Web Server初始化完成后的状态,此刻mg_server的连接链表中只包含初始化时的监听端口,即listening connection

接下来在浏览器中输入http://localhost:8080,回车后调试日志结果如下图:
这里写图片描述

从图中我们可以看出原本的listening connection在select时检测到有新的链接接入,即Inbound connection。在链接配置阶段将新接受的Inbound添加到服务端的连接链表中,且插入位置为表头,由下一阶段链接监控阶段的for循环输出日志可以确定插入位置是表头。
这里写图片描述
对于数据的接收和发送,Fossa内部使用了缓冲机制,其缓冲结构如下图所示:
Fossa缓冲结构

待数据接收和发送处理完成后,起初Mongoose服务的连接链表中还保存三个连接,随着时间的推移,除了listening connection以外的两个连接逐个关闭,Mongoose Web Server又恢复到初始化状态。
这里写图片描述

从上面的调试日志中可以看出在ns_mgr_poll函数内部的三大模块中主要是根据连接链表中各连接的flag bit来进行分类处理,实现端口监控连接接入接收和发送数据连接关闭等功能。这也正是我们上篇博文中希望深入研究的部分,此次博文中只是分析调试了Mongoose官网的测试实例,至于原理性的东西可能要牵扯到HTTP协议和具体的实现时序图等内容,具体细节会在后续文章中给出,敬请期待。

第一次使用MarkDown写CSDN博文,不知道效果如何,^_^。用浏览器查看了一下原来CSDN博文的源码,看来原本的代码格式在常见的MarkDown编辑器不存在,只能手动拷贝CSDN博文的自由格式,如下图:






作者:zssure@163.com

时间:2015-02-10

Logo

更多推荐