ZeroMQ 常用 API

目录

ZeroMQ 常用 API

zmq_msg_init

zmq_msg_data

zmq_msg_close

zmq_msg_init_size

zmq_msg_send

zmq_msg_recv

zmq_getsockopt

zmq_setsockopt

zmq_msg_init

int zmq_msg_init (zmq_msg_t *msg);
  • 示例:
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
int nbytes = zmq_msg_recv (socket, &msg, 0); assert (nbytes != -1);

zmq_msg_data

void *zmq_msg_data (zmq_msg_t *msg);
  • 成功完成后,zmq_msg_data()将返回一个指向消息内容的指针。
    • zmq_msg_data()函数将返回一个指向msg引用的消息对象的消息内容的指针。
    • 永远不要直接访问zmq_msg_t成员,而是始终使用zmq_msg函数族。

zmq_msg_close

int zmq_msg_close (zmq_msg_t *msg);
  • zmq_msg_close()函数将通知ØMQ基础结构,与msg引用的消息对象关联的任何资源都不再需要,可以被释放。与消息对象关联的资源的实际释放应该由ØMQ延迟,直到消息或底层数据缓冲区的所有用户都表示不再需要它。
    • 应用程序应该确保在不再需要消息时调用zmq_msg_close(),否则可能发生内存泄漏。注意,在成功执行zmq_msg_send()之后,没有必要这样做。
    • 如果成功,zmq_msg_close()函数将返回零。否则它将返回-1,并将errno设置为预先定义的一个值。

zmq_msg_init_size

int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
  • 描述:
    • 初始化指定大小的ØMQ消息
    • zmq_msg_init_size()函数应该分配存储消息大小字节长的所需的任何资源,并初始化msg引用的消息对象以表示新分配的消息。
    • 该实现应选择是将消息内容存储在堆栈(小消息)上,还是存储在堆(大消息)上。出于性能原因,zmq_msg_init_size()不应该清除消息数据。
    • 函数zmq_msg_init()、zmq_msg_init_data()和zmq_msg_init_size()是互斥的。永远不要初始化同一个zmq_msg_t两次。
    • 如果成功,zmq_msg_init_size()函数将返回零。否则它将返回-1,并将errno设置为预先定义的一个值。

zmq_msg_send

int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags);
  • 示例(填充消息并将其发送到套接字):
/* Create a new message, allocating 6 bytes for message content */
zmq_msg_t msg;
int rc = zmq_msg_init_size (&msg, 6);
assert (rc == 0);
/* Fill in message content with 'AAAAAA' */
memset (zmq_msg_data (&msg), 'A', 6);
/* Send the message to the socket */
rc = zmq_msg_send (&msg, socket, 0); 
assert (rc == 6);
  • 示例(发送多部分消息):
/* Send a multi-part message consisting of three parts to socket */
rc = zmq_msg_send (&part1, socket, ZMQ_SNDMORE);
rc = zmq_msg_send (&part2, socket, ZMQ_SNDMORE);
/* Final part; no more parts to follow */ 
rc = zmq_msg_send (&part3, socket, 0);
  • zmq_msg_send()函数与zmq_sendmsg(3)函数相同,在未来的版本中将弃用。Zmq_msg_send()与其他消息操作函数更加一致。
  • zmq_msg_send()函数将把msg参数引用的消息排队发送到套接字参数引用的套接字。flags参数是下面定义的标志的组合:
ZMQ_DONTWAIT对于在没有可用对等体(或所有对等体都有完整的高水位标记)时阻塞的套接字类型(DEALER, PUSH),指定操作应该在非阻塞模式下执行。如果消息不能在套接字上排队,zmq_msg_send()函数将失败,并将errno设置为EAGAIN。
ZMQ_SNDMORE指定正在发送的消息是一个多部分消息,然后是后续的消息部分。有关详细描述,请参阅下面关于多部分消息的部分。
  • 传递给zmq_msg_send()的zmq_msg_t结构在调用期间被消零。如果你想发送相同的消息到多个套接字,你必须复制它(例如使用zmq_msg_copy())。
  • 成功调用zmq_msg_send()并不表示消息已传输到网络,只表示消息已在套接字上排队,并且ØMQ已承担消息的责任。在成功执行zmq_msg_send()之后,不需要调用zmq_msg_close()。
  • Multi-part messages:
    • ØMQ消息由1个或多个消息部分组成。每个消息部分都是一个独立的zmq_msg_t。ØMQ确保消息的原子传递:对等点要么接收消息的所有消息部分,要么根本接收不到。消息部件的总数是无限的,除了可用内存。
    • 发送多部分消息的应用程序在发送除最后一个消息外的每个消息部分时必须使用ZMQ_SNDMORE标志。
  • 返回值:
    • 如果成功,zmq_msg_send()函数将返回消息中的字节数(如果字节数高于MAX_INT,函数将返回MAX_INT)。否则它将返回-1,并将errno设置为下面定义的一个值。

zmq_msg_recv

int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);
  • 示例(从套接字接收消息):
/* Create an empty ØMQ message */
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
assert (rc == 0);
/* Block until a message is available to be received from socket */
rc = zmq_msg_recv (&msg, socket, 0);
assert (rc != -1);
/* Release message */ 
zmq_msg_close (&msg);
  • 示例(接收多部分消息):
int more;
size_t more_size = sizeof (more);
do {
 /* Create an empty ØMQ message to hold the message part */
 zmq_msg_t part;
 int rc = zmq_msg_init (&part);
 assert (rc == 0);
 /* Block until a message is available to be received from socket */
 rc = zmq_msg_recv (&part, socket, 0);
 assert (rc != -1);
 /* Determine if more message parts are to follow */
 rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
 assert (rc == 0);
 zmq_msg_close (&part); 
} 
while (more);
  • zmq_msg_recv()函数与zmq_recvmsg(3)函数相同,在未来的版本中将弃用。Zmq_msg_recv()与其他消息操作函数更加一致。
  • zmq_msg_recv()函数将从套接字参数引用的套接字接收消息部分,并将其存储在msg参数引用的消息中。之前储存在msg中的任何内容都应该被正确地释放。
  • 如果指定套接字上没有可用的消息部分,zmq_msg_recv()函数将阻塞,直到请求得到满足。flags参数是下面定义的标志的组合:
ZMQ_DONTWAIT指定操作应在非阻塞模式下执行。如果指定的套接字上没有可用的消息,zmq_msg_recv()函数将失败,errno设置为EAGAIN
  • Multi-part messages
    • ØMQ消息由1个或多个消息部分组成。每个消息部分都是一个独立的zmq_msg_t。ØMQ确保消息的原子传递:对等点要么接收消息的所有消息部分,要么根本接收不到。消息部件的总数是无限的,除了可用内存。
    • 处理多部分消息的应用程序必须在调用zmq_msg_recv()之后使用ZMQ_RCVMORE zmq_getsockopt(3)选项来确定是否有更多的部分需要接收。
  • 返回值:
    • 如果成功,zmq_msg_recv()函数将返回消息中的字节数。否则它将返回-1,并将errno设置为预先定义的一个值。

zmq_getsockopt

int zmq_getsockopt (void *socket, int option_name, \
                    void *option_value, size_t *option_len);
  • 示例(检索外发邮件的高水位标志):
/* Retrieve high water mark into sndhwm */
int sndhwm;
size_t sndhwm_size = sizeof (sndhwm);
rc = zmq_getsockopt (socket, ZMQ_SNDHWM, &sndhwm, &sndhwm_size); 
assert (rc == 0);

zmq_setsockopt

int zmq_setsockopt (void *socket, int option_name, \
                    const void *option_value, size_t option_len);
  • 注意 :
    • 所有选项,除了ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, ZMQ_LINGER, ZMQ_ROUTER_HANDOVER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_XPUB_VERBOSE, ZMQ_XPUB_VERBOSER, ZMQ_REQ_CORRELATE, ZMQ_REQ_RELAXED, ZMQ_SNDHWM and ZMQ_RCVHWM, 仅对后续的套接字绑定/连接生效。
    • 具体来说,安全选项对后续的绑定/连接调用生效,并且可以在任何时候更改以影响后续的绑定和/或连接。
  • 示例(订阅zmq_sub套接字上的消息):
/* Subscribe to all messages */
rc = zmq_setsockopt (socket, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
/* Subscribe to messages prefixed with "ANIMALS.CATS" */ 
rc = zmq_setsockopt (socket, ZMQ_SUBSCRIBE, "ANIMALS.CATS", 12);
  • 示例(设置i/o线程关系):
int64_t affinity;
/* Incoming connections on TCP port 5555 shall be handled by I/O thread 1 */
affinity = 1;
rc = zmq_setsockopt (socket, ZMQ_AFFINITY, &affinity, sizeof (affinity));
assert (rc);
rc = zmq_bind (socket, "tcp://lo:5555");
assert (rc);
/* Incoming connections on TCP port 5556 shall be handled by I/O thread 2 */
affinity = 2;
rc = zmq_setsockopt (socket, ZMQ_AFFINITY, &affinity, sizeof (affinity));
assert (rc);
rc = zmq_bind (socket, "tcp://lo:5556"); 
assert (rc);

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐