• jdk:1.8
  • springboot-version:2.6.3
  • redis:5.0.1(5版本以上才有Stream队列)

准备工作

1、pom

redis 依赖包(version 2.6.3)

1

2

3

4

5

6

7

8

<dependency>

    <groupId>org.projectlombok</groupId>

    <artifactId>lombok</artifactId>

</dependency>

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>

2、 yml

1

2

3

4

spring:

  redis:

    database: 0

    host: 127.0.0.1

3、 RedisStreamUtil工具类

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.connection.stream.MapRecord;

import org.springframework.data.redis.connection.stream.StreamInfo;

import org.springframework.data.redis.connection.stream.StreamOffset;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.stereotype.Component;

import java.util.List;

import java.util.Map;

@Component

public class RedisStreamUtil {

    @Autowired

    private RedisTemplate<String, Object> redisTemplate;

    /**

     * 创建消费组

     *

     * @param key   键名称

     * @param group 组名称

     * @return {@link String}

     */

    public String oup(String key, String group) {

        return redisTemplate.opsForStream().createGroup(key, group);

    }

    /**

     * 获取消费者信息

     *

     * @param key   键名称

     * @param group 组名称

     * @return {@link StreamInfo.XInfoConsumers}

     */

    public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {

        return redisTemplate.opsForStream().consumers(key, group);

    }

    /**

     * 查询组信息

     *

     * @param key 键名称

     * @return

     */

    public StreamInfo.XInfoGroups queryGroups(String key) {

        return redisTemplate.opsForStream().groups(key);

    }

    // 添加Map消息

    public String addMap(String key, Map<String, Object> value) {

        return redisTemplate.opsForStream().add(key, value).getValue();

    }

    // 读取消息

    public List<MapRecord<String, Object, Object>> read(String key) {

        return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));

    }

    // 确认消费

    public Long ack(String key, String group, String... recordIds) {

        return redisTemplate.opsForStream().acknowledge(key, group, recordIds);

    }

    // 删除消息。当一个节点的所有消息都被删除,那么该节点会自动销毁

    public Long del(String key, String... recordIds) {

        return redisTemplate.opsForStream().delete(key, recordIds);

    }

    // 判断是否存在key

    public boolean hasKey(String key) {

        Boolean aBoolean = redisTemplate.hasKey(key);

        return aBoolean != null && aBoolean;

    }

}

代码实现

生产者发送消息

生产者发送消息,在Service层创建addMessage方法,往队列中发送消息。

代码中addMap()方法第一个参数为key,第二个参数为value,该key要和后续配置的保持一致,暂时先记住这个key。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

@Service

@Slf4j

@RequiredArgsConstructor

public class RedisStreamMqServiceImpl implements RedisStreamMqService {

    private final RedisStreamUtil redisStreamUtil;

    /**

     * 发送一个消息

     *

     * @return {@code Object}

     */

    @Override

    public Object addMessage() {

        RedisUser redisUser = new RedisUser();

        redisUser.setAge(18);

        redisUser.setName("hcr");

        redisUser.setEmail("156ef561@gmail.com");

        Map<String, Object> message = new HashMap<>();

        message.put("user", redisUser);

        String recordId = redisStreamUtil.addMap("mystream", message);

        return recordId;

    }

}

controller接口方法

1

2

3

4

5

6

7

8

9

10

11

12

13

@RestController

@RequestMapping("/redis")

@Slf4j

@RequiredArgsConstructor

public class RedisController {

    private final RedisStreamMqService redisStreamMqService;

    @GetMapping("/addMessage")

    public Object addMessage() {

        return redisStreamMqService.addMessage();

    }

}

调用测试,查看redis中是否正常添加数据。

接口返回数据

1702622585248-0

查看redis中的数据

消费者监听消息进行消费

创建RedisConsumersListener监听器

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

import cn.hcr.utils.RedisStreamUtil;

import lombok.RequiredArgsConstructor;

import lombok.extern.slf4j.Slf4j;

import org.springframework.data.redis.connection.stream.MapRecord;

import org.springframework.data.redis.connection.stream.RecordId;

import org.springframework.data.redis.stream.StreamListener;

import org.springframework.stereotype.Component;

import java.util.Map;

@Component

@Slf4j

@RequiredArgsConstructor

public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {

    public final RedisStreamUtil redisStreamUtil;

    /**

     * 监听器

     *

     * @param message

     */

    @Override

    public void onMessage(MapRecord<String, String, String> message) {

        // stream的key值

        String streamKey = message.getStream();

        //消息ID

        RecordId recordId = message.getId();

        //消息内容

        Map<String, String> msg = message.getValue();

        log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg);

        //处理逻辑

        //逻辑处理完成后,ack消息,删除消息,group为消费组名称

        StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey);

        xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));

        redisStreamUtil.del(streamKey, recordId.getValue());

    }

}

创建RedisConfig配置类,配置监听

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

package cn.hcr.config;

import cn.hcr.listener.RedisConsumersListener;

import cn.hcr.utils.RedisStreamUtil;

import com.fasterxml.jackson.annotation.JsonAutoDetect;

import com.fasterxml.jackson.annotation.PropertyAccessor;

import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.extern.slf4j.Slf4j;

import lombok.var;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.connection.stream.Consumer;

import org.springframework.data.redis.connection.stream.MapRecord;

import org.springframework.data.redis.connection.stream.ReadOffset;

import org.springframework.data.redis.connection.stream.StreamOffset;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

import org.springframework.data.redis.serializer.StringRedisSerializer;

import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import org.springframework.data.redis.stream.Subscription;

import javax.annotation.Resource;

import java.time.Duration;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.LinkedBlockingDeque;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

@Configuration

@Slf4j

public class RedisConfig {

    @Resource

    private RedisStreamUtil redisStreamUtil;

    /**

     * redis序列化

     *

     * @param redisConnectionFactory

     * @return {@code RedisTemplate<String, Object>}

     */

    @Bean

    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {

        RedisTemplate<String, Object> template = new RedisTemplate<>();

        template.setConnectionFactory(redisConnectionFactory);

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();

        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

        jackson2JsonRedisSerializer.setObjectMapper(om);

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        template.setKeySerializer(stringRedisSerializer);

        template.setHashKeySerializer(stringRedisSerializer);

        template.setValueSerializer(jackson2JsonRedisSerializer);

        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        template.afterPropertiesSet();

        return template;

    }

    @Bean

    public Subscription subscription(RedisConnectionFactory factory) {

        AtomicInteger index = new AtomicInteger(1);

        int processors = Runtime.getRuntime().availableProcessors();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,

                new LinkedBlockingDeque<>(), r -> {

            Thread thread = new Thread(r);

            thread.setName("async-stream-consumer-" + index.getAndIncrement());

            thread.setDaemon(true);

            return thread;

        });

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =

                StreamMessageListenerContainer

                        .StreamMessageListenerContainerOptions

                        .builder()

                        // 一次最多获取多少条消息

                        .batchSize(5)

                        .executor(executor)

                        .pollTimeout(Duration.ofSeconds(1))

                        .errorHandler(throwable -> {

                            log.error("[MQ handler exception]", throwable);

                            throwable.printStackTrace();

                        })

                        .build();

         

        //该key和group可根据需求自定义配置

        String streamName = "mystream";

        String groupname = "mygroup";

        initStream(streamName, groupname);

        var listenerContainer = StreamMessageListenerContainer.create(factory, options);

        // 手动ask消息

        Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),

                StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil));

        // 自动ask消息

           /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),

                    StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/

        listenerContainer.start();

        return subscription;

    }

    private void initStream(String key, String group) {

        boolean hasKey = redisStreamUtil.hasKey(key);

        if (!hasKey) {

            Map<String, Object> map = new HashMap<>(1);

            map.put("field", "value");

            //创建主题

            String result = redisStreamUtil.addMap(key, map);

            //创建消费组

            redisStreamUtil.oup(key, group);

            //将初始化的值删除掉

            redisStreamUtil.del(key, result);

            log.info("stream:{}-group:{} initialize success", key, group);

        }

    }

}

redisTemplate:该bean用于配置redis序列化

subscription:配置监听

initStream:初始化消费组

监听测试

使用addMessage()方法投送一条消息后,查看控制台输出信息。

1

2

3

4

5

6

7

8

【streamKey】= mystream,

【recordId】= 1702623008044-0,

【msg】=

{user=[

    "cn.hcr.pojo.RedisUser",

    {"name":"hcr","age":18,"email":"156ef561@gmail.com"}

    ]

}

更多推荐