Redis 消息队列和发布/订阅
上篇文章介绍了Springboot集成redis的用法,这篇文章简单介绍下,Redis作为消息队列和发布订阅的简单的应用;如果系统中需要简单的订阅发布功能而系统中没有mq的话,可以考虑使用Redis;1.订阅/发布在redis-cli中可以使用publish来发布消息,使用subscribe来订阅消息;我们可以进行试验一下,在上篇文章中是使用docker启动了redis服务器的容器,可以启...
上篇文章介绍了Springboot集成redis的用法,这篇文章简单介绍下,Redis作为消息队列和发布订阅的简单的应用;如果系统中需要简单的订阅发布功能而系统中没有mq的话,可以考虑使用Redis;
1.订阅/发布
在redis-cli中可以使用publish来发布消息,使用subscribe来订阅消息;我们可以进行试验一下,在上篇文章中是使用docker启动了redis服务器的容器,可以启动两个客户端,一个用来发布消息,一个用来订阅消息,观察下;
docker exec -it redis bash //进入redis容器中
redis-cli //启动一个redis客户端
publish chat aaa //发布一个chat主题的消息,内容为aaa
subscribe chat //订阅一个chat主题的消息
监听的客户端可以收到下面的消息
1) "message"
2) "chat"
3) "aaaaaaa"
下面说说怎么在Springboot中集成;
1>定义一个RedisMessageListener类实现MessageListener接口,做消息订阅的处理
@Component
public class RedisMessageListener implements MessageListener {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody();// 请使用valueSerializer
byte[] channel = message.getChannel();
// 请参考配置文件,本例中key,value的序列化方式均为string。
// 其中key必须为stringSerializer。和redisTemplate.convertAndSend对应
String msgContent = (String) redisTemplate.getValueSerializer().deserialize(body);
String topic = (String) redisTemplate.getStringSerializer().deserialize(channel);
System.out.println("topic:" + topic + "msgContent:" + msgContent);
}
}
2>注册消息监听
在RedisConfig中添加如下代码,这里要设定监听的主题
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(RedisMessageListener receiver) {
return new MessageListenerAdapter(receiver);
}
这是客户端订阅消息之后的处理;
2>发布消息
通过redisTemplate.convertAndSend(channel, message)方法来发布消息;定义一个RedisService,代码如下
@Service
public class RedisService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* redis发布消息
*
* @param channel
* @param message
*/
public void sendMessage(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
}
}
之后可以通过简单的方法调用测试下
@RequestMapping("/public-message")
public String publicMessage() {
redisService.sendMessage("chat","233444");
return "success";
}
调用下就可以看到消息订阅方法打印出来的信息;具体的实现也可以参考https://spring.io/guides/gs/messaging-redis/
2.消息队列
消息队列主要使用List的数据结构;思路是往队列里面发送消息,然后启动另外一个线程不停的去处理队列里面的消息;做个简单的演示;首先在controller中代码,每调用一次往队列里面插入一条消息,代码如下
@RequestMapping("/pushQueue")
public String pushToQueue() {
listOperator.leftPush("KEY", System.currentTimeMillis());
return "finish";
}
然后定义一个service里面进行消息的处理,代码如下所示
@Service
public class RedisQueueConsumer {
@Autowired
private ListOperations listOperator;
@PostConstruct
public void init() {
Thread thread = new Thread(() -> {
while (true) {
Object key = listOperator.leftPop("KEY");
if (key != null) {
System.out.println(key.toString());
}
}
});
thread.start();
}
}
之后访问http://localhost:8888/pushQueue,模拟外部的请求,之后消息队列打印出来请求的时间戳;具体的应用可能会比较复杂一些;
参考资料:
更多推荐
所有评论(0)