AMQP-CPP 使用指南:C++ 消息队列客户端实战

摘要:本文详细介绍了如何在 C++ 项目中基于 AMQP-CPP 实现高性能消息队列客户端。内容涵盖 AMQP 协议基础、AMQP-CPP 客户端 SDK 的核心 API 使用、两种网络模式(自定义 TcpHandler 与 libev 扩展模式)、死信队列机制,并提供了完整的面向对象封装方案。通过 MQClient、Publisher 和 Subscriber 三个核心组件,实现了消息的可靠发布与订阅、延迟队列等功能,为分布式系统提供了可靠的消息通信基础。这种封装使得开发者能够以简洁的 API 快速集成消息队列能力,同时保持高性能和可维护性。

C++脚手架仓库地址: https://gitee.com/chen-weifeng-cwf/developing-scaffolding-for-c

📑 目录


AMQP-CPP 是一个强大的 C++ 消息队列客户端库,完全异步、非阻塞,支持与 RabbitMQ 等 AMQP 中间件交互。本文从 AMQP 协议基础讲起,覆盖客户端 API、死信队列、以及面向对象的服务端封装,带你全面掌握 C++ 消息队列开发。


1. AMQP 简介

1.1 什么是 AMQP?

AMQP(Advanced Message Queuing Protocol)是一个提供统一消息服务的应用层标准高级消息队列协议。基于此协议的客户端与消息中间件可传递消息,不受产品、开发语言等条件的限制。Erlang 中的典型实现有 RabbitMQ

它规定了消息系统中三大组件的通信规范:

  • 消息服务器/代理节点(Server/Broker)
  • 生产者/发布者(Producer/Publisher)
  • 消费者/订阅者(Consumer/Subscriber)

1.2 MQ 核心要素

要素 说明
Broker 消息代理服务器(如 RabbitMQ)
Virtual Host 虚拟主机,环境隔离
Exchange(交换机) 消息的分发中心,接收生产者消息并根据规则路由到队列
Queue(队列) 消息的容器,存储消息直到消费者处理
Binding-Key 交换机和队列之间的绑定键,定义路由规则
Producer(生产者) 消息的发送方
Consumer(消费者) 消息的接收方

四种交换机类型:

类型 说明
Direct Exchange(直连) 根据 Routing-Key 将消息发送到特定队列
Topic Exchange(主题) 根据 Routing-Key 和通配符匹配发送到多个队列
Fanout Exchange(广播) 将消息广播到与交换机绑定的所有队列
Headers Exchange(头) 根据消息的自定义头部属性进行匹配路由

1.3 工作流程

在这里插入图片描述

生产者 → Broker(交换机 → 根据路由键 → 队列) → 消费者
  1. 生产者连接 Broker 发布消息,指定交换机和路由键
  2. 交换机根据路由键将消息路由到一个或多个队列
  3. 消费者从队列中获取消息并处理

1.4 MQ 的十大作用

  1. 解耦:生产者和消费者不需要同时在线或直接交互
  2. 异步处理:生产者发送消息后不必等待处理结果,提高吞吐量
  3. 削峰填谷:将短时间高并发的消息存储在队列中,后端按能力消费
  4. 负载均衡:多个消费者间自动分配消息
  5. 灵活性和可扩展性:支持水平扩展
  6. 增强系统可维护性:消息传递与业务逻辑分离
  7. 容错性:处理失败时可重新路由或放入死信队列
  8. 支持复杂业务流程:工作流和任务调度
  9. 数据分发:跨分布式系统传输数据
  10. 跨语言和平台:支持多种编程语言和平台

2. AMQP-CPP 简介

2.1 项目介绍

AMQP-CPP 是一个强大的开源库,实现了 AMQP 协议的客户端,允许使用 C++ 与 RabbitMQ、Qpid 等支持 AMQP 的消息中间件交互。

核心特性:

特性 说明
异步非阻塞 完全异步,没有阻塞式系统调用,不使用线程即可应用于高性能场景
分层架构 允许用户按需实现网络层,或使用预定义的 TCP/TLS 模块
跨平台 支持 Linux、macOS、Windows
C++17 充分利用现代 C++ 标准,要求编译器支持 C++17 或以上

2.2 安装

git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make && sudo make install

2.3 两种使用模式

模式 说明
默认 TCP 模式 自定义 TcpHandler 子类,自己管理 monitor 事件循环(select/epoll)
扩展模式 对接 libev、libevent、libuv、asio 等成熟网络库,自动管理事件循环

3. 默认 TCP 模式:自定义 TcpHandler

继承 AMQP::TcpHandler,重写一系列回调函数,其中最核心的是 monitor 函数——在该函数中需要将 fd 放入事件循环(select/epoll)中监控,当 fd 可读/可写时调用 connection->process(fd, flags)

#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>

class MyTcpHandler : public AMQP::TcpHandler {
public:
    // 创建新连接时调用(第一次回调)
    virtual void onAttached(AMQP::TcpConnection *connection) override { }

    // TCP 连接建立时调用(与 onLost 成对出现)
    virtual void onConnected(AMQP::TcpConnection *connection) override { }

    // TLS 安全连接建立时调用(仅 amqps:// 连接)
    virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override {
        return true;
    }

    // 登录成功,连接就绪
    virtual void onReady(AMQP::TcpConnection *connection) override {
        // 在这里创建 Channel,开始发布或消费
    }

    // 协商心跳间隔(返回接受的间隔值)
    virtual uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval) override {
        if (interval < 60) interval = 60;
        // 建议在事件循环中设置定时器,每 interval 秒调用 connection->heartbeat()
        return interval;
    }

    // 发生致命错误时调用
    virtual void onError(AMQP::TcpConnection *connection, const char *message) override { }

    // AMQP 协议正常结束时调用(调用 connection.close() 的对应)
    virtual void onClosed(AMQP::TcpConnection *connection) override { }

    // TCP 连接关闭或丢失时调用
    virtual void onLost(AMQP::TcpConnection *connection) override { }

    // 最终回调,表示不再有关于此连接的进一步调用
    virtual void onDetached(AMQP::TcpConnection *connection) override { }

    // ★ 核心方法:事件循环交互
    // 当 AMQP-CPP 想要注册 fd 到事件循环时调用
    // 当 fd 可读/可写时,需调用 connection->process(fd, flags) 通知库
    virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override {
        // 将 fd 加入 select/epoll 事件循环
        // 就绪后调用: connection->process(fd, flags)
    }
};

4. 扩展模式:对接 libev

使用 AMQP::LibEvHandler 可以直接对接 libev 网络库,无需自己实现 monitor

4.1 头文件

#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>

4.2 核心类:Connection

// RabbitMQ 服务器地址格式
class Address {
    Address(const std::string &address);  // "amqp://user:pass@host:port/"
};

// TCP 连接
class TcpConnection : private ConnectionHandler {
    TcpConnection(TcpHandler *handler, const Address &address);
};

// 登录凭证
class Login {
    Login(std::string user, std::string password);
};

// 连接对象
class Connection {
    Connection(ConnectionHandler *handler, const Login &login, const std::string &vhost);
    bool close();  // 关闭连接
};

4.3 核心类:Channel

Channel 是一个虚拟连接,一个连接上可以建立多个通道。所有 RabbitMQ 指令都通过 Channel 传输。操作是异步的,返回值是 Deferred 类,用于安装回调处理函数。

回调函数类型:

namespace AMQP {
using SuccessCallback  = std::function<void()>;
using ErrorCallback    = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;

// 队列声明成功回调
using QueueCallback = std::function<void(
    const std::string &name, uint32_t messagecount, uint32_t consumercount)>;

// 队列删除回调
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;

// 消息处理回调
using MessageCallback = std::function<void(
    const Message &message, uint64_t deliveryTag, bool redelivered)>;

// 发布确认回调
using AckCallback   = std::function<void(uint64_t deliveryTag, bool multiple)>;
using PublishAckCallback  = std::function<void()>;
using PublishNackCallback = std::function<void()>;
using PublishLostCallback = std::function<void()>;
}

交换机类型枚举:

enum ExchangeType {
    fanout,   // 广播交换
    direct,   // 直接交换
    topic,    // 主题交换
    headers,
    consistent_hash,
    message_deduplication
};

Channel 核心 API:

class Channel {
    Channel(Connection *connection);
    bool connected();      // 连接是否可用
    Deferred &close();     // 关闭当前 channel

    // 声明交换机
    // flags: durable(持久化) | autodelete(自动删除) | passive(仅检查) | internal(内部)
    Deferred &declareExchange(const std::string_view &name,
                               ExchangeType type, int flags);

    // 声明队列
    // flags: durable | autodelete | passive | exclusive(连接独占,断连自动删除)
    DeferredQueue &declareQueue(const std::string_view &name, int flags);

    // 绑定队列到交换机
    Deferred &bindQueue(const std::string_view &exchange,
                         const std::string_view &queue,
                         const std::string_view &bindingkey);

    Deferred &unbindQueue(const std::string_view &exchange,
                           const std::string_view &queue,
                           const std::string_view &bindingkey);

    // 删除队列(flags: ifunused | ifempty)
    DeferredDelete &removeQueue(const std::string_view &name, int flags = 0);

    // 发布消息(flags: mandatory | immediate)
    bool publish(const std::string_view &exchange,
                  const std::string_view &routingKey,
                  const std::string &message, int flags = 0);

    // 召回不可路由的消息(配合 mandatory/immediate 使用)
    DeferredRecall &recall();

    // 订阅队列消息
    // flags: nolocal | noack(自动确认) | exclusive
    DeferredConsumer &consume(const std::string_view &queue,
                                const std::string_view &tag, int flags = 0);

    // 取消订阅
    DeferredCancel &cancel(const std::string_view &tag);

    // 检索单条消息
    DeferredGet &get(const std::string_view &queue, int flags = 0);

    // 确认消息(消费者必须确认,除非使用 noack)
    // flags: multiple(确认多条,包括之前所有未确认的消息)
    bool ack(uint64_t deliveryTag, int flags = 0);

    // 拒绝消息
    // flags: multiple(拒绝多条) | requeue(放回队列,否则删除)
    bool reject(uint64_t deliveryTag, int flags = 0);

    // 回复所有未确认的消息
    Deferred &recover(int flags = 0);
};

4.4 Reliable — 可靠发布

template <typename BASE = Tagger>
class Reliable : public BASE {
    template <typename ...Args>
    Reliable(Args &&...args);

    DeferredPublish &publish(const std::string_view &exchange,
                               const std::string_view &routingKey,
                               const std::string_view &message, int flags = 0);
};

// 使用示例
AMQP::TcpChannel mychannel(connection);
AMQP::Reliable reliable(mychannel);
reliable.publish("my-exchange", "my-key", "my first message")
    .onAck()
    .onNack()
    .onLost()
    .onError();

4.5 Message — 消息处理

class Envelope : public MetaData {
    const char *body();     // 消息体指针
    uint64_t bodySize();    // 消息体大小
};

class Message : public Envelope {
    const std::string &exchange();    // 来源交换机
    const std::string &routingkey();  // 路由键
};

4.6 Deferred — 异步回调链

Deferred 类及其派生类是处理异步操作的核心机制,用于设置异步调用的回调函数,支持链式调用。

class Deferred {
    Deferred &onSuccess(const SuccessCallback& callback);
    Deferred &onError(const ErrorCallback& callback);
    Deferred &onFinalize(const FinalizeCallback& callback);
};

声明交换机示例:

channel.declareExchange(exchange, AMQP::ExchangeType::direct)
    .onSuccess([&]() {
        std::cout << "声明交换机成功:" << exchange << std::endl;
    })
    .onError([&](const char *message) {
        std::cout << "声明交换机失败:" << message << std::endl;
    });

4.7 DeferredQueue — 队列操作回调

class DeferredQueue : public Deferred {
    DeferredQueue &onSuccess(const QueueCallback& callback);
    DeferredQueue &onSuccess(const SuccessCallback& callback);
};

using QueueCallback = std::function<void(const std::string &name,
                     uint32_t messageCount, uint32_t consumerCount)>;

声明队列示例:

channel.declareQueue(queue)
    .onSuccess([&](const std::string &name, uint32_t messagecount, uint32_t consumercount) {
        std::cout << "声明队列成功:" << queue << std::endl;
        std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
        std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
    })
    .onError([&](const char *message) {
        std::cout << "声明队列失败:" << message << std::endl;
        abort();
    });

4.8 DeferredConsumer — 订阅回调

class DeferredConsumer : public Deferred {
    DeferredConsumer &onSuccess(const ConsumeCallback& callback);
    DeferredConsumer &onReceived(const MessageCallback& callback);    // 收到消息
    DeferredConsumer &onMessage(const MessageCallback& callback);     // onReceived 别名
    DeferredConsumer &onCancelled(const CancelCallback& callback);     // 取消订阅
};

订阅队列消息示例:

channel.consume(queue)
    .onMessage([&](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
        std::string body(message.body(), message.bodySize());
        std::cout << "收到消息:" << body << std::endl;
        channel.ack(deliveryTag);  // 不要忘了确认消息!
    })
    .onError([&](const char *message) {
        std::cout << "订阅队列消息失败:" << message << std::endl;
        abort();
    })
    .onSuccess([&](const std::string_view &tag) {
        std::cout << "订阅队列消息成功" << std::endl;
    });

4.9 libev 网络库对接

#include <amqpcpp/libev.h>
#include <ev.h>

class LibEvHandler : public TcpHandler {
    LibEvHandler(struct ev_loop *loop, int priority = 0);
};

// libev 核心操作
struct ev_loop *ev_default_loop(unsigned int flags);  // 创建事件循环
int  ev_run(struct ev_loop *loop);                     // 启动事件循环(阻塞)
void ev_break(struct ev_loop *loop, int32_t break_type); // 退出事件循环

// 异步任务(在 ev_loop 线程中执行)
void ev_async_init(ev_async *w, callback cb);
void ev_async_start(struct ev_loop *loop, ev_async *w);
void ev_async_send(struct ev_loop *loop, ev_async *w);

5. 入门示例:简单消息传输

5.1 目录结构

.
├── makefile
├── simple_publish.cc
├── simple_subscribe.cc

5.2 simple_publish.cc — 发布消息

#include <amqpcpp.h>
#include <ev.h>
#include <amqpcpp/libev.h>
#include <iostream>

int main() {
    const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
    const std::string exchange = "my-exchange";
    const std::string queue = "my-queue";
    const std::string binding_key = "my-binding-key";

    // 1. 实例化 libev 网络通信模块
    auto *ev_loop = EV_DEFAULT;
    AMQP::LibEvHandler handler(ev_loop);

    // 2. 创建连接
    AMQP::TcpConnection connection(&handler, AMQP::Address(url));

    // 3. 创建 Channel
    AMQP::TcpChannel channel(&connection);

    // 4. 声明交换机 → 声明队列 → 绑定 → 发布消息(异步回调链)
    channel.declareExchange(exchange, AMQP::ExchangeType::direct)
        .onSuccess([&]() {
            std::cout << "声明交换机成功:" << exchange << std::endl;
            channel.declareQueue(queue)
                .onSuccess([&](const std::string &name, uint32_t messagecount,
                                uint32_t consumercount) {
                    std::cout << "声明队列成功:" << queue << std::endl;
                    channel.bindQueue(exchange, queue, binding_key)
                        .onSuccess([&]() {
                            std::cout << "绑定交换机和队列成功" << std::endl;
                            channel.publish(exchange, binding_key, "Hello World");
                        })
                        .onError([&](const char *message) {
                            std::cout << "绑定失败:" << message << std::endl;
                            abort();
                        });
                })
                .onError([&](const char *message) {
                    std::cout << "声明队列失败:" << message << std::endl;
                    abort();
                });
        })
        .onError([&](const char *message) {
            std::cout << "声明交换机失败:" << message << std::endl;
            abort();
        });

    // 5. 启动事件循环
    ev_run(ev_loop);
    return 0;
}

5.3 simple_subscribe.cc — 订阅消息

订阅端的结构与发布端类似,区别在于绑定成功后调用 channel.consume(queue) 订阅消息,并在 onMessage 回调中处理消息并 ack 确认

// ... 前 6 步与 publish 相同(声明交换机 → 队列 → 绑定)

// 7. 订阅队列消息
channel.consume(queue)
    .onMessage([&](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
        std::string body(message.body(), message.bodySize());
        std::cout << "收到消息:" << body << std::endl;
        channel.ack(deliveryTag);  // ★ 必须确认!
    })
    .onError([&](const char *message) {
        std::cout << "订阅失败:" << message << std::endl;
        abort();
    })
    .onSuccess([&]() {
        std::cout << "订阅队列消息成功" << std::endl;
    });

ev_run(ev_loop);

5.4 Makefile

all: simple_publish simple_subscribe

simple_publish: simple_publish.cc
	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread

simple_subscribe: simple_subscribe.cc
	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread

clean:
	rm -rf simple_publish simple_subscribe

5.5 运行演示

# 先启动订阅端
$ ./simple_subscribe
声明交换机成功:my-exchange
声明队列成功:my-queue
队列中有0个消息
队列中有0个消费者
绑定交换机和队列成功:
订阅队列消息成功:
收到消息:Hello World

# 再启动发布端
$ ./simple_publish
声明交换机成功:my-exchange
声明队列成功:my-queue
队列中有0个消息
队列中有1个消费者
绑定交换机和队列成功:

6. 死信队列(延迟队列)

6.1 核心概念

要素 说明
死信消息 满足特定条件(拒绝、过期、队列满)的消息
死信交换机 (DLX) 普通交换机,死信消息会被重新发布到此交换机
死信队列 (DLQ) 绑定到死信交换机上的普通队列,存储死信消息

6.2 延迟队列使用流程

  1. 创建普通交换机 A 和队列 A,作为死信交换机/队列
  2. 创建普通交换机 B 和队列 B,创建时设置死信关联参数和消息 TTL
  3. 向队列 B 发布消息 → TTL 过期 → 消息投递到死信交换机 A → 进入死信队列 A
  4. 消费者实际订阅的是死信队列 A,从而实现延时消费

6.3 死信队列参数设置

class Table : public Field {
public:
    Table &set(const std::string &name, bool value);
    // ... 一系列常见基础类型的重载 ...
    AssociativeFieldProxy operator[](const std::string& name);
};

// 设置死信队列参数
AMQP::Table args;
args["x-dead-letter-exchange"]    = "dlx-exchange";    // 死信交换机名称
args["x-dead-letter-routing-key"] = "dlx-binding-key";  // 死信路由键
args["x-message-ttl"]             = 5000;               // 消息过期时间(毫秒)

6.4 dlx_publish.cc — 延迟发布

void declaredComponent(AMQP::TcpChannel &channel,
    const std::string &exchange, const std::string &queue,
    const std::string &binding_key) {
    // 普通的 交换机声明 → 队列声明 → 绑定 流程
    // ...
}

int main() {
    const std::string url = "amqp://admin:123456@192.168.65.130:5672/";

    // 命名规范:死信组件加上 dlx_ 前缀
    const std::string delayed_exchange    = "delayed-exchange";
    const std::string delayed_queue       = "delayed-queue";
    const std::string delayed_binding_key = "delayed-binding-key";
    const std::string dlx_exchange    = "dlx-exchange";
    const std::string dlx_queue       = "dlx-queue";
    const std::string dlx_binding_key = "dlx-binding-key";

    auto *ev_loop = EV_DEFAULT;
    AMQP::LibEvHandler handler(ev_loop);
    AMQP::TcpConnection connection(&handler, AMQP::Address(url));
    AMQP::TcpChannel channel(&connection);

    // 1. 声明死信交换机和队列
    declaredComponent(channel, dlx_exchange, dlx_queue, dlx_binding_key);

    // 2. 声明常规交换机和队列(带死信和 TTL 参数)
    channel.declareExchange(delayed_exchange, AMQP::ExchangeType::direct)
        .onSuccess([&]() {
            AMQP::Table args;
            args["x-dead-letter-exchange"]    = dlx_exchange;
            args["x-dead-letter-routing-key"] = dlx_binding_key;
            args["x-message-ttl"]             = 5000;  // 5秒过期

            channel.declareQueue(delayed_queue, args)
                .onSuccess([&](...) {
                    channel.bindQueue(delayed_exchange, delayed_queue, delayed_binding_key)
                        .onSuccess([&]() {
                            // 3. 发布消息到常规队列,TTL 到期后自动路由到死信队列
                            channel.publish(delayed_exchange, delayed_binding_key, "hello world");
                        });
                });
        });

    ev_run(ev_loop);
    return 0;
}

6.5 dlx_subscribe.cc — 延迟订阅

订阅端只需要直接订阅死信队列即可,无需关心消息来自哪个原始队列:

int main() {
    // ... 初始化连接 ...

    // 声明死信交换机和队列,并订阅死信队列
    declaredComponent(channel, dlx_exchange, dlx_queue, dlx_binding_key);

    ev_run(ev_loop);
    return 0;
}

6.6 运行演示

$ ./dlx_subscribe
声明交换机成功:dlx-exchange
声明队列成功:dlx-queue
绑定交换机和队列成功:
订阅队列消息成功:
收到消息:hello world    # 延迟 5 秒后才收到

$ ./dlx_publish
声明交换机成功:dlx-exchange
声明交换机成功:delayed-exchange
声明队列成功:dlx-queue
绑定交换机和队列成功,发布消息到延时队列

7. 面向对象封装

基于 AMQP-CPP 的五个基本操作(声明交换机、声明队列、绑定、发布、订阅),封装层次化类以简化外部使用。

7.1 架构设计

MQFactory (工厂类)
    ├── MQClient (消息队列客户端,封装底层连接和 Channel)
    │       ├── ev_loop 事件循环(异步线程)
    │       ├── TcpConnection + TcpChannel
    │       └── declare / publish / consume
    ├── Publisher (发布者,基于 MQClient)
    │       └── publish()
    └── Subscriber (订阅者,基于 MQClient)
            └── consume()

7.2 declare_settings — 套件配置结构体

// 交换机类型常量
const std::string DIRECT  = "direct";   // 直接交换
const std::string FANOUT  = "fanout";   // 广播交换
const std::string TOPIC   = "topic";    // 主题交换
const std::string HEADERS = "headers";  // 头部交换
const std::string DELAYED = "delayed";  // 自定义:延迟交换(使用死信队列)

// 死信交换机&队列名称默认前缀
const std::string DLX_PREFIX = "dlx_";

struct declare_settings {
    std::string exchange;        // 交换机名称
    std::string exchange_type;   // 交换机类型
    std::string queue;           // 队列名称
    std::string binding_key;     // 绑定关键字
    size_t delayed_ttl = 0;      // 延迟时间(0 表示非延迟队列)

    // 自动生成死信组件名称
    std::string dlx_exchange() const;
    std::string dlx_queue() const;
    std::string dlx_binding_key() const;
};

7.3 MQClient — 消息队列客户端

class MQClient {
public:
    using ptr = std::shared_ptr<MQClient>;

    MQClient(const std::string &url);
    ~MQClient();

    // 声明交换机、队列并绑定;延迟队列会自动创建死信组件
    void declare(const declare_settings &settings);

    // 发布消息
    bool publish(const std::string &exchange,
                 const std::string &routing_key,
                 const std::string &body);

    // 订阅队列消息
    void consume(const std::string &queue, const MessageCallback &callback);

    // 阻塞等待事件循环退出
    void wait();

private:
    // 核心声明逻辑(支持普通和死信模式)
    void _declared(const declare_settings &settings, AMQP::Table &args, bool is_dlx = false);

private:
    std::mutex _declared_mtx;
    std::mutex _mtx;
    std::condition_variable _cv;      // 阻塞/唤醒机制
    struct ev_loop *_ev_loop;
    struct ev_async _ev_async;
    AMQP::LibEvHandler _handler;
    AMQP::TcpConnection _connection;
    AMQP::TcpChannel _channel;
    std::thread _async_thread;        // 事件循环运行在独立线程
};

构造函数与析构函数:

MQClient::MQClient(const std::string &url)
    : _ev_loop(EV_DEFAULT)
    , _handler(_ev_loop)
    , _connection(&_handler, AMQP::Address(url))
    , _channel(&_connection)
    , _async_thread(std::thread([this]() { ev_run(_ev_loop); })) {
}

MQClient::~MQClient() {
    ev_async_init(&_ev_async, MQClient::callback);
    ev_async_start(_ev_loop, &_ev_async);
    ev_async_send(_ev_loop, &_ev_async);  // 异步通知退出事件循环
}

void MQClient::callback(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
    ev_break(loop, EVBREAK_ALL);  // 必须在 ev_loop 线程内调用
}

declare 实现(条件变量同步等待):

void MQClient::declare(const declare_settings &settings) {
    AMQP::Table args;
    if (settings.exchange_type == DELAYED) {
        // 先声明死信交换机和队列
        _declared(settings, args, true);
        // 设置死信关联参数
        args[DEAD_LETTER_EXCHANGE] = settings.dlx_exchange();
        args[DEAD_LETTER_BINDING_KEY] = settings.dlx_binding_key();
        args[MESSAGE_TTL] = settings.delayed_ttl;
    }
    // 声明常规交换机和队列
    _declared(settings, args, false);
}

publish 实现:

bool MQClient::publish(const std::string &exchange,
                        const std::string &routing_key,
                        const std::string &body) {
    return _channel.publish(exchange, routing_key, body);
}

consume 实现:

void MQClient::consume(const std::string &queue, const MessageCallback &callback) {
    std::unique_lock<std::mutex> declared_lock(_declared_mtx);
    std::unique_lock<std::mutex> lock(_mtx);

    _channel.consume(queue)
        .onMessage([=, this](const AMQP::Message &message,
                              uint64_t deliveryTag, bool redelivered) {
            callback(message.body(), message.bodySize());
            _channel.ack(deliveryTag);  // 自动确认
        })
        .onError([&](const char *message) {
            ERR("订阅队列消息失败: {}", message);
            _cv.notify_all();
            abort();
        })
        .onSuccess([&]() {
            DBG("订阅队列消息成功: {}", queue);
            _cv.notify_all();
        });

    _cv.wait(lock);  // 阻塞直到订阅成功或失败
}

7.4 Publisher — 发布客户端

class Publisher {
public:
    using ptr = std::shared_ptr<Publisher>;

    // 构造时自动调用 declare 确保交换机和队列存在
    Publisher(const MQClient::ptr &mq_client, const declare_settings &settings);
    bool publish(const std::string &body);

private:
    MQClient::ptr _mq_client;
    declare_settings _settings;
};

Publisher::Publisher(const MQClient::ptr &mq_client, const declare_settings &settings)
    : _mq_client(mq_client), _settings(settings) {
    _mq_client->declare(_settings);
}

bool Publisher::publish(const std::string &body) {
    return _mq_client->publish(_settings.exchange, _settings.binding_key, body);
}

7.5 Subscriber — 订阅客户端

class Subscriber {
public:
    using ptr = std::shared_ptr<Subscriber>;

    Subscriber(const MQClient::ptr &mq_client, const declare_settings &settings);
    void consume(MessageCallback &&cb);

private:
    MQClient::ptr _mq_client;
    declare_settings _settings;
    MessageCallback _callback;
};

Subscriber::Subscriber(const MQClient::ptr &mq_client, const declare_settings &settings)
    : _mq_client(mq_client), _settings(settings) {
    _mq_client->declare(_settings);
}

void Subscriber::consume(MessageCallback &&cb) {
    _callback = std::move(cb);
    // 延迟队列实际订阅的是死信队列
    if (_settings.exchange_type == DELAYED) {
        _mq_client->consume(_settings.dlx_queue(), _callback);
    } else {
        _mq_client->consume(_settings.queue, _callback);
    }
}

7.6 MQFactory — 工厂类

class MQFactory {
public:
    template<typename R, typename... Args>
    static std::shared_ptr<R> create(Args&&... args) {
        return std::make_shared<R>(std::forward<Args>(args)...);
    }
};

8. 封装后的使用示例

8.1 目录结构

.
├── CMakeLists.txt
├── build
├── delayed_publish.cc
├── delayed_subscribe.cc
├── simple_publish.cc
└── simple_subscribe.cc

8.2 simple_publish.cc

#include "bite_scaffold/mq.h"
#include "bite_scaffold/log.h"

int main() {
    bitelog::bitelog_init();

    const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
    bitemq::declare_settings settings = {
        .exchange = "test-exchange",
        .exchange_type = "direct",
        .queue = "test-queue",
        .binding_key = "test-binding-key"
    };

    auto mq_client = bitemq::MQFactory::create<bitemq::MQClient>(url);
    auto publisher = bitemq::MQFactory::create<bitemq::Publisher>(mq_client, settings);
    publisher->publish("hello world");
    return 0;
}

8.3 simple_subscribe.cc

#include "bite_scaffold/mq.h"
#include "bite_scaffold/log.h"

int main() {
    bitelog::bitelog_init();

    const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
    bitemq::declare_settings settings = {
        .exchange = "test-exchange",
        .exchange_type = "direct",
        .queue = "test-queue",
        .binding_key = "test-binding-key"
    };

    auto mq_client = bitemq::MQFactory::create<bitemq::MQClient>(url);
    auto subscriber = bitemq::MQFactory::create<bitemq::Subscriber>(mq_client, settings);

    subscriber->consume([](const char* msg, size_t len) {
        std::cout << "receive msg: " << std::string(msg, len) << std::endl;
    });

    mq_client->wait();
    return 0;
}

8.4 delayed_publish.cc — 延迟发布

int main() {
    bitelog::bitelog_init();

    const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
    bitemq::declare_settings settings = {
        .exchange = "delayed-exchange",
        .exchange_type = "delayed",       // ★ 延迟类型
        .queue = "delayed-queue",
        .binding_key = "delayed-binding-key",
        .delayed_ttl = 3000               // ★ 3 秒延迟
    };

    auto mq_client = bitemq::MQFactory::create<bitemq::MQClient>(url);
    auto publisher = bitemq::MQFactory::create<bitemq::Publisher>(mq_client, settings);
    publisher->publish("hello world");
    return 0;
}

8.5 CMakeLists.txt

cmake_minimum_required(VERSION 3.1.3)
project(simple_publish VERSION 1.0)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -std=c++17")

set(simple_publish_target "simple_publish")
add_executable(${simple_publish_target} ${CMAKE_CURRENT_SOURCE_DIR}/simple_publish.cc)

set(simple_subscribe_target "simple_subscribe")
add_executable(${simple_subscribe_target} ${CMAKE_CURRENT_SOURCE_DIR}/simple_subscribe.cc)

set(delayed_publish_target "delayed_publish")
add_executable(${delayed_publish_target} ${CMAKE_CURRENT_SOURCE_DIR}/delayed_publish.cc)

set(delayed_subscribe_target "delayed_subscribe")
add_executable(${delayed_subscribe_target} ${CMAKE_CURRENT_SOURCE_DIR}/delayed_subscribe.cc)

find_package(bite_scaffold REQUIRED)
target_link_libraries(${simple_publish_target} PRIVATE bite_scaffold::bite_scaffold)
target_link_libraries(${simple_subscribe_target} PRIVATE bite_scaffold::bite_scaffold)
target_link_libraries(${delayed_publish_target} PRIVATE bite_scaffold::bite_scaffold)
target_link_libraries(${delayed_subscribe_target} PRIVATE bite_scaffold::bite_scaffold)

8.6 运行演示

# 订阅端
$ ./simple_subscribe
[debug]: 订阅队列消息成功: test-queue
receive msg: hello world

# 发布端
$ ./simple_publish

9. 总结

本文从零开始系统介绍了基于 AMQP-CPP 的 C++ 消息队列开发:

层次 内容
协议基础 AMQP 协议概念、MQ 核心要素(Broker、Exchange、Queue、Binding-Key)
库的安装 AMQP-CPP 源码编译、依赖安装
两种模式 自定义 TcpHandler(灵活但复杂)vs libev 扩展模式(开箱即用)
核心 API Connection、Channel、Deferred 链式回调、Reliable 可靠发布
死信队列 DLX/DLQ 实现延迟消息,TTL + 死信交换机参数配置
面向对象封装 MQClient(异步线程 + 条件变量同步) → Publisher / Subscriber → MQFactory
完整示例 简单消息传输 + 延迟队列,5 行代码即可完成发布/订阅

核心设计要点:

  1. 异步线程模型:libev 事件循环运行在独立线程,通过 ev_async_send 跨线程通信
  2. 条件变量同步declareconsume 操作通过条件变量等待异步回调完成
  3. 延迟队列策略:死信交换机 + TTL 实现延迟消费;订阅端自动订阅死信队列,对外透明
  4. 工厂模式:通过 MQFactory::create<T>() 统一创建,便于依赖注入

更多推荐