大家好,欢迎来到RabbitMQ系列的第四篇文章!上一篇我们已经吃透了RabbitMQ的六大核心组件,还动手在管理界面创建了交换机、队列并完成绑定,搞懂了消息流转的全流程。今天,我们正式进入“代码实操”环节——编写第一个RabbitMQ Hello World程序(Java版),手把手教大家用Java代码创建生产者、消费者,实现消息的发送和接收,把上一篇学到的理论知识,真正落地到代码中。

核心目标:通过最简单的Java程序,掌握RabbitMQ消息发送和接收的核心流程,理解生产者如何连接RabbitMQ、发送消息到交换机,消费者如何连接RabbitMQ、监听队列并接收消息。全程代码可复制、步骤可落地,新手跟着操作,就能成功运行第一个RabbitMQ程序,彻底告别“只会看管理界面,不会写代码”的困境。

前置准备

  • 已完成RabbitMQ环境搭建(参考第二篇文章),确保RabbitMQ服务正常运行,能访问管理界面(http://localhost:15672)。

  • 开发环境:IDEA(或Eclipse)、JDK 8及以上(推荐JDK 8,兼容性最好)、Maven(用于导入RabbitMQ依赖)。

  • 提前在RabbitMQ管理界面,创建好交换机、队列并绑定(参考第三篇实操步骤):交换机名称test_exchange(Direct类型)、队列名称test_queue、绑定键test_key(后续代码会用到,可直接复用)。

提示:如果没有提前创建交换机、队列和绑定,也可以通过代码创建(后续会补充代码创建的方式),但新手建议先手动在管理界面创建,更直观地理解消息流转。

一、第一步:创建Maven项目,导入RabbitMQ依赖

我们先创建一个普通的Maven项目,然后导入RabbitMQ的Java客户端依赖(amqp-client),这是编写RabbitMQ程序的核心依赖,无需手动下载jar包,Maven会自动导入。

步骤1:创建Maven项目

  1. 打开IDEA,点击“File”→“New”→“Project”,选择“Maven”,点击“Next”。

  2. 填写项目信息:GroupId(自定义,比如com.rabbitmq.demo)、ArtifactId(比如rabbitmq-hello-world)、Version(默认1.0-SNAPSHOT),点击“Finish”,等待项目初始化完成。

步骤2:导入RabbitMQ依赖
打开项目的pom.xml文件,在标签中,添加以下依赖(直接复制粘贴即可),然后点击“Reload Maven Project”(刷新依赖),等待依赖导入完成。

<!-- RabbitMQ Java客户端依赖 -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.18.0</version> <!-- 稳定版,适配我们搭建的RabbitMQ 3.12.10 -->
</dependency>

<!-- 日志依赖(可选,用于打印日志,方便调试) -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.36</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.36</version>
</dependency>

避坑提示:依赖版本不要随意修改,本文使用的5.18.0版本,与我们搭建的RabbitMQ 3.12.10版本完全兼容;如果导入依赖失败,检查网络连接,或更换Maven镜像源(比如阿里云镜像)。

二、第二步:编写生产者(Producer)——发送消息到交换机

生产者的核心任务:连接RabbitMQ Broker,创建消息,指定路由键,将消息发送到我们提前创建的test_exchange交换机中。我们创建一个Producer类,全程带详细注释,新手能轻松看懂每一行代码的作用。

完整生产者代码(可直接复制)

package com.rabbitmq.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * RabbitMQ生产者:发送消息到交换机
 */
public class Producer {
    // 1. 交换机名称(与管理界面创建的一致)
    private static final String EXCHANGE_NAME = "test_exchange";
    // 2. 路由键(与管理界面绑定的绑定键一致)
    private static final String ROUTING_KEY = "test_key";
    // 3. RabbitMQ服务地址(本地部署,默认localhost)
    private static final String RABBITMQ_HOST = "localhost";
    // 4. RabbitMQ默认端口(5672,消息通信端口,不是管理界面的15672)
    private static final int RABBITMQ_PORT = 5672;
    // 5. RabbitMQ默认账号密码(guest/guest,仅本地访问可用)
    private static final String RABBITMQ_USERNAME = "guest";
    private static final String RABBITMQ_PASSWORD = "guest";

    public static void main(String[] args) {
        // 1. 创建连接工厂(用于创建与RabbitMQ的连接)
        ConnectionFactory factory = new ConnectionFactory();
        // 2. 配置连接工厂参数(核心参数,缺一不可)
        factory.setHost(RABBITMQ_HOST); // 服务地址
        factory.setPort(RABBITMQ_PORT); // 通信端口
        factory.setUsername(RABBITMQ_USERNAME); // 账号
        factory.setPassword(RABBITMQ_PASSWORD); // 密码

        Connection connection = null; // 连接对象
        Channel channel = null; // 信道对象(消息发送/接收的核心载体)

        try {
            // 3. 创建与RabbitMQ的连接
            connection = factory.newConnection();
            // 4. 创建信道(RabbitMQ的消息操作都通过信道完成,不直接操作连接)
            channel = connection.createChannel();

            // 5. (可选)通过代码创建交换机(如果之前没在管理界面创建,可执行这行代码)
            // 参数说明:交换机名称、交换机类型、是否持久化
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

            // 6. (可选)通过代码创建队列(如果之前没在管理界面创建,可执行这行代码)
            // 参数说明:队列名称、是否持久化、是否独占、是否自动删除、额外参数
            channel.queueDeclare("test_queue", true, false, false, null);

            // 7. (可选)通过代码绑定交换机和队列(如果之前没在管理界面绑定,可执行这行代码)
            // 参数说明:队列名称、交换机名称、路由键
            channel.queueBind("test_queue", EXCHANGE_NAME, ROUTING_KEY);

            // 8. 准备要发送的消息(消息体为字节数组,可发送字符串、JSON等)
            String message = "Hello RabbitMQ! 这是我的第一个RabbitMQ消息";
            // 9. 发送消息到交换机
            // 参数说明:交换机名称、路由键、消息属性(null表示默认)、消息体(字节数组)
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());

            System.out.println("生产者发送消息成功!发送的消息内容:" + message);

        } catch (IOException | TimeoutException e) {
            // 异常处理(新手可直接打印异常,后续可优化为日志输出)
            e.printStackTrace();
        } finally {
            // 10. 关闭资源(先关信道,再关连接,避免资源泄露)
            try {
                if (channel != null && channel.isOpen()) {
                    channel.close();
                }
                if (connection != null && connection.isOpen()) {
                    connection.close();
                }
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}

核心代码解析(新手必看)
很多新手看不懂代码,这里重点解析核心步骤,结合上一篇的概念,帮大家串联起来:

  1. 连接工厂(ConnectionFactory):用于创建与RabbitMQ Broker的连接,需要配置服务地址、端口、账号密码,这些参数与我们搭建的环境一致(本地部署默认localhost:5672,账号guest/guest)。

  2. 连接(Connection):生产者与RabbitMQ Broker的连接,是消息通信的基础,创建后需要关闭(finally中关闭,避免资源泄露)。

  3. 信道(Channel):RabbitMQ的核心操作载体,所有消息的发送、接收、队列/交换机的创建,都通过信道完成,不直接操作连接(因为创建连接成本高,信道可复用,提升效率)。

  4. 交换机/队列/绑定的代码创建:注释中的6、7、8行代码,是可选的——如果之前已经在管理界面创建过,可不用执行;如果没创建,执行这些代码,会自动创建对应的交换机、队列和绑定,与管理界面操作效果一致。

  5. 发送消息(basicPublish方法):核心方法,参数分别是“交换机名称、路由键、消息属性、消息体”,消息体必须是字节数组(所以用String的getBytes()方法转换),路由键必须与绑定键一致(test_key),否则消息无法转发到队列。

三、第三步:编写消费者(Consumer)——监听队列,接收消息

消费者的核心任务:连接RabbitMQ Broker,监听我们创建的test_queue队列,当队列中有消息时,主动获取消息并处理,处理完成后发送ACK确认信号,告知RabbitMQ消息已处理,可删除消息。

注意:消费者是“持续监听”队列的,不会执行一次就结束,与生产者(执行一次发送消息后结束)不同。

完整消费者代码(可直接复制)

package com.rabbitmq.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * RabbitMQ消费者:监听队列,接收并处理消息
 */
public class Consumer {
    // 1. 队列名称(与管理界面/生产者代码中创建的一致)
    private static final String QUEUE_NAME = "test_queue";
    // 2. RabbitMQ服务地址、端口、账号密码(与生产者一致)
    private static final String RABBITMQ_HOST = "localhost";
    private static final int RABBITMQ_PORT = 5672;
    private static final String RABBITMQ_USERNAME = "guest";
    private static final String RABBITMQ_PASSWORD = "guest";

    public static void main(String[] args) {
        // 1. 创建连接工厂(与生产者配置一致)
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(RABBITMQ_HOST);
        factory.setPort(RABBITMQ_PORT);
        factory.setUsername(RABBITMQ_USERNAME);
        factory.setPassword(RABBITMQ_PASSWORD);

        Connection connection = null;
        Channel channel = null;

        try {
            // 2. 创建连接、创建信道(与生产者步骤一致)
            connection = factory.newConnection();
            channel = connection.createChannel();

            // 3. (可选)通过代码创建队列(如果之前没创建,可执行,与生产者代码一致)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // 4. 创建消费者,设置消息处理逻辑(核心步骤)
            // DefaultConsumer是RabbitMQ提供的默认消费者类,需重写handleDelivery方法处理消息
            Consumer consumer = new DefaultConsumer(channel) {
                /**
                 * 当队列中有消息时,会自动调用该方法处理消息
                 * @param consumerTag 消费者标签(可忽略)
                 * @param envelope 消息信封(包含消息的路由键、交换机等信息)
                 * @param properties 消息属性(可忽略)
                 * @param body 消息体(字节数组,需转换为String)
                 * @throws IOException 异常
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 5. 处理消息:将字节数组转换为String
                    String message = new String(body, "UTF-8");
                    System.out.println("消费者接收消息成功!接收的消息内容:" + message);

                    // 6. 发送ACK确认信号(关键步骤,避免消息丢失)
                    // 参数说明:消息标识(envelope.getDeliveryTag())、是否批量确认(false表示单条确认)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };

            // 7. 监听队列,开始接收消息
            // 参数说明:队列名称、是否自动ACK(false表示手动ACK,必须手动发送ACK信号)、消费者对象
            channel.basicConsume(QUEUE_NAME, false, consumer);

            System.out.println("消费者已启动,正在监听队列:" + QUEUE_NAME + ",等待接收消息...");

            // 8. 让消费者持续运行(避免程序执行完就结束)
            System.in.read();

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 9. 关闭资源(与生产者一致,先关信道,再关连接)
            try {
                if (channel != null && channel.isOpen()) {
                    channel.close();
                }
                if (connection != null && connection.isOpen()) {
                    connection.close();
                }
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}

核心代码解析(新手必看)
重点解析消费者与生产者的差异,以及核心注意点:

  1. 消费者的持续监听:代码中第8行System.in.read();的作用是让程序持续运行,避免执行完监听代码后就结束,这样消费者才能一直监听队列中的消息。

  2. 消息处理逻辑(handleDelivery方法):这是消费者的核心方法,当队列中有消息时,RabbitMQ会自动调用该方法,我们只需在方法中编写消息处理逻辑(比如打印消息、存储到数据库等)。

  3. 手动ACK确认(关键避坑点):代码中第7行basicConsume方法的第二个参数是false,表示“手动ACK”,必须在处理完消息后,调用basicAck方法发送确认信号。如果设置为true(自动ACK),RabbitMQ会在消息发送给消费者后,立即删除消息,无论消费者是否处理完成,容易导致消息丢失,新手不推荐。

  4. 队列监听:消费者监听的是“队列”(test_queue),而不是交换机,这与上一篇的概念一致——消费者只能从队列中获取消息,不能直接从交换机获取。

四、第四步:运行程序,测试消息发送与接收

代码编写完成后,我们按照“先启动消费者、再启动生产者”的顺序运行程序,测试消息是否能成功发送和接收,全程带测试步骤,新手跟着操作即可。

测试步骤

  1. 启动RabbitMQ服务:确保RabbitMQ服务已启动(Windows看服务,Linux看进程,Docker看容器),能访问管理界面。

  2. 启动消费者:在IDEA中,找到Consumer类,右键“Run Consumer.main()”,启动消费者程序,控制台会输出:“消费者已启动,正在监听队列:test_queue,等待接收消息…”,表示消费者已成功监听队列。

  3. 启动生产者:找到Producer类,右键“Run Producer.main()”,启动生产者程序,控制台会输出:“生产者发送消息成功!发送的消息内容:Hello RabbitMQ! 这是我的第一个RabbitMQ消息”,表示生产者已成功发送消息。

  4. 查看消费者控制台:此时消费者控制台会自动输出接收的消息:“消费者接收消息成功!接收的消息内容:Hello RabbitMQ! 这是我的第一个RabbitMQ消息”,表示消息接收成功。

  5. 管理界面验证:打开RabbitMQ管理界面,点击“Queues”→“test_queue”,查看队列状态:“Ready”(待消费消息数)为0,“Total consumed”(已消费消息数)为1,说明消息已被成功消费,且已删除。

测试成功效果

  • 生产者控制台:输出发送成功的消息内容。

  • 消费者控制台:输出接收成功的消息内容,且程序持续运行(等待下一条消息)。

  • 管理界面:test_queue队列的待消费消息数为0,已消费消息数为1。

五、新手常见问题排查(避坑汇总)

很多新手运行程序时会遇到各种问题,这里汇总高频问题及解决方案,遇到问题直接对照排查:

1. 问题1:生产者/消费者启动失败,提示“Connection refused: connect”
解决方案:RabbitMQ服务未启动,或服务地址、端口配置错误;检查RabbitMQ服务是否运行,确认代码中host是localhost、port是5672(不是15672)。

2. 问题2:生产者发送消息成功,但消费者接收不到消息
解决方案:

  • 检查交换机、队列、绑定是否存在,名称是否与代码一致(大小写敏感)。

  • 检查生产者的路由键(ROUTING_KEY)与绑定键是否一致(Direct交换机需完全一致)。

  • 检查消费者监听的队列名称是否正确,是否与生产者转发的队列一致。

3. 问题3:消费者接收消息后,管理界面“Ready”消息数仍为1,未删除
解决方案:未发送ACK确认信号,检查消费者代码中是否调用了basicAck方法,且basicConsume方法的第二个参数是false(手动ACK)。

4. 问题4:依赖导入失败,提示“Cannot resolve com.rabbitmq:amqp-client:5.18.0”
解决方案:检查网络连接,或更换Maven镜像源(阿里云镜像),重新刷新Maven依赖。

六、补充:代码创建交换机、队列和绑定(可选)

上一篇我们是手动在管理界面创建交换机、队列和绑定,本文生产者代码中也提供了代码创建的方式(注释中的6、7、8行),如果不想手动操作,可取消注释,运行生产者代码,会自动创建:

  • channel.exchangeDeclare:创建交换机,参数依次为“交换机名称、类型、是否持久化”。

  • channel.queueDeclare:创建队列,参数依次为“队列名称、是否持久化、是否独占、是否自动删除、额外参数”。

  • channel.queueBind:绑定交换机和队列,参数依次为“队列名称、交换机名称、路由键”。

提示:代码创建的组件,与手动在管理界面创建的效果完全一致,后续实操中,我们会更多使用代码创建,实现“代码一键部署”。

七、下一篇预告

恭喜你!今天我们成功编写并运行了第一个RabbitMQ Hello World程序,掌握了生产者发送消息、消费者接收消息的核心流程,还学会了排查常见问题,实现了“理论落地实操”。

下一篇文章,我们将深入学习RabbitMQ的核心特性——四种交换机类型,详细讲解Direct、Topic、Fanout、Headers四种交换机的原理、匹配规则和实操场景,结合Java代码,让你彻底分清每种交换机的用法,灵活应对不同的业务场景(比如一对一通信、广播通知等)。

如果这篇实操教程对你有帮助,欢迎点赞、收藏,如果你在运行代码时遇到其他问题,欢迎在评论区留言,我会一一解答,后续我们继续吃透RabbitMQ!

更多推荐