rabbitmq相关总结
这里只记录每次怎么在本地开启服务,不涉及具体安装细节,工作时一般由运维人员安装在linux环境上开启服务时,需要切到本地的rabbitmq的\sbin目录下管理页面入口(测试是否正常启动)默认账号:guest默认密码:guest创建账号设置用户角色设置用户权限当前用户和角色rabbitmq依赖队列模型简单队列生产者生产者首先获得连接,之后获得信道,信道初始化队列,然后发送信息消费者消费者首先获得连
文章目录
Windows服务启动
这里只记录每次怎么在本地开启服务,不涉及具体安装细节,工作时一般由运维人员安装在linux环境上
服务开启命令
开启服务时,需要切到本地的rabbitmq的\sbin目录下
Rabbitmq-server
如果遇到端口占用的问题,windows开始面板上搜索rabbitmq-stop,点击后再次输入开启服务命令即可
管理页面入口(测试是否正常启动)
http://127.0.0.1:15672/
默认账号:guest
默认密码:guest
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
当前用户和角色
rabbitmqctl list_users
rabbitmq依赖
<!--指定 jdk 编译版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
队列模型
简单队列
生产者
生产者首先获得连接,之后获得信道,信道初始化队列,然后发送信息
public class Producer {
public static final String QUENE_NAME = "QUEUE_SIMPLE";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123");
//连接
Connection connection = connectionFactory.newConnection();
//信道
Channel channel = connection.createChannel();
channel.queueDeclare(QUENE_NAME,false,false,false,null);
String message = "hello world rabbitmq";
channel.basicPublish("",QUENE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}
消费者
消费者首先获得连接,产生信道,
public class Consumer {
public static final String QUEUE_NAME = "QUEUE_SIMPLE";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123");
//连接
Connection connection = connectionFactory.newConnection();
//信道
Channel channel = connection.createChannel();
System.out.println("等待接受消息");
推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (counsumerTag)->{
System.out.println("消息消费被中断");
};
//1. 消费哪个队列
//2. 消费成功后是否要自动应答
//3. 消费者未成功消费的回调
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
工作队列
核心思想就是轮训分发消息
抽取工具类
首先对获取信道的方法抽取成工具类
调用静态方法getChanel就可以获取一个信道
/**
* 此类为连接工厂创建信道的工具类
*/
public class RabbitMqUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
启动两个工作线程
工作线程c1
/**
* 工作线程(相当于消费者)
*/
public class Worker01 {
//队列的名称
public static final String QUEUE_NAME = "hello";
//接收消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("接收:"+new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag + "消息者取消消费接口回调逻辑");
};
//接收
System.out.println("c1启动...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
工作线程c2
/**
* 工作线程(相当于消费者)
*/
public class Worker02 {
//队列的名称
public static final String QUEUE_NAME = "hello";
//接收消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("接收:"+new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag + "消息者取消消费接口回调逻辑");
};
//接收
System.out.println("c2启动...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
启动发送线程
采用scanner输入流的方式从控制台去输入数据
/**
* 生产者 发送大量的消息
*/
public class Task01 {
public static final String QUEUE_NAME = "hello";
//发送大量的消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//队列的声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制台当中接受消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
}
}
}
发送结果:
接收结果:
work01
work02
总结:生产者一共发送四条消息,两个消费者各接收两条消息,并且是有序的
消息应答
rabbitmq引入消息应答机制,保证消息在发送过程中不丢失,消息应答就是消费者在接收到消息并处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把消息删除了
自动应答
容易丢失消息
手动应答
Channel.basicAck(用于肯定确认)
rabbitmq已知道该消息并且成功的处理消息,可以将其丢弃了
Multiple
Multiple参数一般设置为false
true
false
消息自动重新入队
若消费者失去连接(宕机、连接关闭等),导致消息未发送ACK确认,Rabbitmq将了解到消息未完全处理,并将对其重新排队,此时会分发给别的消费者。这样,即使某个消费者偶尔死亡,也可以确认不会丢失任何消息
此图代表了消息自动重新入队的具体执行流程
代码实现
生产者
/**
* 消息在手动应答时不丢失,放回队列中重新消费
*/
public class Task2 {
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
//从控制台输入
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
System.out.println("生产者发出消息:"+ message);
}
}
}
消费者
消费者c1
沉睡时间1秒,模拟快速处理业务
public class Work03 {
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("c1等待接受消息处理较短");
DeliverCallback deliverCallback =(consumerTag,message)->{
//沉睡1s
SleepUtils.sleep(1);
System.out.println("接收到的消息:"+new String(message.getBody()));
//手动应答
/**
* 1. 消息的标记 tag
* 2. 是否批量应答 false:不批量应答信道中的消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag -> {
System.out.println("消费者取消消费接口回调");
}));
}
}
消费者c2
沉睡时间30秒,模拟慢速处理业务
public class Work04 {
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("c2等待接受消息处理较长");
DeliverCallback deliverCallback =(consumerTag,message)->{
//沉睡1s
SleepUtils.sleep(30);
System.out.println("接收到的消息:"+new String(message.getBody()));
//手动应答
/**
* 1. 消息的标记 tag
* 2. 是否批量应答 false:不批量应答信道中的消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag -> {
System.out.println("消费者取消消费接口回调");
}));
}
}
睡眠工具类
public class SleepUtils {
public static void sleep(int second){
try {
Thread.sleep(1000*second);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
效果
发送消息a,b,测试可知c1接收后,c2隔一段时间接收,此时发送c,d消息,c1接收了c消息,然后把c2服务停掉,模拟宕机,此时c1会去接收d消息,消息并没有丢失,会重新发送
发送消息:
c1
d消息成功接收到
c2
接收到b消息后模拟宕机
Exchanges类型
fagout-发布订阅模式
不处理routingkey,每个队列都会得到
direct-路由模式
根据routingkey去转发消息给指定的队列
topic-主题模式
根据特殊符号去匹配多个,类似于正则表达式
发布订阅模式
- 一个生产者对应多个消费者
- 生产者将消息发送到X(Exchange),每个消费者需要绑定一个队列
- Exchage:接收生产者的消息 推送给消费者
生产者
/**
* 发布订阅模式-生产者
*/
public class producer{
//交换机
public static final String EXCHAGE_NAME = "exchange_ps";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHAGE_NAME,"fanout");
String msg = "hello fbdy";
//发送消息到交换机
channel.basicPublish(EXCHAGE_NAME,"",null,msg.getBytes());
System.out.println("publish msg:"+msg);
}
}
消费者
消费者1
/**
* 消费者1
*/
public class cousumer1 {
//队列
public static final String QUEUE_NAME = "queue_mail";
//交换机
public static final String EXCHAGE_NAME = "exchange_ps";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHAGE_NAME,"");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("cousumer1:"+msg);
}
};
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
消费者2
/**
* 消费者2
*/
public class cousumer2 {
//队列
public static final String QUEUE_NAME = "queue_mail";
//交换机
public static final String EXCHAGE_NAME = "exchange_ps";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHAGE_NAME,"");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("cousumer2:"+msg);
}
};
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
路由模式
- 生产者将消息发送到交换机,消息附带routingkey
- 消费者声明队列与交换机绑定,因为生产者附带routingKey,所以消费者如果想要接收来自交换机的消息,也需要绑定一致的routingKey,这样才可以接收来自生产者的消息。
- 只要消费者的绑定的交换机的路由键与交换机的路由键一致,则可以收到生产者的消息
生产者
- 生产者发布消息给交换机时需要指定routingKey
- 交换机的初始化是由生产者进行的
/**
* 路由模式
*/
public class producer {
public static final String EXCHAGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHAGE_NAME,"direct");
String rountingKey = "warning";
channel.basicPublish(EXCHAGE_NAME,rountingKey,null,"hello,gjj".getBytes());
System.out.println("发送消息:hello,gjj,routingKey:"+rountingKey);
}
}
消费者
消费者1
- 队列的初始化是需要消费者进行的
- 每个队列需要绑定到交换机上的routingKey上,当与交换机的routingKey一致,则可以消费消息。
public class consumer1 {
//声明交换机和队列类型
public static final String EXCHANGE_NAME = "direct_exchange";
public static final String QUEUE_NAME = "direct_queue1";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//绑定队列至交换机,可以通过多个routingKey绑定一个队列
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("counsumer1:"+msg);
}
};
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
消费者2
public class consumer2{
//声明交换机和队列类型
public static final String EXCHANGE_NAME = "direct_exchange";
public static final String QUEUE_NAME = "direct_queue2";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//绑定队列至交换机,可以通过多个routingKey绑定一个队列
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("counsumer2:"+msg);
}
};
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
死信队列
延迟队列
更多推荐
所有评论(0)