微服务调用失败的一种解决方案
一.简介今天微服务和远程调用已经使用的很广泛了,可以解决我们很多的问题,不过由于远程调用不可控因素更多,失败的可能性更大,但是一些接口可能需要很高的要求,需要每一次调用都需要成功。比如订单流转。调用失败订单就丢失了,影响很大。我这里介绍一种简单的解决办法,采用消息队列解构接口调用,定时器重新发送,mysql持久化。配合人工处理可以较好的解决这个问题。二.流程...
一.简介
今天微服务和远程调用已经使用的很广泛了,可以解决我们很多的问题,不过由于远程调用不可控因素更多,失败的可能性更大,但是一些接口可能需要很高的要求,需要每一次调用都需要成功。比如订单流转。调用失败订单就丢失了,影响很大。我这里介绍一种简单的解决办法,采用消息队列解构接口调用,定时器重新发送,mysql持久化。配合人工处理可以较好的解决这个问题。
二.时序图
三.简介
3.1创建springboot 父子工程
service-rpc-kafka
├── service-api -- 系统共用api模块
├── service-consumer -- rpc服务消费模块(8062)
├── service-producter-- rpc服务生产模块(8061)
├── service-rpc-fail-dispose -- rpc失败服务处理模块(8063)
├── sql -- 数据库初始化脚本
3.2父工程pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>service-api</module>
<module>service-consumer</module>
<module>service-producer</module>
<module>service-rpc-fail-dispose</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yangzheng</groupId>
<artifactId>service-rpc-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>service-rpc-kafka</name>
<description>远程调用kafka解耦,处理远程调用失败</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--引入dubbo环境-->
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.29</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.28</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
<!-- hutool 工具集合包 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.5.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.3 api接口依赖
TestService 在service-api 模块中给其他服务依赖使用
package com.yangzheng.service;
import com.yangzheng.vo.UserVo;
/**
* @author yangzheng
* @Description: //TODO
* @Title: TestService
* @ProjectName service-rpc-kafka
* @date 2020/6/3/003 11:30
*/
public interface TestService {
String test(UserVo userVo);
}
UserVo
import lombok.Data;
import java.io.Serializable;
/**
* @author yangzheng
* @Description: //TODO
* @Title: UserVo
* @ProjectName service-rpc-kafka
* @date 2020/6/3/003 15:59
*/
@Data
public class UserVo implements Serializable {
private String name;
private Integer age;
}
3.4 producer 服务生产者
启动类加上开启dubbo
@EnableDubbo
yml配置文件
dubbo:
application:
name: service-producer
registry:
address: localhost:2181
protocol: zookeeper
check: false
protocol:
name: dubbo
port: 30003
monitor:
protocol: register
consumer:
check: false
timeout: 3000
server:
port: 8061
spring:
datasource:
name: user
driver-class-name: com.mysql.cj.jdbc.Driver
###################以下为druid增加的配置###########################
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://localhost:3306/service_rpc_kafka?serverTimezone=Asia/Chongqing&useUnicode=true&characterEncoding=utf8&characterSetResults=utf8&useSSL=false&verifyServerCertificate=false&autoReconnct=true&autoReconnectForPools=true&allowMultiQueries=true
username: root
password: 123456
# 下面为连接池的补充设置,应用到上面所有数据源中
# 初始化大小,最小,最大
initialSize: 5
minIdle: 5
maxActive: 100
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
# 打开PSCache,并且指定每个连接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
filters: stat,wall,logback
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# 合并多个DruidDataSource的监控数据
useGlobalDataSourceStat: true
# druid recycle
removeAbandoned: true
removeAbandonedTimeout: 300
logAbandoned: true
###############以上为配置druid添加的配置########################################
transaction:
rollback-on-commit-failure: true
import com.alibaba.dubbo.config.annotation.Service;
import com.yangzheng.service.TestService;
import com.yangzheng.vo.UserVo;
import lombok.extern.slf4j.Slf4j;
/**
* @author yangzheng
* @Description: //TODO
* @Title: TestServiceImpl
* @ProjectName service-rpc-kafka
* @date 2020/6/3/003 11:34
*/
@Service
@Slf4j
public class TestServiceImpl implements TestService {
@Override
public String test(UserVo userVo) {
log.info("dubbo服务调用成功,服务调用者为"+userVo.getName());
return "dubbo服务调用成功,服务调用者为"+userVo.getName();
}
}
3.5 服务消费者
controller 先发送到 业务 topic
import com.alibaba.fastjson.JSON;
import com.yangzheng.vo.UserVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author yangzheng
* @Description: //TODO
* @Title: TestController
* @ProjectName service-rpc-kafka
* @date 2020/6/3/003 13:17
*/
@RestController
public class TestController {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@RequestMapping("/test")
public String test(){
UserVo userVo = new UserVo();
userVo.setName("yangzheng");
userVo.setAge(18);
kafkaTemplate.send("test", JSON.toJSONString(userVo, true));
return "success";
}
}
kafka listener监听topic,消费消息调用服务生产者的接口,成功就调用成功的工具类方法,失败就调用失败的工具类方法
import cn.hutool.json.JSONUtil;
import com.alibaba.druid.util.StringUtils;
import com.alibaba.dubbo.config.annotation.Reference;
import com.yangzheng.service.TestService;
import com.yangzheng.serviceconsumer.util.MqMessageUtil;
import com.yangzheng.vo.UserVo;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @author yangzheng
* @Description: //TODO
* @Title: KafkaTestListener
* @ProjectName service-rpc-kafka
* @date 2020/6/3/003 13:51
*/
@Component
@Slf4j
public class KafkaTestListener {
@Reference(check=false, timeout = 60000)
TestService testService;
@KafkaListener(topics = "test",groupId = "test")
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
JSONObject json = JSONObject.fromObject(record.value());
log.info("test 消费了: Topic:" + topic + ",Message:" + json);
try {
UserVo userVo = JSONUtil.toBean(json.toString(), UserVo.class);
String result = testService.test(userVo);
if (!StringUtils.isEmpty(result)) {
MqMessageUtil.handleSuccessMsg(record);
} else {
MqMessageUtil.handleFailMsg(record);
}
} catch (Exception e) {
MqMessageUtil.handleFailMsg(record);
log.error("",e);
}
ack.acknowledge();
}
}
}
MqMessageUtil工具类,发送消息到调用接口失败或成功的topic,用于失败重试
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.core.KafkaTemplate;
/**
* @author yudong
* @date 2019/12/31
*/
public class MqMessageUtil {
public static final String RESEND_FLAG = "mq_message_resend_flag";
private static KafkaTemplate<String, Object> kafkaTemplate = SpringUtil.getApplicationContext().getBean(KafkaTemplate.class);
public static void handleSuccessMsg(ConsumerRecord<?, ?> record) {
JSONObject json = JSONObject.fromObject(record.value());
if (json.has(RESEND_FLAG)) {
int id = json.getInt(RESEND_FLAG);
JSONObject object = new JSONObject();
object.put("id", id);
kafkaTemplate.send("mall_moonmall_statistical_success", object.toString());
}
}
public static void handleFailMsg(ConsumerRecord<?, ?> record) {
String key = record.key() != null ? String.valueOf(record.key()) : "";
JSONObject json = new JSONObject();
json.put("topic", record.topic());
json.put("key", key);
json.put("value", record.value());
kafkaTemplate.send("mall_moonmall_statistical_fail", json.toString());
}
}
SpringUtil
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if(SpringUtil.applicationContext == null){
SpringUtil.applicationContext = applicationContext;
}
System.out.println("========ApplicationContext配置成功,在普通类可以通过调用SpringUtils.getAppContext()获取applicationContext对象,applicationContext="+SpringUtil.applicationContext+"========");
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
}
3.6调用消息处理
监听 MqMessageUtil 发送过来的消息,第一次失败就插入数据库,其他失败就直接返回
import com.yangzheng.servicerpcfaildispose.dao.MallMqLogMapper;
import com.yangzheng.servicerpcfaildispose.model.MallMqLog;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @author yangzheng
* @Description: //TODO
* @Title: RpcDisposeListener
* @ProjectName service-rpc-kafka
* @date 2020/6/3/003 15:44
*/
@Component
@Slf4j
public class RpcDisposeListener {
@Autowired
private MallMqLogMapper mallMqLogMapper;
/**
* 监听成功调用
* @param record
* @param ack
* @param topic
*/
@KafkaListener(topics = "mall_moonmall_statistical_success",groupId = "test")
public void RpcSuccess(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("mall_moonmall_statistical_success 消费了: Topic:" + topic + ",Message:" + msg);
JSONObject json = JSONObject.fromObject(record.value());
int id = json.getInt("id");
MallMqLog mallMqLog = new MallMqLog();
mallMqLog.setId(id);
mallMqLog.setMqStatus(2);
int affect = mallMqLogMapper.updateById(mallMqLog);
log.info("updateSuccessMq affect:{},{}", id, affect);
ack.acknowledge();
}
}
/**
* 监听失败调用
* @param record
* @param ack
* @param topic
*/
@KafkaListener(topics = "mall_moonmall_statistical_fail",groupId = "test")
public void RpcFail(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("mall_moonmall_statistical_fail 消费了: Topic:" + topic + ",Message:" + msg);
JSONObject json = JSONObject.fromObject(record.value());
String topicRpc = json.getString("topic");
String key = json.getString("key");
String valueStr = json.getString("value");
JSONObject valueJson = JSONObject.fromObject(valueStr);
if (valueJson.has("mq_message_resend_flag")) {
return;
}
MallMqLog mallMqLog = new MallMqLog();
mallMqLog.setTopic(topicRpc);
mallMqLog.setMqKey(key);
mallMqLog.setMqValue(valueStr);
mallMqLog.setResendTimes(0);
mallMqLog.setMqStatus(2);
mallMqLogMapper.insert(mallMqLog);
ack.acknowledge();
}
}
}
定时器读取数据库里面的需要重发的消息,重新发给业务topic调用服务
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.baomidou.mybatisplus.mapper.Wrapper;
import com.baomidou.mybatisplus.toolkit.StringUtils;
import com.google.common.collect.Lists;
import com.yangzheng.servicerpcfaildispose.dao.MallMqLogMapper;
import com.yangzheng.servicerpcfaildispose.model.MallMqLog;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.Date;
import java.util.List;
/**
* @author yangzheng
* @Description: //TODO
* @Title: RpcFailSchedule
* @ProjectName service-rpc-kafka
* @date 2020/6/3/003 16:35
*/
@Component
@Slf4j
public class RpcFailSchedule {
@Autowired
MallMqLogMapper mallMqLogMapper;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Scheduled(cron = "* */1 * * * ?")
public void handleFailRpc() {
List<String> resList = Lists.newArrayList();
Wrapper<MallMqLog> wrapper = new EntityWrapper<>();
wrapper.and("mq_status",1);
wrapper.le("resend_times",6);
// 查询所有处理失败的消息
List<MallMqLog> list = mallMqLogMapper.selectList(wrapper);
for (MallMqLog mallMqLog : list) {
resList.add(mallMqLog.getId() + "," + mallMqLog.getTopic() + "," + mallMqLog.getMqKey());
Integer resendTimes = mallMqLog.getResendTimes();
String value = mallMqLog.getMqValue();
JSONObject json = JSONObject.fromObject(value);
// 添加重发标志
json.put("mq_message_resend_flag", mallMqLog.getId());
value = json.toString();
String key = StringUtils.isNotEmpty(mallMqLog.getMqKey()) ? mallMqLog.getMqKey() : null;
// 重新发送处理失败的消息
kafkaTemplate.send(mallMqLog.getTopic(), key, value)
.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
// do nothing
}
@Override
public void onSuccess(SendResult<String, Object> result) {
// 发送成功,重发次数加1
MallMqLog log = new MallMqLog();
log.setId(mallMqLog.getId());
Integer times = (resendTimes + 1);
log.setResendTimes(times);
log.setOpTime(new Date());
mallMqLogMapper.updateById(log);
}
});
}
log.info("rehandleFailMq:{}", resList);
}
}
四.总结
这个方法在我们公司的实际生产环境中有用到,效果还不错,因为大版本上线或者系统切换的时候,有可能会有很多原因导致服务调用失败,所以我们就做了这个重试机制,自动重试一定次数,超过最大次数就短信报警人工解决问题,解决问题后,把数据库中失败消息的重试次数改小一点,这些失败的调用又可以自动重试了,不用再人工处理。这个方法优点使用范围比较广,做好之后其他服务想用只有用工具类发消息即可,但是也有一定的缺点,就是流程变得更长了。
五.源码
更多推荐
所有评论(0)