谷粒商城 - 个人笔记(高级篇四)
电商后台管理系统+电商系统,掌握微服务的全套方案。该系统有商品服务、仓储服务、订单服务、购物车服务、检索服务、认证中心服务、网关服务、用户服务、秒杀服务、第三方服务。使用SpringBoot+SpringCloud并配套SpringCloud Alibaba系列,引入全套微服务治理方案:Nacos注册中心/配置中心、Feign远程调用、Gateway网关、Redisson缓存、基于ElasticS
前言
- 学习视频:Java项目《谷粒商城》架构师级Java项目实战,对标阿里P6-P7,全网最强
- 学习文档:
-
接口文档:谷粒商城接口文档
- 本内容仅用于个人学习笔记,如有侵扰,联系删除
十、消息队列
RabbitMQ参考文档:中间件系列 - RabbitMQ
1、MQ简介
1)、为什么要用MQ:
- 异步处理
- 应用解耦
- 流量控制
2)、概述:
- 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力
- 消息服务中两个重要概念:
- 消息代理(message broker)和目的地(destination)
- 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
- 消息队列主要有两种形式的目的地
- 队列(queue):点对点消息通信(point-to-point)
- 主题(topic):发布(publish)/订阅(subscribe)消息通信
- 点对点式:
- 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
- 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
- 发布订阅式
- 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息
- JMS(Java Message Service)JAVA消息服务:
- 基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
- AMQP(Advanced Message Queuing Protocol)
- 高级消息队列协议,也是一个消息代理的规范,兼容JMS
- RabbitMQ是AMQP的实现
- Spring支持
- spring-jms提供了对JMS的支持
- spring-rabbit提供了对AMQP的支持
- 需要ConnectionFactory的实现来连接消息代理
- 提供JmsTemplate、RabbitTemplate来发送消息
-
@JmsListener( JMS )、 @RabbitListener ( AMQP)注解在方法上监听消息代理发布的消息
-
@EnableJms、 @EnableRabbit开启支持
- Spring Boot自动配置
- JmsAutoConfiguration
- RabbitAutoConfiguration
- 市面的MQ产品
- ActiveMQ、RabbitMQ、RocketMQ、Kafka
2、RabbitMQ概念
1)、RabbitMQ简介:
3、docker安装RabbitMQ
不下载镜像,直接安装。默认会帮你下载
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
- 4369, 25672 (Erlang发现&集群端口)
- 5672, 5671 (AMQP端口)
- 15672 (web管理后台端口)
- 61613, 61614 (STOMP协议端口)
- 1883, 8883 (MQTT协议端口)
官网地址:https://www.rabbitmq.com/networking.html
修改只要启动docker自动重启rabbitMQ
docker update rabbitmq --restart=always
登录rabbitmq控制台: http://192.168.119.127:15672
账号:guest
密码:guest
1)、创建一个交换机
2)、创建队列
3)、交换机绑定队列
删除交换机,先双击点击要删除的交换机,接着
4、Exchange类型
1)、RabbitMQ运行机制
AMQP中消息的路由过程和 Java 开 发者熟悉的 JMS 存在一些差别,AMQP中增加了 Exchange和 Binding 的角色。生产者把消息发布 到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
2)、Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、 fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接 看另外三种类型:
- Direct Exchange
- Fanout Exchange
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
- Topic Exchange
3)、测试
根据下图要求我们创建交换机和队列
1、exchange.direct
创建交换机
创建队列
绑定队列
发送消息
消费消息
2、exchange.fanout
创建交换机
绑定队列
发送消息
消费消息
3、exchange.topic
创建交换机
绑定队列
发送消息
(1)发送hello.news只匹配*.news的队列
(2)发送atguigu.news的匹配atguigu.#和*.news的队列
5、SpringBoot整合RabbitMQ
RabbitMQ的使用
1、引入amqp;RabbitAutoConfiguration就会自动生效
2、给容器中自动配置了RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
所有的属性都是
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties
3、给配置文件中配置 spring.rabbitmq 信息
4、@EnableRabbit 开启功能
gulimall-order
1)、pom导入amqp依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2)、添加配置(@ConfigurationProperties(prefix = "spring.rabbitmq"))注意配置前缀一定是spring.rabbitmq
spring.rabbitmq.host=192.168.119.127
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
3)、主启动类添加@EnableRabbit注解
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallOrderApplication.class, args);
}
}
6、AmqpAdmin使用
@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;
/**
* 1、如何创建Exchange[hello-java-exchange]、Queue、Binding
* 1)、使用AmqpAdmin进行创建
* 2、如何收发消息
*/
@Test
void contextLoads() {
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功","hello-java-exchange");
}
@Test
public void createQueue(){
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功","hello-java-queue");
}
@Test
public void createBinding(){
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功","hello-java-binding");
}
}
7、RabbitTemplate使用
@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@Test
public void sendMessageTest() {
OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
orderReturnApplyEntity.setId(1L);
orderReturnApplyEntity.setCreateTime(new Date());
orderReturnApplyEntity.setReturnName("哈哈哈");
//1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现Serializable
String msg = "hello word";
//2、配置MyRabbitConfig,让发送的对象类型的消息,可以是一个json
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnApplyEntity);
log.info("消息发送完成{}", orderReturnApplyEntity);
}
}
注意:
配置MyRabbitConfig,让发送的对象类型的消息,可以是一个json
添加“com.atguigu.gulimall.order.config.MyRabbitConfig”类,代码如下:
@Configuration
public class MyRabbitConfig {
/**
* 使用JSON序列化机制,进行消息转换
*
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
8、RabbitListener&RabbitHandler接收消息
监听消息:使用@RabbitListener; 主启动类必须有@EnableRabbit
@RabbitListener:类+方法上(监听哪些队列即可)
@RabbitHandler:标在方法上(重载区分不同的消息)
RabbitListener用法
queues:声明需要监听的所有队列
org.springframework.amqp.core.Message
参数可以写以下内容:
1、Message message:原生消息详细信息。头+体
2、T<发送的消息类型> OrderReturnApplyEntity content
3、Channel channel 当前传输数据的通道
Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
场景:
1)、订单服务启动多个;同一个消息,只能有一个客户端收到
2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
修改“com.atguigu.gulimall.order.GulimallOrderApplicationTests”类,代码如下:
/**
* 同一个队列发送不同消息
*/
@Test
public void sendUniqueMessageTest() {
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
orderReturnApplyEntity.setId(1L);
orderReturnApplyEntity.setCreateTime(new Date());
orderReturnApplyEntity.setReturnName("哈哈哈");
// 配置MyRabbitConfig,让发送的对象类型的消息,可以是一个json
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnApplyEntity, new CorrelationData(UUID.randomUUID().toString()));
} else {
OrderEntity entity = new OrderEntity();
entity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", entity, new CorrelationData(UUID.randomUUID().toString()));
}
}
}
修改“com.atguigu.gulimall.order.service.impl.OrderItemServiceImpl”类,代码如下:
package com.atguigu.gulimall.order.service.impl;
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.entity.OrderReturnApplyEntity;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.util.Map;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.atguigu.common.utils.PageUtils;
import com.atguigu.common.utils.Query;
import com.atguigu.gulimall.order.dao.OrderItemDao;
import com.atguigu.gulimall.order.entity.OrderItemEntity;
import com.atguigu.gulimall.order.service.OrderItemService;
import javax.swing.*;
@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
/**
* queues:声明需要监听的所有队列
*
* org.springframework.amqp.core.Message
* @param message
*
* 参数可以写以下内容
* 1、Message message:原生消息详细信息。头+体
* 2、T<发送的消息类型> OrderReturnApplyEntity content
* 3、Channel channel 当前传输数据的通道
*
* Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
* 场景:
* 1)、订单服务启动多个;同一个消息,只能有一个客户端收到
* 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
*
*/
//@RabbitListener(queues = {"hello-java-queue"})
@RabbitHandler
public void receiverMessage(Message message,OrderReturnApplyEntity content,
Channel channel) throws InterruptedException {
//消息体
byte[] body = message.getBody();
//消息头属性信息
MessageProperties properties = message.getMessageProperties();
System.out.println("接收到消息...内容:" + content);
// Thread.sleep(3000);
System.out.println("消息处理完成=》"+content.getReturnName());
}
@RabbitHandler
public void receiverMessage(OrderEntity orderEntity){
System.out.println("接收到消息...内容:" + orderEntity);
}
}
9、消息确认机制-发送端确认
- 保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制
- publisher confirmCallback 确认模式
- publisher returnCallback 未投递到 queue 退回模式
- consumer ack机制
1)、ConfirmCallback
- spring.rabbitmq.publisher-confirms=true
- 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启confirmcallback 。
- CorrelationData:用来表示当前消息唯一性。
- 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。
- 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。
2)、ReturnCallback
- spring.rabbitmq.publisher-returns=true
- spring.rabbitmq.template.mandatory=true
- confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有 些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到return 退回模式。
- 这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。
定制RabbitTemplate
服务器收到消息就回调
1、开启发送端确认
1、spring.rabbitmq.publisher-confirms=true
2、设置确认回调
2、消息抵达队列就回调
1、#开启发送端抵达队列确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步发送优先回调我们这个returnConfirm
spring.rabbitmq.template.mandatory=true
2、设置确认回调ReturnCallback
3、消费端确认(保证每个消息被正确消费,此时才可以保证broker删除这个消息)
修改application.properties
#开启发送端确认
#spring.rabbitmq.publisher-confirms=true #老版本springboot配置写法
spring.rabbitmq.publisher-confirm-type=correlated #新版本springboot配置写法
#开启发送端抵达队列确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,以异步发送优先回调我们这个returnConfirm
spring.rabbitmq.template.mandatory=true
添加“com.atguigu.gulimall.order.config.MyRabbitConfig”类,代码如下:
package com.atguigu.gulimall.order.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 使用JSON序列化机制,进行消息转换
*
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 服务器收到消息就回调
* 1、开启发送端确认
* 1、spring.rabbitmq.publisher-confirms=true
* 2、设置确认回调
* 2、消息抵达队列就回调
* 1、#开启发送端抵达队列确认
* spring.rabbitmq.publisher-returns=true
* #只要抵达队列,以异步发送优先回调我们这个returnConfirm
* spring.rabbitmq.template.mandatory=true
* 2、设置确认回调ReturnCallback
* 3、消费端确认(保证每个消息被正确消费,此时才可以保证broker删除这个消息)
*/
@PostConstruct //MyRabbitConfig对象创建完以后,执行这个方法
public void initRabbitTemplate(){
//设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 只要消息抵达Broker就b = true
* @param correlationData 当前消息的唯一关联数据(这个消息的唯一id)
* @param b 消息是否成功收到
* @param s 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm...correlationData["+correlationData+"]==>b["+b+"]s==>["+s+"]");
}
});
//设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* @param message 投递失败的消息详细信息
* @param i 回复的状态码
* @param s 回复的文本内容
* @param s1 当时这个消息发给哪个交换机
* @param s2 当时这个消息用哪个路由键
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("Fail Message["+message+"]==>i["+i+"]==>s["+s+"]==>s1["+s1+"]==>s2["+s2+"]");
}
});
}
}
bug:启动发现出现了循环依赖现象
***************************
APPLICATION FAILED TO START
***************************
Description:
The dependencies of some of the beans in the application context form a cycle:
┌─────┐
| myRabbitConfig (field private org.springframework.amqp.rabbit.core.RabbitTemplate com.atguigu.gulimall.order.config.MyRabbitConfig.rabbitTemplate)
↑ ↓
| rabbitTemplate defined in class path resource [org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration$RabbitTemplateConfiguration.class]
↑ ↓
| rabbitTemplateConfigurer defined in class path resource [org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration$RabbitTemplateConfiguration.class]
└─────┘
我们可以看到自定义的mq配置中,注入了一个 MessageConverter
再来看RabbitTemplateConfiguration也有一个 MessageConverter
于是乎形成了
MyRabbitConfig 需要 RabbitTemplate
RabbitTemplate 需要 MessageConverter
MyRabbitConfig 又定义了 MessageConverter
反反复复,就形成了循环依赖
解决方法:
只要把 自定义 MessageConverter 与自定义 RabbitTemplate 不写在同一个类MyRabbitConfig 就行了
添加“com.atguigu.gulimall.order.config.MyMessageConverterConfig”类,代码如下:
@Configuration
public class MyMessageConverterConfig {
/**
* 使用JSON序列化机制,进行消息转换
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
把MyRabbitConfig配置类中的消息转换配置删掉即可。
发送消息打印结果
confirm...correlationData[CorrelationData [id=17a39c15-d309-45a9-b93d-a0e67151a878]]==>ack[true]s==>[null]
confirm...correlationData[CorrelationData [id=37d1b2dc-ddb9-40eb-933f-96729cb4cf36]]==>ack[true]s==>[null]
confirm...correlationData[CorrelationData [id=4a86e085-6785-4c86-8b0f-22ac68b2d333]]==>ack[true]s==>[null]
confirm...correlationData[CorrelationData [id=82026297-a94b-410c-a592-70f7709acad2]]==>ack[true]s==>[null]
confirm...correlationData[CorrelationData [id=81ae0e70-156f-4afe-8f16-af16c1b9ff31]]==>ack[true]s==>[null]
confirm...correlationData[CorrelationData [id=e97032ed-e460-474f-b518-e3bc472f3a6e]]==>ack[true]s==>[null]
confirm...correlationData[CorrelationData [id=f88d5c09-c881-4193-b902-e3974895687e]]==>ack[true]s==>[null]
confirm...correlationData[CorrelationData [id=84289b43-9025-4088-98b7-d7fb8f53355b]]==>ack[true]s==>[null]
confirm...correlationData[CorrelationData [id=7cb6b25d-67e9-426f-9947-1cd6826f2dc4]]==>ack[true]s==>[null]
confirm...correlationData[CorrelationData [id=8cd46d86-2e73-4747-8192-e4d725173b02]]==>ack[true]s==>[null]
10、消息确认机制-消费端确认
Ack消息确认机制
- 消费者获取到消息,成功处理,可以回复Ack给Broker
- basic.ack用于肯定确认;broker将移除此消息
- basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
- basic.reject用于否定确认;同上,但不能批量
- 默认自动ack,消息被消费者收到,就会从broker的queue中移除
- queue无消费者,消息依然会被存储,直到消费者消费
- 消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式
- 消息处理成功,ack(),接受下一个消息,此消息broker就会移除
- 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
-
消息一直没有调用 ack/nack 方法, broker 认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker 移除,会投递给别人
消费端确认(保证每个消息被正确消费,此时才可以保证broker删除这个消息)
1、默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息
问题:
我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,宕机了。发生消息丢失
手动确认模式。只要我们没有明确告诉MQ,货物被签收,没有ACK,消息就一直unacked状态,即使Consumer宕机。消息不会丢失,会重新变为Ready,下一次有新的Consumer连接进来就发给他。
2、
1)、#手动确认收货(ack)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
2)、channel.basicAck(deliveryTag,false);签收;业务成功完成就应该签收
channel.basicNack(deliveryTag,false,true);拒签:业务失败,拒签
添加application.properties
#手动确认收货(ack)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitHandler
public void receiverMessage(Message message, OrderReturnApplyEntity content, Channel channel) {
// 消息体
byte[] body = message.getBody();
// 消息头属性信息
MessageProperties properties = message.getMessageProperties();
System.out.println("接收到消息...内容:" + content);
// Thread.sleep(3000);
System.out.println("消息处理完成=》" + content.getReturnName());
// channel内按顺序自增的
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag:" + deliveryTag);
// 签收货物,非批量模式
try {
if (deliveryTag % 2 == 0) {
// 收货
channel.basicAck(deliveryTag, false);
System.out.println("签收了货物。。。" + deliveryTag);
} else {
// 退货requeue=false 丢弃 requeue=true发挥服务器,服务器重新入队。
channel.basicNack(deliveryTag, false, true);
System.out.println("没有签收货物..." + deliveryTag);
}
} catch (Exception e) {
//网络中断
}
}
接收到消息...内容:OrderReturnApplyEntity(id=1, orderId=null, skuId=null, orderSn=null, createTime=Mon May 20 16:31:49 CST 2024, memberUsername=null, returnAmount=null, returnName=哈哈哈0, returnPhone=null, status=null, handleTime=null, skuImg=null, skuName=null, skuBrand=null, skuAttrsVals=null, skuCount=null, skuPrice=null, skuRealPrice=null, reason=null, description述=null, descPics=null, handleNote=null, handleMan=null, receiveMan=null, receiveTime=null, receiveNote=null, receivePhone=null, companyAddress=null)
消息处理完成=》哈哈哈0
deliveryTag:1
没有签收货物...1
接收到消息...内容:OrderReturnApplyEntity(id=1, orderId=null, skuId=null, orderSn=null, createTime=Mon May 20 16:31:49 CST 2024, memberUsername=null, returnAmount=null, returnName=哈哈哈2, returnPhone=null, status=null, handleTime=null, skuImg=null, skuName=null, skuBrand=null, skuAttrsVals=null, skuCount=null, skuPrice=null, skuRealPrice=null, reason=null, description述=null, descPics=null, handleNote=null, handleMan=null, receiveMan=null, receiveTime=null, receiveNote=null, receivePhone=null, companyAddress=null)
消息处理完成=》哈哈哈2
deliveryTag:2
签收了货物。。。2
接收到消息...内容:OrderReturnApplyEntity(id=1, orderId=null, skuId=null, orderSn=null, createTime=Mon May 20 16:31:49 CST 2024, memberUsername=null, returnAmount=null, returnName=哈哈哈4, returnPhone=null, status=null, handleTime=null, skuImg=null, skuName=null, skuBrand=null, skuAttrsVals=null, skuCount=null, skuPrice=null, skuRealPrice=null, reason=null, description述=null, descPics=null, handleNote=null, handleMan=null, receiveMan=null, receiveTime=null, receiveNote=null, receivePhone=null, companyAddress=null)
消息处理完成=》哈哈哈4
deliveryTag:3
没有签收货物...3
接收到消息...内容:OrderReturnApplyEntity(id=1, orderId=null, skuId=null, orderSn=null, createTime=Mon May 20 16:31:49 CST 2024, memberUsername=null, returnAmount=null, returnName=哈哈哈6, returnPhone=null, status=null, handleTime=null, skuImg=null, skuName=null, skuBrand=null, skuAttrsVals=null, skuCount=null, skuPrice=null, skuRealPrice=null, reason=null, description述=null, descPics=null, handleNote=null, handleMan=null, receiveMan=null, receiveTime=null, receiveNote=null, receivePhone=null, companyAddress=null)
消息处理完成=》哈哈哈6
deliveryTag:4
签收了货物。。。4
接收到消息...内容:OrderReturnApplyEntity(id=1, orderId=null, skuId=null, orderSn=null, createTime=Mon May 20 16:31:49 CST 2024, memberUsername=null, returnAmount=null, returnName=哈哈哈8, returnPhone=null, status=null, handleTime=null, skuImg=null, skuName=null, skuBrand=null, skuAttrsVals=null, skuCount=null, skuPrice=null, skuRealPrice=null, reason=null, description述=null, descPics=null, handleNote=null, handleMan=null, receiveMan=null, receiveTime=null, receiveNote=null, receivePhone=null, companyAddress=null)
消息处理完成=》哈哈哈8
deliveryTag:5
没有签收货物...5
十一、商城业务-订单服务
1、订单中心
电商系统涉及到 3 流,分别时信息流,资金流,物流,而订单系统作为中枢将三者有机的集合起来。
订单模块是电商系统的枢纽,在订单这个环节上需求获取多个模块的数据和信息,同时对这 些信息进行加工处理后流向下个环节,这一系列就构成了订单的信息流通。
1.1、订单构成
- 订单类型包括实体商品订单和虚拟订单商品等,这个根据商城商品和服务类型进行区分。
- 同时订单都需要做父子订单处理,之前在初创公司一直只有一个订单,没有做父子订单处理后期需要进行拆单的时候就比较麻烦,尤其是多商户商场,和不同仓库商品的时候,父子订单就是为后期做拆单准备的。
- 订单编号不多说了,需要强调的一点是父子订单都需要有订单编号,需要完善的时候可以对订单编号的每个字段进行统一定义和诠释。
- 订单状态记录订单每次流转过程,后面会对订单状态进行单独的说明。
- 订单流转时间需要记录下单时间,支付时间,发货时间,结束时间/关闭时间等等
3)、商品信息
- 支付流水单号,这个流水单号是在唤起网关支付后支付通道返回给电商业务平台的支付流水号,财务通过订单号和流水单号与支付通道进行对账使用。
- 支付方式用户使用的支付方式,比如微信支付、支付宝支付、钱包支付、快捷支付等。支付方式有时候可能有两个——余额支付+第三方支付。
- 商品总金额,每个商品加总后的金额;运费,物流产生的费用;优惠总金额,包括促销活动的优惠金额,优惠券优惠金额,虚拟积分或者虚拟币抵扣的金额,会员折扣的金额等之和;实付金额,用户实际需要付款的金额。用户实付金额=商品总金额+运费-优惠总金额
1.2、订单状态
2、订单流程
1、订单创建与支付
- 订单创建前需要预览订单,选择收货信息等
- 订单创建需要锁定库存,库存有才可创建,否则不能创建
- 订单创建后超时未支付需要解锁库存
- 支付成功后,需要进行拆单,根据商品打包方式,所在仓库,物流等进行拆单
- 支付的每笔流水都需要记录,以待查账
- 订单创建,支付成功等状态都需要给 MQ 发送消息,方便其他系统感知订阅
- 修改订单,用户没有提交订单,可以对订单一些信息进行修改,比如配送信息,优惠信息,及其他一些订单可修改范围的内容,此时只需对数据进行变更即可。
- 订单取消,用户主动取消订单和用户超时未支付,两种情况下订单都会取消订单,而超时情况是系统自动关闭订单,所以在订单支付的响应机制上面要做支付的限时处理,尤其是在前面说的下单减库存的情形下面,可以保证快速的释放库存。
另外需要需要处理的是促销优惠中使用的优惠券,权益等视平台规则,进行相应补回给用户。
-
退款,在待发货订单状态下取消订单时,分为缺货退款和用户申请退款。如果是 全部退款则订单更新为关闭状态,若只是做部分退款则订单仍需进行进行,同时生 成一条退款的售后订单,走退款流程。退款金额需原路返回用户的账户。
-
发货后的退款,发生在仓储货物配送,在配送过程中商品遗失,用户拒收,用户 收货后对商品不满意,这样情况下用户发起退款的售后诉求后,需要商户进行退款 的审核,双方达成一致后,系统更新退款状态,对订单进行退款操作,金额原路返 回用户的账户,同时关闭原订单数据。仅退款情况下暂不考虑仓库系统变化。如果 发生双方协调不一致情况下,可以申请平台客服介入。在退款订单商户不处理的情 况下,系统需要做限期判断,比如 5 天商户不处理,退款单自动变更同意退款。
3、搭建页面环境
1、把静态资源放到虚拟机的nginx里,在/mydata/nginx/html/static/目录先创建order文件夹,在创建detail文件夹,并把静态资源上传到这个文件夹
把index.html放到gulimall-order服务,改名为detail.html
2、在/mydata/nginx/html/static/order目录下创建list文件夹,并把静态资源上传到这个文件夹
把index.html放到gulimall-order服务,改名为list.html
3、在/mydata/nginx/html/static/order目录下创建confirm文件夹,并把静态资源上传到这个文件夹
把index.html放到gulimall-order服务,改名为confirm.html
4、在/mydata/nginx/html/static/order目录下创建pay文件夹,并把静态资源上传到这个文件夹
把index.html放到gulimall-order服务,改名为pay.html
5、重启nginx
docker restart nginx
6、在C:\Windows\System32\drivers\etc\hosts文件里添加域名(把属性只读模式去掉,用记事本打开)
#----------gulimall----------
192.168.119.127 gulimall.com
192.168.119.127 search.gulimall.com
192.168.119.127 item.gulimall.com
192.168.119.127 auth.gulimall.com
192.168.119.127 cart.gulimall.com
192.168.119.127 order.gulimall.com
7、在gulimal-gateway添加路由
- id: gulimall_order_route
uri: lb://gulimall-order
predicates:
- Host=order.gulimall.com
8、修改每个html的资源访问路径
9、引入thymeleaf模板引擎
<!--模板引擎 thymeleaf-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
各个页面加上thymeleaf模板空间
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
关闭thymeleaf缓存
spring:
thymeleaf:
cache: false
10、测试
创建“com.atguigu.gulimall.order.web.HelloController”类,代码如下
@Controller
public class HelloController {
@GetMapping("{page}.html")
public String listPage(@PathVariable("page") String page) {
return page;
}
}
启动gulimall-order和guliall-gateway
问题:访问http://order.gulimall.com/confirm.html访问失败,报503
原因:
gulimall-order服务没有加入到注册中心
解决:
- pom文件已经导入gulimall-common依赖,说明gulimall-order服务包含注册中心nacos的依
- 在主启动类添加@EnableDiscoveryClient注解
- 配置应用名和注册中心地址(如果不配置应用名,注册到注册中心不会成功)
spring:
application:
name: gulimall-order
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
访问路径:http://order.gulimall.com/confirm.html
发现confirm.html页面报错,搜素/*把它去掉即可
正常效果
4、订单确认页
4.1、订单确认页流程
接下来我们开发订单确认页的功能,以该流程图为基础进行下面的开发
4.2、整合SpringSession
1、pom添加依赖
<!--属性配置的提示工具-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!--整合SpringSession完成session共享问题-->
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
<!--引入redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
2、修改application.properties
#redis
spring.redis.host=192.168.119.127
spring.redis.port=6379
#SpringSession的存储类型
spring.session.store-type=redis
#线程池属性的配置
gulimall.thread.core= 20
gulimall.thread.max-size= 200
gulimall.thread.keep-alive-time= 10
3、添加SpringSession的配置
添加“com.atguigu.gulimall.order.config.GulimallSessionConfig”类,代码如下
@Configuration
public class GulimallSessionConfig {
@Bean
public CookieSerializer cookieSerializer(){
DefaultCookieSerializer cookieSerializer = new DefaultCookieSerializer();
cookieSerializer.setDomainName("gulimall.com");
cookieSerializer.setCookieName("GULISESSION");
return cookieSerializer;
}
@Bean
public RedisSerializer<Object> springSessionDefaultRedisSerializer(){
return new GenericJackson2JsonRedisSerializer();
}
}
4、添加线程池的配置
添加“com.atguigu.gulimall.order.config.MyThreadConfig”类,代码如下
@Configuration
public class MyThreadConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool){
return new ThreadPoolExecutor(pool.getCore(),
pool.getMaxSize(),
pool.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
}
5、线程池配置需要的属性
添加“com.atguigu.gulimall.order.config.ThreadPoolConfigProperties”类,代码如下
@ConfigurationProperties(prefix = "gulimall.thread")
@Component
@Data
public class ThreadPoolConfigProperties {
private Integer core;
private Integer maxSize;
private Integer keepAliveTime;
}
6、主启动类是上添加SpingSession自动启动的注解
修改“com.atguigu.gulimall.order.GulimallOrderApplication”类,代码如下:
@EnableRedisHttpSession
@EnableDiscoveryClient
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallOrderApplication.class, args);
}
}
7、页面调整
修改商城首页我的订单地链接地址
修改gulimall-product模块的index.html页面,代码如下:
<ul>
<li>
<a th:if="${session.loginUser != null}">欢迎:[[${session.loginUser == null ? '' : session.loginUser.nickname}]]</a>
<a href="http://auth.gulimall.com/login.html" th:if="${session.loginUser == null}">欢迎,请登录</a>
</li>
<li>
<a th:if="${session.loginUser == null}" href="http://auth.gulimall.com/reg.html">免费注册</a>
</li>
<span>|</span>
<li>
<a href="http://order.gulimall.com/list.html">我的订单</a>
</li>
</ul>
8、获取用户信息
修改gulimall-order模块的detail.html页面,代码如下:
<ul class="header_ul_right">
<li style="width: 120px">
<a th:if="${session.loginUser != null}">欢迎:[[${session.loginUser == null ? '' :
session.loginUser.nickname}]]</a>
<a href="http://auth.gulimall.com/login.html" th:if="${session.loginUser == null}">你好,请登录</a>
</li>
<li th:if="${session.loginUser == null}">
<a href="http://auth.gulimall.com/reg.html">免费注册</a>
</li>
修改gulimall-order模块的confirm.html页面,代码如下:
<ul class="header-right">
<li>[[${session.loginUser == null ? '' : session.loginUser.nickname}]]<img src="/static/order/confirm/img/03.png" style="margin-bottom: 0px;margin-left3: 3px;" /><img src="/static/order/confirm/img/06.png" /></li>
<li>|</li>
修改gulimall-order模块的pay.html页面,代码如下:
<div class="Jdbox_head">
<img src="/static/order/confirm/img/logo1.jpg" alt=""><span class="bank">收银台</span>
<ul>
<li><span>[[${session.loginUser == null ? '' : session.loginUser.nickname}]]</span><span>退出</span></li>
<li>我的订单</li>
<li>支付帮助</li>
</ul>
</div>
4.3、订单登录拦截
订单流程
订单生成 -> 支付订单 -> 卖家发货 -> 确认收货 -> 交易成功
修改购物车页面的“去结算”的链接地址
修改gulimall-cart模块的cartLis.html页面,代码如下:
function toTrade() {
window.location.href = "http://order.gulimall.com/toTrade";
}
添加“com.atguigu.gulimall.order.web.OrderWebController”类,代码如下:
@Controller
public class OrderWebController {
@GetMapping("/toTrade")
public String toTrade(){
return "confirm";
}
}
添加登录拦截器类“com.atguigu.gulimall.order.interceptor.LoginUserInterceptor”,代码如下:
@Component
public class LoginUserInterceptor implements HandlerInterceptor {
public static ThreadLocal<MemberResponseVO> loginUser = new ThreadLocal<>();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
MemberResponseVO attribute = (MemberResponseVO) request.getSession().getAttribute(AuthServerConstant.LOGIN_USER);
if (attribute != null){
loginUser.set(attribute);
return true;
}else {
// 没登录就去登录
request.getSession().setAttribute("msg", "请先进行登录");
response.sendRedirect("http://auth.gulimall.com/login.html");
return false;
}
}
}
添加拦截器的配置“com.atguigu.gulimall.order.config.OrderWebConfiguration”类,代码如下:
@Configuration
public class OrderWebConfiguration implements WebMvcConfigurer {
@Autowired
LoginUserInterceptor loginUserInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(loginUserInterceptor).addPathPatterns("/**");
}
}
修改gulimall-auth-server模块的login.html页面,代码如下:
<div class="si_top">
<p>
<span>谷粒商城不会以任何理由要求您转账汇款,谨防诈骗。</span>
<span style="color: red" th:if="${session.msg != null}"><br>[[${session.msg}]]</span>
</p>
</div>
4.4、订单确认页数据获取
可以发现订单结算页,包含以下信息:
- 收货人信息:有更多地址,即有多个收货地址,其中有一个默认收货地址
- 支付方式:货到付款、在线支付,不需要后台提供
- 送货清单:配送方式(不做)及商品列表(根据购物车选中的 skuId 到数据库中查询)
- 发票:不做
- 优惠:查询用户领取的优惠券(不做)及可用积分(京豆)
添加“com.atguigu.gulimall.order.vo.OrderConfirmVo”类,代码如下:
package com.atguigu.gulimall.order.vo;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
/**
* @Description: 订单确认页需要用的数据
* @Date: 2024/5/20 22:21
* @Version 1.0
*/
public class OrderConfirmVo {
//收获地址,ums_member_receive_address表
List<MemberAddressVo> address;
//所有选中的购物项
List<OrderItemVo> items;
//发票。。。
//优惠券信息。。。
//积分
Integer integration;
//订单总额
BigDecimal total;
//应付价格
BigDecimal payPrice;
//防重令牌
String orderToken;
Integer count;
//总件数
public Integer getCount() {
Integer i = 0;
if (items != null) {
for (OrderItemVo item : items) {
i += item.getCount();
}
}
return i;
}
public List<MemberAddressVo> getAddress() {
return address;
}
public void setAddress(List<MemberAddressVo> address) {
this.address = address;
}
public List<OrderItemVo> getItems() {
return items;
}
public void setItems(List<OrderItemVo> items) {
this.items = items;
}
public Integer getIntegration() {
return integration;
}
public void setIntegration(Integer integration) {
this.integration = integration;
}
public BigDecimal getTotal() {
BigDecimal total = new BigDecimal("0");
if (items != null) {
for (OrderItemVo item : items) {
BigDecimal multiply = item.getPrice().multiply(new BigDecimal(item.getCount().toString()));
total = total.add(multiply);
}
}
return total;
}
public BigDecimal getPayPrice() {
return getTotal();
}
public String getOrderToken() {
return orderToken;
}
public void setOrderToken(String orderToken) {
this.orderToken = orderToken;
}
}
添加“com.atguigu.gulimall.order.vo.MemberAddressVo”类,代如下:
package com.atguigu.gulimall.order.vo;
import lombok.Data;
/**
* @Description: 用户收获地址列表
* @Date: 2024/5/20 22:22
* @Version 1.0
*/
@Data
public class MemberAddressVo {
private Long id;
/**
* member_id
*/
private Long memberId;
/**
* 收货人姓名
*/
private String name;
/**
* 电话
*/
private String phone;
/**
* 邮政编码
*/
private String postCode;
/**
* 省份/直辖市
*/
private String province;
/**
* 城市
*/
private String city;
/**
* 区
*/
private String region;
/**
* 详细地址(街道)
*/
private String detailAddress;
/**
* 省市区代码
*/
private String areacode;
/**
* 是否默认
*/
private Integer defaultStatus;
}
添加“com.atguigu.gulimall.order.vo.OrderItemVo”类,代如下:
package com.atguigu.gulimall.order.vo;
import lombok.Data;
import java.math.BigDecimal;
import java.util.List;
/**
* @Description: 选中的购物项
* @Date: 2024/5/20 22:22
* @Version 1.0
*/
@Data
public class OrderItemVo {
private Long skuId;
//标题
private String title;
//图片
private String image;
//商品套餐属性
private List<String> skuAttr;
//价格
private BigDecimal price;
//数量
private Integer count;
//总价
private BigDecimal totalPrice;
// TODO 查询库存状态
private Boolean hasStock;
//重量
private BigDecimal weight;
}
修改“com.atguigu.gulimall.order.web.OrderWebController”类,代码如下:
@Controller
public class OrderWebController {
@Autowired
OrderService orderService;
@GetMapping("/toTrade")
public String toTrade(Model model){
OrderConfirmVo confirmVo = orderService.confirmOrder();
//展示订单确认的数据
model.addAttribute("orderConfirmData",confirmVo);
return "confirm";
}
}
修改“com.atguigu.gulimall.order.service.OrderService”类,代码如下:
/**
* 订单确认页返回需要用到的数据
*
* @return
*/
OrderConfirmVo confirmOrder();
修改“com.atguigu.gulimall.order.service.impl.OrderServiceImpl”类,代码如下:
@Override
public OrderConfirmVo confirmOrder() {
OrderConfirmVo confirmVo = new OrderConfirmVo();
MemberResponseVO memberResponseVO = LoginUserInterceptor.loginUser.get();
// 1、远程查询所有的收货地址列表
List<MemberAddressVo> address = memberFeignService.getAddress(memberResponseVO.getId());
confirmVo.setAddress(address);
// 2、远程查询购物车所有选中的购物项
List<OrderItemVo> items = cartFeignService.getCurrentUserCartItems();
confirmVo.setItems(items);
// 3、查询用户积分
Integer integration = memberResponseVO.getIntegration();
confirmVo.setIntegration(integration);
// 4、其他数据自动计算
return confirmVo;
}
远程调用要开启fegin客户端
@EnableFeignClients
@EnableRedisHttpSession
@EnableDiscoveryClient
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallOrderApplication.class, args);
}
}
1、远程查询所有的收货地址列表
添加“com.atguigu.gulimall.order.feign.MemberFeignService”类,代码如下:
@FeignClient("gulimall-member")
public interface MemberFeignService {
@GetMapping("/member/memberreceiveaddress/{memberId}/addresses")
List<MemberAddressVo> getAddress(@PathVariable("memberId") Long memberId);
}
gulimall-member
修改“com.atguigu.gulimall.member.controller.MemberReceiveAddressController”类,代码如下
@GetMapping("/{memberId}/addresses")
public List<MemberReceiveAddressEntity> getAddress(@PathVariable("memberId") Long memberId){
return memberReceiveAddressService.getAddress(memberId);
}
修改“com.atguigu.gulimall.member.service.MemberReceiveAddressService”类,代码如下:
List<MemberReceiveAddressEntity> getAddress(Long memberId);
修改“com.atguigu.gulimall.member.service.impl.MemberReceiveAddressServiceImpl”类,代码如下:
@Override
public List<MemberReceiveAddressEntity> getAddress(Long memberId) {
List<MemberReceiveAddressEntity> memberAddress = this.list(new QueryWrapper<MemberReceiveAddressEntity>().eq("member_id", memberId));
return memberAddress;
}
2、远程查询购物车所有选中的购物项
添加“com.atguigu.gulimall.order.feign.CartFeignService”类,代码如下:
@FeignClient("gulimall-cart")
public interface CartFeignService {
@GetMapping("/currentUserCartItems")
List<OrderItemVo> getCurrentUserCartItems();
}
gulimall-cart
修改“com.atguigu.gulimall.cart.controller.CartController”类,代码如下
@GetMapping("/currentUserCartItems")
@ResponseBody
public List<CartItem> getCurrentUserCartItems(){
return cartService.getUserCartItems();
}
修改“com.atguigu.gulimall.cart.service.CartService”类,代码如下:
/**
* 获取用户购物车里购物项的所有数据
*
* @return
*/
List<CartItem> getUserCartItems();
修改“com.atguigu.gulimall.cart.service.impl.CartServiceImpl”类,代码如下
@Override
public List<CartItem> getUserCartItems() {
UserInfoTo userInfoTo = CartInterceptor.threadLocal.get();
if (userInfoTo.getUserId() == null){
return null;
}else {
String cartKey = CART_PREFIX + userInfoTo.getUserId();
List<CartItem> cartItems = getCartItems(cartKey);
// 获取所有被选中的购物项
List<CartItem> collect = cartItems.stream().filter(item -> item.getCheck())
.map(item->{
R price = productFeignService.getPrice(item.getSkuId());
// 更新为最新价格
String data = (String) price.get("data");
item.setPrice(new BigDecimal(data));
return item;})
.collect(Collectors.toList());
return collect;
}
}
添加“com.atguigu.gulimall.cart.feign.ProductFeignService”类,代码如下:
@GetMapping("/product/skuinfo/{skuId}/price")
R getPrice(@PathVariable("skuId") Long skuId);
gulimall-product
修改“com.atguigu.gulimall.product.app.SkuInfoController”类,代码如下:
@GetMapping("/{skuId}/price")
public R getPrice(@PathVariable("skuId") Long skuId){
SkuInfoEntity byId = skuInfoService.getById(skuId);
return R.ok().setData(byId.getPrice().toString());
}
4.5、Feign远程调用丢失请求头问题
feign
远程调用的请求头中没有含有JSESSIONID
的cookie
,所以也就不能得到服务端的session
数据,cart认为没登录,获取不了用户信息- 但在
feign
的调用过程中,会使用容器中的RequestInterceptor
对RequestTemplate
进行处理,因此我们可以通过向容器中导入定制的RequestInterceptor
为请求加上cookie
。
RequestContextHolder
为SpingMVC中共享request
数据的上下文,底层由ThreadLocal
实现。经过RequestInterceptor
处理后的请求如下,已经加上了请求头的Cookie
信息
添加“com.atguigu.gulimall.order.config.GulimallFeignConfig”类,代码如下:
package com.atguigu.gulimall.order.config;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
/**
* @Description: 请求拦截器
* @Date: 2024/5/21 9:26
* @Version 1.0
*/
@Configuration
public class GulimallFeignConfig {
@Bean("requestInterceptor")
public RequestInterceptor requestInterceptor() {
return new RequestInterceptor() {
@Override
public void apply(RequestTemplate requestTemplate) {
System.out.println("RequestInterceptor线程..." + Thread.currentThread().getId());
// 1、RequestContextHolder拿到刚进来的请求
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes != null) {
// 老请求
HttpServletRequest request = attributes.getRequest();
if (request != null) {
// 同步请求头数据。Cookie
String cookie = request.getHeader("Cookie");
// 给新请求同步了老请求的cookie
requestTemplate.header("Cookie", cookie);
System.out.println("feign远程之前先执行RequestInterceptor.apply()");
}
}
}
};
}
}
4.6、Feign异步调用丢失请求头问题
- 查询购物项、库存和收货地址都要调用远程服务,串行会浪费大量时间,因此我们使用
CompletableFuture
进行异步编排
- 由于
RequestContextHolder
使用ThreadLocal
共享数据,所以在开启异步时获取不到老请求的信息,自然也就无法共享cookie
了。在这种情况下,我们需要在开启异步的时候将老请求的RequestContextHolder
的数据设置进去
修改“com.atguigu.gulimall.order.service.impl.OrderServiceImpl”类,代码如下
@Override
public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {
OrderConfirmVo confirmVo = new OrderConfirmVo();
MemberResponseVO memberResponseVO = LoginUserInterceptor.loginUser.get();
System.out.println("主线程..." + Thread.currentThread().getId());
// 获取之前的请求
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
// 异步任务编排
// 1、远程查询所有的收货地址列表
CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {
System.out.println("member线程..." + Thread.currentThread().getId());
// 每一个线程都来共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);
List<MemberAddressVo> address = memberFeignService.getAddress(memberResponseVO.getId());
confirmVo.setAddress(address);
}, executor);
// 2、远程查询购物车所有选中的购物项
CompletableFuture<Void> cartFuture = CompletableFuture.runAsync(() -> {
System.out.println("cart线程..." + Thread.currentThread().getId());
// 每一个线程都来共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);
List<OrderItemVo> items = cartFeignService.getCurrentUserCartItems();
confirmVo.setItems(items);
// feign在远程调用之前要构造请求,调用很多拦截器RequestInterceptor interceptor: requestInterceptors
}, executor);
// 3、查询用户积分
Integer integration = memberResponseVO.getIntegration();
confirmVo.setIntegration(integration);
// 4、其他数据自动计算
// 5、TODO 防重令牌想·
CompletableFuture.allOf(getAddressFuture, cartFuture).get();
return confirmVo;
}
4.7、订单确认页页面调整
<!--主体部分-->
<p class="p1">填写并核对订单信息</p>
<div class="section">
<!--收货人信息-->
<div class="top-2">
<span>收货人信息</span>
<span>新增收货地址</span>
</div>
<!--地址-->
<div class="top-3" th:each="addr:${orderConfirmData.address}">
<p>[[${addr.name}]]</p><span>[[${addr.name}]] [[${addr.province}]] [[${addr.city}]] [[${addr.detailAddress}]] [[${addr.phone}]]</span>
</div>
<p class="p2">更多地址︾</p>
<div class="hh1"/></div>
<!--********************************************************************************************-->
<!--谷粒学院自提-->
<div class="top-4">
<p>谷粒学院自提</p>
<p>省运费·无续重·随时取</p>
<p class="xiang">详情</p>
</div>
<!--地址-->
<!--支付方式-->
<h4 class="h4">支付方式</h4>
<div class="top-6">
<p>货到付款</p>
<p><span>惠</span>在线支付</p>
</div>
<div class="hh1"></div>
<!--送货清单-->
<h4 class="h4" style="margin-top: 5px;">送货清单</h4>
<div class="top_1">
<div class="to_left">
<h5><span class="peisong">配送方式</span><span class="dui"><img src="/static/order/confirm/img/i_03.png"/> 对应商品</span></h5>
<div class="box">
谷粒学院快递
</div>
<p class="biao">
<span class="til">标 准 达 :</span>
<span class="con">预计 12月16日[今天] 15:00-19:00 送达</span>
<a href="/static/order/confirm/#">修改</a>
</p>
<div class="updata-1">
<img src="/static/order/confirm/img/im_06.png" />
<span>京准达 标准达</span>
<span style="color: black;"> 配送服务全面升级</span>
</div>
<div class="hh1"></div>
<p class="tui">
<span class="til">退换无忧:</span>
<span class="con">
<input type="checkbox" />
自签收后7天内退货,15天内换<span style="font-size: 12px;margin-left: 5px"> ¥ 0.50</span><br />
<span class="nul">货,</span>可享1次上门取件服务 ﹀
</span>
<div class="updata-2">
<img src="/static/order/confirm/img/im_11.png" />
<span>京准达运费大促(限自营中小件)</span>
</div>
</p>
<p class="kg" style="color:#666666;margin-top: 13px;font-size: 12px">总重量 :<span style="color:#999999;font-size: 12px">0.095kg</span></p>
</div>
<div class="to_right">
<h5>商家:谷粒学院自营</h5>
<div><button>换购</button><span>已购满20.00元,再加49.90元,可返回购物车领取赠品</span></div>
<!--图片-->
<div class="yun1" th:each="item:${orderConfirmData.items}">
<img style="width: 150px;height: 100px;" th:src="${item.image}" class="yun"/>
<div class="mi">
<p>[[${item.title}]] <span style="color: red;"> ¥[[${#numbers.formatDecimal(item.price, 1, 2)}]]</span> <span> x[[${item.count}]]</span> <span>[[${item.hasStock?"有货":"无货"}]]</span></p>
<p><span>0.095kg</span></p>
<p class="tui-1"><img src="/static/order/confirm/img/i_07.png" />支持7天无理由退货</p>
</div>
</div>
<div class="hh1"></div>
<p>退换无忧 <span class="money">¥ 0.00</span></p>
</div>
</div>
<div class="bto">
<div class="hh2"></div>
<h4 class="float">发票信息</h4>
<div class="note float"><img src="/static/order/confirm/img/i_11.png" /> <span>开企业抬头发票须填写纳税人识别号,以免影响报销</span></div>
<ul style="clear: both;">
<li>电子普通发票 <img src="/static/order/confirm/img/i_14.png" /></li>
<li>个人</li>
<li>商品明细</li>
<li>
<a href="/static/order/confirm/">修改</a>
</li>
</ul>
<div class="hh3"></div>
<h4 class="clear">使用优惠/礼品卡/抵用 ^</h4>
<ul>
<li class="red">优惠卡</li>
<li>礼品卡</li>
<li>京豆</li>
<li>余额</li>
<li>领奖码</li>
</ul>
<div class="tuijian clear">
<input type="checkbox" />
<span>优惠组合推荐</span>
</div>
</div>
<div class="xia">
<div class="qian">
<p class="qian_y">
<span>[[${orderConfirmData.count}]]</span>
<span>件商品,总商品金额:</span>
<span class="rmb">¥[[${#numbers.formatDecimal(orderConfirmData.total, 1, 2)}]]</span>
</p>
<p class="qian_y">
<span>返现:</span>
<span class="rmb"> -¥0.00</span>
</p>
<p class="qian_y">
<span>运费: </span>
<span class="rmb">   ¥0.00</span>
</p>
<p class="qian_y">
<span>服务费: </span>
<span class="rmb">   ¥0.00</span>
</p>
<p class="qian_y">
<span>退换无忧: </span>
<span class="rmb">   ¥0.00</span>
</p>
</div>
<div class="yfze">
<p class="yfze_a"><span class="z">应付总额:</span><span class="hq">¥[[${#numbers.formatDecimal(orderConfirmData.payPrice, 1, 2)}]]</span></p>
<p class="yfze_b">寄送至: 北京 朝阳区 三环到四环之间 朝阳北路复兴国际大厦23层麦田房产 IT-中心研发二部 收货人:赵存权 188****5052</p>
</div>
<button class="tijiao">提交订单</button>
</div>
</div>
http://order.gulimall.com/toTrade
4.8、订单确认页库存查询
修改“com.atguigu.gulimall.order.service.impl.OrderServiceImpl”类,代码如下:
@Override
public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {
OrderConfirmVo confirmVo = new OrderConfirmVo();
MemberResponseVO memberResponseVO = LoginUserInterceptor.loginUser.get();
System.out.println("主线程..."+Thread.currentThread().getId());
//获取之前的请求
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
//异步任务编排
CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {
//1、远程查询所有的收货地址列表
System.out.println("member线程..."+Thread.currentThread().getId());
//每一个线程都来共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);
List<MemberAddressVo> address = memberFeignService.getAddress(memberResponseVO.getId());
confirmVo.setAddress(address);
}, executor);
CompletableFuture<Void> cartFuture = CompletableFuture.runAsync(() -> {
//2、远程查询购物车所有选中的购物项
System.out.println("cart线程..."+Thread.currentThread().getId());
//每一个线程都来共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);
List<OrderItemVo> items = cartFeignService.getCurrentUserCartItems();
confirmVo.setItems(items);
//feign在远程调用之前要构造请求,调用很多拦截器RequestInterceptor interceptor: requestInterceptors
}, executor).thenRunAsync(()->{
//查询库存信息
List<OrderItemVo> items = confirmVo.getItems();
List<Long> collect = items.stream().map(item -> item.getSkuId()).collect(Collectors.toList());
R hasStock = wmsFeignService.getSkusHasStock(collect);
List<SkuStockVo> data = hasStock.getData(new TypeReference<List<SkuStockVo>>() {
});
if (data != null){
Map<Long, Boolean> map = data.stream().collect(Collectors.toMap(SkuStockVo::getSkuId, SkuStockVo::getHasStock));
confirmVo.setStocks(map);
}
},executor);
添加“com.atguigu.gulimall.order.vo.SkuStockVo” 类,代码如下:
@Data
public class SkuStockVo {
private Long skuId;
private Boolean hasStock;
}
修改“com.atguigu.gulimall.order.vo.OrderConfirmVo”类,添加stocks参数,并添加getter,setter方法
修改“com.atguigu.gulimall.order.vo.OrderItemVo”类,去掉hasStock属性
远程调用库存,查询是否有库存
gulimall-ware
添加“com.atguigu.gulimall.order.feign.WmsFeignService”类,代码如下
@FeignClient("gulimall-ware")
public interface WmsFeignService {
//查询sku是否有库存
@PostMapping("/ware/waresku/hasStock")
public R getSkusHasStock(@RequestBody List<Long> skuIds);
}
修改gulimall-order模块的confirm.html页面
<!--图片-->
<div class="yun1" th:each="item:${orderConfirmData.items}">
<img style="width: 150px;height: 100px;" th:src="${item.image}" class="yun"/>
<div class="mi">
<p>[[${item.title}]] <span style="color: red;"> ¥[[${#numbers.formatDecimal(item.price, 1, 2)}]]</span> <span> x[[${item.count}]]</span> <span>[[${orderConfirmData.stocks[item.skuId]?"有货":"无货"}]]</span></p>
<p><span>0.095kg</span></p>
<p class="tui-1"><img src="/static/order/confirm/img/i_07.png" />支持7天无理由退货</p>
</div>
</div>
效果
4.9、订单确认页模拟运费效果
修改gulimall-order模块的confirm.html页面,代码如下:
<!--地址-->
<div class="top-3 addr-item" th:each="addr:${orderConfirmData.address}">
<p th:attr="def=${addr.defaultStatus},addrId=${addr.id}">[[${addr.name}]]</p><span>[[${addr.name}]] [[${addr.province}]] [[${addr.city}]] [[${addr.detailAddress}]] [[${addr.phone}]]</span>
</div>
<p class="qian_y">
<span>运费: </span>
<span class="rmb">   ¥<b id="fareEle"></b></span>
</p>
<div class="yfze">
<p class="yfze_a"><span class="z">应付总额:</span><span class="hq">¥<b id="payPriceEle">[[${#numbers.formatDecimal(orderConfirmData.payPrice, 1, 2)}]]</b></span></p>
<p class="yfze_b">寄送至:<span id="reciveAddressEle"></span> 收货人:<span id="reveiverEle"></span></p>
</div>
/**
* 默认地址高亮
*/
function highlight() {
// 1.先初始化收获地址都不高亮
$(".addr-item p").css({"border": "2px solid gray"})
// 2.默认地址(def=1),我们让它高亮
$(".addr-item p[def='1']").css({"border": "2px solid red"})
}
/**
* 收货地址被点击重新高亮对应的地址
*/
$(".addr-item p").click(function () {
$(".addr-item p").attr("def", "0");
$(this).attr("def", "1");
highlight();
// 获取当前地址id
// alert($(this).attr("addrId"));
var addrId = $(this).attr("addrId");
// 发送ajax获取运费信息
getFare(addrId);
});
/**
* 获取运费信息
*/
function getFare(addrId) {
// 给表单回填选择的地址
$("#addrIdInput").val(addrId);
$.get("http://gulimall.com/api/ware/wareinfo/fare?addrId=" + addrId, function (resp) {
console.log(resp);
$("#fareEle").text(resp.data.fare);
var total = [[${orderConfirmData.total}]];
var payPrice = total * 1 + resp.data.fare * 1;
// 设置运费等
$("#payPriceInput").val(payPrice);
$("#payPriceEle").text(payPrice);
// 设置收货人信息
$("#reciveAddressEle").text(resp.data.address.province + " " + resp.data.address.city + " " + resp.data.address.detailAddress);
$("#reveiverEle").text(resp.data.address.name);
});
}
添加“com.atguigu.gulimall.ware.vo.FareVo”类,代码如下
@Data
public class FareVo {
//收货人地址信息
private MemberAddressVo address;
//费用
private BigDecimal fare;
}
添加“com.atguigu.gulimall.ware.vo.MemberAddressVo”类,代码如下:
@Data
public class MemberAddressVo {
private Long id;
/**
* member_id
*/
private Long memberId;
/**
* 收货人姓名
*/
private String name;
/**
* 电话
*/
private String phone;
/**
* 邮政编码
*/
private String postCode;
/**
* 省份/直辖市
*/
private String province;
/**
* 城市
*/
private String city;
/**
* 区
*/
private String region;
/**
* 详细地址(街道)
*/
private String detailAddress;
/**
* 省市区代码
*/
private String areacode;
/**
* 是否默认
*/
private Integer defaultStatus;
}
修改“com.atguigu.gulimall.ware.controller.WareInfoController”类,代码如下:
@GetMapping("/fare")
public R getFare(@RequestParam("addrId") Long addrId){
FareVo fare = wareInfoService.getFare(addrId);
return R.ok().setData(fare);
}
修改“com.atguigu.gulimall.ware.service.WareInfoService”类,代码如下:
/**
* 根据用户的收获地址计算运费
* @param attrId
* @return
*/
FareVo getFare(Long attrId);
修改“com.atguigu.gulimall.ware.service.impl.WareInfoServiceImpl”类,代码如下:
@Override
public FareVo getFare(Long attrId) {
FareVo fareVo = new FareVo();
R r = memberFeignService.addrInfo(attrId);
MemberAddressVo data = r.getData("memberReceiveAddress",new TypeReference<MemberAddressVo>() {
});
if (data != null){
//模拟计算运费
String phone = data.getPhone();
String substring = phone.substring(phone.length() - 1, phone.length());
BigDecimal bigDecimal = new BigDecimal(substring);
fareVo.setAddress(data);
fareVo.setFare(bigDecimal);
return fareVo;
}
return null;
}
添加“com.atguigu.gulimall.ware.feign.MemberFeignService”类,代码如下:
@FeignClient("gulimall-member")
public interface MemberFeignService {
@RequestMapping("/member/memberreceiveaddress/info/{id}")
R addrInfo(@PathVariable("id") Long id);
}
5、接口幂等性
5.1、什么是接口幂等性
5.2、哪些情况需要防止
- 用户多次点击按钮
- 用户页面回退再次提交
- 微服务互相调用,由于网络问题,导致请求失败。feign 触发重试机制
- 其他业务情况
5.3、什么情况下需要幂等
- SELECT * FROM table WHER id=?,无论执行多少次都不会改变状态,是天然的幂等。
- UPDATE tab1 SET col1=1 WHERE col2=2,无论执行成功多少次状态都是一致的,也是幂等操作。
- delete from user where userid=1,多次操作,结果一样,具备幂等性
- insert into user(userid,name) values(1,'a') 如 userid 为唯一主键,即重复操作上面的业务,只 会插入一条用户数据,具备幂等性。
- UPDATE tab1 SET col1=col1+1 WHERE col2=2,每次执行的结果都会发生变化,不是幂等的。
- insert into user(userid,name) values(1,'a') 如 userid 不是主键,可以重复,那上面业务多次操作,数据都会新增多条,不具备幂等性。
5.4、幂等性解决方案
5.4.1、token 机制
- 服务端提供了发送 token 的接口。我们在分析业务的时候,哪些业务是存在幂等问题的, 就必须在执行业务前,先去获取 token,服务器会把 token 保存到 redis 中。
- 然后调用业务接口请求时,把 token 携带过去,一般放在请求头部。
- 服务器判断 token 是否存在 redis 中,存在表示第一次请求,然后删除 token,继续执行业 务。
- 如果判断 token 不存在 redis 中,就表示是重复操作,直接返回重复标记给 client,这样 就保证了业务代码,不被重复执行。
- 先删除 token 还是后删除 token;
- 先删除可能导致,业务确实没有执行,重试还带上之前 token,由于防重设计导致,
请求还是不能执行。
-
后删除可能导致,业务处理成功,但是服务闪断,出现超时,没有删除 token ,别人继续重试,导致业务被执行两遍。
-
我们最好设计为先删除 token,如果业务调用失败,就重新获取 token 再次请求。
- 先删除可能导致,业务确实没有执行,重试还带上之前 token,由于防重设计导致,
- Token 获取、比较和删除必须是原子性
- redis.get(token) 、token.equals、redis.del(token)如果这两个操作不是原子,可能导
致,高并发下,都 get 到同样的数据,判断都成功,继续业务并发执行
-
可以在 redis 使用 lua 脚本完成这个操作 if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
- redis.get(token) 、token.equals、redis.del(token)如果这两个操作不是原子,可能导
5.4.2、各种锁机制
1)、数据库悲观锁
5.4.3、各种唯一约束
5.4.4、防重表
5.4.5、全局请求唯一 id
6、添加防重令牌
订单确认页添加防重令牌
修改“com.atguigu.gulimall.order.service.impl.OrderServiceImpl”类,代码如下:
@Override
public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {
OrderConfirmVo confirmVo = new OrderConfirmVo();
MemberResponseVO memberResponseVO = LoginUserInterceptor.loginUser.get();
System.out.println("主线程..." + Thread.currentThread().getId());
// 获取之前的请求
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
// 异步任务编排
// 1、远程查询所有的收货地址列表
CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {
System.out.println("member线程..." + Thread.currentThread().getId());
// 每一个线程都来共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);
List<MemberAddressVo> address = memberFeignService.getAddress(memberResponseVO.getId());
confirmVo.setAddress(address);
}, executor);
// 2、远程查询购物车所有选中的购物项
CompletableFuture<Void> cartFuture = CompletableFuture.runAsync(() -> {
System.out.println("cart线程..." + Thread.currentThread().getId());
// 每一个线程都来共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);
List<OrderItemVo> items = cartFeignService.getCurrentUserCartItems();
confirmVo.setItems(items);
// feign在远程调用之前要构造请求,调用很多拦截器RequestInterceptor interceptor: requestInterceptors
}, executor).thenRunAsync(()->{
// 查询库存信息
List<OrderItemVo> items = confirmVo.getItems();
List<Long> collect = items.stream().map(item -> item.getSkuId()).collect(Collectors.toList());
R hasStock = wmsFeignService.getSkusHasStock(collect);
List<SkuStockVo> data = hasStock.getData(new TypeReference<List<SkuStockVo>>() {
});
if (!CollectionUtils.isEmpty(data)){
Map<Long, Boolean> map = data.stream().collect(Collectors.toMap(SkuStockVo::getSkuId, SkuStockVo::getHasStock));
confirmVo.setStocks(map);
}
},executor);
// 3、查询用户积分
Integer integration = memberResponseVO.getIntegration();
confirmVo.setIntegration(integration);
// 4、其他数据自动计算
// 5、防重令牌想
String token = UUID.randomUUID().toString().replace("-", "");
stringRedisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX+memberResponseVO.getId(),token,30, TimeUnit.MINUTES);
confirmVo.setOrderToken(token);
CompletableFuture.allOf(getAddressFuture, cartFuture).get();
return confirmVo;
}
添加“com.atguigu.gulimall.order.constant.OrderConstant”类,代码如下:
package com.atguigu.gulimall.order.constant;
public class OrderConstant {
public static final String USER_ORDER_TOKEN_PREFIX = "order:token";
}
7、提交订单
7.1、下单流程
7.2、整体代码实现
修改gulimall-order模块得confirm.html页面,代码如下:
<form action="http://order.gulimall.com/submitOrder" method="post">
<input id="addrIdInput" name="addrId" type="hidden" />
<input id="payPriceInput" name="payPrice" type="hidden" />
<input type="hidden" name="orderToken" th:value="${orderConfirmData.orderToken}" />
<button class="tijiao" type="submit">提交订单</button>
</form>
页面提交数据 添加“com.atguigu.gulimall.order.vo.OrderSubmitVo”类,代码如下:
/**
* @Description: 封装订单提交的数据
* @Date: 2020/11/26 22:27
* @Version 1.0
*/
@Data
public class OrderSubmitVo {
//收获地址的id
private Long addrId;
//支付方式
private Integer payType;
//无需提交需要购买的商品,去购物车在获取一遍
//防重令牌
private String orderToken;
//应付价格 验价
private BigDecimal payPrice;
//订单备注
private String note;
}
添加“com.atguigu.gulimall.order.vo.SubmitOrderResponseVo”类,代码如下:
@Data
public class SubmitOrderResponseVo {
private OrderEntity order;
private Integer code;
}
- 提交订单成功,则携带返回数据转发至支付页面
- 提交订单失败,则携带错误信息重定向至确认页
添加“com.atguigu.gulimall.order.web.OrderWebController”类,代码如下:
/**
* 下单功能
*/
@PostMapping("/submitOrder")
public String submitOrder(OrderSubmitVo submitVo, Model model, RedirectAttributes redirectAttributes) {
// 下单 去创建订单 验证令牌 核算价格 锁定库存
try {
SubmitOrderResponseVo responseVo = orderService.submitOrder(submitVo);
if (responseVo.getCode() == 0) {
// 下单成功到选择支付方式页面
model.addAttribute("submitOrderResp", responseVo);
return "pay";
} else {
// 订单失败返回到订单确认页面
String msg = "下订单失败: ";
switch (responseVo.getCode()) {
case 1:
msg += "订单信息过期, 请刷新后再次提交.";
break;
case 2:
msg += "订单中的商品价格发生变化, 请刷新后再次提交.";
break;
case 3:
msg += "库存锁定失败, 商品库存不足.";
break;
}
redirectAttributes.addFlashAttribute("msg", msg);
return "redirect:http://order.gulimall.com/toTrade";
}
} catch (Exception e) {
if (e instanceof NoStockException) {
String message = e.getMessage();
redirectAttributes.addFlashAttribute("msg", message);
}
return "redirect:http://order.gulimall.com/toTrade";
}
}
当报错的时候,要在页面显示
修改gulimall-order模块的confim.html页面,代码如下:
<p class="p1">填写并核对订单信息 <span style="color: red" th:if="${session.msg != null}" th:text="${session.msg}"></span></p>
添加“com.atguigu.gulimall.order.service.OrderService”类,代码如下:
/**
* 下单
*
* @param submitVo
* @return
*/
SubmitOrderResponseVo submitOrder(OrderSubmitVo submitVo);
1、验证令牌【令牌的对比和删除必须保证原子性】
2、令牌验证成功
3、下单逻辑:去创建订单 --> 验证令牌 --> 核算价格 --> 锁定库存
添加“com.atguigu.gulimall.order.service.impl.OrderServiceImpl”类,代码如下:
private ThreadLocal<OrderSubmitVo> confirmVoThreadLocal = new ThreadLocal<>();
/**
* 下单逻辑 --> 去创建订单 --> 验证令牌 --> 核算价格 --> 锁定库存
*/
@Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo submitVo) {
confirmVoThreadLocal.set(submitVo);
SubmitOrderResponseVo response = new SubmitOrderResponseVo();
// 0、获取用户登录信息
MemberResponseVO memberResponseVO = LoginUserInterceptor.loginUser.get();
response.setCode(0);
// 1、验证令牌【令牌的对比和删除必须保证原子性】
// lua脚本返回结果: 0-令牌失败; 1-删除成功
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
String orderToken = submitVo.getOrderToken();
// 原子验证令牌和删除令牌
Long result = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVO.getId()), orderToken);
/* 使用上面的lua脚本代替,这种方式不能保证原子性
String redisToken = stringRedisTemplate.opsForValue().get(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVO.getId());
if (orderToken != null && orderToken.equals(redisToken)){
// 令牌验证通过
stringRedisTemplate.delete(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVO.getId());
}else {
// 不通过
}*/
if (result == 0L) {
// 1.1、令牌验证失败
response.setCode(1);
return response;
} else {
// 1.2、令牌验证成功
// 下单逻辑 --> 去创建订单 --> 验证令牌 --> 核算价格 --> 锁定库存
// 1.2.1、创建订单,订单项,计算价格、积分等信息
OrderCreateTo order = createOrder();
// 1.2.2、验价
BigDecimal payAmount = order.getOrder().getPayAmount();
BigDecimal payPrice = submitVo.getPayPrice();
if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {
// 1)、金额对比成功
// 1.2.3、保存订单
saveOrder(order);
// 1.2.4、库存锁定,只要有异常回滚订单数据。订单号,订单项信息(skuId,skuName,num)
WareSkuLockVo wareSkuLockVo = new WareSkuLockVo();
wareSkuLockVo.setOrderSn(order.getOrder().getOrderSn());
List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map(item -> {
OrderItemVo orderItemVo = new OrderItemVo();
orderItemVo.setSkuId(item.getSkuId());
orderItemVo.setCount(item.getSkuQuantity());
orderItemVo.setTitle(item.getSkuName());
return orderItemVo;
}).collect(Collectors.toList());
wareSkuLockVo.setLocks(orderItemVos);
// 远程锁库存
R r = wmsFeignService.orderLockStock(wareSkuLockVo);
if (r.getCode() == 0) {
// 锁成功了
response.setOrder(order.getOrder());
return response;
} else {
// 锁定失败
throw new NoStockException((String) r.get("msg"));
}
} else {
// 2)、金额对比失败
response.setCode(2);
return response;
}
}
}
创建订单返回数据
添加“com.atguigu.gulimall.order.to.OrderCreateTo”类,代码如下:
@Data
public class OrderCreateTo {
// 订单
private OrderEntity order;
// 订单项
private List<OrderItemEntity> orderItems;
// 订单应付的价格
private BigDecimal payPrice;
//运费
private BigDecimal fare;
}
7.3、具体代码实现
7.3.1、创建订单
/**
* 创建订单
*
* @return
*/
public OrderCreateTo createOrder() {
OrderCreateTo createTo = new OrderCreateTo();
// 0、生成订单号
String orderSn = IdWorker.getTimeId();
// 1、构建订单
OrderEntity orderEntity = buildOrder(orderSn);
createTo.setOrder(orderEntity);
// 2、构建所有订单项数据
List<OrderItemEntity> itemEntities = buildOrderItems(orderSn);
createTo.setOrderItems(itemEntities);
// 3、计算价格、积分等相关
computePrice(orderEntity, itemEntities);
return createTo;
}
1)、构建订单
/**
* 构建订单
*
* @param orderSn 订单号
* @return
*/
private OrderEntity buildOrder(String orderSn) {
OrderEntity orderEntity = new OrderEntity();
MemberResponseVO memberResponseVo = LoginUserInterceptor.loginUser.get();
// 1、设置订单号
orderEntity.setOrderSn(orderSn);
// 2、设置用户id
orderEntity.setMemberId(memberResponseVo.getId());
OrderSubmitVo orderSubmitVo = confirmVoThreadLocal.get();
// 远程获取收获地址信息
R r = wmsFeignService.getFare(orderSubmitVo.getAddrId());
FareVo fareResp = r.getData(new TypeReference<FareVo>() {
});
// 3、设置运费信息
orderEntity.setFreightAmount(fareResp.getFare());
// 4、设置收货人信息
orderEntity.setReceiverCity(fareResp.getAddress().getCity());
orderEntity.setReceiverDetailAddress(fareResp.getAddress().getDetailAddress());
orderEntity.setReceiverName(fareResp.getAddress().getName());
orderEntity.setReceiverPhone(fareResp.getAddress().getPhone());
orderEntity.setReceiverPostCode(fareResp.getAddress().getPostCode());
orderEntity.setReceiverRegion(fareResp.getAddress().getRegion());
// 5、设置订单的相关状态信息
orderEntity.setStatus(OrderStatusEnum.CREATE_NEW.getCode());
// 6、设置自动确认时间
orderEntity.setAutoConfirmDay(7);
return orderEntity;
}
远程获取收获地址信息(gulimall-ware)
修改“com.atguigu.gulimall.order.feign.WmsFeignService”类,代码如下:
@GetMapping("/ware/wareinfo/fare")
public R getFare(@RequestParam("addrId") Long addrId);
封装远程返回的数据
添加“com.atguigu.gulimall.order.vo.FareVo”类,代码如下:
@Data
public class FareVo {
//收货人地址信息
private MemberAddressVo address;
//费用
private BigDecimal fare;
}
2)、构建所有订单项数据
/**
* 构建所有订单项数据
*
* @return
*/
private List<OrderItemEntity> buildOrderItems(String orderSn) {
// 最后确定每个购物项的价格
List<OrderItemVo> currentUserCartItems = cartFeignService.getCurrentUserCartItems(); //获取所有的订单项
if (currentUserCartItems != null && currentUserCartItems.size() > 0) {
List<OrderItemEntity> itemEntities = currentUserCartItems.stream().map(cartItem -> {
// 构建某一个订单项
OrderItemEntity itemEntity = buildOrderItem(cartItem);
itemEntity.setOrderSn(orderSn);
return itemEntity;
}).collect(Collectors.toList());
return itemEntities;
}
return null;
}
2.1)、构建某一个订单项
/**
* 构建某一个订单项
*
* @param cartItem
* @return
*/
private OrderItemEntity buildOrderItem(OrderItemVo cartItem) {
OrderItemEntity orderItemEntity = new OrderItemEntity();
// 1、订单信息: 订单号
// 2、商品的SPU信息
Long skuId = cartItem.getSkuId();
R r = productFeignService.getSpuInfoBySkuId(skuId);
SpuInfoVo spuInfoVo = r.getData(new TypeReference<SpuInfoVo>() {
});
orderItemEntity.setSpuId(spuInfoVo.getId());
orderItemEntity.setSpuBrand(spuInfoVo.getBrandId().toString());
orderItemEntity.setSpuName(spuInfoVo.getSpuName());
orderItemEntity.setCategoryId(spuInfoVo.getCatalogId());
// 3、商品的SKU信息
orderItemEntity.setSkuId(cartItem.getSkuId());
orderItemEntity.setSkuName(cartItem.getTitle());
orderItemEntity.setSkuPic(cartItem.getImage());
orderItemEntity.setSkuPrice(cartItem.getPrice());
String skuAttrs = StringUtils.collectionToDelimitedString(cartItem.getSkuAttr(), ";"); //将集合转换成字符串
orderItemEntity.setSkuAttrsVals(skuAttrs);
orderItemEntity.setSkuQuantity(cartItem.getCount());
// 4、优惠信息 [不做]
// 5、积分信息
orderItemEntity.setGiftGrowth(cartItem.getPrice().multiply(new BigDecimal(cartItem.getCount().toString())).intValue());
orderItemEntity.setGiftIntegration(cartItem.getPrice().multiply(new BigDecimal(cartItem.getCount().toString())).intValue());
// 6、订单项的价格信息
orderItemEntity.setPromotionAmount(new BigDecimal("0"));
orderItemEntity.setIntegrationAmount(new BigDecimal("0"));
orderItemEntity.setCouponAmount(new BigDecimal("0"));
// 6.1、当前订单项的实际金额
BigDecimal origin = orderItemEntity.getSkuPrice().multiply(new BigDecimal(orderItemEntity.getSkuQuantity().toString()));
// 6.2、总额减去各种优惠后的价格
BigDecimal subtract = origin.subtract(orderItemEntity.getCouponAmount()).subtract(orderItemEntity.getIntegrationAmount()).subtract(orderItemEntity.getPromotionAmount());
orderItemEntity.setRealAmount(subtract);
return orderItemEntity;
}
远程获取spu信息
封装远程spu的返回信息
添加“com.atguigu.gulimall.order.vo.SpuInfoVo”类,代码如下
@Data
public class SpuInfoVo {
/**
* 商品id
*/
private Long id;
/**
* 商品名称
*/
private String spuName;
/**
* 商品描述
*/
private String spuDescription;
/**
* 所属分类id
*/
private Long catalogId;
/**
* 品牌id
*/
private Long brandId;
/**
*
*/
private BigDecimal weight;
/**
* 上架状态[0 - 下架,1 - 上架]
*/
private Integer publishStatus;
/**
*
*/
private Date createTime;
/**
*
*/
private Date updateTime;
}
添加“com.atguigu.gulimall.order.feign.ProductFeignService”类,代码如下“:
@FeignClient("gulimall-product")
public interface ProductFeignService {
@GetMapping("/product/spuinfo/skuId/{id}")
R getSpuInfoBySkuId(@PathVariable("id") Long skuId);
}
gulimall-product
修改“com.atguigu.gulimall.product.app.SpuInfoController”类,代码如下:
@GetMapping("skuId/{id}")
public R getSpuInfoBySkuId(@PathVariable("id") Long skuId){
SpuInfoEntity entity = spuInfoService.getSpuInfoBySkuId(skuId);
return R.ok().setData(entity);
}
添加“com.atguigu.gulimall.product.service.SpuInfoService”类,代码如下:
SpuInfoEntity getSpuInfoBySkuId(Long skuId);
修改“com.atguigu.gulimall.product.service.impl.SpuInfoServiceImpl”类,代码如下:
@Override
public SpuInfoEntity getSpuInfoBySkuId(Long skuId) {
SkuInfoEntity byId = skuInfoService.getById(skuId);
Long spuId = byId.getSpuId();
SpuInfoEntity spuInfoEntity = getById(spuId);
return spuInfoEntity;
}
3)、计算价格、积分等相关
/**
* 计算价格、积分等相关
*
* @param orderEntity
* @param itemEntities
*/
private void computePrice(OrderEntity orderEntity, List<OrderItemEntity> itemEntities) {
BigDecimal total = new BigDecimal("0.0"); //订单总额
BigDecimal coupon = new BigDecimal("0.0");//优惠券抵扣总额
BigDecimal integration = new BigDecimal("0.0");//积分抵扣总额
BigDecimal promotion = new BigDecimal("0.0");//促销优惠总额
BigDecimal gift = new BigDecimal("0.0");//积分总值
BigDecimal growth = new BigDecimal("0.0");//成长总值
// 0、订单的总额,叠加每一个订单项的总额信息。
for (OrderItemEntity entity : itemEntities) {
coupon = coupon.add(entity.getCouponAmount());
integration = integration.add(entity.getIntegrationAmount());
promotion = promotion.add(entity.getPromotionAmount());
total = total.add(entity.getRealAmount());
gift = gift.add(new BigDecimal(entity.getGiftIntegration().toString()));
growth = growth.add(new BigDecimal(entity.getGiftGrowth().toString()));
}
// 1、订单价格相关
orderEntity.setTotalAmount(total);
// 2、应付金额
orderEntity.setPayAmount(total.add(orderEntity.getFreightAmount()));
orderEntity.setPromotionAmount(promotion);
orderEntity.setIntegrationAmount(integration);
orderEntity.setCouponAmount(coupon);
// 3、设置积分信息
orderEntity.setIntegration(gift.intValue());
orderEntity.setGrowth(growth.intValue());
// 4、设置删除状态 0未删除
orderEntity.setDeleteStatus(0);
}
7.3.2、保存订单
/**
* 保存订单数据
*
* @param order
*/
private void saveOrder(OrderCreateTo order) {
OrderEntity orderEntity = order.getOrder();
orderEntity.setModifyTime(new Date());
this.save(orderEntity);
List<OrderItemEntity> orderItems = order.getOrderItems();
orderItemService.saveBatch(orderItems);
}
7.3.3、库存锁定
只要有异常回滚订单数据。订单号,订单项信息(skuId,skuName,num)
- 找出所有库存大于商品数的仓库
- 遍历所有满足条件的仓库,逐个尝试锁库存,若锁库存成功则退出遍历
锁库存流程
修改“com.atguigu.gulimall.order.feign.WmsFeignService”类,代码如下:
@PostMapping("/ware/waresku/lock/order")
public R orderLockStock(@RequestBody WareSkuLockVo vo);
gulimall-ware
修改“com.atguigu.gulimall.ware.controller.WareSkuController”类,代码如下:
@PostMapping("/lock/order")
public R orderLockStock(@RequestBody WareSkuLockVo vo){
try{
Boolean stock = wareSkuService.orderLockStock(vo);
return R.ok();
}catch (NoStockException e){
return R.error(BizCodeEnume.NO_STOCK_EXCEPTION.getCode(),BizCodeEnume.NO_STOCK_EXCEPTION.getMsg());
}
}
修改“com.atguigu.common.exception.BizCodeEnume”类,代码如下:
修改“com.atguigu.gulimall.ware.service.WareSkuService”类,代码如下:
Boolean orderLockStock(WareSkuLockVo vo);
添加“com.atguigu.gulimall.ware.service.impl.WareSkuServiceImpl”类,代码如下:
/**
* 为某个订单锁定库存
* (rollbackFor = NoStockException.class)
* 默认只要都是运行异常都会回滚
* @param vo
* @return
*/
@Transactional
@Override
public Boolean orderLockStock(WareSkuLockVo vo) {
// 1、按照下单的收货地址,找到一个就近仓库,锁定库存
// 1、找到每个商品在哪个仓库都有库存
List<OrderItemVo> locks = vo.getLocks();
List<SkuWareHasStock> collect = locks.stream().map(item -> {
SkuWareHasStock stock = new SkuWareHasStock();
Long skuId = item.getSkuId();
stock.setSkuId(skuId);
stock.setNum(item.getCount());
// 查询这个商品在哪个仓库有库存
List<Long> wareIds = wareSkuDao.listWareIdHasSkuStock(skuId);
stock.setWareId(wareIds);
return stock;
}).collect(Collectors.toList());
// 2、锁定库存
for (SkuWareHasStock hasStock : collect) {
Boolean skuStocked = false;
Long skuId = hasStock.getSkuId();
List<Long> wareIds = hasStock.getWareId();
if (wareIds == null || wareIds.size() == 0){
// 没有任何库存有这个商品的库存
throw new NoStockException(skuId);
}
for (Long wareId : wareIds) {
// 成功返回1;否则就是0
Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum());
if (count == 1){
skuStocked = true;
break;
// 当仓库锁失败,重试下一个仓库
}
}
if (skuStocked == false){
// 当前商品所有仓库都没有锁住
throw new NoStockException(skuId);
}
}
// 3、肯定全部都是锁定成功的
return true;
}
@Data
class SkuWareHasStock{
private Long skuId;
private Integer num;
private List<Long> wareId;
}
}
这里通过异常机制控制事务回滚,如果在锁定库存失败则抛出NoStockException
s,订单服务和库存服务都会回滚。
修改“com.atguigu.gulimall.ware.dao.WareSkuDao”类,代码如下:
List<Long> listWareIdHasSkuStock(@Param("skuId") Long skuId);
Long lockSkuStock(@Param("skuId") Long skuId, @Param("wareId") Long wareId, @Param("num") Integer num);
<select id="listWareIdHasSkuStock" resultType="java.lang.Long">
select ware_id from wms_ware_sku where sku_id =#{skuId} and stock - stock_locked > 0
</select>
<update id="lockSkuStock">
update wms_ware_sku set stock_locked = stock_locked + #{num}
where sku_id = #{skuId} and ware_id = #{wareId} and stock - stock_locked >= #{num}
</update>
添加“com.atguigu.common.exception.NoStockException”类,代码如下:
package com.atguigu.common.exception;
public class NoStockException extends RuntimeException {
private Long skuId;
public NoStockException(String msg) {
super(msg);
}
public NoStockException(Long skuId){
super("商品id:" + skuId + "没有足够的库存了");
}
public void setSkuId(Long skuId) {
this.skuId = skuId;
}
public Long getSkuId() {
return skuId;
}
}
修改gulimall-order模块的pay.html页面,代码如下:
<div class="Jdbox_BuySuc">
<dl>
<dt><img src="/static/order/confirm/img/saoyisao.png" alt=""></dt>
<dd>
<span>订单提交成功,请尽快付款!订单号:[[${submitOrderResp.order.orderSn}]]</span>
<span>应付金额<font>[[${#numbers.formatDecimal(submitOrderResp.order.payAmount,1,2)}]]</font>元</span>
</dd>
<dd>
<span>推荐使用</span>
<span>扫码支付请您在<font>24小时</font>内完成支付,否则订单会被自动取消(库存紧订单请参见详情页时限)</span>
<span>订单详细</span>
</dd>
</dl>
</div>
修改`gulimall_oms`数据库含order_sn字段的相关表,长度扩充为64位
ALTER TABLE `gulimall_oms`.`oms_order`
MODIFY COLUMN `order_sn` char(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '订单号' AFTER `member_id`;
ALTER TABLE `gulimall_oms`.`oms_order_item`
MODIFY COLUMN `order_sn` char(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'order_sn' AFTER `order_id`;
ALTER TABLE `gulimall_oms`.`oms_order_return_apply`
MODIFY COLUMN `order_sn` char(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '订单编号' AFTER `sku_id`;
ALTER TABLE `gulimall_oms`.`oms_payment_info`
MODIFY COLUMN `order_sn` char(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '订单号(对外业务号)' AFTER `id`;
8、分布式事务
8.1、本地事务
- 原子性:一系列的操作整体不可拆分,要么同时成功,要么同时失败
- 一致性:数据在事务的前后,业务整体一致。
- 转账。A:1000;B:1000; 转 200 事务成功; A:800 B:1200
- 隔离性:事务之间互相隔离。
- 持久性:一旦事务成功,数据一定会落盘在数据库。
Business:我们具体的业务代码
Storage:库存业务代码;扣库存
Order:订单业务代码;保存订单
Account:账号业务代码;减账户余额
比如买东西业务,扣库存,下订单,账户扣款,是一个整体;必须同时成功或者失败一个事务开始,代表以下的所有操作都在同一个连接里面;
2、事务的隔离级别
- READ UNCOMMITTED(读未提交)
该隔离级别的事务会读到其它未提交事务的数据,此现象也称之为脏读。
- READ COMMITTED(读已提交)
- REPEATABLE READ(可重复读)
- SERIALIZABLE(序列化)
3、事务的传播行为
- PROPAGATION_REQUIRED:如果当前没有事务,就创建一个新事务,如果当前存在事务,就加入该事务,该设置是最常用的设置。
- PROPAGATION_SUPPORTS:支持当前事务,如果当前存在事务,就加入该事务,如果当 前不存在事务,就以非事务执行。
- PROPAGATION_MANDATORY:支持当前事务,如果当前存在事务,就加入该事务,如果 当前不存在事务,就抛出异常。
- PROPAGATION_REQUIRES_NEW:创建新事务,无论当前存不存在事务,都创建新事务。
- PROPAGATION_NOT_SUPPORTED:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
- PROPAGATION_NEVER:以非事务方式执行,如果当前存在事务,则抛出异常。
- PROPAGATION_NESTED:如果当前存在事务,则在嵌套事务内执行。如果当前没有务, 则执行与 PROPAGATION_REQUIRED 类似的操作。
在同一个类里面,编写两个方法,内部调用的时候,会导致事务设置失效。原因是没有用到代理对象的缘故。
- 导入 spring-boot-starter-aop
- @EnableTransactionManagement(proxyTargetClass = true)
- @EnableAspectJAutoProxy(exposeProxy=true)
- AopContext.currentProxy() 调用方法
8.2、分布式事务
分布式情况下,可能出现一些服务事务不一致的情况
- 一致性(Consistency):
- 在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
- 可用性(Availability)
- 在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
- 分区容错性(Partition tolerance)
- 大多数分布式系统都分布在多个子网络。每个子网络就叫做一个区(partition)。 分区容错的意思是,区间通信可能失败。比如,一台服务器放在中国,另一台服务 器放在美国,这就是两个区,它们之间可能无法通信。
一般来说,分区容错无法避免,因此可以认为 CAP 的 P 总是成立。CAP 定理告诉我们,剩下的 C 和 A 无法同时做到。
2.2、面临的问题
- 基本可用(Basically Available)
- 基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、功能上的可用性),允许损失部分可用性。需要注意的是,基本可用绝不等价于系统不可用。
- 响应时间上的损失:正常情况下搜索引擎需要在 0.5 秒之内返回给用户相应的查询结果,但由于出现故障(比如系统部分机房发生断电或断网故障),查询结果的响应时间增加到了 1~2 秒。
- 功能上的损失:购物网站在购物高峰(如双十一)时,为了保护系统的稳定性,部分消费者可能会被引导到一个降级页面。
- 基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、功能上的可用性),允许损失部分可用性。需要注意的是,基本可用绝不等价于系统不可用。
- 软状态( Soft State)
- 软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布式存储中一般一份数据会有多个副本,允许不同副本同步的延时就是软状态的体现。mysql replication 的异步复制也是一种体现。
- 最终一致性( Eventual Consistency)
- 最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。
2.4、强一致性、弱一致性、最终一致性
3、分布式事务几种方案
- XA 协议比较简单,而且一旦商业数据库实现了 XA 协议,使用分布式事务的成本也比较低。
- XA 性能不理想,特别是在交易下单链路,往往并发量很高,XA 无法满足高并发场景
- XA 目前在商业数据库支持的比较理想,在 mysql 数据库中支持的不太理想,mysql 的XA 实现,没有记录 prepare 阶段日志,主备切换回导致主库与备库数据不一致。
- 许多 nosql 也没有支持 XA,这让 XA 的应用场景变得非常狭隘。
- 也有 3PC,引入了超时机制(无论协调者还是参与者,在向对方发送请求后,若长时间未收到回应则做出相应处理)
2)、柔性事务-TCC 事务补偿型方案
一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
二阶段 commit 行为:调用 自定义 的 commit 逻辑。
二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
3)、柔性事务-最大努力通知型方案
防止消息丢失:
/**
* 1、做好消息确认机制(pulisher,consumer【手动 ack】)
* 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一遍
*/
CREATE TABLE `mq_message` (
`message_id` char(32) NOT NULL,
`content` text,
`to_exchane` varchar(255) DEFAULT NULL,
`routing_key` varchar(255) DEFAULT NULL,
`class_type` varchar(255) DEFAULT NULL,
`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
9、Seata
9.1、概念
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
官网地址:https://seata.apache.org/zh-cn/
9.1.1、AT 模式
前提
- 基于支持本地 ACID 事务的关系型数据库。
- Java 应用,通过 JDBC 访问数据库。
整体机制
两阶段提交协议的演变:
-
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
-
二阶段:
- 提交异步化,非常快速地完成。
- 回滚通过一阶段的回滚日志进行反向补偿。
写隔离
- 一阶段本地事务提交前,需要确保先拿到 全局锁 。
- 拿不到 全局锁 ,不能提交本地事务。
- 拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。
以一个示例来说明:
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
读隔离
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
工作机制
以一个示例来说明整个 AT 分支的工作过程。
业务表:product
AT 分支事务的业务逻辑:
update product set name = 'GTS' where name = 'TXC';
一阶段
过程:
- 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = 'TXC')等相关的信息。
- 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
select id, name, since from product where name = 'TXC';
得到前镜像:
- 执行业务 SQL:更新这条记录的 name 为 'GTS'。
- 查询后镜像:根据前镜像的结果,通过 主键 定位数据。
select id, name, since from product where id = 1;
得到后镜像:
- 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到
UNDO_LOG
表中。{ "branchId": 641789253, "undoItems": [{ "afterImage": { "rows": [{ "fields": [{ "name": "id", "type": 4, "value": 1 }, { "name": "name", "type": 12, "value": "GTS" }, { "name": "since", "type": 12, "value": "2014" }] }], "tableName": "product" }, "beforeImage": { "rows": [{ "fields": [{ "name": "id", "type": 4, "value": 1 }, { "name": "name", "type": 12, "value": "TXC" }, { "name": "since", "type": 12, "value": "2014" }] }], "tableName": "product" }, "sqlType": "UPDATE" }], "xid": "xid:xxx" }
- 提交前,向 TC 注册分支:申请
product
表中,主键值等于 1 的记录的 全局锁 。 - 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
- 将本地事务提交的结果上报给 TC。
二阶段-回滚
- 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
- 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
- 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
- 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update product set name = 'TXC' where id = 1;
- 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
二阶段-提交
- 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
- 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
附录
回滚日志表
UNDO_LOG Table:不同数据库在类型上会略有差别。
以 MySQL 为例:
-- 注意此处0.7.0+ 增加字段 context
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
9.1.2、TCC 模式
回顾总览中的描述:一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:
- 一阶段 prepare 行为
- 二阶段 commit 或 rollback 行为
根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.
AT 模式基于 支持本地 ACID 事务 的 关系型数据库:
- 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
- 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
- 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。
相应的,TCC 模式,不依赖于底层数据资源的事务支持:
- 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
- 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
- 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
9.1.3、Saga 模式
Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
理论基础:Hector & Kenneth 发表论⽂ Sagas (1987)
适用场景:
- 业务流程长、业务流程多
- 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
优势:
- 一阶段提交本地事务,无锁,高性能
- 事件驱动架构,参与者可异步执行,高吞吐
- 补偿服务易于实现
缺点:
- 不保证隔离性(应对方案见用户文档)
9.2、Seata术语
- TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
9.3、快速开始
让我们从一个微服务示例开始。
用例
用户购买商品的业务逻辑。整个业务逻辑由 3 个微服务提供支持:
- 仓储服务:对给定的商品扣除仓储数量。
- 订单服务:根据采购需求创建订单。
- 帐户服务:从用户帐户中扣除余额。
架构图
仓储服务
public interface StorageService {
/**
* 扣除存储数量
*/
void deduct(String commodityCode, int count);
}
订单服务
public interface OrderService {
/**
* 创建订单
*/
Order create(String userId, String commodityCode, int orderCount);
}
帐户服务
public interface AccountService {
/**
* 从用户账户中借出
*/
void debit(String userId, int money);
}
主要业务逻辑
public class BusinessServiceImpl implements BusinessService {
private StorageService storageService;
private OrderService orderService;
/**
* 采购
*/
public void purchase(String userId, String commodityCode, int orderCount) {
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount);
}
}
public class OrderServiceImpl implements OrderService {
private OrderDAO orderDAO;
private AccountService accountService;
public Order create(String userId, String commodityCode, int orderCount) {
int orderMoney = calculate(commodityCode, orderCount);
accountService.debit(userId, orderMoney);
Order order = new Order();
order.userId = userId;
order.commodityCode = commodityCode;
order.count = orderCount;
order.money = orderMoney;
// INSERT INTO orders ...
return orderDAO.insert(order);
}
}
SEATA 的分布式交易解决方案
我们只需要使用一个 @GlobalTransactional
注解在业务方法上:
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
......
}
由 Dubbo + SEATA 提供支持的示例
步骤 1:建立数据库
- 要求:具有 InnoDB 引擎的 MySQL。
注意: 实际上,在示例用例中,这 3 个服务应该有 3 个数据库。 但是,为了简单起见,我们只创建一个数据库并配置 3 个数据源。
使用您刚创建的数据库 URL/username/password 修改 Spring XML。
dubbo-account-service.xml dubbo-order-service.xml dubbo-storage-service.xml
<property name="url" value="jdbc:mysql://x.x.x.x:3306/xxx" />
<property name="username" value="xxx" />
<property name="password" value="xxx" />
步骤 2:创建 UNDO_LOG 表
SEATA AT 模式需要 UNDO_LOG
表。你可以通过 github 获取到指定版本的undo log SQL 脚本.
CREATE TABLE IF NOT EXISTS `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);
步骤 3:为示例业务创建表
DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
PRIMARY KEY (`id`),
UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
步骤 4: 启动服务
- 从 https://github.com/apache/incubator-seata/releases ,下载服务器软件包,将其解压缩。
Usage: sh seata-server.sh(for linux and mac) or cmd seata-server.bat(for windows) [options]
Options:
--host, -h
The address is expose to registration center and other service can access seata-server via this ip
Default: 0.0.0.0
--port, -p
The port to listen.
Default: 8091
--storeMode, -m
log store mode : file、db
Default: file
--help
e.g.
sh seata-server.sh -p 8091 -h 127.0.0.1 -m file
步骤 5: 运行示例
示例仓库: seata-samples/at-samples。找到合适的依赖项设置,按顺序启动 Account
, Storage
, Order
, Business
服务。
9.4、环境搭建
0)、Seata控制分布式事务搭建流程:
1、每一个微服务先必须创建undo_logo表;
2、安装事务协调器(seata-server):https://github.com/seata/seata/releases
3、整合:
1)、导入依赖 spring-cloud-starter-alibaba-seata seata-all-1.0.0.jar
2)、解压并启动seata-server
registry.conf 注册中心相关的配置,修改registry type=nacos
file.conf seata配置中心
3)、所有想要用到分布式事务的微服务使用seata DatasourceProxy代理自己的数据源
4)、每个微服务,都必须导入registry.cof
file.conf vgroup_mapping.{application.name}-fescar-service-group = "default"
5)、启动测试
6)、给分布式大事务的路口标注@GlobalTransactional
7)、每一个远程的小事务用 @Transactional
1)、每个微服务数据库添加回滚日志表undo_log
CREATE TABLE IF NOT EXISTS `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);
3.1)、gulimall-common模块的pom引入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
3.2)、安装事务协调器-seata-server
根据maven下载的seata版本如上图所示,从 https://github.com/seata/seata/releases下载服务器软件包,将其解压缩。
下载senta-server-1.0.0并修改register.conf
,使用nacos作为注册中心(这里根据自己maven版本下载)
修改registry.conf,把nacos作为seata的注册中
注意:
高版本的安装配置方式有所改变,可以参考文档:https://blog.csdn.net/qq_39654841/article/details/129385582
3.3)、所有想要用到分布式事务的微服务,使用seata的DatasourceProxy代理自己的数据源
给gulimall-order和gulimall-ware添加seata的DatasourceProxy代理自己的数据源
添加“com.atguigu.gulimall.order.config.MySeataConfig”类,代码如下:
添加“com.atguigu.gulimall.ware.config.MySeataConfig”类,代码如下:
import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import javax.sql.DataSource;
@Configuration
public class MySeataConfig {
@Autowired
DataSourceProperties dataSourceProperties;
@Bean
public DataSource dataSource(DataSourceProperties dataSourceProperties) {
// 得到数据源
HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
if (StringUtils.hasText(dataSourceProperties.getName())) {
dataSource.setPoolName(dataSourceProperties.getName());
}
return new DataSourceProxy(dataSource);
}
}
gulimall-ware
修改“com.atguigu.gulimall.ware.config.WareMybatisConfig”类,代码如下:
@Bean
public DataSource dataSource(DataSourceProperties dataSourceProperties){
//得到数据源
HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
if (StringUtils.hasText(dataSourceProperties.getName())){
dataSource.setPoolName(dataSourceProperties.getName());
}
return new DataSourceProxy(dataSource);
}
分别给gulimall-order和gulimall-ware加上file.conf和registry.conf这两个配置,并修改file.conf
给分布式大事务的路口标注@GlobalTransactional; 每一个远程的小事务用 @Transactional
@GlobalTransactional
@Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo submitVo)
10、RabbitMQ延时队列(实现定时任务)
场景:
消息的TTL(Time To Live)
- 消息的TTL就是消息的存活时间。
- RabbitMQ可以对队列和消息分别设置TTL。
- 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的 设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
- 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队 列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的 TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。
Dead Letter Exchanges(DLX)
- 一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。(什么是死信)
- 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不
会被再次放在队列里,被其他消费者使用。 ( basic.reject/ basic.nack ) requeue=false
-
上面的消息的 TTL 到了,消息过期了。
-
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
- 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不
-
Dead Letter Exchange 其实就是一种普通的 exchange ,和创建其他exchange没有两样。只是在某一个设置 Dead Letter Exchange 的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange 中去。
-
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列
- 手动ack&异常消息统一放在一个队列处理建议的两种方式
- catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
- 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
- 延时队列实现-1
- 延时队列实现-2
带队列的下单流程
带队列的下单流程-升级
添加“com.atguigu.gulimall.order.config.MyMQConfig”类,代码如下:
@Configuration
public class MyMQConfig {
/**
* 容器中的 Binding Queue Exchange 都会自动创建(RabbitMQ没有的情况)
* RabbitMQ 只要有。@Bean声明属性发生变化也不会覆盖
*/
/**
* 订单延时队列
*/
@Bean
public Queue orderDelayQueue() {
// 1、参数
Map<String, Object> arguments = new HashMap<>();
// 1.1、设置死信队列的交换机
arguments.put("x-dead-letter-exchange", "order-event-exchange");
// 1.2、设置死信队列的路由键
arguments.put("x-dead-letter-routing-key", "order.release.order");
// 1.3、设置死信延时时间:1分钟
arguments.put("x-message-ttl", 60000);
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}
/**
* 订单释放队列
*/
@Bean
public Queue orderReleaseOrderQueue() {
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
/**
* 交换机
*/
@Bean
public Exchange orderEventExchange() {
return new TopicExchange("order-event-exchange", true, false);
}
/**
* 绑定延时队列
*/
@Bean
public Binding orderCreateOrderBinding() {
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
/**
* 绑定释放队列
*/
@Bean
public Binding orderReleaseOrderBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
}
修改“com.atguigu.gulimall.order.web.HelloController”类代码如下:
@ResponseBody
@GetMapping("/test/createOrder")
public String createOrderTest(){
//订单下单成功
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
orderEntity.setModifyTime(new Date());
//给MQ发送消息
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
return "ok";
}
修改“com.atguigu.gulimall.order.config.MyMQConfig”类,代码如下:
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("收到过期的订单信息,准备关闭订单:" + orderEntity.getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
访问url进行测试:http://order.gulimall.com/test/createOrder
10.1、消息队列下单整体流程
设计建议规范:(基于事件模型的交换机设计)
- 交换机命名:业务+exchange;交换机为Topic
- 路由键:事件.需要感知的业务(可以不写)
- 队列命名:事件+想要监听服务名+queue
- 绑定关系:事件.感知的业务(#)
11、库存自动解锁
库存解锁的场景
- 下订单成功,订单过期没有支付被系统自动取消,被用户手动取消。都要解锁库存
- 下订单成功,库存锁定成功,接下来的业务调用失效,导致订单回滚。之前锁定的库存就要自动回滚
11.1、锁库存流程
11.2、搭建消息队列环境
gulimall-ware 服务添加RabbitMQ
1、pom导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、添加配置
spring:
rabbitmq: # rabbitmq配置
host: 192.168.119.127
virtual-host: /
listener:
simple:
acknowledge-mode: manual #手动确认机制
3、主启动类添加注解
@EnableRabbit
@EnableFeignClients
@EnableDiscoveryClient
@SpringBootApplication
public class GulimallWareApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallWareApplication.class, args);
}
}
4、添加“com.atguigu.gulimall.ware.config.MyRabbitConfig”类,代码如下:
@Configuration
public class MyRabbitConfig {
/**
* 库存交换机
*/
@Bean
public Exchange stockEventExchange() {
return new TopicExchange("stock-event-exchange", true, false);
}
/**
* 库存释放队列
*/
@Bean
public Queue stockReleaseStockQueue() {
return new Queue("stock.release.stock.queue", true, false, false);
}
/**
* 库存延迟队列
*/
@Bean
public Queue stockDelayQueue() {
// 1、参数
Map<String, Object> arguments = new HashMap<>();
// 1.1、设置死信队列交换机
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
// 1.2、设置死信队列路由键
arguments.put("x-dead-letter-routing-key", "stock.release");
// 1.3、设置死信队列消息延时时间
arguments.put("x-message-ttl", 120000);
return new Queue("stock.delay.queue", true, false, false, arguments);
}
/**
* 绑定库存释放队列
*/
@Bean
public Binding stockReleaseStockBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
new HashMap<>());
}
/**
* 绑定库存锁定队列
*/
@Bean
public Binding stockLockedBinding() {
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
new HashMap<>());
}
}
5、 添加“com.atguigu.gulimall.ware.config.MyMessageConverterConfig”类,代码如下:
@Configuration
public class MyMessageConverterConfig {
/**
* 使用JSON序列化机制,进行消息转换
*
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
11.3、库存锁定
通过锁库存流程,我们可以在库存锁定是添加以下逻辑:
- 由于可能订单回滚的情况,所以为了能够得到库存锁定的信息,在锁定时需要记录库存工作单,其中包括订单信息和锁定库存时的信息(仓库id,商品id,锁了几件...)
- 在锁定成功后,向延迟队列发消息,带上库存锁定的相关信息
修改表wms_ware_order_task_detail
修改“com.atguigu.gulimall.ware.entity.WareOrderTaskDetailEntity”类,代码如下:
@AllArgsConstructor
@NoArgsConstructor
@Data
@TableName("wms_ware_order_task_detail")
public class WareOrderTaskDetailEntity implements Serializable {
/**
* id
*/
@TableId
private Long id;
/**
* sku_id
*/
private Long skuId;
/**
* sku_name
*/
private String skuName;
/**
* 购买个数
*/
private Integer skuNum;
/**
* 工作单id
*/
private Long taskId;
/**
* 仓库id
*/
private long wareId;
/**
* 锁定状态
*/
private Integer lockStatus;
}
修改WareOrderTaskDetailDao.xml,代码如下:
<resultMap type="com.atguigu.gulimall.ware.entity.WareOrderTaskDetailEntity" id="wareOrderTaskDetailMap">
<result property="id" column="id"/>
<result property="skuId" column="sku_id"/>
<result property="skuName" column="sku_name"/>
<result property="skuNum" column="sku_num"/>
<result property="taskId" column="task_id"/>
<result property="wareId" column="ware_id"/>
<result property="lockStatus" column="lock_status"/>
</resultMap>
新增“com.atguigu.common.to.mq.StockDetailTO”类,代码如下:
@Data
public class StockDetailTO {
private Long id;
/**
* sku_id
*/
private Long skuId;
/**
* sku_name
*/
private String skuName;
/**
* 购买个数
*/
private Integer skuNum;
/**
* 工作单id
*/
private Long taskId;
/**
* 仓库id
*/
private long wareId;
/**
* 锁定状态
*/
private Integer lockStatus;
}
修改“com.atguigu.gulimall.ware.service.impl.WareSkuServiceImpl”类,代码如下:
@Transactional
@Override
public Boolean orderLockStock(WareSkuLockVo vo) {
// 保存库存工作单的详情。追溯
WareOrderTaskEntity taskEntity = new WareOrderTaskEntity();
taskEntity.setOrderSn(vo.getOrderSn());
wareOrderTaskService.save(taskEntity);
// 1、按照下单地收货地址,找到一个就近仓库,锁定库存
// 1、找到每个商品在哪个仓库都有库存
List<OrderItemVo> locks = vo.getLocks();
List<SkuWareHasStock> collect = locks.stream().map(item -> {
SkuWareHasStock stock = new SkuWareHasStock();
Long skuId = item.getSkuId();
stock.setSkuId(skuId);
stock.setNum(item.getCount());
// 查询这个商品在哪个仓库有库存
List<Long> wareIds = wareSkuDao.listWareIdHasSkuStock(skuId);
stock.setWareId(wareIds);
return stock;
}).collect(Collectors.toList());
// 2、锁定库存
for (SkuWareHasStock hasStock : collect) {
Boolean skuStocked = false;
Long skuId = hasStock.getSkuId();
List<Long> wareIds = hasStock.getWareId();
if (wareIds == null || wareIds.size() == 0){
// 没有任何库存有这个商品的库存
throw new NoStockException(skuId);
}
// 2.1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ
// 2.2、锁定失败。前面保存的工作单信息就回滚了。发送出去的消息,即使要解锁记录,由于去数据库查不到id,就不用解锁
for (Long wareId : wareIds) {
// 锁库存: 成功返回1;否则就是0
Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum());
if (count == 1){
skuStocked = true;
// 告诉MQ库存锁定成功
WareOrderTaskDetailEntity detailEntity = new WareOrderTaskDetailEntity(null, skuId, "", hasStock.getNum(), taskEntity.getId(), wareId, 1);
wareOrderTaskDetailService.save(detailEntity);
StockLockedTO stockLockedTO = new StockLockedTO();
StockDetailTO stockDetailTO = new StockDetailTO();
BeanUtils.copyProperties(detailEntity, stockDetailTO);
stockLockedTO.setId(taskEntity.getId());
// 防止回滚以后找不到数据
stockLockedTO.setDetail(stockDetailTO);
rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", stockLockedTO);
break;
} else {
// 当前仓库锁定库存失败 重试下一个仓库
}
}
if (skuStocked == false){
// 当前商品所有仓库都没有锁住
throw new NoStockException(skuId);
}
}
// 2.3、肯定全部都是锁定成功的
return true;
}
@Data
class SkuWareHasStock{
private Long skuId;
private Integer num;
private List<Long> wareId;
}
11.4、监听队列
- 延迟队列会将过期的消息路由至
"stock.release.stock.queue"
,通过监听该队列实现库存的解锁- 为保证消息的可靠到达,我们使用手动确认消息的模式,在解锁成功后确认消息,若出现异常则重新归队
@Slf4j
@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {
@Autowired
private WareSkuService wareSkuService;
/**
* 1、库存自动解锁
* 库存解锁的场景
* 1)、下订单成功,库存锁定成功,接下来的业务调用失效,导致订单回滚。之前锁定的库存就要自动回滚
* 2)、订单失败
* 锁库存失败
*
* 只要解锁库存的消息失败。一定要告诉服务解锁失败。
*
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTO to, Message message, Channel channel) throws IOException {
log.info("************************收到库存解锁的消息********************************");
try {
wareSkuService.unLockStock(to);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
11.5、库存解锁
- 如果工作单详情不为空,说明该库存锁定成功
- 查询最新的订单状态,如果订单不存在,说明订单提交出现异常回滚,或者订单处于已取消的状态,我们都对已锁定的库存进行解锁
- 如果工作单详情为空,说明库存未锁定,自然无需解锁
- 为保证幂等性,我们分别对订单的状态和工作单的状态都进行了判断,只有当订单过期且工作单显示当前库存处于锁定的状态时,才进行库存的解锁
/**
* 解锁库存流程:
* 1、查询数据库关于这个订单的锁定库存信息
* 1.1、有:证明库存锁定成功了
* 解锁:要看订单情况
* 1.1.1、没有这个订单。必须解锁
* 1.1.2、有这个订单。不是直接解锁库存,要看订单状态
* 订单状态:已取消:解锁库存
* 没取消,不能解锁
* 1.2、没有:库存锁定失败了,库存回滚了
* @param to
*/
@Override
public void unLockStock(StockLockedTO to) {
StockDetailTO detail = to.getDetail();
Long detailId = detail.getId();
// 1、查询数据库关于这个订单的锁定库存信息
WareOrderTaskDetailEntity byId = wareOrderTaskDetailService.getById(detailId);
// 1.1、解锁
if (byId != null) {
Long id = to.getId();
WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);
// 根据订单号查询订单的状态
String orderSn = taskEntity.getOrderSn();
R r = orderFeignService.getOrderStatus(orderSn);
if (r.getCode() == 0) {
// 订单数据返回成功
OrderVo data = r.getData(new TypeReference<OrderVo>() {
});
// 订单不存在 || 订单已经取消了:才能解锁库存
if (data == null || data.getStatus() == 4) {
// 为保证幂等性,只有当库存工作单详情处于被锁定的情况下(lockStatus = 1)才进行解锁
if (byId.getLockStatus() == 1) {
// 解锁库存具体实现
unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);
}
}
} else {
// 消息拒绝以后重新放到队列里面,让别人继续消费解锁。
throw new RuntimeException("远程调用订单服务失败");
}
} else {
// 1.2、没有:库存锁定失败了,库存回滚了,无需解锁
}
}
/**
* 解锁库存具体实现
*
* @param skuId 商品ID
* @param wareId 库存Id
* @param num 数量
* @param taskDetailId 工作单详情Id
*/
private void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {
// 解锁库存
wareSkuDao.unlockStock(skuId, wareId, num, taskDetailId);
// 更新库存工作单的状态
WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();
entity.setId(taskDetailId);
entity.setLockStatus(2);// 变为已解锁
wareOrderTaskDetailService.updateById(entity);
}
远程查询订单状态 gulimall-order
添加“com.atguigu.gulimall.ware.feign.OrderFeignService”类,代码如下:
@FeignClient("gulimall-order")
public interface OrderFeignService {
@GetMapping("/order/order/status/{orderSn}")
R getOrderStatus(@PathVariable("orderSn") String orderSn);
}
添加“com.atguigu.gulimall.order.controller.OrderController”类,代码如下:
@GetMapping("/status/{orderSn}")
public R getOrderStatus(@PathVariable("orderSn") String orderSn){
OrderEntity orderEntity = orderService.getOrderByOrderSn(orderSn);
return R.ok().setData(orderEntity);
}
修改“com.atguigu.gulimall.order.service.OrderService”类,代码如下:
OrderEntity getOrderByOrderSn(String orderSn);
修改“com.atguigu.gulimall.order.service.impl.OrderServiceImpl”类,代码如下:
@Override
public OrderEntity getOrderByOrderSn(String orderSn) {
OrderEntity order_sn = this.getOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderSn));
return order_sn;
}
由于gulimall-order添加了拦截器,只要使用该服务必须登录才行。因为这边需要远程调用订单,但不需要登录,所以给这个路径放行
@Component
public class LoginUserInterceptor implements HandlerInterceptor {
public static ThreadLocal<MemberResponseVO> loginUser = new ThreadLocal<>();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String uri = request.getRequestURI();
// 该路径只是远程调用,你不需要登录拦截
boolean match = new AntPathMatcher().match("/order/order/status/**", uri);
if (match){
return true;
}
MemberResponseVO attribute = (MemberResponseVO) request.getSession().getAttribute(AuthServerConstant.LOGIN_USER);
if (attribute != null){
loginUser.set(attribute);
return true;
}else {
// 没登录就去登录
request.getSession().setAttribute("msg","请先进行登录");
response.sendRedirect("http://auth.gulimall.com/login.html");
return false;
}
}
}
修改“com.atguigu.gulimall.ware.dao.WareSkuDao”类,代码如下:
void unlockStock(@Param("skuId") Long skuId, @Param("wareId") Long wareId, @Param("num") Integer num, @Param("taskDetailId") Long taskDetailId);
<update id="unlockStock">
update wms_ware_sku set stock_locked = stock_locked - #{num}
where sku_id = #{skuId} and ware_id = #{wareId}
</update>
测试的时候我们在下订单方法修改代码,故意让锁定库存成功之后,报错,进行回滚
12、定时关单
12.1、提交订单
添加“com.atguigu.gulimall.order.web.OrderWebController”类,代码如下:
/**
* 下单功能
* @param submitVo
* @param model
* @param redirectAttributes
* @return
*/
@PostMapping("/submitOrder")
public String submitOrder(OrderSubmitVo submitVo, Model model, RedirectAttributes redirectAttributes) {
// 下单 去创建订单 验证令牌 核算价格 锁定库存
try {
SubmitOrderResponseVo responseVo = orderService.submitOrder(submitVo);
System.out.println("============================="+responseVo.getCode());
if (responseVo.getCode() == 0) {
// 下单成功到选择支付方式页面
model.addAttribute("submitOrderResp", responseVo);
return "pay";
} else {
// 订单失败返回到订单确认页面
String msg = "下订单失败: ";
switch (responseVo.getCode()) {
case 1 : msg += "订单信息过期, 请刷新后再次提交."; break;
case 2 : msg += "订单中的商品价格发生变化, 请刷新后再次提交."; break;
case 3 : msg += "库存锁定失败, 商品库存不足."; break;
}
redirectAttributes.addFlashAttribute("msg", msg);
return "redirect:http://order.gulimall.com/toTrade";
}
} catch (Exception e) {
if (e instanceof NoStockException) {
String message = e.getMessage();
redirectAttributes.addFlashAttribute("msg", message);
}
return "redirect:http://order.gulimall.com/toTrade";
}
12.2、监听队列
创建订单的消息会进入延迟队列,最终发送至队列order.release.order.queue
,因此我们对该队列进行监听,进行订单的关闭
@Service
@RabbitListener(queues = "order.release.order.queue")
public class OrderCloseListener {
@Autowired
private OrderService orderService;
@RabbitHandler
public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
try {
// 定时关单
orderService.closeOrder(entity);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 修改失败 拒绝消息 使消息重新入队
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
12.3、关闭订单
- 由于要保证幂等性,因此要查询最新的订单状态判断是否需要关单
- 关闭订单后也需要解锁库存,因此发送消息进行库存、会员服务对应的解锁
添加“com.atguigu.gulimall.order.service.OrderService”类,代码如下:
void closeOrder(OrderEntity entity);
修改“com.atguigu.gulimall.order.service.impl.OrderServiceImpl”,代码如下:
@Override
public void closeOrder(OrderEntity entity) {
// 查询当前这个订单地最新状态
OrderEntity orderEntity = this.getById(entity.getId());
// 待付款的状态才可以关单
if (orderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()) {
OrderEntity update = new OrderEntity();
update.setId(entity.getId());
update.setStatus(OrderStatusEnum.CANCLED.getCode());
this.updateById(update);
OrderTo orderTo = new OrderTo();
BeanUtils.copyProperties(orderEntity, orderTo);
// 发给MQ一个
try {
// 保证消息一定会发送出去,每一个消息都可以做好日志记录(给数据库保存每一个消息的详细信息)
// 定期扫描数据库将失败的消息在发送一遍
rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
} catch (Exception e) {
// 将没发送成功的消息进行重试发送
}
}
}
添加“com.atguigu.common.to.mq.OrderTo”类,代码如下:
@Data
public class OrderTo {
private Long id;
/**
* member_id
*/
private Long memberId;
/**
* 订单号
*/
private String orderSn;
/**
* 使用的优惠券
*/
private Long couponId;
/**
* create_time
*/
private Date createTime;
/**
* 用户名
*/
private String memberUsername;
/**
* 订单总额
*/
private BigDecimal totalAmount;
/**
* 应付总额
*/
private BigDecimal payAmount;
/**
* 运费金额
*/
private BigDecimal freightAmount;
/**
* 促销优化金额(促销价、满减、阶梯价)
*/
private BigDecimal promotionAmount;
/**
* 积分抵扣金额
*/
private BigDecimal integrationAmount;
/**
* 优惠券抵扣金额
*/
private BigDecimal couponAmount;
/**
* 后台调整订单使用的折扣金额
*/
private BigDecimal discountAmount;
/**
* 支付方式【1->支付宝;2->微信;3->银联; 4->货到付款;】
*/
private Integer payType;
/**
* 订单来源[0->PC订单;1->app订单]
*/
private Integer sourceType;
/**
* 订单状态【0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单】
*/
private Integer status;
/**
* 物流公司(配送方式)
*/
private String deliveryCompany;
/**
* 物流单号
*/
private String deliverySn;
/**
* 自动确认时间(天)
*/
private Integer autoConfirmDay;
/**
* 可以获得的积分
*/
private Integer integration;
/**
* 可以获得的成长值
*/
private Integer growth;
/**
* 发票类型[0->不开发票;1->电子发票;2->纸质发票]
*/
private Integer billType;
/**
* 发票抬头
*/
private String billHeader;
/**
* 发票内容
*/
private String billContent;
/**
* 收票人电话
*/
private String billReceiverPhone;
/**
* 收票人邮箱
*/
private String billReceiverEmail;
/**
* 收货人姓名
*/
private String receiverName;
/**
* 收货人电话
*/
private String receiverPhone;
/**
* 收货人邮编
*/
private String receiverPostCode;
/**
* 省份/直辖市
*/
private String receiverProvince;
/**
* 城市
*/
private String receiverCity;
/**
* 区
*/
private String receiverRegion;
/**
* 详细地址
*/
private String receiverDetailAddress;
/**
* 订单备注
*/
private String note;
/**
* 确认收货状态[0->未确认;1->已确认]
*/
private Integer confirmStatus;
/**
* 删除状态【0->未删除;1->已删除】
*/
private Integer deleteStatus;
/**
* 下单时使用的积分
*/
private Integer useIntegration;
/**
* 支付时间
*/
private Date paymentTime;
/**
* 发货时间
*/
private Date deliveryTime;
/**
* 确认收货时间
*/
private Date receiveTime;
/**
* 评价时间
*/
private Date commentTime;
/**
* 修改时间
*/
private Date modifyTime;
}
12.4、解锁库存
订单释放&库存解锁流程
- 库存不能解锁场景
- 防止订单服务卡顿,导致订单状态一直改变不了,库存消息优先到期,查订单状态新建状态,什么都不做就走了
- 导致卡顿的订单,永远不能解锁库存
- 解决办法
- 再添加一个订单关闭准备解锁库存队列
修改“com.atguigu.gulimall.ware.listener.StockReleaseListener”类,代码如下:
@Slf4j
@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {
@Autowired
private WareSkuService wareSkuService;
/**
* 1、库存自动解锁
* 库存解锁的场景
* 1)、下订单成功,库存锁定成功,接下来的业务调用失效,导致订单回滚。之前锁定的库存就要自动回滚
* 2)、订单失败
* 锁库存失败
*
* 只要解锁库存的消息失败。一定要告诉服务解锁失败。
*
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTO to, Message message, Channel channel) throws IOException {
log.info("************************收到库存解锁的消息********************************");
try {
// 解锁库存
wareSkuService.unLockStock(to);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
@RabbitHandler
public void handleOrderCloseRelease(OrderTo to, Message message, Channel channel) throws IOException {
log.info("************************订单关闭准备解锁库存********************************");
try {
wareSkuService.unLockStockForOrder(to);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
修改“com.atguigu.gulimall.ware.service.WareSkuService”类,代码如下:
void unLockStockForOrder(OrderTo to);
修改“com.atguigu.gulimall.order.service.impl.OrderServiceImpl”类,代码如下:
/**
* 防止订单服务卡顿,导致订单状态一直改变不了,库存消息优先到期,查订单状态新建状态,什么都不做就走了
* 导致卡顿的订单,永远不能解锁库存
*/
@Transactional
@Override
public void unLockStockForOrder(OrderTo to) {
String orderSn = to.getOrderSn();
// 查一下最新的库存解锁状态,防止重复解锁库存
// 1、根据订单号查询库存工作单
WareOrderTaskEntity task = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);
Long id = task.getId();
// 2、按照工作单找到所有 没有解锁的库存,进行解锁
List<WareOrderTaskDetailEntity> entities = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", id).eq("lock_status", 1));
for (WareOrderTaskDetailEntity entity : entities) {
// 3、解锁库存具体实现
unLockStock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId());
}
}
修改“com.atguigu.gulimall.ware.service.WareOrderTaskService”类,代码如下:
WareOrderTaskEntity getOrderTaskByOrderSn(String orderSn);
修改“com.atguigu.gulimall.ware.service.impl.WareOrderTaskServiceImpl”类,代码如下:
@Override
public WareOrderTaskEntity getOrderTaskByOrderSn(String orderSn) {
WareOrderTaskEntity orderTaskEntity = this.getOne(new QueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn));
return orderTaskEntity;
}
总结:
- 做好消息确认机制(pulisher,consumer【手动 ack】)
- 每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一遍
CREATE TABLE `mq_message` (
`message_id` char(32) NOT NULL, `content` text, `to_exchane` varchar(255) DEFAULT NULL,
`routing_key` varchar(255) DEFAULT NULL, `class_type` varchar(255) DEFAULT NULL,
`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
`create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
13、消息丢失、积压、重复等解决方案
如何保证消息可靠性:
13.1、消息丢失
- 消息发送出去,由于网络问题没有抵达服务器
- 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式
- 做好日志记录,每个消息状态是否都被服务器收到都应该记录
- 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发
- 消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。
- publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。
- 自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机
- 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新入队
解决方案:
1、 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式
2、做好消息确认机制(publisher,consumer【手动ack】)
修改“com.atguigu.gulimall.order.config.MyRabbitConfig”类,代码如下:
@Configuration
public class MyRabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // MyRabbitConfig对象创建完以后,执行这个方法
public void initRabbitTemplate(){
// 设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 只要消息抵达Broker就b = true
* @param correlationData 当前消息的唯一关联数据(这个消息的唯一id)
* @param ack 消息是否成功收到
* @param s 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
/**
* 1、做好消息确认机制(publisher,consumer【手动ack】)
* 2、每一个发送的消息都在数据库做好记录。定期将失效的消息再次发送
*/
// 服务器收到了
log.info("confirm...correlationData["+correlationData+"]==>ack["+ack+"]s==>["+s+"]");
}
});
// 设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* @param message 投递失败的消息详细信息
* @param i 回复的状态码
* @param s 回复的文本内容
* @param s1 当时这个消息发给哪个交换机
* @param s2 当时这个消息用哪个路由键
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
// 报错误了。修改数据库当前消息的错误状态-》错误
log.error("Fail Message["+message+"]==>i["+i+"]==>s["+s+"]==>s1["+s1+"]==>s2["+s2+"]");
}
});
}
}
13.2、消息重复
- 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
- 消息消费失败,由于重试机制,自动又将消息发送出去
- 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
- 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标志
- 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理
- rabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的
解决方案:
1、消费者的业务消费接口应该设计为幂等性的
13.3、消息积压
- 消费者宕机积压
- 消费者消费能力不足积压
- 发送者发送流量太大
- 上线更多的消费者,进行正常消费
- 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理
更多推荐
所有评论(0)