RabbitMQ中的事务与confirmSelect模式
好久没写技术文章了,由于公司马上要做消息相关的业务,所以最近在Docker上搭了一台RabbitMQ并研究研究。从网易蜂巢上拉取的镜像:docker pull hub.c.163.com/library/rabbitmq:latest启动容器:docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672...
好久没写技术文章了,由于公司马上要做消息相关的业务,所以最近在Docker上搭了一台RabbitMQ并研究研究。
从网易蜂巢上拉取的镜像:
docker pull hub.c.163.com/library/rabbitmq:latest
启动容器:
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
查看容器启动情况:
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
05fb983beef4 rabbitmq:3-management "docker-entrypoint.s…" 3 days ago Up About an hour 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp my-rabbit1
我们可以看到, 主机的15672映射到docker的15672端口,主机的5672映射到docker的5672端口.
在浏览器中输入网址:http://ip:5672/,输入用户名/密码:guest/guest,即可进入RabbitMQ的主界面。
简单的搭建过程就是这样,废话不多说,接下来介绍RabbitMQ事务方面的问题(本文的部分截图来自于网络)。
让我们先看一个RabbitMQ的小例子:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerDemo {
public static void main(String[] args)throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("172.16.41.232");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
try{
String exchangeName = "exchangeName";
String routingKey = "routingKey";
String queueName = "queueName";
channel.exchangeDeclare(exchangeName,"direct",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
byte [] messageBodyBytes = "Hello World!" .getBytes();
for(int i = 0;i<100;i++) {
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
}
}catch (Exception e){
e.printStackTrace();
}finally {
channel.close();
conn.close();
}
}
}
通过上面的代码,消息生产者Producer向Broker发送100条消息(什么是Broker本文暂不做解释,请自行百度),然而生产环境异常复杂,我们怎么确定Broker收到Producer的消息了呢??类似于JDBC中的事务:①开启事务--> ②update/insert/delete-->3成功commit失败rollback,我们来看RabbitMQ对事务的控制。
1、txSelect()、txCommit()与txRollback()
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerDemo {
public static void main(String[] args)throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("172.16.41.232");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
try{
String exchangeName = "exchangeName";
String routingKey = "routingKey";
String queueName = "queueName";
channel.exchangeDeclare(exchangeName,"direct",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
byte [] messageBodyBytes = "Hello World!" .getBytes();
channel.txSelect(); //开启事务
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.txCommit(); //提交事务
}catch (Exception e){
e.printStackTrace();
channel.txRollback(); //回滚
}finally {
channel.close();
conn.close();
}
}
}
在通过txSelect开启事务之后,我们便可以发布消息给broker服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
通过wireshark抓包,我们可以看到事务对RabbitMQ性能的影响。
在事务中,整个过程如下:
Tx.Select-->Tx.Select-OK-->Basic.Publish-->Tx.Commit-->Tx.Commit-OK(注意这里的Tx.Commit与Tx.Commit-Ok之间的时间间隔294ms,由此可见事务还是很耗时的。)
我们再来看看没有事务时的通信是怎样的:
只有Basic.Publish
最后我们看看事务回滚时的通信:
try{
channel.txSelect(); //开启事务
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
int i = 1/0;
channel.txCommit(); //提交事务
}catch (Exception e){
e.printStackTrace();
channel.txRollback(); //回滚
}finally {
channel.close();
conn.close();
}
Tx.Select-->Tx.Select-OK-->Basic.Publish-->Tx.Rollback-->Tx.Rollback-OK
事务确实可以判断producer向Broker发送消息是否成功,只有Broker接受到消息,才会commit,但是使用事务机制的话会降低RabbitMQ的性能,那么有没有更好的方法既能保障producer知道消息已经正确送到,又能基本上不带来性能上的损失呢?从AMQP协议的层面看是没有更好的方法,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式。
2、Comfirm模式
生产者将Channel设置成confirm模式,一旦Channel进入confirm模式,所有在该Channel上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等Channel返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息;
开启Comfire模式的方法:
channel.confirmSelect();
这里注意一下:txSelect与Confirm模式不能共存。
Confirm模式的三种编程方式:
- 串行confirm模式:peoducer每发送一条消息后,调用waitForConfirms()方法,等待broker端confirm。
- 批量confirm模式:producer每发送一批消息后,调用waitForConfirms()方法,等待broker端confirm。
- 异步confirm模式:提供一个回调方法,broker confirm了一条或者多条消息后producer端会回调这个方法。
我们分别来看看这三种confirm模式
1、串行confirm模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerDemo {
public static void main(String[] args)throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("172.16.41.232");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String exchangeName = "exchangeName";
String routingKey = "routingKey";
String queueName = "queueName";
channel.exchangeDeclare(exchangeName,"direct",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
byte [] messageBodyBytes = "Hello World!" .getBytes();
channel.confirmSelect(); //开启confirm模式
try{
for(int i = 0;i<50;i++) {
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
if (channel.waitForConfirms()) { //broker confirm后producer调用
System.out.println("发送成功");
} else {
System.out.println("发送失败");
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
channel.close();
conn.close();
}
}
}
通过循环,发送了50条消息,在channel.waitForConfirms()等待broker发送ack或nack,这种模式每发送一条消息就会等待broker代理服务器返回消息,通过抓包我们可以看到:
2、批量confirm模式:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ProducerDemo {
public static void main(String[] args)throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("172.16.41.232");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String exchangeName = "exchangeName";
String routingKey = "routingKey";
String queueName = "queueName";
channel.exchangeDeclare(exchangeName,"direct",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
byte [] messageBodyBytes = "Hello World!" .getBytes();
channel.confirmSelect(); //开启confirm模式
try{
for(int i = 0;i<50;i++) {
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
}
if (channel.waitForConfirms()) { //broker confirm后producer调用
System.out.println("发送成功");
} else {
System.out.println("发送失败");
}
}catch (Exception e){
e.printStackTrace();
}finally {
channel.close();
conn.close();
}
}
}
通过循环批量发送50条消息,但只在控制台输出了一行“发送成功”,该方法会等到最后一条消息得到ack或者得到nack才会结束,也就是说在waitForConfirms处会造成当前程序的阻塞,这点我们看出broker端默认情况下是进行批量回复的,并不是针对每条消息都发送一条ack消息;
3、异步confirm模式:
通过添加监听器,如果broker返回ack,producer回调handleAck,返回nack,producer回调handleNack
import com.rabbitmq.client.*;
import java.io.IOException;
public class Demo01_ConnectionMQ_Provider {
public static void main(String[] args)throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("172.16.41.232");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String exchangeName = "exchangeNamex";
String routingKey = "routingKeyx";
String queueName = "queueNamex";
channel.exchangeDeclare(exchangeName,"direct",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
byte [] messageBodyBytes = "你好,世界!" .getBytes();
try{
channel.confirmSelect(); // 开启confirm模式
long start = System.currentTimeMillis();
//设置监听器
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("ack:deliveryTag:"+deliveryTag+",multiple:"+multiple);
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("nack:deliveryTag:"+deliveryTag+",multiple:"+multiple);
}
});
for(int i = 0;i<100;i++) { //循环发消息
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
}
}catch (Exception e){
e.printStackTrace();
}finally {
channel.close();
conn.close();
}
}
}
在控制台输出结果:
ack:deliveryTag:8,multiple:true
ack:deliveryTag:1,multiple:false
ack:deliveryTag:3,multiple:true
ack:deliveryTag:6,multiple:true
Process finished with exit code 0
可以看到,发送100条消息,收到的ack个数不一样。你多次运行程序会发现每次发送回来的ack消息中的deliveryTag域的值并不是一样的,说明broker端批量回传给发送者的ack消息并不是以固定的批量大小回传的;
由于是异步的,producer不需要等待broker返回ack任可以继续发送消息,比channel.waitForConfirms()速度快很多。
3、性能测试
Client端机器和RabbitMQ机器配置:CPU:24核,2600MHZ, 64G内存,1TB硬盘。
Client端发送消息体大小10B,线程数为1即单线程,消息都持久化处理(deliveryMode:2)。
分别采用事务模式、普通confirm模式,批量confirm模式和异步confirm模式进行producer实验,比对各个模式下的发送性能。
发送平均速率:
- 事务模式(tx):1637.484
- 普通confirm模式(common):1936.032
- 批量confirm模式(batch):10432.45
- 异步confirm模式(async):10542.06
可以看到事务模式性能是最差的,普通confirm模式性能比事务模式稍微好点,但是和批量confirm模式还有异步confirm模式相比,还是小巫见大巫。批量confirm模式的问题在于confirm之后返回false之后进行重发这样会使性能降低,异步confirm模式(async)编程模型较为复杂,至于采用哪种方式,看情况喽。
4、Consumer端的消息确认
与producer端类似,为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制。consumer在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。
采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。
当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
让我们来看代码:
1、consumer自动向broker发送ack
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
public static void main(String[] args)throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("172.16.41.190");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String queueName = "queueNamex";
QueueingConsumer consumer = new QueueingConsumer(channel);
//设置为true,consumer自动向broker发送ack
channel.basicConsume(queueName, true, consumer);
for(int i=0;i<100;i++){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg); //打印消息
}
channel.close();
conn.close();
}
}
假设有100条消息,consumer 调用
channel.basicConsume(queueName, true, consumer);
设置为true自动向broker发送ack,最后关闭链接。读者可以在rabbitmq的管理界面看到消息从100条减少到0条。
2、consumer手动向broker发送ack
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
public static void main(String[] args)throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("172.16.41.190");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String queueName = "queueNamex";
QueueingConsumer consumer = new QueueingConsumer(channel);
//设置为false,consumer手动向broker发送ack
channel.basicConsume(queueName, false, consumer);
for(int i=0;i<100;i++){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
//consumer手动向broker发送ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(msg);
}
channel.close();
conn.close();
}
}
在consumer端,调用
channel.basicConsume(queueName, false, consumer);
和
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
手动向broker发送ack确认消息被接受,随后关闭链接。
3、consumer不发送ack并且consumer断开链接:这一点要注意让我们来看下面的代码和rabbitmq的管理界面
我用producer发送了100条消息,可以看到,Ready=100,Unacked=0,Total=100;
如果我在Consumer端,设置为手动发送ack方式但最后一直没有发送ack,并且在读取消息后立刻关闭链接
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
public static void main(String[] args)throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("172.16.41.190");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String queueName = "queueNamex";
QueueingConsumer consumer = new QueueingConsumer(channel);
//设置为false,consumer手动向broker发送ack
channel.basicConsume(queueName, false, consumer);
for(int i=0;i<100;i++){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
//不发送ack给broker
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(msg);
}
//关闭连接
channel.close();
conn.close();
}
}
我们再来运行Consumer,来看看输出的结果和rabbitmq的管理界面:
打印了100条消息,但是从rabbitmq的管理界面来看,消息数任仍为100条,并没有被消费掉,这就验证了我前面的话:
如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
4、consumer不发送ack,并且没有关闭连接
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
public static void main(String[] args)throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("172.16.41.190");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String queueName = "queueNamex";
QueueingConsumer consumer = new QueueingConsumer(channel);
//设置为false,consumer手动向broker发送ack
channel.basicConsume(queueName, false, consumer);
for(int i=0;i<100;i++){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
//不发送ack给broker
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(msg);
}
}
}
我们可以看到,Ready变为0,而Unacked变为100,表示consumer没有向broker发送ack,前面我们说过,只有consumer向broker发送了ack,broker才会删除消息,所以此时broker并没有删除消息,如果消费者再次正常消费,依然可以获得消息。
这就是我这几天来对RabbitMQ事务方面的理解,谢谢大家,欢迎转载。
更多推荐
所有评论(0)