RabbitMQ系列文章(第四篇):RabbitMQ入门实操——第一个Hello World程序(Java版)
大家好,欢迎来到RabbitMQ系列的第四篇文章!上一篇我们已经吃透了RabbitMQ的六大核心组件,还动手在管理界面创建了交换机、队列并完成绑定,搞懂了消息流转的全流程。今天,我们正式进入“代码实操”环节——编写第一个RabbitMQ Hello World程序(Java版),手把手教大家用Java代码创建生产者、消费者,实现消息的发送和接收,把上一篇学到的理论知识,真正落地到代码中。
核心目标:通过最简单的Java程序,掌握RabbitMQ消息发送和接收的核心流程,理解生产者如何连接RabbitMQ、发送消息到交换机,消费者如何连接RabbitMQ、监听队列并接收消息。全程代码可复制、步骤可落地,新手跟着操作,就能成功运行第一个RabbitMQ程序,彻底告别“只会看管理界面,不会写代码”的困境。
前置准备:
-
已完成RabbitMQ环境搭建(参考第二篇文章),确保RabbitMQ服务正常运行,能访问管理界面(http://localhost:15672)。
-
开发环境:IDEA(或Eclipse)、JDK 8及以上(推荐JDK 8,兼容性最好)、Maven(用于导入RabbitMQ依赖)。
-
提前在RabbitMQ管理界面,创建好交换机、队列并绑定(参考第三篇实操步骤):交换机名称test_exchange(Direct类型)、队列名称test_queue、绑定键test_key(后续代码会用到,可直接复用)。
提示:如果没有提前创建交换机、队列和绑定,也可以通过代码创建(后续会补充代码创建的方式),但新手建议先手动在管理界面创建,更直观地理解消息流转。
一、第一步:创建Maven项目,导入RabbitMQ依赖
我们先创建一个普通的Maven项目,然后导入RabbitMQ的Java客户端依赖(amqp-client),这是编写RabbitMQ程序的核心依赖,无需手动下载jar包,Maven会自动导入。
步骤1:创建Maven项目
-
打开IDEA,点击“File”→“New”→“Project”,选择“Maven”,点击“Next”。
-
填写项目信息:GroupId(自定义,比如com.rabbitmq.demo)、ArtifactId(比如rabbitmq-hello-world)、Version(默认1.0-SNAPSHOT),点击“Finish”,等待项目初始化完成。
步骤2:导入RabbitMQ依赖
打开项目的pom.xml文件,在标签中,添加以下依赖(直接复制粘贴即可),然后点击“Reload Maven Project”(刷新依赖),等待依赖导入完成。
<!-- RabbitMQ Java客户端依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.18.0</version> <!-- 稳定版,适配我们搭建的RabbitMQ 3.12.10 -->
</dependency>
<!-- 日志依赖(可选,用于打印日志,方便调试) -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
</dependency>
避坑提示:依赖版本不要随意修改,本文使用的5.18.0版本,与我们搭建的RabbitMQ 3.12.10版本完全兼容;如果导入依赖失败,检查网络连接,或更换Maven镜像源(比如阿里云镜像)。
二、第二步:编写生产者(Producer)——发送消息到交换机
生产者的核心任务:连接RabbitMQ Broker,创建消息,指定路由键,将消息发送到我们提前创建的test_exchange交换机中。我们创建一个Producer类,全程带详细注释,新手能轻松看懂每一行代码的作用。
完整生产者代码(可直接复制)
package com.rabbitmq.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RabbitMQ生产者:发送消息到交换机
*/
public class Producer {
// 1. 交换机名称(与管理界面创建的一致)
private static final String EXCHANGE_NAME = "test_exchange";
// 2. 路由键(与管理界面绑定的绑定键一致)
private static final String ROUTING_KEY = "test_key";
// 3. RabbitMQ服务地址(本地部署,默认localhost)
private static final String RABBITMQ_HOST = "localhost";
// 4. RabbitMQ默认端口(5672,消息通信端口,不是管理界面的15672)
private static final int RABBITMQ_PORT = 5672;
// 5. RabbitMQ默认账号密码(guest/guest,仅本地访问可用)
private static final String RABBITMQ_USERNAME = "guest";
private static final String RABBITMQ_PASSWORD = "guest";
public static void main(String[] args) {
// 1. 创建连接工厂(用于创建与RabbitMQ的连接)
ConnectionFactory factory = new ConnectionFactory();
// 2. 配置连接工厂参数(核心参数,缺一不可)
factory.setHost(RABBITMQ_HOST); // 服务地址
factory.setPort(RABBITMQ_PORT); // 通信端口
factory.setUsername(RABBITMQ_USERNAME); // 账号
factory.setPassword(RABBITMQ_PASSWORD); // 密码
Connection connection = null; // 连接对象
Channel channel = null; // 信道对象(消息发送/接收的核心载体)
try {
// 3. 创建与RabbitMQ的连接
connection = factory.newConnection();
// 4. 创建信道(RabbitMQ的消息操作都通过信道完成,不直接操作连接)
channel = connection.createChannel();
// 5. (可选)通过代码创建交换机(如果之前没在管理界面创建,可执行这行代码)
// 参数说明:交换机名称、交换机类型、是否持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
// 6. (可选)通过代码创建队列(如果之前没在管理界面创建,可执行这行代码)
// 参数说明:队列名称、是否持久化、是否独占、是否自动删除、额外参数
channel.queueDeclare("test_queue", true, false, false, null);
// 7. (可选)通过代码绑定交换机和队列(如果之前没在管理界面绑定,可执行这行代码)
// 参数说明:队列名称、交换机名称、路由键
channel.queueBind("test_queue", EXCHANGE_NAME, ROUTING_KEY);
// 8. 准备要发送的消息(消息体为字节数组,可发送字符串、JSON等)
String message = "Hello RabbitMQ! 这是我的第一个RabbitMQ消息";
// 9. 发送消息到交换机
// 参数说明:交换机名称、路由键、消息属性(null表示默认)、消息体(字节数组)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
System.out.println("生产者发送消息成功!发送的消息内容:" + message);
} catch (IOException | TimeoutException e) {
// 异常处理(新手可直接打印异常,后续可优化为日志输出)
e.printStackTrace();
} finally {
// 10. 关闭资源(先关信道,再关连接,避免资源泄露)
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}
核心代码解析(新手必看)
很多新手看不懂代码,这里重点解析核心步骤,结合上一篇的概念,帮大家串联起来:
-
连接工厂(ConnectionFactory):用于创建与RabbitMQ Broker的连接,需要配置服务地址、端口、账号密码,这些参数与我们搭建的环境一致(本地部署默认localhost:5672,账号guest/guest)。
-
连接(Connection):生产者与RabbitMQ Broker的连接,是消息通信的基础,创建后需要关闭(finally中关闭,避免资源泄露)。
-
信道(Channel):RabbitMQ的核心操作载体,所有消息的发送、接收、队列/交换机的创建,都通过信道完成,不直接操作连接(因为创建连接成本高,信道可复用,提升效率)。
-
交换机/队列/绑定的代码创建:注释中的6、7、8行代码,是可选的——如果之前已经在管理界面创建过,可不用执行;如果没创建,执行这些代码,会自动创建对应的交换机、队列和绑定,与管理界面操作效果一致。
-
发送消息(basicPublish方法):核心方法,参数分别是“交换机名称、路由键、消息属性、消息体”,消息体必须是字节数组(所以用String的getBytes()方法转换),路由键必须与绑定键一致(test_key),否则消息无法转发到队列。
三、第三步:编写消费者(Consumer)——监听队列,接收消息
消费者的核心任务:连接RabbitMQ Broker,监听我们创建的test_queue队列,当队列中有消息时,主动获取消息并处理,处理完成后发送ACK确认信号,告知RabbitMQ消息已处理,可删除消息。
注意:消费者是“持续监听”队列的,不会执行一次就结束,与生产者(执行一次发送消息后结束)不同。
完整消费者代码(可直接复制)
package com.rabbitmq.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RabbitMQ消费者:监听队列,接收并处理消息
*/
public class Consumer {
// 1. 队列名称(与管理界面/生产者代码中创建的一致)
private static final String QUEUE_NAME = "test_queue";
// 2. RabbitMQ服务地址、端口、账号密码(与生产者一致)
private static final String RABBITMQ_HOST = "localhost";
private static final int RABBITMQ_PORT = 5672;
private static final String RABBITMQ_USERNAME = "guest";
private static final String RABBITMQ_PASSWORD = "guest";
public static void main(String[] args) {
// 1. 创建连接工厂(与生产者配置一致)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RABBITMQ_HOST);
factory.setPort(RABBITMQ_PORT);
factory.setUsername(RABBITMQ_USERNAME);
factory.setPassword(RABBITMQ_PASSWORD);
Connection connection = null;
Channel channel = null;
try {
// 2. 创建连接、创建信道(与生产者步骤一致)
connection = factory.newConnection();
channel = connection.createChannel();
// 3. (可选)通过代码创建队列(如果之前没创建,可执行,与生产者代码一致)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 4. 创建消费者,设置消息处理逻辑(核心步骤)
// DefaultConsumer是RabbitMQ提供的默认消费者类,需重写handleDelivery方法处理消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 当队列中有消息时,会自动调用该方法处理消息
* @param consumerTag 消费者标签(可忽略)
* @param envelope 消息信封(包含消息的路由键、交换机等信息)
* @param properties 消息属性(可忽略)
* @param body 消息体(字节数组,需转换为String)
* @throws IOException 异常
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5. 处理消息:将字节数组转换为String
String message = new String(body, "UTF-8");
System.out.println("消费者接收消息成功!接收的消息内容:" + message);
// 6. 发送ACK确认信号(关键步骤,避免消息丢失)
// 参数说明:消息标识(envelope.getDeliveryTag())、是否批量确认(false表示单条确认)
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 7. 监听队列,开始接收消息
// 参数说明:队列名称、是否自动ACK(false表示手动ACK,必须手动发送ACK信号)、消费者对象
channel.basicConsume(QUEUE_NAME, false, consumer);
System.out.println("消费者已启动,正在监听队列:" + QUEUE_NAME + ",等待接收消息...");
// 8. 让消费者持续运行(避免程序执行完就结束)
System.in.read();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
// 9. 关闭资源(与生产者一致,先关信道,再关连接)
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}
核心代码解析(新手必看)
重点解析消费者与生产者的差异,以及核心注意点:
-
消费者的持续监听:代码中第8行System.in.read();的作用是让程序持续运行,避免执行完监听代码后就结束,这样消费者才能一直监听队列中的消息。
-
消息处理逻辑(handleDelivery方法):这是消费者的核心方法,当队列中有消息时,RabbitMQ会自动调用该方法,我们只需在方法中编写消息处理逻辑(比如打印消息、存储到数据库等)。
-
手动ACK确认(关键避坑点):代码中第7行basicConsume方法的第二个参数是false,表示“手动ACK”,必须在处理完消息后,调用basicAck方法发送确认信号。如果设置为true(自动ACK),RabbitMQ会在消息发送给消费者后,立即删除消息,无论消费者是否处理完成,容易导致消息丢失,新手不推荐。
-
队列监听:消费者监听的是“队列”(test_queue),而不是交换机,这与上一篇的概念一致——消费者只能从队列中获取消息,不能直接从交换机获取。
四、第四步:运行程序,测试消息发送与接收
代码编写完成后,我们按照“先启动消费者、再启动生产者”的顺序运行程序,测试消息是否能成功发送和接收,全程带测试步骤,新手跟着操作即可。
测试步骤
-
启动RabbitMQ服务:确保RabbitMQ服务已启动(Windows看服务,Linux看进程,Docker看容器),能访问管理界面。
-
启动消费者:在IDEA中,找到Consumer类,右键“Run Consumer.main()”,启动消费者程序,控制台会输出:“消费者已启动,正在监听队列:test_queue,等待接收消息…”,表示消费者已成功监听队列。
-
启动生产者:找到Producer类,右键“Run Producer.main()”,启动生产者程序,控制台会输出:“生产者发送消息成功!发送的消息内容:Hello RabbitMQ! 这是我的第一个RabbitMQ消息”,表示生产者已成功发送消息。
-
查看消费者控制台:此时消费者控制台会自动输出接收的消息:“消费者接收消息成功!接收的消息内容:Hello RabbitMQ! 这是我的第一个RabbitMQ消息”,表示消息接收成功。
-
管理界面验证:打开RabbitMQ管理界面,点击“Queues”→“test_queue”,查看队列状态:“Ready”(待消费消息数)为0,“Total consumed”(已消费消息数)为1,说明消息已被成功消费,且已删除。
测试成功效果
-
生产者控制台:输出发送成功的消息内容。
-
消费者控制台:输出接收成功的消息内容,且程序持续运行(等待下一条消息)。
-
管理界面:test_queue队列的待消费消息数为0,已消费消息数为1。
五、新手常见问题排查(避坑汇总)
很多新手运行程序时会遇到各种问题,这里汇总高频问题及解决方案,遇到问题直接对照排查:
1. 问题1:生产者/消费者启动失败,提示“Connection refused: connect”
解决方案:RabbitMQ服务未启动,或服务地址、端口配置错误;检查RabbitMQ服务是否运行,确认代码中host是localhost、port是5672(不是15672)。
2. 问题2:生产者发送消息成功,但消费者接收不到消息
解决方案:
-
检查交换机、队列、绑定是否存在,名称是否与代码一致(大小写敏感)。
-
检查生产者的路由键(ROUTING_KEY)与绑定键是否一致(Direct交换机需完全一致)。
-
检查消费者监听的队列名称是否正确,是否与生产者转发的队列一致。
3. 问题3:消费者接收消息后,管理界面“Ready”消息数仍为1,未删除
解决方案:未发送ACK确认信号,检查消费者代码中是否调用了basicAck方法,且basicConsume方法的第二个参数是false(手动ACK)。
4. 问题4:依赖导入失败,提示“Cannot resolve com.rabbitmq:amqp-client:5.18.0”
解决方案:检查网络连接,或更换Maven镜像源(阿里云镜像),重新刷新Maven依赖。
六、补充:代码创建交换机、队列和绑定(可选)
上一篇我们是手动在管理界面创建交换机、队列和绑定,本文生产者代码中也提供了代码创建的方式(注释中的6、7、8行),如果不想手动操作,可取消注释,运行生产者代码,会自动创建:
-
channel.exchangeDeclare:创建交换机,参数依次为“交换机名称、类型、是否持久化”。
-
channel.queueDeclare:创建队列,参数依次为“队列名称、是否持久化、是否独占、是否自动删除、额外参数”。
-
channel.queueBind:绑定交换机和队列,参数依次为“队列名称、交换机名称、路由键”。
提示:代码创建的组件,与手动在管理界面创建的效果完全一致,后续实操中,我们会更多使用代码创建,实现“代码一键部署”。
七、下一篇预告
恭喜你!今天我们成功编写并运行了第一个RabbitMQ Hello World程序,掌握了生产者发送消息、消费者接收消息的核心流程,还学会了排查常见问题,实现了“理论落地实操”。
下一篇文章,我们将深入学习RabbitMQ的核心特性——四种交换机类型,详细讲解Direct、Topic、Fanout、Headers四种交换机的原理、匹配规则和实操场景,结合Java代码,让你彻底分清每种交换机的用法,灵活应对不同的业务场景(比如一对一通信、广播通知等)。
如果这篇实操教程对你有帮助,欢迎点赞、收藏,如果你在运行代码时遇到其他问题,欢迎在评论区留言,我会一一解答,后续我们继续吃透RabbitMQ!
更多推荐
所有评论(0)