实现双写一致性,通常会选择先更新数据库,然后再删除缓存的策略,并结合 RabbitMQ 的消息队列机制,主要有以下几个原因:

  1. 保证数据一致性。在应用程序中,同一个数据可能存在于多个缓存服务节点中,这样会对数据的一致性造成很大影响。为了避免出现数据不一致的情况,在进行修改和删除操作时,先进行数据库操作,确保数据正确地被持久化到了数据库中,然后再去异步地更新相应的缓存。这可以保证无论是什么故障发生,最终数据都会以正确的方式被持久化。

  2. 提高性能和可扩展性。在并发和大规模的场景下,缓存被频繁地读写访问,而数据库则更适合做稳定、高效的读访问。另外,在大型的分布式系统中,因为涉及到多个数据节点,直接重建缓存是很难执行的,因此需要使用异步机制,将某个节点上的更新事件通知给其他节点去更新缓存,从而提高了整体性能和可伸缩性。

  3. 实现高可靠性系统。在高可靠性应用场景中(例如电商网站等),通过双写一致性策略,即使在缓存更新出现异常的情况下,通过数据库数据仍然可以保证可靠性,而采用消息队列可让系统抗事件(比如高并发等)能力提高得更好,因此消息队列非常适合这类场景。

  4. RabbitMQ 是流行的消息队列中间件。异步通信需要队列和消息传递,RabbitMQ 是一款功能强大、稳定可靠,并且已经被广泛使用的开源消息队列中间件,它支持多种协议(例如,AMQP、STOMP 等),以及各种基于不同语言编写的客户端,满足了多样化的开发需求。

导入依赖

<!-- MyBatis-Spring 整合包 -->
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

<!-- Redis 相关依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.5.5</version>
</dependency>

<!-- RabbitMQ 相关依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.5.5</version>
</dependency>

配置yml

spring:
  rabbitmq:
    host: localhost
    port: 5123
    username: qql
    password: qql

配置交换机与消息队列及绑定关系

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
 
    @Bean
    public Queue myQueue() {
        return new Queue("qqlQueue");
    }
 
    @Bean
    public DirectExchange myExchange() {
        return new DirectExchange("qqlExchange");
    }
 
    @Bean
    public Binding binding(Queue myQueue, DirectExchange myExchange) {
        return BindingBuilder.bind(myQueue).to(myExchange).with("qqlRoutingKey");
    }
}

创建消息生产者

@Component
public class MQSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;
    
    public void send(String message) {
        rabbitTemplate.convertAndSend("qqlExchange", "qqlRoutingKey", message);
    }

}

创建消息消费者

@Component
@RabbitListener(queues = "qqlQueue")
public class MQReceiver {

    @Autowired
    private UserService userService;

    @RabbitHandler
    public void onMessage(String message) {
        Long id = Long.parseLong(message);
        userService.deleteUserCache(id);
    }
    
}

Mapper类

@Mapper
public interface UserMapper {

    User selectById(Long id);

    User selectByUsername(String username);

    int insert(User user);

    int update(User user);

    int delete(Long id);

}

修改和删除时实现双写一致性

@Service
public class UserService {

    private final UserMapper userMapper;
    private final RedisTemplate<String, Object> redisTemplate;
    private final MQSender mqSender;

    @Autowired
    public UserService(UserMapper userMapper,
                       RedisTemplate<String, Object> redisTemplate,
                       MQSender mqSender) {
        this.userMapper = userMapper;
        this.redisTemplate = redisTemplate;
        this.mqSender = mqSender;
    }

    // 更新用户,并删除缓存和通知其他业务系统更新对应的缓存
    public void updateUser(User user) {
        // 先更新到数据库
        userMapper.update(user);
        // 删除用户已有的缓存
        String key = "user:" + user.getId();
        if (redisTemplate.hasKey(key)) {
            redisTemplate.delete(key);
        }
        // 发送消息到 MQ,通知其他业务系统更新缓存
        sendMessage(user.getId());
       redisTemplate.opsForValue().set(key, user);
    }

    // 删除用户,并删除缓存和通知其他业务系统删除相应的缓存
    public void deleteUser(Long id) {
        userMapper.delete(id);
        String key = "user:" + id;
        if (redisTemplate.hasKey(key)) {
            redisTemplate.delete(key);
        }
        sendMessage(id);
    }

    // 通知其他业务系统更新缓存
    private void sendMessage(Long id) {
        mqSender.send(id.toString());
    }

    // 处理缓存删除操作的方法
    public void deleteUserCache(Long id) {
        String key = "user:" + id;
        if (redisTemplate.hasKey(key)) {
            redisTemplate.delete(key);
        }
    }

}

redisTemplate.hasKey(key)是RedisTemplate中的一种方法,作用就是判断在缓存中是否存在key这个主键。

mqSender.send(id.toString()) 方法的作用是在缓存被更新之后向消息队列中发送一条消息,以通知其他业务系统清除其本地缓存。该方法接收一个字符串类型的参数,该参数表示用户 ID。

mqSender.send(id.toString());也是把消息注入到对应的消息队列中去。

更多推荐