关于异步转同步(微服务集群的实现设想,使用消息中间件)
微服务集群如何实现异步转同步,使用消息中间件前言一、集群与集群实现异步转同步的难点在哪儿?二、看图说实现方法1.分析请求2.分析响应总结前言之前接到一个异步转同步的需求:(下一行先从左到右看,再下一行从右到左看)上游系统--> A系统 --> mqtt(请求由B系消费) ;上游系统<-- A系统 <-- mqtt(B系消费后,把结果放入mqtt);但是我负责的A系统,且不是
前言
之前接到一个异步转同步的需求:(下一行先从左到右看,再下一行从右到左看)
上游系统 --> A系统 --> mqtt(请求由B系消费) ;
上游系统 <-- A系统 <-- mqtt(B系消费后,把结果放入mqtt);
但是我负责的A系统,且不是集群。所以直接使用的ConcurrentHashMap + synchronized / ReentrantLock 来实现的。但如果A系统是集群又应该怎样去实现?
一、集群与集群实现异步转同步的难点在哪儿?
如:A集群又三个实例 ,B集群又三个实例,AB之间要实现异步转同步(AB通过消息中间件通信)
难点在于:如何避免重复处理?
即:
1:A集群 A1 发了一个请求到B集群,如何保证B集群只有一个实例 B1 处理该请求。
2:B集群 B1 返回的结果,如何保证是原来的 A1 被唤醒并进行处理。
二、看图说实现方法
1.分析请求
a:上游请求经过网关分发,只会有一个A系统实例收到请求,如 A1。
b:A1处理完业务逻辑,向kafka 的 topic 1 中放入消息
c:然后 A1开启分布式锁,以唯一约束作为分布式锁的value,同时往redis中写一条数,并让线程await
----------redis中写一条数,用于后面判断该由哪个的实例A处理响应结果:key为 prefix:A1的ip:A1的port:请求的唯一约束,value为 condition
----------让线程await:如果20秒内收到 期望的响应(该自己处理的响应) 则唤醒线程;没收到,则自定义处理逻辑(可以返回上游失败\处理中,也可以继续等待10s)。
d:B集群的三个服务实例属于同一个消费者组,所以 topic 1 内的消息只会有一个服务实例收到请求,如 B1。
RLock lock = redissonClient.getLock("唯一约束");
Boolean isLock = lock.tryLock();
if (isLock) {
//获取到锁之后 创建Condition 用于指定线程 wait 和 signal
Condition condition = lock.newCondition();
//redis 里存上 condition 用于唤醒线程 condition.signal()
redissonUtil.setObject("prefix:A1的ip:A1的port:请求的唯一约束",condition);
condition.await(20, TimeUnit.SECONDS);
//查询redis中是否存在返回结果
Object object = redissonUtil.getObject("唯一约束");
if(null != object){
//获取到值,进行业务逻辑处理
//业务逻辑处理完后,删除不再使用的缓存数据
redissonUtil.delObject("唯一约束");
}else{
//没获取到值,就是超时了,自定义超时处理
}
)
2.分析响应
a:B1 处理完业务逻辑后,将结果放入topic 2中,结果中需要包含:prefix:A1的ip:A1的port:请求的唯一约束 中的 唯一约束值
b: 因为A1、A2、A3是三个不同组的消费者,所以A1、A2、A3都能收到 topic 2中的消息。
c: A1、A2、A3解析 B1 返回的结果,去 redis 查询该消息 是否应该由自己处理,如 应由A1处理。
----------在redis中根据 prefix:自己的ip:自己的port:请求的唯一约束 查询是否存在
d: 存在 则将B1返回的结果放入 redis 中,然后取出 prefix:自己的ip:自己的port:请求的唯一约束的value,唤醒之前的await的线程。
---------结果放入 redis 中:key 为 result:请求的唯一约束,value为B1返回的结果
e:被唤醒的线程,取出redis中 key为 result:请求的唯一约束 的value,进行后面的逻辑处理。
// 解析B1返回结果,获取唯一标识 判断是否应该当前服务实例处理
Object conditionValue = redissonUtil.getObject("prefix:自己的ip:自己的port:请求的唯一约束");
if(null != conditionValue){
//把B1的返回结果放入redis中
redissonUtil.setObject("唯一标识",B1_ResultData);
Condition condition = (Condition)conditionValue;
condition.signal();
//唤醒线程后,删除不再使用的缓存数据
redissonUtil.delObject("prefix:自己的ip:自己的port:请求的唯一约束");
}
总结
以上就是我的设想,另外总结下:
1:基于kafka 消费者分组的概念,可以实现 精准唤醒等待线程
2:使用redis代替了ConcurrentHashMap ,因为ConcurrentHashMap 是存在内存中的,防止宕机后数据无法找回。
3:redis 存了两种数据,
第一种是:prefix:自己的ip:自己的port:请求的唯一约束 :Condition
------------ key 用来判断是哪个实例A处理B1的响应
------------ value 用来精准唤醒线程
第二种是: 唯一标识 :B1_ResultData
----------- 用来获取B1返回的结果
4:不止kafka有消费者分组的概念,mqtt 和rocket都有分组的概念。
有不足的地方,请大佬们指正。
更多推荐
所有评论(0)