5.3 Spring整合Kafka
配置对kafka做相应的配置application.properties# KafkaPropertiesspring.kafka.bootstrap-servers=localhost:9092spring.kafka.consumer.group-id=community-consumer-groupspring.kafka.consumer.enable-auto-commit=truesp
·
配置
对kafka做相应的配置application.properties
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
配置server、消费者的组id(注意:修改之后需要重启服务)、配置是否自动提交(偏移量)、配置自动提交的频率
演示代码
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() throws InterruptedException {
kafkaProducer.sendMessage("test","你好");
kafkaProducer.sendMessage("test","在吗");
Thread.sleep(1000*10);
}
}
@Component
class KafkaProducer{
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topics, String content){
kafkaTemplate.send(topics, content);
}
}
@Component
class KafkaConsumer{
@KafkaListener(topics={"test"}) //关注需要监听的主题,处理监听到消息的方法
public void handleMessage(ConsumerRecord record){
System.out.println(record.value());
}
}
生产者发送消息是主动的,但消费者消费消息是被动的。(所以可能会有一点点的延迟)
更多推荐
已为社区贡献2条内容
所有评论(0)