第一章我们简单试用了ZMQ的若干通信模式:请求-应答模式、发布-订阅模式、管道模式。这一章我们将学习更多在实际开发中会使用到的东西。

套接字API

ZMQ提供了一套类似于BSD套接字的API,但将很多消息处理机制的细节隐藏了起来,你会逐渐适应这种变化,并乐于用它进行编程。

​套接字事实上是用于网络编程的标准接口,ZMQ之所那么吸引人眼球,原因之一就是它是建立在标准套接字API之上。因此,ZMQ的套接字操作非常容易理解,其生命周期主要包含四个部分:

  • 创建和销毁套接字:zmq_socket(), zmq_close()
  • 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
  • 为套接字建立连接:zmq_bind(), zmq_connect()
  • 发送和接收消息:zmq_send(), zmq_recv()


如以下C代码:

void *mousetrap;

//  创建套接字
mousetrap = zmq_socket (context, ZMQ_PULL);

// 配置套接字选项,设置高水位阈值
int64_t jawsize = 10000;
zmq_setsockopt (mousetrap, ZMQ_HWM, &jawsize, sizeof jawsize);

// 连接端口
zmq_connect (mousetrap, "tcp://192.168.55.221:5001");

//  等待接收数据
zmq_msg_t mouse;
zmq_msg_init (&mouse);
zmq_recv (mousetrap, &mouse, 0);
//  销毁消息
zmq_msg_close (&mouse);

//  Destroy the socket
zmq_close (mousetrap);


请注意,套接字永远是空指针类型的,而消息则是一个数据结构。在ZMQ中所有的套接字都是由ZMQ管理的,只有消息是由程序员管理的。创建、销毁、以及配置套接字的工作和处理一个对象差不多,但请记住ZMQ是异步的,伸缩性很强。

​使用套接字构建拓扑结

在连接两个节点时,其中一个需要使用zmq_bind(),另一个则使用zmq_connect()。通常来讲,使用zmq_bind()连接的节点称之为服务端,它有着一个较为固定的网络地址;使用zmq_connect()连接的节点称为客户端,其地址不固定。端点指的是某个广为周知网络地址。

​ZMQ连接和传统的TCP连接是有区别的,主要有:

​1、使用多种协议,inproc(进程内)、ipc(进程间)、tcp、pgm(广播)。

2、​当客户端使用zmq_connect()时连接就已经建立了,并不要求该端点已使zmq_bind()进行了绑定;

3、​连接是异步的,并由一组消息队列做缓冲;

4、​连接会表现出某种消息模式,这是由创建连接的套接字类型决定的;

5、​一个套接字可以有多个输入和输出连接;

​6、ZMQ套接字绑定至端点时它就自动开始接受连接了;

​7、这些连接是被封装在ZMQ底层的,由ZMQ进行管理,程序员只需要关注消息处理

​在很多架构中都使用了类似于C/S的架构。服务端组件式比较稳定的,而客户端组件则较为动态,来去自如。所以说,服务端地址对客户端而言往往是可见的,而客户端地址对服务端却不是可见的。这样一来,架构中应该将哪些组件作为服务端(使用zmq_bind()),哪些作为客户端(使用zmq_connect()),就很明显了。

如果先打开了客户端,后打开服务端,会发生什么?传统网络连接中,我们打开客户端时一定会收到系统的报错信息。但ZMQ让我们能够自由地启动架构中的组件(客户端和服务端)。当客户端使用zmq_connect()连接至某个端点时,它就已经能够使用该套接字发送消息了。如果这时,服务端启动起来了,并使用zmq_bind()绑定至该端点,ZMQ将自动开始转发消息。

​服务端节点可以仅使用一个套接字就能绑定至多个端点:

zmq_bind (socket,"tcp://*:5555");

zmq_bind (socket,"tcp://*:9999");

zmq_bind (socket,"ipc://myserver.ipc");

当然,你不能多次绑定至同一端点,这样是会报错的。每当有客户端节点使用zmq_connect()连接至上述某个端点时,服务端就会自动创建连接。ZMQ没有对连接数量进行限制。此外,客户端节点也可以使用一个套接字同时建立多个连接。

​当我们在设计架构时,应该遵循“服务端是稳定的,客户端是灵活的“原则。

​套接字是有类型的,套接字类型定义了套接字的行为。你可以将不同种类的套接字进行连接,如PUB-SUB组合,这种组合称之为发布-订阅模式,其他组合也会有相应的模式名称,我们会在下文详述。正是因为套接字可以使用不同的方式进行连接,才构成了ZMQ最基本的消息队列系统。我们还可以在此基础之上建立更为复杂的装置、路由机制等,下文会详述。总的来说,ZMQ为你提供了一套组件,供你在网络架构中拼装和使用。

使用套接字传递数据

发送和接收消息使用的是zmq_send()和zmq_recv()这两个函数。

1

让我们看一看TCP套接字和ZMQ套接字之间在传输数据方面的区别:

1、ZMQ套接字传输的是消息,而不是字节(TCP)或帧(UDP)。消息指的是一段指定长度的二进制数据块,我们下文会讲到消息,这种设计是为了性能优化而考虑的。

2、ZMQ套接字无论是接收还是发送消息,它都会先传送到一个本地的缓冲队列,这个内存队列的大小是可以配置的。

3、ZMQ套接字可以和多个套接字进行连接(如果套接字类型允许的话)。TCP协议只能进行点对点的连接,而ZMQ则可以进行一对多(类似于无线广播)、多对多(类似于邮局)、多对一(类似于信箱),当然也包括一对一的情况。

2

MQ套接字可以发送消息给多个端点(扇出模型),或从多个端点中接收消息(扇入模型)

所以,套接字可能会将消息发送给很多节点,相应的,套接字又会从所有已建立的连接中接收消息。zmq_recv()方法使用了公平队列的算法来决定接收哪个连接的消息。

调用zmq_send()方法时其实并没有真正将消息发送给套接字连接。消息会在一个内存队列中保存下来,并由后台的I/O线程异步地进行发送。如果不出意外情况,这一行为是非阻塞的。所以说,即便zmq_send()有返回值,并不能代表消息已经发送。当你在用zmq_msg_init_data()初始化消息后,你不能重用或是释放这条消息,否则ZMQ的I/O线程会认为它在传输垃圾数据。这对初学者来讲是一个常犯的错误。

单播传输

ZMQ提供了一组单播传输协议(inporc, ipc, tcp),和两个广播协议(epgm,pgm)。一般而言我们会使用 tcp作为传输协议,这种TCP连接是可以脱机运作的。因为ZMQ中的TCP连接不需要该端点已经有某个服务进行了绑定,客户端和服务端可以随时进行连接和绑定。

进程间协议,即 ipc,和tcp的行为差不多,但不需要指定IP地址或者域名。这种协议很多时候会很方便,ZMQ中的ipc协议同样可以是脱机的,但有一个缺点——无法在Windows操作系统上运作。我们一般会在端点名称的末尾附上.ipc的扩展名,在UNIX系统上,使用ipc协议还需要注意权限问题。你还需要保证所有的程序都能够找到这个ipc端点。

进程内协议,即 inproc,可以在同一个进程的不同线程之间进行消息传输,它比ipc或tcp要快得多。这种协议有一个要求,必须先绑定到端点,才能建立连接。通常的做法是先启动服务端线程,绑定至端点,后启动客户端线程,连接至端点。

I/O线程
我们提过ZMQ是通过后台的I/O线程进行消息传输的。一个I/O线程已经足以处理多个套接字的数据传输要求,当然,那些极端的应用程序除外。这也就是我们在创建上下文时传入的1所代表的意思:
void *context = zmq_init (1);
ZMQ应用程序和传统应用程序的区别之一就是你不需要为每个套接字都创建一个连接。单个ZMQ套接字可以处理所有的发送和接收任务。如,当你需要向一千个订阅者发布消息时,使用一个套接字就可以了;当你需要向二十个服务进程分发任务时,使用一个套接字就可以了;当你需要从一千个网页应用程序中获取数据时,也是使用一个套接字就可以了。传统应用程序每个进程或线程会有一个远程连接,它又只能处理一个套接字。

核心消息模式

    ZMQ的核心消息模式有:

  • 请求-应答模式

  • 发布-订阅模式

  • 管道模式

  • 排他对接模式 将两个套接字一对一地连接起来,这种模式应用场景很少。

    以下是合法的套接字连接-绑定对(一端绑定、一端连接即可):

  • PUB - SUB
  • REQ - REP
  • REUTQ - ROER
  • DEALER - REP
  • DEALER - ROUTER
  • DEALER - DEALER
  • ROUTER - ROUTER
  • PUSH - PULL
  • PAIR - PAIR
  • 其他的组合模式会产生不可预知的结果,在将来的ZMQ版本中可能会直接返回错误。
上层消息模式
上文中的四种核心消息模式是内建在ZMQ中的,他们是API的一部分,在这些消息模式之上,我们会建立更为 上层的消息模式。这种模式可以用任何语言编写,他们不属于核心类型的一部分,不随ZMQ发行,只在你自己的应用程序中出现。


消息的使用方法

ZMQ的传输单位是消息,即一个二进制块。在内存中,ZMQ消息由zmq_msg_t结构表示(每种语言有特定的表示)。在C语言中使用ZMQ消息时需要注意以下几点:
  • 你需要创建和传递zmq_msg_t对象,而不是一组数据块;
  • 读取消息时,先用zmq_msg_init()初始化一个空消息,再将其传递给zmq_recv()函数;
  • 写入消息时,先用zmq_msg_init_size()来创建消息(同时也已初始化了一块内存区域),然后用memcpy()函数将信息拷贝到该对象中,最后传给zmq_send()函数;
  • 释放消息(并不是销毁)时,使用zmq_msg_close()函数,它会将对消息对象的引用删除,最终由ZMQ将消息销毁;
  • 获取消息内容时需使用zmq_msg_data()函数;若想知道消息的长度,可以使用zmq_msg_size()函数;
  • 至于zmq_msg_move()、zmq_msg_copy()、zmq_msg_init_data()函数,在充分理解手册中的说明之前,建议不好贸然使用。

    以下是一段处理消息的典型代码,如果之前的代码你有看的话,那应该会感到熟悉。这段代码其实是从zhelpers.h文件中抽出的:

    //  从套接字中获取ZMQ字符串,并转换为C语言字符串 
static char *
s_recv (void *socket) {

    zmq_msg_t message;
 zmq_msg_init (&message);
    
zmq_recv (socket, &message, 0);

    int size = zmq_msg_size (&message);

    char *string = malloc (size + 1);
    
memcpy (string, zmq_msg_data (&message), size);

zmq_msg_close (&message);
    
string [size] = 0;
    
return (string);
 }

//  将C语言字符串转换为ZMQ字符串,并发送给套接字

static int
s_send (void *socket, char *string) {
    
int rc;
    
zmq_msg_t message;
 zmq_msg_init_size (&message, strlen (string));
 memcpy (zmq_msg_data (&message), string, strlen (string));
 rc = zmq_send (socket, &message, 0);
 assert (!rc);
    
zmq_msg_close (&message);
    
return (rc);

}
需要注意的是,当你将一个消息对象传递给zmq_send()函数后,该对象的长度就会被清零,因此你无法发送同一个消息对象两次,也无法获得已发送消息的内容。
如果你想发送同一个消息对象两次,就需要在发送第一次前新建一个对象,使用zmq_msg_copy()函数进行拷贝。这个函数不会拷贝消息内容,只是拷贝引用。然后你就可以再次发送这个消息了。ZMQ支持多帧消息,即在一条消息中保存多个消息帧。

关于消息,还有一些需要注意的地方:

  • ZMQ的消息是作为一个整体来收发的,你不会只收到消息的一部分;
  • ZMQ不会立即发送消息,而是有一定的延迟;
  • 你可以发送0字节长度的消息,作为一种信号;
  • 消息必须能够在内存中保存,如果你想发送文件或超长的消息,就需要将他们切割成小块,在独立的消息中进行发送;
  • 必须使用zmq_msg_close()函数来关闭消息。
  • 再重复一句,不要贸然使用zmq_msg_init_data()函数。

处理多个套接字

在之前的示例中,主程序的循环体内会做以下几件事:

  1. 等待套接字的消息;
  2. 处理消息;
  3. 返回第一步。
如果我们想要读取多个套接字中的消息呢?最简单的方法是将套接字连接到多个端点上,让ZMQ使用公平队列的机制来接受消息。如果不同端点上的套接字类型是一致的,那可以使用这种方法。但是,如果一个套接字的类型是PULL,另一个是PUB怎么办?如果现在开始混用套接字类型,那将来就没有可靠性可言了。
用NOBLOCK(非阻塞)的方式来实现从多个套接字读取消息的功能。下面将气象信息服务和并行处理这两个示例结合起来:

msreader: Multiple socket reader in

//

//  从多个套接字中获取消息

//  本示例简单地再循环中使用recv函数

//
#include "zhelpers.h"


int main (void) 
{

//  准备上下文和套接字
    
void *context = zmq_init (1);

 //  连接至任务分发器

    void *receiver = zmq_socket (context, ZMQ_PULL);
    
zmq_connect (receiver, "tcp://localhost:5557");

    

//  连接至天气服务
 void *subscriber = zmq_socket (context, ZMQ_SUB);
 zmq_connect (subscriber, "tcp://localhost:5556");
    
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);


    while (1) {
        
//  处理等待中的任务

        int rc;
        
for (rc = 0; !rc; ) {

               zmq_msg_t task;
            
    zmq_msg_init (&task);
    if ((rc = zmq_recv (receiver, &task, ZMQ_NOBLOCK)) == 0) {
                
   //  处理任务

}
 zmq_msg_close (&task);
 }

        //  处理等待中的气象更新
 for (rc = 0; !rc; ) {
            
zmq_msg_t update;
 zmq_msg_init (&update);
 if ((rc = zmq_recv (subscriber, &update, ZMQ_NOBLOCK)) == 0) {

                //  处理气象更新
 }
            
zmq_msg_close (&update);
 }
        
// 没有消息,等待1毫秒
        s_sleep (1);
    
}

下面,让我们看看如何用zmq_poll()来实现同样的功能:

mspoller: Multiple socket poller in C

//
//  从多个套接字中接收消息
//  本例使用zmq_poll()函数
//
#include "zhelpers.h"

int main (void) 
{
    void *context = zmq_init (1);

    //  连接任务分发器
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "tcp://localhost:5557");

    //  连接气象更新服务
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    zmq_connect (subscriber, "tcp://localhost:5556");
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "10001 ", 6);

    //  初始化轮询对象
    zmq_pollitem_t items [] = {
        { receiver, 0, ZMQ_POLLIN, 0 },
        { subscriber, 0, ZMQ_POLLIN, 0 }
    };
    //  处理来自两个套接字的消息
    while (1) {
        zmq_msg_t message;
        zmq_poll (items, 2, -1);
        if (items [0].revents & ZMQ_POLLIN) {
            zmq_msg_init (&message);
            zmq_recv (receiver, &message, 0);
            //  处理任务
            zmq_msg_close (&message);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            zmq_msg_init (&message);
            zmq_recv (subscriber, &message, 0);
            //  处理气象更新
            zmq_msg_close (&message);
        }
    }
    //  程序不会运行到这儿
    zmq_close (receiver);
    zmq_close (subscriber);
    zmq_term (context);
    return 0;
}

处理错误和ETERM信号

ZMQ的错误处理机制提倡的是快速崩溃。在C语言中,断言失败会让程序立即中止。其他语言中可以使用异常来做到。到目前为止,我们很少看到C语言的示例中有对错误进行处理。
现实中的代码应该对每一次的ZMQ函数调用作错误处理
。如果你不是使用C语言进行编程,可能那种语言的ZMQ类库已经做了错误处理。但在C语言中,你需要自己动手。
下是一些常规的错误处理手段:
  • 创建对象的方法如果失败了会返回NULL;
  • 其他方法执行成功时会返回0,失败时会返回其他值(一般是-1);
  • 错误代码可以从变量errno中获得,或者调用zmq_errno()函数;
  • 错误消息可以调用zmq_strerror()函数获得。


 
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐