【C++ 脚手架】amqp-cpp 的介绍与使用
AMQP-CPP 使用指南:C++ 消息队列客户端实战
摘要:本文详细介绍了如何在 C++ 项目中基于 AMQP-CPP 实现高性能消息队列客户端。内容涵盖 AMQP 协议基础、AMQP-CPP 客户端 SDK 的核心 API 使用、两种网络模式(自定义 TcpHandler 与 libev 扩展模式)、死信队列机制,并提供了完整的面向对象封装方案。通过 MQClient、Publisher 和 Subscriber 三个核心组件,实现了消息的可靠发布与订阅、延迟队列等功能,为分布式系统提供了可靠的消息通信基础。这种封装使得开发者能够以简洁的 API 快速集成消息队列能力,同时保持高性能和可维护性。
C++脚手架仓库地址: https://gitee.com/chen-weifeng-cwf/developing-scaffolding-for-c
📑 目录
- 1. AMQP 简介
- 2. AMQP-CPP 简介
- 3. 默认 TCP 模式:自定义 TcpHandler
- 4. 扩展模式:对接 libev
- 5. 入门示例:简单消息传输
- 6. 死信队列(延迟队列)
- 7. 面向对象封装
- 8. 封装后的使用示例
- 9. 总结
AMQP-CPP 是一个强大的 C++ 消息队列客户端库,完全异步、非阻塞,支持与 RabbitMQ 等 AMQP 中间件交互。本文从 AMQP 协议基础讲起,覆盖客户端 API、死信队列、以及面向对象的服务端封装,带你全面掌握 C++ 消息队列开发。
1. AMQP 简介
1.1 什么是 AMQP?
AMQP(Advanced Message Queuing Protocol)是一个提供统一消息服务的应用层标准高级消息队列协议。基于此协议的客户端与消息中间件可传递消息,不受产品、开发语言等条件的限制。Erlang 中的典型实现有 RabbitMQ。
它规定了消息系统中三大组件的通信规范:
- 消息服务器/代理节点(Server/Broker)
- 生产者/发布者(Producer/Publisher)
- 消费者/订阅者(Consumer/Subscriber)
1.2 MQ 核心要素
| 要素 | 说明 |
|---|---|
| Broker | 消息代理服务器(如 RabbitMQ) |
| Virtual Host | 虚拟主机,环境隔离 |
| Exchange(交换机) | 消息的分发中心,接收生产者消息并根据规则路由到队列 |
| Queue(队列) | 消息的容器,存储消息直到消费者处理 |
| Binding-Key | 交换机和队列之间的绑定键,定义路由规则 |
| Producer(生产者) | 消息的发送方 |
| Consumer(消费者) | 消息的接收方 |
四种交换机类型:
| 类型 | 说明 |
|---|---|
| Direct Exchange(直连) | 根据 Routing-Key 将消息发送到特定队列 |
| Topic Exchange(主题) | 根据 Routing-Key 和通配符匹配发送到多个队列 |
| Fanout Exchange(广播) | 将消息广播到与交换机绑定的所有队列 |
| Headers Exchange(头) | 根据消息的自定义头部属性进行匹配路由 |
1.3 工作流程

生产者 → Broker(交换机 → 根据路由键 → 队列) → 消费者
- 生产者连接 Broker 发布消息,指定交换机和路由键
- 交换机根据路由键将消息路由到一个或多个队列
- 消费者从队列中获取消息并处理
1.4 MQ 的十大作用
- 解耦:生产者和消费者不需要同时在线或直接交互
- 异步处理:生产者发送消息后不必等待处理结果,提高吞吐量
- 削峰填谷:将短时间高并发的消息存储在队列中,后端按能力消费
- 负载均衡:多个消费者间自动分配消息
- 灵活性和可扩展性:支持水平扩展
- 增强系统可维护性:消息传递与业务逻辑分离
- 容错性:处理失败时可重新路由或放入死信队列
- 支持复杂业务流程:工作流和任务调度
- 数据分发:跨分布式系统传输数据
- 跨语言和平台:支持多种编程语言和平台
2. AMQP-CPP 简介
2.1 项目介绍
AMQP-CPP 是一个强大的开源库,实现了 AMQP 协议的客户端,允许使用 C++ 与 RabbitMQ、Qpid 等支持 AMQP 的消息中间件交互。
核心特性:
| 特性 | 说明 |
|---|---|
| 异步非阻塞 | 完全异步,没有阻塞式系统调用,不使用线程即可应用于高性能场景 |
| 分层架构 | 允许用户按需实现网络层,或使用预定义的 TCP/TLS 模块 |
| 跨平台 | 支持 Linux、macOS、Windows |
| C++17 | 充分利用现代 C++ 标准,要求编译器支持 C++17 或以上 |
2.2 安装
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make && sudo make install
2.3 两种使用模式
| 模式 | 说明 |
|---|---|
| 默认 TCP 模式 | 自定义 TcpHandler 子类,自己管理 monitor 事件循环(select/epoll) |
| 扩展模式 | 对接 libev、libevent、libuv、asio 等成熟网络库,自动管理事件循环 |
3. 默认 TCP 模式:自定义 TcpHandler
继承 AMQP::TcpHandler,重写一系列回调函数,其中最核心的是 monitor 函数——在该函数中需要将 fd 放入事件循环(select/epoll)中监控,当 fd 可读/可写时调用 connection->process(fd, flags)。
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
class MyTcpHandler : public AMQP::TcpHandler {
public:
// 创建新连接时调用(第一次回调)
virtual void onAttached(AMQP::TcpConnection *connection) override { }
// TCP 连接建立时调用(与 onLost 成对出现)
virtual void onConnected(AMQP::TcpConnection *connection) override { }
// TLS 安全连接建立时调用(仅 amqps:// 连接)
virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override {
return true;
}
// 登录成功,连接就绪
virtual void onReady(AMQP::TcpConnection *connection) override {
// 在这里创建 Channel,开始发布或消费
}
// 协商心跳间隔(返回接受的间隔值)
virtual uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval) override {
if (interval < 60) interval = 60;
// 建议在事件循环中设置定时器,每 interval 秒调用 connection->heartbeat()
return interval;
}
// 发生致命错误时调用
virtual void onError(AMQP::TcpConnection *connection, const char *message) override { }
// AMQP 协议正常结束时调用(调用 connection.close() 的对应)
virtual void onClosed(AMQP::TcpConnection *connection) override { }
// TCP 连接关闭或丢失时调用
virtual void onLost(AMQP::TcpConnection *connection) override { }
// 最终回调,表示不再有关于此连接的进一步调用
virtual void onDetached(AMQP::TcpConnection *connection) override { }
// ★ 核心方法:事件循环交互
// 当 AMQP-CPP 想要注册 fd 到事件循环时调用
// 当 fd 可读/可写时,需调用 connection->process(fd, flags) 通知库
virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override {
// 将 fd 加入 select/epoll 事件循环
// 就绪后调用: connection->process(fd, flags)
}
};
4. 扩展模式:对接 libev
使用 AMQP::LibEvHandler 可以直接对接 libev 网络库,无需自己实现 monitor。
4.1 头文件
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
4.2 核心类:Connection
// RabbitMQ 服务器地址格式
class Address {
Address(const std::string &address); // "amqp://user:pass@host:port/"
};
// TCP 连接
class TcpConnection : private ConnectionHandler {
TcpConnection(TcpHandler *handler, const Address &address);
};
// 登录凭证
class Login {
Login(std::string user, std::string password);
};
// 连接对象
class Connection {
Connection(ConnectionHandler *handler, const Login &login, const std::string &vhost);
bool close(); // 关闭连接
};
4.3 核心类:Channel
Channel 是一个虚拟连接,一个连接上可以建立多个通道。所有 RabbitMQ 指令都通过 Channel 传输。操作是异步的,返回值是 Deferred 类,用于安装回调处理函数。
回调函数类型:
namespace AMQP {
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
// 队列声明成功回调
using QueueCallback = std::function<void(
const std::string &name, uint32_t messagecount, uint32_t consumercount)>;
// 队列删除回调
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
// 消息处理回调
using MessageCallback = std::function<void(
const Message &message, uint64_t deliveryTag, bool redelivered)>;
// 发布确认回调
using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
using PublishAckCallback = std::function<void()>;
using PublishNackCallback = std::function<void()>;
using PublishLostCallback = std::function<void()>;
}
交换机类型枚举:
enum ExchangeType {
fanout, // 广播交换
direct, // 直接交换
topic, // 主题交换
headers,
consistent_hash,
message_deduplication
};
Channel 核心 API:
class Channel {
Channel(Connection *connection);
bool connected(); // 连接是否可用
Deferred &close(); // 关闭当前 channel
// 声明交换机
// flags: durable(持久化) | autodelete(自动删除) | passive(仅检查) | internal(内部)
Deferred &declareExchange(const std::string_view &name,
ExchangeType type, int flags);
// 声明队列
// flags: durable | autodelete | passive | exclusive(连接独占,断连自动删除)
DeferredQueue &declareQueue(const std::string_view &name, int flags);
// 绑定队列到交换机
Deferred &bindQueue(const std::string_view &exchange,
const std::string_view &queue,
const std::string_view &bindingkey);
Deferred &unbindQueue(const std::string_view &exchange,
const std::string_view &queue,
const std::string_view &bindingkey);
// 删除队列(flags: ifunused | ifempty)
DeferredDelete &removeQueue(const std::string_view &name, int flags = 0);
// 发布消息(flags: mandatory | immediate)
bool publish(const std::string_view &exchange,
const std::string_view &routingKey,
const std::string &message, int flags = 0);
// 召回不可路由的消息(配合 mandatory/immediate 使用)
DeferredRecall &recall();
// 订阅队列消息
// flags: nolocal | noack(自动确认) | exclusive
DeferredConsumer &consume(const std::string_view &queue,
const std::string_view &tag, int flags = 0);
// 取消订阅
DeferredCancel &cancel(const std::string_view &tag);
// 检索单条消息
DeferredGet &get(const std::string_view &queue, int flags = 0);
// 确认消息(消费者必须确认,除非使用 noack)
// flags: multiple(确认多条,包括之前所有未确认的消息)
bool ack(uint64_t deliveryTag, int flags = 0);
// 拒绝消息
// flags: multiple(拒绝多条) | requeue(放回队列,否则删除)
bool reject(uint64_t deliveryTag, int flags = 0);
// 回复所有未确认的消息
Deferred &recover(int flags = 0);
};
4.4 Reliable — 可靠发布
template <typename BASE = Tagger>
class Reliable : public BASE {
template <typename ...Args>
Reliable(Args &&...args);
DeferredPublish &publish(const std::string_view &exchange,
const std::string_view &routingKey,
const std::string_view &message, int flags = 0);
};
// 使用示例
AMQP::TcpChannel mychannel(connection);
AMQP::Reliable reliable(mychannel);
reliable.publish("my-exchange", "my-key", "my first message")
.onAck()
.onNack()
.onLost()
.onError();
4.5 Message — 消息处理
class Envelope : public MetaData {
const char *body(); // 消息体指针
uint64_t bodySize(); // 消息体大小
};
class Message : public Envelope {
const std::string &exchange(); // 来源交换机
const std::string &routingkey(); // 路由键
};
4.6 Deferred — 异步回调链
Deferred 类及其派生类是处理异步操作的核心机制,用于设置异步调用的回调函数,支持链式调用。
class Deferred {
Deferred &onSuccess(const SuccessCallback& callback);
Deferred &onError(const ErrorCallback& callback);
Deferred &onFinalize(const FinalizeCallback& callback);
};
声明交换机示例:
channel.declareExchange(exchange, AMQP::ExchangeType::direct)
.onSuccess([&]() {
std::cout << "声明交换机成功:" << exchange << std::endl;
})
.onError([&](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
});
4.7 DeferredQueue — 队列操作回调
class DeferredQueue : public Deferred {
DeferredQueue &onSuccess(const QueueCallback& callback);
DeferredQueue &onSuccess(const SuccessCallback& callback);
};
using QueueCallback = std::function<void(const std::string &name,
uint32_t messageCount, uint32_t consumerCount)>;
声明队列示例:
channel.declareQueue(queue)
.onSuccess([&](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
std::cout << "声明队列成功:" << queue << std::endl;
std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
})
.onError([&](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
abort();
});
4.8 DeferredConsumer — 订阅回调
class DeferredConsumer : public Deferred {
DeferredConsumer &onSuccess(const ConsumeCallback& callback);
DeferredConsumer &onReceived(const MessageCallback& callback); // 收到消息
DeferredConsumer &onMessage(const MessageCallback& callback); // onReceived 别名
DeferredConsumer &onCancelled(const CancelCallback& callback); // 取消订阅
};
订阅队列消息示例:
channel.consume(queue)
.onMessage([&](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string body(message.body(), message.bodySize());
std::cout << "收到消息:" << body << std::endl;
channel.ack(deliveryTag); // 不要忘了确认消息!
})
.onError([&](const char *message) {
std::cout << "订阅队列消息失败:" << message << std::endl;
abort();
})
.onSuccess([&](const std::string_view &tag) {
std::cout << "订阅队列消息成功" << std::endl;
});
4.9 libev 网络库对接
#include <amqpcpp/libev.h>
#include <ev.h>
class LibEvHandler : public TcpHandler {
LibEvHandler(struct ev_loop *loop, int priority = 0);
};
// libev 核心操作
struct ev_loop *ev_default_loop(unsigned int flags); // 创建事件循环
int ev_run(struct ev_loop *loop); // 启动事件循环(阻塞)
void ev_break(struct ev_loop *loop, int32_t break_type); // 退出事件循环
// 异步任务(在 ev_loop 线程中执行)
void ev_async_init(ev_async *w, callback cb);
void ev_async_start(struct ev_loop *loop, ev_async *w);
void ev_async_send(struct ev_loop *loop, ev_async *w);
5. 入门示例:简单消息传输
5.1 目录结构
.
├── makefile
├── simple_publish.cc
├── simple_subscribe.cc
5.2 simple_publish.cc — 发布消息
#include <amqpcpp.h>
#include <ev.h>
#include <amqpcpp/libev.h>
#include <iostream>
int main() {
const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
const std::string exchange = "my-exchange";
const std::string queue = "my-queue";
const std::string binding_key = "my-binding-key";
// 1. 实例化 libev 网络通信模块
auto *ev_loop = EV_DEFAULT;
AMQP::LibEvHandler handler(ev_loop);
// 2. 创建连接
AMQP::TcpConnection connection(&handler, AMQP::Address(url));
// 3. 创建 Channel
AMQP::TcpChannel channel(&connection);
// 4. 声明交换机 → 声明队列 → 绑定 → 发布消息(异步回调链)
channel.declareExchange(exchange, AMQP::ExchangeType::direct)
.onSuccess([&]() {
std::cout << "声明交换机成功:" << exchange << std::endl;
channel.declareQueue(queue)
.onSuccess([&](const std::string &name, uint32_t messagecount,
uint32_t consumercount) {
std::cout << "声明队列成功:" << queue << std::endl;
channel.bindQueue(exchange, queue, binding_key)
.onSuccess([&]() {
std::cout << "绑定交换机和队列成功" << std::endl;
channel.publish(exchange, binding_key, "Hello World");
})
.onError([&](const char *message) {
std::cout << "绑定失败:" << message << std::endl;
abort();
});
})
.onError([&](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
abort();
});
})
.onError([&](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
abort();
});
// 5. 启动事件循环
ev_run(ev_loop);
return 0;
}
5.3 simple_subscribe.cc — 订阅消息
订阅端的结构与发布端类似,区别在于绑定成功后调用 channel.consume(queue) 订阅消息,并在 onMessage 回调中处理消息并 ack 确认。
// ... 前 6 步与 publish 相同(声明交换机 → 队列 → 绑定)
// 7. 订阅队列消息
channel.consume(queue)
.onMessage([&](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::string body(message.body(), message.bodySize());
std::cout << "收到消息:" << body << std::endl;
channel.ack(deliveryTag); // ★ 必须确认!
})
.onError([&](const char *message) {
std::cout << "订阅失败:" << message << std::endl;
abort();
})
.onSuccess([&]() {
std::cout << "订阅队列消息成功" << std::endl;
});
ev_run(ev_loop);
5.4 Makefile
all: simple_publish simple_subscribe
simple_publish: simple_publish.cc
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread
simple_subscribe: simple_subscribe.cc
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread
clean:
rm -rf simple_publish simple_subscribe
5.5 运行演示
# 先启动订阅端
$ ./simple_subscribe
声明交换机成功:my-exchange
声明队列成功:my-queue
队列中有0个消息
队列中有0个消费者
绑定交换机和队列成功:
订阅队列消息成功:
收到消息:Hello World
# 再启动发布端
$ ./simple_publish
声明交换机成功:my-exchange
声明队列成功:my-queue
队列中有0个消息
队列中有1个消费者
绑定交换机和队列成功:
6. 死信队列(延迟队列)
6.1 核心概念
| 要素 | 说明 |
|---|---|
| 死信消息 | 满足特定条件(拒绝、过期、队列满)的消息 |
| 死信交换机 (DLX) | 普通交换机,死信消息会被重新发布到此交换机 |
| 死信队列 (DLQ) | 绑定到死信交换机上的普通队列,存储死信消息 |
6.2 延迟队列使用流程
- 创建普通交换机 A 和队列 A,作为死信交换机/队列
- 创建普通交换机 B 和队列 B,创建时设置死信关联参数和消息 TTL
- 向队列 B 发布消息 → TTL 过期 → 消息投递到死信交换机 A → 进入死信队列 A
- 消费者实际订阅的是死信队列 A,从而实现延时消费
6.3 死信队列参数设置
class Table : public Field {
public:
Table &set(const std::string &name, bool value);
// ... 一系列常见基础类型的重载 ...
AssociativeFieldProxy operator[](const std::string& name);
};
// 设置死信队列参数
AMQP::Table args;
args["x-dead-letter-exchange"] = "dlx-exchange"; // 死信交换机名称
args["x-dead-letter-routing-key"] = "dlx-binding-key"; // 死信路由键
args["x-message-ttl"] = 5000; // 消息过期时间(毫秒)
6.4 dlx_publish.cc — 延迟发布
void declaredComponent(AMQP::TcpChannel &channel,
const std::string &exchange, const std::string &queue,
const std::string &binding_key) {
// 普通的 交换机声明 → 队列声明 → 绑定 流程
// ...
}
int main() {
const std::string url = "amqp://admin:123456@192.168.65.130:5672/";
// 命名规范:死信组件加上 dlx_ 前缀
const std::string delayed_exchange = "delayed-exchange";
const std::string delayed_queue = "delayed-queue";
const std::string delayed_binding_key = "delayed-binding-key";
const std::string dlx_exchange = "dlx-exchange";
const std::string dlx_queue = "dlx-queue";
const std::string dlx_binding_key = "dlx-binding-key";
auto *ev_loop = EV_DEFAULT;
AMQP::LibEvHandler handler(ev_loop);
AMQP::TcpConnection connection(&handler, AMQP::Address(url));
AMQP::TcpChannel channel(&connection);
// 1. 声明死信交换机和队列
declaredComponent(channel, dlx_exchange, dlx_queue, dlx_binding_key);
// 2. 声明常规交换机和队列(带死信和 TTL 参数)
channel.declareExchange(delayed_exchange, AMQP::ExchangeType::direct)
.onSuccess([&]() {
AMQP::Table args;
args["x-dead-letter-exchange"] = dlx_exchange;
args["x-dead-letter-routing-key"] = dlx_binding_key;
args["x-message-ttl"] = 5000; // 5秒过期
channel.declareQueue(delayed_queue, args)
.onSuccess([&](...) {
channel.bindQueue(delayed_exchange, delayed_queue, delayed_binding_key)
.onSuccess([&]() {
// 3. 发布消息到常规队列,TTL 到期后自动路由到死信队列
channel.publish(delayed_exchange, delayed_binding_key, "hello world");
});
});
});
ev_run(ev_loop);
return 0;
}
6.5 dlx_subscribe.cc — 延迟订阅
订阅端只需要直接订阅死信队列即可,无需关心消息来自哪个原始队列:
int main() {
// ... 初始化连接 ...
// 声明死信交换机和队列,并订阅死信队列
declaredComponent(channel, dlx_exchange, dlx_queue, dlx_binding_key);
ev_run(ev_loop);
return 0;
}
6.6 运行演示
$ ./dlx_subscribe
声明交换机成功:dlx-exchange
声明队列成功:dlx-queue
绑定交换机和队列成功:
订阅队列消息成功:
收到消息:hello world # 延迟 5 秒后才收到
$ ./dlx_publish
声明交换机成功:dlx-exchange
声明交换机成功:delayed-exchange
声明队列成功:dlx-queue
绑定交换机和队列成功,发布消息到延时队列
7. 面向对象封装
基于 AMQP-CPP 的五个基本操作(声明交换机、声明队列、绑定、发布、订阅),封装层次化类以简化外部使用。
7.1 架构设计
MQFactory (工厂类)
├── MQClient (消息队列客户端,封装底层连接和 Channel)
│ ├── ev_loop 事件循环(异步线程)
│ ├── TcpConnection + TcpChannel
│ └── declare / publish / consume
├── Publisher (发布者,基于 MQClient)
│ └── publish()
└── Subscriber (订阅者,基于 MQClient)
└── consume()
7.2 declare_settings — 套件配置结构体
// 交换机类型常量
const std::string DIRECT = "direct"; // 直接交换
const std::string FANOUT = "fanout"; // 广播交换
const std::string TOPIC = "topic"; // 主题交换
const std::string HEADERS = "headers"; // 头部交换
const std::string DELAYED = "delayed"; // 自定义:延迟交换(使用死信队列)
// 死信交换机&队列名称默认前缀
const std::string DLX_PREFIX = "dlx_";
struct declare_settings {
std::string exchange; // 交换机名称
std::string exchange_type; // 交换机类型
std::string queue; // 队列名称
std::string binding_key; // 绑定关键字
size_t delayed_ttl = 0; // 延迟时间(0 表示非延迟队列)
// 自动生成死信组件名称
std::string dlx_exchange() const;
std::string dlx_queue() const;
std::string dlx_binding_key() const;
};
7.3 MQClient — 消息队列客户端
class MQClient {
public:
using ptr = std::shared_ptr<MQClient>;
MQClient(const std::string &url);
~MQClient();
// 声明交换机、队列并绑定;延迟队列会自动创建死信组件
void declare(const declare_settings &settings);
// 发布消息
bool publish(const std::string &exchange,
const std::string &routing_key,
const std::string &body);
// 订阅队列消息
void consume(const std::string &queue, const MessageCallback &callback);
// 阻塞等待事件循环退出
void wait();
private:
// 核心声明逻辑(支持普通和死信模式)
void _declared(const declare_settings &settings, AMQP::Table &args, bool is_dlx = false);
private:
std::mutex _declared_mtx;
std::mutex _mtx;
std::condition_variable _cv; // 阻塞/唤醒机制
struct ev_loop *_ev_loop;
struct ev_async _ev_async;
AMQP::LibEvHandler _handler;
AMQP::TcpConnection _connection;
AMQP::TcpChannel _channel;
std::thread _async_thread; // 事件循环运行在独立线程
};
构造函数与析构函数:
MQClient::MQClient(const std::string &url)
: _ev_loop(EV_DEFAULT)
, _handler(_ev_loop)
, _connection(&_handler, AMQP::Address(url))
, _channel(&_connection)
, _async_thread(std::thread([this]() { ev_run(_ev_loop); })) {
}
MQClient::~MQClient() {
ev_async_init(&_ev_async, MQClient::callback);
ev_async_start(_ev_loop, &_ev_async);
ev_async_send(_ev_loop, &_ev_async); // 异步通知退出事件循环
}
void MQClient::callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
ev_break(loop, EVBREAK_ALL); // 必须在 ev_loop 线程内调用
}
declare 实现(条件变量同步等待):
void MQClient::declare(const declare_settings &settings) {
AMQP::Table args;
if (settings.exchange_type == DELAYED) {
// 先声明死信交换机和队列
_declared(settings, args, true);
// 设置死信关联参数
args[DEAD_LETTER_EXCHANGE] = settings.dlx_exchange();
args[DEAD_LETTER_BINDING_KEY] = settings.dlx_binding_key();
args[MESSAGE_TTL] = settings.delayed_ttl;
}
// 声明常规交换机和队列
_declared(settings, args, false);
}
publish 实现:
bool MQClient::publish(const std::string &exchange,
const std::string &routing_key,
const std::string &body) {
return _channel.publish(exchange, routing_key, body);
}
consume 实现:
void MQClient::consume(const std::string &queue, const MessageCallback &callback) {
std::unique_lock<std::mutex> declared_lock(_declared_mtx);
std::unique_lock<std::mutex> lock(_mtx);
_channel.consume(queue)
.onMessage([=, this](const AMQP::Message &message,
uint64_t deliveryTag, bool redelivered) {
callback(message.body(), message.bodySize());
_channel.ack(deliveryTag); // 自动确认
})
.onError([&](const char *message) {
ERR("订阅队列消息失败: {}", message);
_cv.notify_all();
abort();
})
.onSuccess([&]() {
DBG("订阅队列消息成功: {}", queue);
_cv.notify_all();
});
_cv.wait(lock); // 阻塞直到订阅成功或失败
}
7.4 Publisher — 发布客户端
class Publisher {
public:
using ptr = std::shared_ptr<Publisher>;
// 构造时自动调用 declare 确保交换机和队列存在
Publisher(const MQClient::ptr &mq_client, const declare_settings &settings);
bool publish(const std::string &body);
private:
MQClient::ptr _mq_client;
declare_settings _settings;
};
Publisher::Publisher(const MQClient::ptr &mq_client, const declare_settings &settings)
: _mq_client(mq_client), _settings(settings) {
_mq_client->declare(_settings);
}
bool Publisher::publish(const std::string &body) {
return _mq_client->publish(_settings.exchange, _settings.binding_key, body);
}
7.5 Subscriber — 订阅客户端
class Subscriber {
public:
using ptr = std::shared_ptr<Subscriber>;
Subscriber(const MQClient::ptr &mq_client, const declare_settings &settings);
void consume(MessageCallback &&cb);
private:
MQClient::ptr _mq_client;
declare_settings _settings;
MessageCallback _callback;
};
Subscriber::Subscriber(const MQClient::ptr &mq_client, const declare_settings &settings)
: _mq_client(mq_client), _settings(settings) {
_mq_client->declare(_settings);
}
void Subscriber::consume(MessageCallback &&cb) {
_callback = std::move(cb);
// 延迟队列实际订阅的是死信队列
if (_settings.exchange_type == DELAYED) {
_mq_client->consume(_settings.dlx_queue(), _callback);
} else {
_mq_client->consume(_settings.queue, _callback);
}
}
7.6 MQFactory — 工厂类
class MQFactory {
public:
template<typename R, typename... Args>
static std::shared_ptr<R> create(Args&&... args) {
return std::make_shared<R>(std::forward<Args>(args)...);
}
};
8. 封装后的使用示例
8.1 目录结构
.
├── CMakeLists.txt
├── build
├── delayed_publish.cc
├── delayed_subscribe.cc
├── simple_publish.cc
└── simple_subscribe.cc
8.2 simple_publish.cc
#include "bite_scaffold/mq.h"
#include "bite_scaffold/log.h"
int main() {
bitelog::bitelog_init();
const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
bitemq::declare_settings settings = {
.exchange = "test-exchange",
.exchange_type = "direct",
.queue = "test-queue",
.binding_key = "test-binding-key"
};
auto mq_client = bitemq::MQFactory::create<bitemq::MQClient>(url);
auto publisher = bitemq::MQFactory::create<bitemq::Publisher>(mq_client, settings);
publisher->publish("hello world");
return 0;
}
8.3 simple_subscribe.cc
#include "bite_scaffold/mq.h"
#include "bite_scaffold/log.h"
int main() {
bitelog::bitelog_init();
const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
bitemq::declare_settings settings = {
.exchange = "test-exchange",
.exchange_type = "direct",
.queue = "test-queue",
.binding_key = "test-binding-key"
};
auto mq_client = bitemq::MQFactory::create<bitemq::MQClient>(url);
auto subscriber = bitemq::MQFactory::create<bitemq::Subscriber>(mq_client, settings);
subscriber->consume([](const char* msg, size_t len) {
std::cout << "receive msg: " << std::string(msg, len) << std::endl;
});
mq_client->wait();
return 0;
}
8.4 delayed_publish.cc — 延迟发布
int main() {
bitelog::bitelog_init();
const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
bitemq::declare_settings settings = {
.exchange = "delayed-exchange",
.exchange_type = "delayed", // ★ 延迟类型
.queue = "delayed-queue",
.binding_key = "delayed-binding-key",
.delayed_ttl = 3000 // ★ 3 秒延迟
};
auto mq_client = bitemq::MQFactory::create<bitemq::MQClient>(url);
auto publisher = bitemq::MQFactory::create<bitemq::Publisher>(mq_client, settings);
publisher->publish("hello world");
return 0;
}
8.5 CMakeLists.txt
cmake_minimum_required(VERSION 3.1.3)
project(simple_publish VERSION 1.0)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -std=c++17")
set(simple_publish_target "simple_publish")
add_executable(${simple_publish_target} ${CMAKE_CURRENT_SOURCE_DIR}/simple_publish.cc)
set(simple_subscribe_target "simple_subscribe")
add_executable(${simple_subscribe_target} ${CMAKE_CURRENT_SOURCE_DIR}/simple_subscribe.cc)
set(delayed_publish_target "delayed_publish")
add_executable(${delayed_publish_target} ${CMAKE_CURRENT_SOURCE_DIR}/delayed_publish.cc)
set(delayed_subscribe_target "delayed_subscribe")
add_executable(${delayed_subscribe_target} ${CMAKE_CURRENT_SOURCE_DIR}/delayed_subscribe.cc)
find_package(bite_scaffold REQUIRED)
target_link_libraries(${simple_publish_target} PRIVATE bite_scaffold::bite_scaffold)
target_link_libraries(${simple_subscribe_target} PRIVATE bite_scaffold::bite_scaffold)
target_link_libraries(${delayed_publish_target} PRIVATE bite_scaffold::bite_scaffold)
target_link_libraries(${delayed_subscribe_target} PRIVATE bite_scaffold::bite_scaffold)
8.6 运行演示
# 订阅端
$ ./simple_subscribe
[debug]: 订阅队列消息成功: test-queue
receive msg: hello world
# 发布端
$ ./simple_publish
9. 总结
本文从零开始系统介绍了基于 AMQP-CPP 的 C++ 消息队列开发:
| 层次 | 内容 |
|---|---|
| 协议基础 | AMQP 协议概念、MQ 核心要素(Broker、Exchange、Queue、Binding-Key) |
| 库的安装 | AMQP-CPP 源码编译、依赖安装 |
| 两种模式 | 自定义 TcpHandler(灵活但复杂)vs libev 扩展模式(开箱即用) |
| 核心 API | Connection、Channel、Deferred 链式回调、Reliable 可靠发布 |
| 死信队列 | DLX/DLQ 实现延迟消息,TTL + 死信交换机参数配置 |
| 面向对象封装 | MQClient(异步线程 + 条件变量同步) → Publisher / Subscriber → MQFactory |
| 完整示例 | 简单消息传输 + 延迟队列,5 行代码即可完成发布/订阅 |
核心设计要点:
- 异步线程模型:libev 事件循环运行在独立线程,通过
ev_async_send跨线程通信 - 条件变量同步:
declare和consume操作通过条件变量等待异步回调完成 - 延迟队列策略:死信交换机 + TTL 实现延迟消费;订阅端自动订阅死信队列,对外透明
- 工厂模式:通过
MQFactory::create<T>()统一创建,便于依赖注入
更多推荐



所有评论(0)