分布式事务之rabbitMQ最终一致性
一般的大型电商网站都会面临的问题:分布式事务,在面临分布式微服务等项目使用传统的单一事务已经无法满足,解决分布式事务的方案也比较多,有TCC事务补偿(基于2PC的实现)、2PC(两阶段提交)、3PC(三阶段提交)等,框架有JTA atomiks等。很多公司也有自己的分布式事务解决方案,比如最开始支付宝的XTS等像JTA atomiks等2PC的方案效率并不高,中间需要一个协调者,并且是同步的,..
一般的大型电商网站都会面临的问题:分布式事务,在面临分布式微服务等项目使用传统的单一事务已经无法满足,解决分布式事务的方案也比较多,有TCC事务补偿(基于2PC的实现)、2PC(两阶段提交)、3PC(三阶段提交)等,框架有JTA atomiks等。很多公司也有自己的分布式事务解决方案,比如最开始支付宝的XTS等
像JTA atomiks等2PC的方案效率并不高,中间需要一个协调者,并且是同步的,性能低下。如果对性能要求并不高的实际业务可以选用
在分布式中,前辈们已经很早就挖掘了一套理论,比如CAP理论,BASE定理 https://www.cnblogs.com/duanxz/p/5229352.html
没有用过rabbitmq,拿来练手了解了解
用自己学习的项目结构,使用的spring cloud
首先向product插入一个商品,但是必须生成一条订单,需要数据是一致的。
product服务:启动类
package com.chwl.cn;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.chwl.cn.ribbon.CustomRibbon;
@SpringBootApplication
@MapperScan(basePackages="com.chwl.cn.mapper")
@EnableEurekaClient
@EnableCircuitBreaker//开启hystrix服务熔断回调
@EnableCaching//开启本地缓存
@RibbonClient(name="CHWL-PROVIDER-ORDER",configuration=CustomRibbon.class)//自定义的负载均衡算法,针对当前服务按照自己的实际业务进行编写负载均衡算法
@EnableFeignClients//feign声明式API调用(RestTemplate+Ribbon负载均衡)
@EnableScheduling//开启定时任务
public class ProductApplication {
public static void main(String[] args) {
SpringApplication.run(ProductApplication.class, args);
}
}
pom.xml需要引入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
server:
port: 8003
#开启断路器,Feign接口API的实现类方法处理
feign:
hystrix:
enabled: true
hystrix:
command:
default: #default全局有效,service id指定应用有效
execution:
timeout:
#如果enabled设置为false,则请求超时交给ribbon控制,为true,则超时作为熔断根据
enabled: true
isolation:
thread:
timeoutInMilliseconds: 3000 #断路器超时时间,默认1000ms
spring:
application:
name: chwl-provider-product #很重要,很重要,很重要,这是微服务向外部暴露的微服务的名字
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
platform: mysql
url: jdbc:mysql://xxx.xxx.xxx.xx:5306/chwl?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false
username: root
password: admin
redis:
# database: 1
host: xxxxxxx
port: 6379
password:
timeout: 10000
lettuce:
pool:
minIdle: 0
maxIdle: 10
maxWait: 10000
max-active: 10
sentinel:
master: master-6379
nodes: 127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381
# cluster:
# nodes:
# - 192.168.91.5:9001
# - 192.168.91.5:9002
# - 192.168.91.5:9003
# - 192.168.91.5:9004
# - 192.168.91.5:9005
# - 192.168.91.5:9006
rabbitmq:
host: 120.79.81.103
port: 5672
username: xxx
password: xxxxxx
publisher-confirms: true #开启消息确认机制
publisher-returns: true #开启发送失败退回
virtual-host: / #虚拟主机(一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列
template:
mandatory: true #保证监听有效
listener:
simple:
acknowledge-mode: manual #消费者的ack方式为手动 auto自动 none不会发送ACK(与channelTransacted=true不兼容)
concurrency: 1 #最小消费者数量
max-concurrency: 10 #最大消费者数量
retry:
enabled: true #支持重试/重发
mybatis:
config-location: classpath:mybatis/mybatis.cfg.xml
#typeAliasesPackage: com.ypp.springcloud.entites
mapper-locations: classpath:mybatis/mapper/**/*Mapper.xml
mapper:
mappers: com.chwl.cn.basemapper.BaseMapper
identity: mysql
eureka:
client: #客户端注册进eureka服务列表内
service-url:
#defaultZone: http://localhost:2001/eureka #这个地址就是EurekaServer注册中心的地址
defaultZone: http://ypp:admin@eureka2001.com:2001/eureka/,http://ypp:admin@eureka2002.com:2002/eureka/
instance:
instance-id: chwl-provider-product
prefer-ip-address: true #访问路径可以显示IP地址
一般每个消息队列配置一个,比如订单队列、产品队列
MQOrderQueueConfig:
package com.chwl.cn.config.mq;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQOrderQueueConfig {
public static final String ORDER_QUEUE_NAME="orderQueue";
public static final String ORDER_ROUTING_KEY="order_routing_key";
public static final String ORDER_EXCHANGE_NAME="order_exchange";
/**
* 将普通队列绑定到交换机上
* 声明一个持久化队列 第二个参数true为持久化,在下次重启后自动加载队列,不设置也是默认持久化
* @return
*/
@Bean
public Queue orderQueue() {
return new Queue(ORDER_QUEUE_NAME,true);
}
@Bean
public DirectExchange orderExchange(){
return new DirectExchange(ORDER_EXCHANGE_NAME);
}
@Bean
public Binding bindingExchange(@Qualifier("orderQueue")Queue queue,@Qualifier("orderExchange")DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with(ORDER_ROUTING_KEY);
}
/**
* 定制化amqp模版
*
* ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack
* ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack
*/
// @Bean
// public RabbitTemplate rabbitTemplate() {
// Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
//
// // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
// rabbitTemplate.setMandatory(true);
//
// // 消息返回, yml需要配置 publisher-returns: true
// rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// String correlationId = message.getMessageProperties().getCorrelationId();
// log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange,
// routingKey);
// });
//
// // 消息确认, yml需要配置 publisher-confirms: true
// rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
// if (ack) {
// // log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
// } else {
// log.debug("消息发送到exchange失败,原因: {}", cause);
// }
// });
// return rabbitTemplate;
// }
}
package com.chwl.cn.service.product.impl;
import java.util.UUID;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.alibaba.fastjson.JSONObject;
import com.chwl.cn.api.OrderClientService;
import com.chwl.cn.config.mq.MQOrderQueueConfig;
import com.chwl.cn.entity.localmessage.LocalMessageEntity;
import com.chwl.cn.entity.order.OrderEntity;
import com.chwl.cn.entity.product.ProductEntity;
import com.chwl.cn.mapper.ProductMapper;
import com.chwl.cn.service.localmessageservice.IMQLocalMessageService;
import com.chwl.cn.service.mq.SenderService;
import com.chwl.cn.service.product.IProductService;
@Service
@Transactional
public class ProductService implements IProductService{
@Autowired
private OrderClientService orderClientService;
@Autowired
private SenderService senderService;
@Autowired
private IMQLocalMessageService mqLocalMessageService;
@Autowired
private ProductMapper mapper;
/**
* 本地消息表和产品插入必须在同一个事务,保证原子操作
*/
@Override
public ProductEntity add(ProductEntity product) {
mapper.insert(product);
OrderEntity order = new OrderEntity().setOrderNumber(UUID.randomUUID().toString()).setSerialNumber(UUID.randomUUID().toString());
String context = JSONObject.toJSONString(order);
// 初始化本地消息为发送失败,mq回调确认收到消息后修改为发送成功
//本地消息服务一般抽离出来做成一个本地消息服务系统,因为其他服务也会用到,也要进行实现最终一致性进行存储本地消息
LocalMessageEntity localMessage = new LocalMessageEntity().setContext(context)
.setSerialNumber(UUID.randomUUID().toString()) // 序列号
.setState(LocalMessageEntity.L_M_STATE_FAIL);
// 添加本地消息服务
mqLocalMessageService.insert(localMessage);
senderService.send(context,String.valueOf(localMessage.getId()));
return product;
}
}
统一的发送消息服务SenderService:
package com.chwl.cn.service.mq;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.alibaba.fastjson.JSONObject;
import com.chwl.cn.config.mq.MQOrderQueueConfig;
import com.chwl.cn.entity.localmessage.LocalMessageEntity;
import com.chwl.cn.service.localmessageservice.IMQLocalMessageService;
@Component
@Transactional
public class SenderService implements ReturnCallback, ConfirmCallback {
private final Logger log= LoggerFactory.getLogger(SenderService.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private IMQLocalMessageService mqLocalMessageService;
// private AmqpTemplate rabbitTemplate;
public void send(String context,String localMessageId) {
System.out.println("Sender发送内容 : " + context);
this.rabbitTemplate.setMandatory(true);// 当Mandatory参数设为true时,如果目的不可达,会发送消息给生产者,生产者通过一个回调函数来获取该信息。
this.rabbitTemplate.setConfirmCallback(this);//确认回调
this.rabbitTemplate.setReturnCallback(this);//失败回退
CorrelationData correlationData = new CorrelationData(localMessageId);//用于确认之后更改本地消息状态或删除--本地消息id
this.rabbitTemplate.convertAndSend(MQOrderQueueConfig.ORDER_EXCHANGE_NAME,MQOrderQueueConfig.ORDER_ROUTING_KEY, context,correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String localMessageId = correlationData.getId();
if (ack) {// 消息发送成功,更新本地消息为已成功发送状态或者直接删除该本地消息记录,剩余的由MQ投递到消费者端,消费者端需要进行幂等,避免产生脏数据
LocalMessageEntity message = new LocalMessageEntity();
message.setId(Long.valueOf(localMessageId));
message.setState(LocalMessageEntity.L_M_STATE_SUCCESS);
mqLocalMessageService.updateById(message);
// mqLocalMessageService.deleteById(localMessageId);
} else {
//失败处理
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode
+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
}
}
兜底方案,防止MQ中途出现故障,保证每个消息都可以发送到MQ,轮训本地消息表对没有发送消息重发,每隔30秒轮训一次
CheckMQLocalMessage:
package com.chwl.cn.config.mq;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.chwl.cn.entity.localmessage.LocalMessageEntity;
import com.chwl.cn.service.localmessageservice.IMQLocalMessageService;
import com.chwl.cn.service.mq.SenderService;
@Component
public class CheckMQLocalMessage {
@Autowired
private SenderService senderService;
@Autowired
private IMQLocalMessageService messageService;
/**
* 兜底方案:必须保证每个消息都发送到MQ消费端进行消费,保证数据最终一致
* 每隔30秒检查本地消息表没有发送成功的消息,进行重试再次发送到MQ
*/
@Scheduled(fixedDelay=1000*30L)
public void checkMQLocalMessage(){
LocalMessageEntity lme = new LocalMessageEntity().setState(LocalMessageEntity.L_M_STATE_FAIL);
List<LocalMessageEntity> failStates = messageService.select(lme);
if(failStates!=null&&failStates.size()>0){
failStates.stream().forEach(messageFailstate->{
senderService.send(messageFailstate.getContext(), String.valueOf(messageFailstate.getId()));
});
}
}
}
订单服务:不同的项目
消费者,可以定义多个消费者,但某一条消息只会有一个消费成功的消费者
OrderConsumer:
package com.chwl.cn.config.mq;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.alibaba.fastjson.JSONObject;
import com.chwl.cn.entity.order.OrderEntity;
import com.chwl.cn.service.order.IOrderService;
import com.rabbitmq.client.Channel;
@Component
@Transactional
public class OrderConsumer {
@Autowired
private IOrderService orderService;
// @RabbitHandler
@RabbitListener(queues = "orderQueue")// @RabbitListener注解用于监听RabbitMQ,queues指定监听哪些队列
public void process(Channel channel, Message message) {
System.err.println("order收到 : " + message.getBody() +"消费时间"+new Date());
try {
OrderEntity orderEntity = JSONObject.parseObject(message.getBody(), OrderEntity.class);
//状态和订单号进行幂等性判断防止应用中途挂掉或异常,MQ没有收到ACK确认导致重发消息数据库重复添加
List<OrderEntity> list = orderService.select(orderEntity);//通用mapper查询--根据实体中的属性值进行查询,查询条件使用等号
if(list!=null&&list.size()>0){
orderService.updateByIdSelective(orderEntity);
}else {
orderService.insert(orderEntity);
}
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//deliveryTag是tag的id,由生产者生成
} catch (IOException e) {
e.printStackTrace();
//丢弃这条消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
System.out.println("receiver fail");
//对同一订单异常次数统计,多次都失败,这里人工干预
}
}
}
测试:
添加商品
添加成功
本地消息添加成功,状态是2,还没有发送到MQ(打了断点)
MQ已经收到消息,并异步确认回调
断点在确认这里,MQ确认回调,然后修改本地消息表状态为已发送表示MQ已经收到消息
更改为已发送状态
启动order服务:
order服务监听了orderQueue队列,对消息已经消费
order数据库添加数据,流程完毕。
连贯走一遍:
中途在打断点的时候定时任务轮训了本地消息表,对打断点的消息也发送到了MQ队列,意味着MQ有两条消息(实际是同一个order数据),这里仍然只添加了一条,需要做幂等处理,防止脏数据。
product和order等表都是不同的数据库
总结:
队列消息模型的特点:
1、消息生产者将消息发送到Queue中,然后消费者监听Queue并接受消息
2、消息被确认消费(ACK机制)之后,就会从Queue中移除,消费者不会接受到已经被消费过的数据
3、Queue支持多个消费者,但对于某一个消息而言,只会存在一个成功消费此消息的消费者
队列生产与消费的流程:
1、Producer生成消息并发送给MQ(同步/异步)
2、MQ接受消息并将消息持久化(持久化为可选操作配置)
3、MQ向生产者返回消息的接受结果(确认/返回值/异常)
4、Consumer监听MQ中的消息
5、Consumer获取到消息执行相应的业务逻辑
6、Consumer进行消费后,对成功消费的消息向MQ进行确认(ACK)
7、MQ得到ACK确认后将消息从MQ中移除
每一步都有可能异常或者出现意外,比如在插入到数据库中出现异常、发送到MQ中途异常、MQ接受消息后返回确认时异常、MQ向消费者投递消息异常、消费者消费消息后向MQ进行ACK确认时异常等。每一步的异常都要进行相应的处理,不然就有可能导致脏数据。
插入到数据库中出现异常:当product插入到数据库异常,这一步还没到MQ,直接回滚。本地消息表的作用在于不过度依赖MQ,MQ中途也有可能挂掉。
发送到MQ中途异常:轮训本地消息表对没有发送到MQ的消息进行重发
MQ接受消息后返回确认时异常:和上步一样,重发到MQ,等MQ给确认收到更改本地消息
MQ向消费者投递消息异常:MQ会进行重试向消费者投递消息,只要消费者没有给MQ ACK确认,MQ就会进行重试,重试配置可根据实际业务配置
消费者消费消息后向MQ进行ACK确认时异常:和上步相同,MQ没收到ACK确认还是会重发
MQ重发时,在消费者需要进行幂等设计处理,防止脏数据
对于本地消息表,可以抽离出来做成消息服务系统,毕竟不只是一个服务在使用,抽离出来可以作为共用
更多推荐
所有评论(0)