配置

对kafka做相应的配置application.properties

# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000

配置server、消费者的组id(注意:修改之后需要重启服务)、配置是否自动提交(偏移量)、配置自动提交的频率

演示代码

@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
    @Autowired
    private KafkaProducer kafkaProducer;
    @Test
    public void testKafka() throws InterruptedException {
        kafkaProducer.sendMessage("test","你好");
        kafkaProducer.sendMessage("test","在吗");
        Thread.sleep(1000*10);
    }
}
@Component
class KafkaProducer{
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topics, String content){
        kafkaTemplate.send(topics, content);
    }
}
@Component
class KafkaConsumer{
    @KafkaListener(topics={"test"}) //关注需要监听的主题,处理监听到消息的方法
    public void handleMessage(ConsumerRecord record){
        System.out.println(record.value());
    }
}

生产者发送消息是主动的,但消费者消费消息是被动的。(所以可能会有一点点的延迟)

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐