前言

之前接到一个异步转同步的需求:(下一行先从左到右看,再下一行从右到左看)
上游系统 --> A系统 --> mqtt(请求由B系消费) ;
上游系统 <-- A系统 <-- mqtt(B系消费后,把结果放入mqtt);

但是我负责的A系统,且不是集群。所以直接使用的ConcurrentHashMap + synchronized / ReentrantLock 来实现的。但如果A系统是集群又应该怎样去实现?


一、集群与集群实现异步转同步的难点在哪儿?

如:A集群又三个实例 ,B集群又三个实例,AB之间要实现异步转同步(AB通过消息中间件通信)
难点在于:如何避免重复处理?
即:
1:A集群 A1 发了一个请求到B集群,如何保证B集群只有一个实例 B1 处理该请求。
2:B集群 B1 返回的结果,如何保证是原来的 A1 被唤醒并进行处理。

二、看图说实现方法

在这里插入图片描述

1.分析请求

a:上游请求经过网关分发,只会有一个A系统实例收到请求,如 A1

b:A1处理完业务逻辑,向kafka 的 topic 1 中放入消息
c:然后 A1开启分布式锁,以唯一约束作为分布式锁的value,同时往redis中写一条数,并让线程await
----------redis中写一条数,用于后面判断该由哪个的实例A处理响应结果:key为 prefix:A1的ip:A1的port:请求的唯一约束,value为 condition
----------让线程await:如果20秒内收到 期望的响应(该自己处理的响应) 则唤醒线程;没收到,则自定义处理逻辑(可以返回上游失败\处理中,也可以继续等待10s)。

d:B集群的三个服务实例属于同一个消费者组,所以 topic 1 内的消息只会有一个服务实例收到请求,如 B1

RLock lock = redissonClient.getLock("唯一约束");
Boolean isLock = lock.tryLock();
if (isLock) {
            //获取到锁之后 创建Condition 用于指定线程 wait 和 signal
            Condition condition = lock.newCondition();  
            //redis 里存上 condition 用于唤醒线程 condition.signal()
			redissonUtil.setObject("prefix:A1的ip:A1的port:请求的唯一约束",condition);
            condition.await(20, TimeUnit.SECONDS);
            
            //查询redis中是否存在返回结果
            Object object = redissonUtil.getObject("唯一约束");
            if(null != object){
                //获取到值,进行业务逻辑处理
                //业务逻辑处理完后,删除不再使用的缓存数据
				redissonUtil.delObject("唯一约束");
            }else{
            	//没获取到值,就是超时了,自定义超时处理
			}

2.分析响应

a:B1 处理完业务逻辑后,将结果放入topic 2中,结果中需要包含:prefix:A1的ip:A1的port:请求的唯一约束 中的 唯一约束值

b: 因为A1、A2、A3是三个不同组的消费者,所以A1、A2、A3都能收到 topic 2中的消息。

c: A1、A2、A3解析 B1 返回的结果,去 redis 查询该消息 是否应该由自己处理,如 应由A1处理。
----------在redis中根据 prefix:自己的ip:自己的port:请求的唯一约束 查询是否存在

d: 存在 则将B1返回的结果放入 redis 中,然后取出 prefix:自己的ip:自己的port:请求的唯一约束的value,唤醒之前的await的线程。
---------结果放入 redis 中:key 为 result:请求的唯一约束,value为B1返回的结果

e:被唤醒的线程,取出redis中 key为 result:请求的唯一约束 的value,进行后面的逻辑处理。

 // 解析B1返回结果,获取唯一标识 判断是否应该当前服务实例处理
Object conditionValue = redissonUtil.getObject("prefix:自己的ip:自己的port:请求的唯一约束");
if(null != conditionValue){
	//把B1的返回结果放入redis中
	redissonUtil.setObject("唯一标识",B1_ResultData);
	Condition condition = (Condition)conditionValue;
	condition.signal();
	//唤醒线程后,删除不再使用的缓存数据
	redissonUtil.delObject("prefix:自己的ip:自己的port:请求的唯一约束");
}

总结

以上就是我的设想,另外总结下:

1:基于kafka 消费者分组的概念,可以实现 精准唤醒等待线程

2:使用redis代替了ConcurrentHashMap ,因为ConcurrentHashMap 是存在内存中的,防止宕机后数据无法找回。

3:redis 存了两种数据,
第一种是:prefix:自己的ip:自己的port:请求的唯一约束 :Condition
------------ key 用来判断是哪个实例A处理B1的响应
------------ value 用来精准唤醒线程
第二种是: 唯一标识 :B1_ResultData
----------- 用来获取B1返回的结果

4:不止kafka有消费者分组的概念,mqtt 和rocket都有分组的概念。

有不足的地方,请大佬们指正。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐