黑马RabbitMQ初级学习笔记
1、MQ的基本概念1.1、MQ概述MQ 全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。分布式系统子系统间两种通信方式远程调用通过第三方传递消息小结:1、MQ,消息队列,存储消息的中间件;2、分布式系统通信两种方式:直接远程调用 和 借助第三方 完成间接通信3、发送方成为生产者,接收方称为消费者1.2、MQ的优势和劣势优势:- 应用解耦
本文章根据b站黑马教育-rabbitMQ消息中间件视频总结所写,如有需求,请通过如下链接观看视频
2020黑马——消息中间件RabbitMQ
1、MQ的基本概念
1.1、MQ概述
MQ 全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
分布式系统子系统间两种通信方式
- 远程调用
- 通过第三方传递消息
小结:
1、MQ,消息队列,存储消息的中间件;
2、分布式系统通信两种方式:直接远程调用 和 借助第三方 完成间接通信
3、发送方成为生产者,接收方称为消费者
1.2、MQ的优势和劣势
优势:
- 应用解耦
- 异步提速
- 削峰填谷
劣势:
- 系统可用性降低
- 系统复杂度提高
- 一致性问题
1.3、MQ的优势
1、应用解耦
- 直接远程调用耦合度高
系统的耦合性越高,容错性就越低,可维护性就越低。
- 通过MQ实现解耦
使用MQ使得应用间解耦,提升容错性和可维护性。
2、异步提速
- 远程调用方式:同步方式
一个下单操作耗时:20+300+300+300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
- MQ方式通信:异步
用户点击完下单按钮后,只需等待25ms就能得到下单响应(20+5 = 25ms)
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
3、削峰填谷
-
远程调用
请求瞬间增多,导致A系统压力过大而宕机。 -
MQ传递
使用了MQ之后,限制消费小溪的速度为1000,这样一来,高峰期产生的数据势必会挤压在MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
使用MQ后,可以提高系统稳定性。
小结:
1、应用解耦:提高系统容错性和可维护性
2、异步提速:提升用户体验和系统吞吐量
3、削峰填谷:提高系统稳定性。
1.4、MQ的劣势
-
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。如何保证MQ的高可用? -
系统复杂度提高
MQ的加入大大增加了系统的复杂度,以前系统间时同步的远程调用,现在是通过MQ异步调用。如何保证消息没有被重复消
费?怎样处理消息丢失情况?怎么保证消息传递的顺序性? -
一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。如何保证消息数据处理的一致性?
小结:
既然MQ有优势也有劣势,那么使用MQ需要满足什么条件呢?
① 生产者不需要从消费者出获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为null,这才让明白下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
②容许短暂的不一致性。
③确实用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。
1.5、常见的MQ产品
目前业界有很多的MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ,MetaMq等,也有直接使用Redis充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑。
由于RabbitMQ综合能力强劲,所以接下来的课程中,我们将主要学习RabbitMQ。
1.6、RabbitMQ简介
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。给予此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比http
2007年,Rabbit计数公司给予AMQP标准开发的RabbitMQ 1.0 发布。RabbitMQ采用Erlang语言开发。
Erlang语言由Ericson设计,专门为开发高并发和分布式系统的一种语言,在典型领域使用广泛。
RabbitMQ基础架构如下图:
RabbitMQ中的相关概念:
- Broker:接收和分发消息的应用,RabbitMQ server 就是Message Broker
- Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等
- Connection:publisher/consumer 和 broker之间的TCP连接
- Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候简历TCP Connection的开销将是巨大的,效率也低。Channel是在Connection内部简历的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel进行通讯,AMQP method包含了channel id 帮组客户端和message broker 识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统简历TCP connectino的开销
- Exchange:message到达broker的第一站,根据分发规则,配匹查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point),topic(publish-subscribe)and fanout(multicast)
- Queue:消息最终被送到这里等待consumer取走
- Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。
RabbitMQ提供了6中工作模式:简单模式、work queues、Publish/Subscribe发布与订阅模式、Routing路由模式、Topics主题模式、RPC远程调用模式(远程调用,不太算MQ;暂不做介绍)。
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
1.7、JMS
- JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API
- JMS是JavaEE规范中的一种,类比JDBC
- 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS的实现包,但是开源社区有
小结:
1、RabbitMQ是基于AMQP协议使用Erlang语言开发的一款消息队列产品。
2、RabbitMQ提供了6中工作模式,我们学习5中。这是今天的重点。
3、AMQP是协议,类比HTTP。
JMS是API规范接口,类比JDBC。
2、RabbitMQ的安装和配置
- RabbitMQ的官方地址:http://www.rabbitmq.com/
- 安装文档:资料/软件/安装RabbitMQ.md
3、RabbitMQ快速入门
3.1、入门程序
需求:使用简单模式完成消息传递
步骤:
① 创建工程(生产者、消费者)
② 分别添加依赖
③ 编写生产者发送消息
④ 编写消费者接收消息
Producer_HelloWorld.java
package com.ph.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、常见Channel
Channel channel = connection.createChannel();
//5、创建队列Queue
/*
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 参数:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后,还在
* 3。exclusive:
* *是否独占。只能有一个消费者监听这队列
* *当Connection关闭时,是否删除队列
* 4、autoDelete:是否自动删除。当没有Consumer时,自动删除掉
* 5、arguments:参数
* */
//入门没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("hello_world",true,false,false,null);
//6、发送消息
/*
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
* 参数:
* 1、exchange:交换机名称。简单模式下交换机会使用默认的""
* 2、routingKey:路由名称
* 3、props:配置信息
* 4、body:发送消息数据
* */
String body = "hello rabbitMQ!";
channel.basicPublish("","hello_world",null,body.getBytes());
//7、释放资源
channel.close();
connection.close();
}
}
Consumer_HelloWorld.java
package org.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、常见Channel
Channel channel = connection.createChannel();
//5、创建队列Queue
/*
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 参数:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后,还在
* 3。exclusive:
* *是否独占。只能有一个消费者监听这队列
* *当Connection关闭时,是否删除队列
* 4、autoDelete:是否自动删除。当没有Consumer时,自动删除掉
* 5、arguments:参数
* */
//入门没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("hello_world",true,false,false,null);
//接收消息
/*
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1、queue:队列名称
* 2、autoAck:是否自动确认
* 3、callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
* 回调方法,当收到消息后,会自动执行该方法
* 1、consumerTag:标识
* 2、envelope:获取一些信息,交换机、路由key...
* 3、properties:配置信息
* 4、body:数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag"+consumerTag);
System.out.println("Exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("properties"+properties);
System.out.println("body"+new String(body));
}
};
channel.basicConsume("hello_world",true,consumer);
//消费者不用关闭资源链接,消费者相当于监听程序
}
}
3.2、小结
上述的入门案例中其实使用的是如下的简单模式:
在上图的模型中,有以下概念:
① P:生产者,也就是要发送消息的程序
② C:消费者:消息的接收者,会一直等待消息到来
③ queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
4、RabbitMQ的工作模式
4.1、Work queues工作队列模式
1、模式说明(一条消息只能被一个消费者消费)
- Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
- 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。=
代码:
Producer_WorkQueues.java
package com.ph.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、常见Channel
Channel channel = connection.createChannel();
//5、创建队列Queue
/*
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 参数:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后,还在
* 3。exclusive:
* *是否独占。只能有一个消费者监听这队列
* *当Connection关闭时,是否删除队列
* 4、autoDelete:是否自动删除。当没有Consumer时,自动删除掉
* 5、arguments:参数
* */
//如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
//6、发送消息
/*
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
* 参数:
* 1、exchange:交换机名称。简单模式下交换机会使用默认的""
* 2、routingKey:路由名称
* 3、props:配置信息
* 4、body:发送消息数据
* */
for (int i = 0; i <= 10; i++) {
String body = i+"=>hello rabbitMQ work_queues!";
channel.basicPublish("","work_queues",null,body.getBytes());
}
//7、释放资源
channel.close();
connection.close();
}
}
Consumer_WorkQueues1.java
package org.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、常见Channel
Channel channel = connection.createChannel();
//接收消息
/*
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1、queue:队列名称
* 2、autoAck:是否自动确认
* 3、callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
* 回调方法,当收到消息后,会自动执行该方法
* 1、consumerTag:标识
* 2、envelope:获取一些信息,交换机、路由key...
* 3、properties:配置信息
* 4、body:数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*System.out.println("consumerTag"+consumerTag);
System.out.println("Exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("properties"+properties);*/
System.out.println("body"+new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
//消费者不用关闭资源链接,消费者相当于监听程序
}
}
Consumer_WorkQueues2.java
package org.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、常见Channel
Channel channel = connection.createChannel();
//接收消息
/*
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1、queue:队列名称
* 2、autoAck:是否自动确认
* 3、callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
* 回调方法,当收到消息后,会自动执行该方法
* 1、consumerTag:标识
* 2、envelope:获取一些信息,交换机、路由key...
* 3、properties:配置信息
* 4、body:数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*System.out.println("consumerTag"+consumerTag);
System.out.println("Exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("properties"+properties);*/
System.out.println("body"+new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
//消费者不用关闭资源链接,消费者相当于监听程序
}
}
小结:
1、在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争关系。
2、Work Queues对于任务过重或任务较多情况使用工作队列模式可以提高任务处理的速度。例如:短信服务部署多个,需要有一个节点发送即可。
4.2、Pub/Sub订阅模式
1、模式说明(一条消息可以被多个消费者消费)
在订阅模型中,多了一个Exchange角色,而且过程略有变化:
- P:生产者,也就是发送消息的程序,但是不在发送到队列中,而是发给X(交换机)
- C:消费者,消息的接收者,会一直等待消息到来
- Queue:消息队列,接收消息、缓存消息
- Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,直到如何处理消息,例如:递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3中类型:
1. Fanout:广播,将消息交给所有绑定到交换机的队列
2. Direct:定向,把消息交给符合指定routing key的队列
3. Topic:通配符,把消息交个符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
Producer_SubPub .java
package com.ph.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_SubPub {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建交换机
/*exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments)
参数:
1、exchange:交换机名称
2、type:交换机类型
(1) DIRECT("direct"):定向
(2)FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定的队列。
(3)TOPIC("topic"):通配符的形式
(4)HEADERS("headers"):参数匹配
3、durable:是否持久化
4、autoDelete:自动删除
5、internal:内部使用。一般为false
6、arguments:参数
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);
//6、创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
参数:
1、queue:队列名称;
2、exchange:交换机名称;
3、routingKey:路由键,绑定规则:
如果交换机的类型为fanout,routingKey设置为“”字符串,表示消息会发给每一个与之绑定的queue
4、arguments:参数
*/
channel.queueBind(queue1Name,exchangeName,"",null);
channel.queueBind(queue2Name,exchangeName,"",null);
//8、发送消息
String body = "日志信息:张三调用了findAll方法,日志级别:info";
channel.basicPublish(exchangeName,"",null,body.getBytes());
//9、释放资源
channel.close();
connection.close();
}
}
Consumer_SubPub1 .java
package org.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_SubPub1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、常见Channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
//接收消息
/*
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1、queue:队列名称
* 2、autoAck:是否自动确认
* 3、callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
* 回调方法,当收到消息后,会自动执行该方法
* 1、consumerTag:标识
* 2、envelope:获取一些信息,交换机、路由key...
* 3、properties:配置信息
* 4、body:数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*System.out.println("consumerTag"+consumerTag);
System.out.println("Exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("properties"+properties);*/
System.out.println("body"+new String(body));
System.out.println("将日志信息打印到控制台");
}
};
channel.basicConsume(queue1Name,true,consumer);
//消费者不用关闭资源链接,消费者相当于监听程序
}
}
Consumer_SubPub2 .java
package org.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_SubPub2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、常见Channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
//接收消息
/*
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1、queue:队列名称
* 2、autoAck:是否自动确认
* 3、callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
* 回调方法,当收到消息后,会自动执行该方法
* 1、consumerTag:标识
* 2、envelope:获取一些信息,交换机、路由key...
* 3、properties:配置信息
* 4、body:数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*System.out.println("consumerTag"+consumerTag);
System.out.println("Exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("properties"+properties);*/
System.out.println("body"+new String(body));
System.out.println("将日志信息保存数据库");
}
};
channel.basicConsume(queue2Name,true,consumer);
//消费者不用关闭资源链接,消费者相当于监听程序
}
}
4.3、Routing路由模式
1、模式说明:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个routingKey(路由key)
- 消息的发送方在向exchange发送消息时,也必须指定消息的routingKey
- Exchange不在把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息
图解:
- p:生产者,向exchange发送消息,发送消息时,会制定一个routing key
- X:exchange(交换机),接收生产者消息,然后把消息递交给routing key完全匹配的队列
- C1:消费者,其所在队列指定需要routing key为error的消息
- C2:消费者,其所在队列指定需要routing key为info、error、warning的消息
小结:
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列
Producer_Routing .java
package com.ph.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建交换机
/*exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments)
参数:
1、exchange:交换机名称
2、type:交换机类型
(1) DIRECT("direct"):定向
(2)FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定的队列。
(3)TOPIC("topic"):通配符的形式
(4)HEADERS("headers"):参数匹配
3、durable:是否持久化
4、autoDelete:自动删除
5、internal:内部使用。一般为false
6、arguments:参数
*/
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
//6、创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
参数:
1、queue:队列名称;
2、exchange:交换机名称;
3、routingKey:路由键,绑定规则:
如果交换机的类型为fanout,routingKey设置为“”字符串,表示消息会发给每一个与之绑定的queue
4、arguments:参数
*/
//队列1的绑定 error
channel.queueBind(queue1Name,exchangeName,"error",null);
//队列2的绑定 info error warning
channel.queueBind(queue2Name,exchangeName,"info",null);
channel.queueBind(queue2Name,exchangeName,"error",null);
channel.queueBind(queue2Name,exchangeName,"warning",null);
//8、发送消息
String body = "日志信息:张三调用了findAll方法,日志级别:info";
channel.basicPublish(exchangeName,"info",null,body.getBytes());
//9、释放资源
channel.close();
connection.close();
}
}
Consumer_Routing1 .java
package org.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、常见Channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
//接收消息
/*
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1、queue:队列名称
* 2、autoAck:是否自动确认
* 3、callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
* 回调方法,当收到消息后,会自动执行该方法
* 1、consumerTag:标识
* 2、envelope:获取一些信息,交换机、路由key...
* 3、properties:配置信息
* 4、body:数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*System.out.println("consumerTag"+consumerTag);
System.out.println("Exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("properties"+properties);*/
System.out.println("body"+new String(body));
System.out.println("将日志信息打印到控制台");
}
};
channel.basicConsume(queue1Name,true,consumer);
//消费者不用关闭资源链接,消费者相当于监听程序
}
}
Consumer_Routing2 .java
package org.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、常见Channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
//接收消息
/*
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1、queue:队列名称
* 2、autoAck:是否自动确认
* 3、callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
* 回调方法,当收到消息后,会自动执行该方法
* 1、consumerTag:标识
* 2、envelope:获取一些信息,交换机、路由key...
* 3、properties:配置信息
* 4、body:数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*System.out.println("consumerTag"+consumerTag);
System.out.println("Exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("properties"+properties);*/
System.out.println("body"+new String(body));
System.out.println("将日志信息打印到控制台");
}
};
channel.basicConsume(queue2Name,true,consumer);
//消费者不用关闭资源链接,消费者相当于监听程序
}
}
4.4、Topics通配符模式
1、模式说明
图解:
- 红色Queue:绑定的usa.#,因此凡是以usa.开头的routing key 都会被匹配到;
- 黄色Queue:绑定的是#.news,因此凡是以.news结尾的routing key都会被匹配到;
- *代表1个单词,#代表0到多个单词。
小结:
Topics模式可以实现Pub/Sub发布与订阅模式和Routing路由模式的功能,只是Topics在配置routing key的时候可以使用通配符,显得更加灵活
Producer_Topics .java
package com.ph.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建交换机
/*exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments)
参数:
1、exchange:交换机名称
2、type:交换机类型
(1) DIRECT("direct"):定向
(2)FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定的队列。
(3)TOPIC("topic"):通配符的形式
(4)HEADERS("headers"):参数匹配
3、durable:是否持久化
4、autoDelete:自动删除
5、internal:内部使用。一般为false
6、arguments:参数
*/
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.TOPIC,true,false,false,null);
//6、创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7、绑定队列和交换机
/*
queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
参数:
1、queue:队列名称;
2、exchange:交换机名称;
3、routingKey:路由键,绑定规则:
如果交换机的类型为fanout,routingKey设置为“”字符串,表示消息会发给每一个与之绑定的queue
4、arguments:参数
*/
//routing key 系统的名称.日志的级别
//需求:所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error",null);
channel.queueBind(queue1Name,exchangeName,"order.*",null);
channel.queueBind(queue2Name,exchangeName,"*.*",null);
//8、发送消息
String body = "日志信息:张三调用了findAll方法,日志级别:info";
channel.basicPublish(exchangeName,"order.error",null,body.getBytes());
//9、释放资源
channel.close();
connection.close();
}
}
Consumer_Topics1 .java
package org.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topics1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、常见Channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
//接收消息
/*
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1、queue:队列名称
* 2、autoAck:是否自动确认
* 3、callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
* 回调方法,当收到消息后,会自动执行该方法
* 1、consumerTag:标识
* 2、envelope:获取一些信息,交换机、路由key...
* 3、properties:配置信息
* 4、body:数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*System.out.println("consumerTag"+consumerTag);
System.out.println("Exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("properties"+properties);*/
System.out.println("body"+new String(body));
System.out.println("将日志信息打印到控制台");
}
};
channel.basicConsume(queue1Name,true,consumer);
//消费者不用关闭资源链接,消费者相当于监听程序
}
}
Consumer_Topics2 .java
package org.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topics2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("172.16.98.133");//ip 默认值 localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/itcast");//虚拟机 ,默认值/
factory.setUsername("heima");//用户名 默认 guest
factory.setPassword("heima");//密码 默认 guest
//3、创建连接 Connection
Connection connection = factory.newConnection();
//4、常见Channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
//接收消息
/*
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1、queue:队列名称
* 2、autoAck:是否自动确认
* 3、callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
* 回调方法,当收到消息后,会自动执行该方法
* 1、consumerTag:标识
* 2、envelope:获取一些信息,交换机、路由key...
* 3、properties:配置信息
* 4、body:数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*System.out.println("consumerTag"+consumerTag);
System.out.println("Exchange"+envelope.getExchange());
System.out.println("RoutingKey"+envelope.getRoutingKey());
System.out.println("properties"+properties);*/
System.out.println("body"+new String(body));
System.out.println("将日志信息打印到控制台");
}
};
channel.basicConsume(queue2Name,true,consumer);
//消费者不用关闭资源链接,消费者相当于监听程序
}
}
5、Spring整合RabbitMQ
5.1、Spring整合RabbitMQ
需求:使用spring整合RabbitMQ
步骤:
生产者:
①创建生产者工程
②添加依赖
③配置整合
④编写代码发送消息
消费者
①创建消费者工程
②添加依赖
③配置整合
④编写消息监听器
生产者端代码
1、Maven文件pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spring-rabbitmq-producers</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2、资源文件rabbitmq.properties
rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast
3、spring配置文件spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
默认交换机类型为direct,名字为:"",路由键为队列的名称
-->
<!--
id:bean的名称
name:queue的名称
auto-declare:自动创建
auto-delete:自动删除。最后一个消费者和该队列断开连接后,自动删除队列
exclusive:是否独占(排他)
durable:是否持久化
-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~广播;所有队列都能收到消息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<!--定义广播类型交换机;并绑定上述两个队列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一个单词,#匹配多个单词 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<!--定义广播交换机中的持久化队列,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
4、测试文件ProducerTest.java
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
//1、注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testHelloWorld(){
//2、发送消息
rabbitTemplate.convertAndSend("spring_queue","hello world spring....");
}
@Test
public void testFanout(){
//2、发送消息
rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout");
}
public void testTopic(){
//2、发送消息
rabbitTemplate.convertAndSend("spring_topic_exchange","heima.hehe.haha","spring topics");
}
}
消费者端代码
1、Maven文件pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spring-rabbitmq-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2、资源文件rabbitmq.properties
rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast
3、spring配置文件spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<bean id="springQueueListener" class="com.itheima.rabbitmq.listener.SpringQueueListener"/>
<!--<bean id="fanoutListener1" class="com.itheima.rabbitmq.listener.FanoutListener1"/>
<bean id="fanoutListener2" class="com.itheima.rabbitmq.listener.FanoutListener2"/>
<bean id="topicListenerStar" class="com.itheima.rabbitmq.listener.TopicListenerStar"/>
<bean id="topicListenerWell" class="com.itheima.rabbitmq.listener.TopicListenerWell"/>
<bean id="topicListenerWell2" class="com.itheima.rabbitmq.listener.TopicListenerWell2"/>-->
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<!--<rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
<rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
<rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
<rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>-->
</rabbit:listener-container>
</beans>
4、监听类SpringQueueListener.java
package com.itheima.rabbitmq.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
5、测试文件ConsumerTest.java
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void Test1(){
boolean flag = true;
while(flag){
}
}
}
5.2、SpringBoot整合RabbitMQ
生产者
1、创建生产者SpringBoot工程
2、引入依赖坐标
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>producer-springboot</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
3、编写yml配置,基本信息配置application.yml
# 配置RabbitMQ的基本信息 ip、端口、username password
spring:
rabbitmq:
host: 172.16.98.133 # ip
port: 5672
username: heima
password: heima
virtual-host: /
4、定义交换机,队列以及绑定关系的配置类
RabbitMQConfig.java
package com.itheima.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
//1、交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2、Queue 队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3、队列和交换机绑定关系 Binding
/*
1、知道是哪个队列
2、知道哪个交换机
3、routing key
*/
@Bean
public Binding bootQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
5、注入RabbitTemplate,调用方法,完成消息发送,测试类
ProducerTest.java
import com.itheima.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
//1、注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haaha","这是一条发给topics模式的消息!");
}
}
消费者
1、创建消费者springBoot工程
2、引入依赖坐标pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>consumer-springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>consumer-springboot</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
3、编写yml配置,基本信息配置application.yml
# 配置RabbitMQ的基本信息 ip、端口、username password
spring:
rabbitmq:
host: 172.16.98.133 # ip
port: 5672
username: heima
password: heima
virtual-host: /
4、创建监听类RabbitMQListener.java
package com.example.consumerspringboot;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message){
System.out.println(new String(message.getBody()));
}
}
5、启动测试
更多推荐
所有评论(0)