AMQP-CPP在Windows下的使用及网络层Boost Asio实现
背景AMQP-CPP 是c++连接RabbitMq的客户端库,它完全异步,需要c11的支持。关于 AMQP-CPP 的基础使用,请参考 c++使用amqp-cpp库连接RabbitMq 。源码下载请稳步github: AMQP-CPP 。AMQP-CPP采用分层的架构,网络层与AMQP协议层分离,而仓库中只提供了Linux下的TCP模块,所以在Windows上使用时,需要自行实现网络层I...
背景
AMQP-CPP 是c++连接RabbitMq的客户端库,它完全异步,需要c11的支持。
关于 AMQP-CPP 的基础使用,请参考 c++使用amqp-cpp库连接RabbitMq 。
源码下载请移步github: AMQP-CPP 。
AMQP-CPP采用分层的架构,网络层与AMQP协议层分离,而仓库中只提供了Linux下的TCP模块,所以在Windows上使用时,需要自行实现网络层IO进行数据收发。
本文主要介绍在Windows上实现过程及注意事项,并附实现网络层的代码(使用Boost Asio库)。
在Windows下编译
注意,Windows平台不支持编译动态库,只能编译静态库。
克隆代码到本地,使用cmake .
编译(建议使用外部构建,这个目录会更清晰,关于cmake)。
我使用的是vs2015,当前目录生成sln工程,打开显示如下:
选择需要的目标平台及版本,我编译的是 Win32平台的Release版本。
设置属性。在预编译宏中添加 NOMINMAX
。
右键右侧的项目名称:amqpcpp,开始build。编译完成后,生成 amqpcpp.lib。
把 amqpcpp.lib 及 include 目录下的拷贝到自己的项目下,并添加相应属性,准备工作完成。
使用amqpcpp
添加头文件:#include "amqpcpp.h"
,设置好include目录,及链接库,就可以开始写自己的代码了。
由于没有TCP模块可用,就要自己实现网络IO功能。
- 创建自己的连接管理类
连接管理类负责所有底层IO操作,包括发送数据到RabbitMq,处理发生的错误等,都在连接管理类中处理。
代码如下:
// 处理TCP连接的类
class MyConnectionHandler : public AMQP::ConnectionHandler
{
private:
boost::shared_ptr<TcpClient> m_pTcpClient; // 处理底层TCP操作的类,进行具体的数据发送和接收
public:
MyConnectionHandler(boost::shared_ptr<TcpClient> pTcpClient);
~MyConnectionHandler();
// 数据待发送出去
virtual void onData(AMQP::Connection *connection, const char *data, size_t size); // 调用TCP操作,把数据发送出去
// Rmq登录成功
virtual void onReady(AMQP::Connection *connection);
// 发生错误,一般发生此错误后连接不再可用
virtual void onError(AMQP::Connection *connection, const char *message);
// 对端关闭连接
virtual void onClosed(AMQP::Connection *connection);
};
其中,TcpClient 是由Boost Asio库实现的用于处理TCP连接和数据的类,作为 MyConnectionHandler 的成员变量,供 MyConnectionHandler 调用。
- 使用流程
有了这个自行实现的类,就可以使用了:
// create an instance of your own connection handler
MyConnectionHandler myHandler;
// create a AMQP connection object
AMQP::Connection connection(&myHandler, AMQP::Login("guest","guest"), "/"); // 连接到RabbitMq,会进行AMQP协议交互,连接真正建立会有耗时
// and create a channel
AMQP::Channel channel(&connection);
// use the channel object to call the AMQP method you like
channel.declareExchange("my-exchange", AMQP::fanout);
channel.declareQueue("my-queue");
channel.bindQueue("my-exchange", "my-queue", "my-routing-key");
所有对象都可以建立在堆上,这里只是为了演示使用流程。
正如注释所言,连接真正建立起来可能会耗费一定时间,连接未建立起来时,RabbitMq不会接受任何指令。
但后续的建立通道、声明组件操作并不受到影响,因为AMQP库会缓存这些指令,并在连接建立后发送出去。
- 解析收到的数据
发送数据由 MyConnectionHandler::onData()
实现,接收数据由 TcpClient 类中方法实现,那么接收到的数据如何处理呢?
解析数据需要使用 Connection::Parse()
来实现,其原型如下:
/**
* Parse data that was received from RabbitMQ
*
* Every time that data comes in from RabbitMQ, you should call this method to parse
* the incoming data, and let it handle by the AMQP-CPP library. This method returns
* the number of bytes that were processed.
*
* If not all bytes could be processed because it only contained a partial frame,
* you should call this same method later on when more data is available. The
* AMQP-CPP library does not do any buffering, so it is up to the caller to ensure
* that the old data is also passed in that later call.
*
* @param buffer buffer to decode
* @param size size of the buffer to decode
* @return number of bytes that were processed
*/
size_t parse(char *buffer, size_t size)
{
return _implementation.parse(buffer, size);
}
在接收到数据之后,调用该函数解析即可。有以下几点需要注意:
- 数据缓冲区自行维护,如接收到100字节,以它调用parse,返回60,则调用者负责将未解析的40个字节再次调用parse
- 可以使用 Connection::expected() 返回AMQP库下次需要的字节数,然后以此长度数据调用parse,这样可以减少调用次数(以小于该长度字节数调用parse无意义)
- 可以使用 Connection::maxFrame() 返回AMQP消息的最大字节数,以此值初始化消息缓冲区,就不会因为消息过长而重新分配内存
关于TCP连接
使用Boost Asio库实现,跨平台,异步,高性能,易扩展。
示例代码如下:
class TcpClient : public boost::enable_shared_from_this<TcpClient>, boost::noncopyable
{
private:
using error_code = boost::system::error_code;
bool IsSocketStarted(); // socket是否已经启动
void OnConnect(const error_code &err); // TCP连接就绪
void OnReadData(const error_code &err, size_t bytes); // 接收到数据
void OnWrite(const error_code &err, size_t bytes); // 已发送数据
void Run();
void RecvData(); // 接收数据
void parse(); // 解析数据
public:
TcpClient();
~TcpClient();
void Start(); // 开始运行
void CloseSocket();
void SendData(const uint8_t *pDataInfo, const size_t len); // 发送数据数据
void Stop(); // 停止
private:
boost::asio::io_service m_ios;
boost::asio::io_service::work m_work;
boost::asio::ip::tcp::socket m_sock;
int m_numOfWorkThreads;
boost::thread_group m_threads;
bool m_socketStarted;
std::string m_writeBuf; // 发送数据缓冲区
char m_readBuf[1]; // 接收数据缓冲区
std::vector<char> m_recvedBuf; //用于调用parse的数据缓冲区
boost::shared_ptr<MyConnectionHandler> m_pHandler;
boost::shared_ptr<AMQP::Connection> m_pConnect;
std::mutex m_lock;
bool m_bparse;
};
如果对 Boost Asio库及网络编程还不熟悉,可以参考相关资料。
参考代码
上述示例实现代码均已上传到github,点击查看
由于理解还不够深入,错漏之处请不吝指出。
参考资料
更多推荐
所有评论(0)