写数据库同时发mq消息事务一致性的一种解决方案
写数据库同时发mq消息事务一致性的一种解决方案事件驱动(event driven)的系统设计,服务之间的交互大多数都是通过消息队列中间件,那么我们都会面临一个微服务之间数据一致性的问题。假设如下场景:服务A在一个事务中包含数据库更新操作,然后发送消息给MQ通知服务B一般做法就是将数据库的操作以及发送消息放到一个事务中。如果数据库操作或者发送消息失败,则回滚事务即可。如果事务提交成功,消息发出去了以
写数据库同时发mq消息事务一致性的一种解决方案
事件驱动(event driven)的系统设计,服务之间的交互大多数都是通过消息队列中间件,那么我们都会面临一个微服务之间数据一致性的问题。
假设如下场景:服务A在一个事务中包含数据库更新操作,然后发送消息给MQ通知服务B
一般做法就是将数据库的操作以及发送消息放到一个事务中。如果数据库操作或者发送消息失败,则回滚事务即可。如果事务提交成功,消息发出去了以后,服务B处理消息出现异常,则我们只需要在fix调问题以后retry message就能够保证数据最终一致性。(当然在发送消息之前我们会将这个消息存储下来,redis或者数据库都可以,之后再做housekeep删除)。
但是我们会面临这样一个问题,假如服务B需要拿到服务A数据库更新操作以后的数据状态,但是我们的数据库操作跟发送消息是在一个事务里面的,那就可能存在服务B已经在处理消息了,但是服务A这边的事务还未提交,那么服务B从数据库中拿到的数据状态还是原来的数据状态。
所以对于这个问题,个人觉得比较好的做法就是,发送消息不要放在数据库事务中,等到数据库提交事务以后,会有一个回调事件(ATER_COMMIT),然后在这个回调事件中发送消息即可。
- begin tx 开启本地事务
- do work 执行业务操作
- insert message 向同实例消息库插入消息
- end tx 事务提交
- send message 网络向 server 发送消息
- reponse server 回应消息
- delete message 如果 server 回复成功则删除消息
- scan messages 补偿任务扫描未发送消息
- send message 补偿任务补偿消息
- delete messages 补偿任务删除补偿成功的消息
跟人觉得以上是一种比较好的解决方案,发送消息前回将消息存储下来,如果服务消费方出现异常,只需要retry即可。
该方案在这篇文章有详细描述:去哪儿QMQ
这是一种设计了,那如何落实到代码实现呢?在这里我记录的是使用spring的事件机制以及提供的注解来实现,在事务提交以后回调事件发送消息。
- 定义一个evnet (假设就是要发给MQ的event)
public class MyTransactionEvent {
private String name;
public MyTransactionEvent2(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
- 在数据库事务中发布定义的 MyTransactionEvent
@Service
@Slf4j
@RequiredArgsConstructor
public class FooService {
private final ApplicationEventPublisher publisher;
private final FooRepository fooRepository;
@Transactional
public boolean saveFoo(FooEntity fooEntity) throws InterruptedException {
log.error("start insert foo");
fooRepository.save(fooEntity);
publisher.publishEvent(new MyTransactionEvent(fooEntity.getFooName()));
log.error("end insert foo");
Thread.currentThread().sleep(2000);
log.error("to commit insert");
return true;
}
}
- 订阅 MyTransactionEvent
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void afterCommit(MyTransactionEvent event) {
log.error("after commit then send event {}", event);
log.error("after commit then send event {}", event.getName());
}
可以在同一个class中加入这个订阅方法,也可以将这个订阅方法放在另一个class,要交给spring去管理。
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 告诉spring是在事务提交以后触发这个方法,并且事务中发布的事件是 MyTransactionEvent ,该方法才会触发。
完整代码查看:github
更多推荐
所有评论(0)