背景

AMQP-CPP 是c++连接RabbitMq的客户端库,它完全异步,需要c11的支持。

关于 AMQP-CPP 的基础使用,请参考 c++使用amqp-cpp库连接RabbitMq

源码下载请移步github: AMQP-CPP

AMQP-CPP采用分层的架构,网络层与AMQP协议层分离,而仓库中只提供了Linux下的TCP模块,所以在Windows上使用时,需要自行实现网络层IO进行数据收发。

本文主要介绍在Windows上实现过程及注意事项,并附实现网络层的代码(使用Boost Asio库)。

在Windows下编译

注意,Windows平台不支持编译动态库,只能编译静态库。

克隆代码到本地,使用cmake .编译(建议使用外部构建,这个目录会更清晰,关于cmake)。

我使用的是vs2015,当前目录生成sln工程,打开显示如下:

在这里插入图片描述

选择需要的目标平台及版本,我编译的是 Win32平台的Release版本。

设置属性。在预编译宏中添加 NOMINMAX

在这里插入图片描述

右键右侧的项目名称:amqpcpp,开始build。编译完成后,生成 amqpcpp.lib。

把 amqpcpp.lib 及 include 目录下的拷贝到自己的项目下,并添加相应属性,准备工作完成。

使用amqpcpp

添加头文件:#include "amqpcpp.h",设置好include目录,及链接库,就可以开始写自己的代码了。

由于没有TCP模块可用,就要自己实现网络IO功能。

  1. 创建自己的连接管理类

连接管理类负责所有底层IO操作,包括发送数据到RabbitMq,处理发生的错误等,都在连接管理类中处理。

代码如下:

// 处理TCP连接的类
class MyConnectionHandler : public AMQP::ConnectionHandler
{
private:
	boost::shared_ptr<TcpClient> m_pTcpClient; // 处理底层TCP操作的类,进行具体的数据发送和接收

public:
	MyConnectionHandler(boost::shared_ptr<TcpClient> pTcpClient);
	~MyConnectionHandler();

	// 数据待发送出去
	virtual void onData(AMQP::Connection *connection, const char *data, size_t size); // 调用TCP操作,把数据发送出去
	// Rmq登录成功
	virtual void onReady(AMQP::Connection *connection);
	// 发生错误,一般发生此错误后连接不再可用
	virtual void onError(AMQP::Connection *connection, const char *message);
	// 对端关闭连接
	virtual void onClosed(AMQP::Connection *connection);
};

其中,TcpClient 是由Boost Asio库实现的用于处理TCP连接和数据的类,作为 MyConnectionHandler 的成员变量,供 MyConnectionHandler 调用。

  1. 使用流程

有了这个自行实现的类,就可以使用了:

// create an instance of your own connection handler
MyConnectionHandler myHandler;

// create a AMQP connection object
AMQP::Connection connection(&myHandler, AMQP::Login("guest","guest"), "/"); // 连接到RabbitMq,会进行AMQP协议交互,连接真正建立会有耗时

// and create a channel
AMQP::Channel channel(&connection);

// use the channel object to call the AMQP method you like
channel.declareExchange("my-exchange", AMQP::fanout);
channel.declareQueue("my-queue");
channel.bindQueue("my-exchange", "my-queue", "my-routing-key");

所有对象都可以建立在堆上,这里只是为了演示使用流程。

正如注释所言,连接真正建立起来可能会耗费一定时间,连接未建立起来时,RabbitMq不会接受任何指令。

但后续的建立通道、声明组件操作并不受到影响,因为AMQP库会缓存这些指令,并在连接建立后发送出去。

  1. 解析收到的数据

发送数据由 MyConnectionHandler::onData() 实现,接收数据由 TcpClient 类中方法实现,那么接收到的数据如何处理呢?

解析数据需要使用 Connection::Parse()来实现,其原型如下:

/**
 *  Parse data that was received from RabbitMQ
 *
 *  Every time that data comes in from RabbitMQ, you should call this method to parse
 *  the incoming data, and let it handle by the AMQP-CPP library. This method returns
 *  the number of bytes that were processed.
 *
 *  If not all bytes could be processed because it only contained a partial frame,
 *  you should call this same method later on when more data is available. The
 *  AMQP-CPP library does not do any buffering, so it is up to the caller to ensure
 *  that the old data is also passed in that later call.
 *
 *  @param  buffer      buffer to decode
 *  @param  size        size of the buffer to decode
 *  @return             number of bytes that were processed
 */
size_t parse(char *buffer, size_t size)
{
    return _implementation.parse(buffer, size);
}

在接收到数据之后,调用该函数解析即可。有以下几点需要注意:

  • 数据缓冲区自行维护,如接收到100字节,以它调用parse,返回60,则调用者负责将未解析的40个字节再次调用parse
  • 可以使用 Connection::expected() 返回AMQP库下次需要的字节数,然后以此长度数据调用parse,这样可以减少调用次数(以小于该长度字节数调用parse无意义)
  • 可以使用 Connection::maxFrame() 返回AMQP消息的最大字节数,以此值初始化消息缓冲区,就不会因为消息过长而重新分配内存
关于TCP连接

使用Boost Asio库实现,跨平台,异步,高性能,易扩展。

示例代码如下:

class TcpClient : public boost::enable_shared_from_this<TcpClient>, boost::noncopyable
{
private:
	using error_code = boost::system::error_code;
	bool IsSocketStarted(); // socket是否已经启动
	void OnConnect(const error_code &err); // TCP连接就绪
	void OnReadData(const error_code &err, size_t bytes); // 接收到数据
	void OnWrite(const error_code &err, size_t bytes); // 已发送数据
	void Run();
	void RecvData();  // 接收数据
	void parse(); // 解析数据

public:
	TcpClient();
	~TcpClient();
	void Start();  // 开始运行
	void CloseSocket();
	void SendData(const uint8_t *pDataInfo, const size_t len); // 发送数据数据
	void Stop(); // 停止

private:
	boost::asio::io_service m_ios;
	boost::asio::io_service::work m_work;
	boost::asio::ip::tcp::socket m_sock;
	int m_numOfWorkThreads;
	boost::thread_group m_threads;
	bool m_socketStarted;
	std::string m_writeBuf; // 发送数据缓冲区
	char m_readBuf[1]; // 接收数据缓冲区
	std::vector<char> m_recvedBuf; //用于调用parse的数据缓冲区
	boost::shared_ptr<MyConnectionHandler> m_pHandler;
	boost::shared_ptr<AMQP::Connection> m_pConnect;
	std::mutex m_lock;
	bool m_bparse;
};

如果对 Boost Asio库及网络编程还不熟悉,可以参考相关资料。

参考代码

上述示例实现代码均已上传到github,点击查看

由于理解还不够深入,错漏之处请不吝指出。

参考资料

AMQP-CPP

Logo

更多推荐