|
package cn.hcr.config;
import cn.hcr.listener.RedisConsumersListener;
import cn.hcr.utils.RedisStreamUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Configuration
@Slf4j
public class RedisConfig {
@Resource
private RedisStreamUtil redisStreamUtil;
/**
* redis序列化
*
* @param redisConnectionFactory
* @return {@code RedisTemplate<String, Object>}
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Bean
public Subscription subscription(RedisConnectionFactory factory) {
AtomicInteger index = new AtomicInteger(1);
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setName("async-stream-consumer-" + index.getAndIncrement());
thread.setDaemon(true);
return thread;
});
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
// 一次最多获取多少条消息
.batchSize(5)
.executor(executor)
.pollTimeout(Duration.ofSeconds(1))
.errorHandler(throwable -> {
log.error("[MQ handler exception]", throwable);
throwable.printStackTrace();
})
.build();
//该key和group可根据需求自定义配置
String streamName = "mystream";
String groupname = "mygroup";
initStream(streamName, groupname);
var listenerContainer = StreamMessageListenerContainer.create(factory, options);
// 手动ask消息
Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"),
StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil));
// 自动ask消息
/* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),
StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/
listenerContainer.start();
return subscription;
}
private void initStream(String key, String group) {
boolean hasKey = redisStreamUtil.hasKey(key);
if (!hasKey) {
Map<String, Object> map = new HashMap<>(1);
map.put("field", "value");
//创建主题
String result = redisStreamUtil.addMap(key, map);
//创建消费组
redisStreamUtil.oup(key, group);
//将初始化的值删除掉
redisStreamUtil.del(key, result);
log.info("stream:{}-group:{} initialize success", key, group);
}
}
}
|
所有评论(0)