Answer a question

I have a legacy kafka topic where different type of messages get sent, these messages are written with a custom header with a specific key to discriminate the record. On a given application I have multiple methods that I would like to annotate with custom annotation like @CustomKafkaListener(discriminator="xxx") which would be annotated with @KafkaListener.

How can I filter the messages so that if I have 2 messages sent to the central topic the method annotated with discriminator "xxx" would only read those messages whereas the method annotated with discriminator "yyy" would only read the "yyy" ones.

For example

    @CustomKafkaListener(discriminator="com.mypackage.subpackage", topic="central-topic")
    public void consumerMessagesXXX(ConsumerRecord r){
    // reads only XXXX messages skip all others
    }
    
    
    @CustomKafkaListener(discriminator="com.mypackage", topic="central-topic")
    public void consumerMessagesYYY(ConsumerRecord r){
    // reads only YYY messages skip all others
    }

I would like for the filter to be able to read the discriminator property of the target listener and decide dynamically if a message should be processed by that listener either by reflection or by some metadata provided to the filter for example

  public boolean filter(ConsumerRecord consumerRecord, Consumer<Long, Event> consumer) {
  var discriminatorPattern = consumer.getMetadataXXX();//retrieve discriminator information either by reflection or metadata
return    
   discriminatorPattern .matches(consumerRecord().lastHeader("discriminator").value());
}

Answers

Creating custom annotations is a pretty advanced topic; you would need to subclass the annotation bean post processor and come up with some mechanism to customize the endpoint by adding the filter strategy bean.

Feel free to open a new feature request on GitHub https://github.com/spring-projects/spring-kafka/issues

We could add a new property to pass the bean name of a RecordFilterStrategy bean from the @KafkaListener.

EDIT

I see you opened an issue; thanks.

Here is a work around to add the filters later...

@SpringBootApplication
public class So71237300Application {

    public static void main(String[] args) {
        SpringApplication.run(So71237300Application.class, args);
    }

    @KafkaListener(id = "xxx", topics = "so71237300", autoStartup = "false")
    void listen1(String in) {
        System.out.println("1:" + in);
    }

    @KafkaListener(id = "yyy", topics = "so71237300", autoStartup = "false")
    void listen2(String in) {
        System.out.println("2:" + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71237300").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> xxx() {
        return rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), "xxx".getBytes());
        };
    }

    @Bean
    RecordFilterStrategy<String, String> yyy() {
        return rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), "yyy".getBytes());
        };
    }

    @Bean
    ApplicationRunner runner(RecordFilterStrategy<String, String> xxx, RecordFilterStrategy<String, String> yyy,
            KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {

        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("so71237300", "test.to.xxx");
            record.headers().add("which", "xxx".getBytes());
            template.send(record);
            record = new ProducerRecord<>("so71237300", "test.to.yyy");
            record.headers().add("which", "yyy".getBytes());
            template.send(record);

            updateListener("xxx", xxx, registry);
            updateListener("yyy", yyy, registry);
            registry.start();
        };
    }

    private void updateListener(String id, RecordFilterStrategy<String, String> filter,
            KafkaListenerEndpointRegistry registry) {

        MessageListener listener = (MessageListener) registry.getListenerContainer(id).getContainerProperties()
                .getMessageListener();
        registry.getListenerContainer(id).getContainerProperties()
                .setMessageListener(new FilteringMessageListenerAdapter<>(listener, filter));
    }

}
1:test.to.xxx
2:test.to.yyy

EDIT2

This version uses a single filter and uses the consumer's group.id as the discriminator:

@SpringBootApplication
public class So71237300Application {

    public static void main(String[] args) {
        SpringApplication.run(So71237300Application.class, args);
    }

    @KafkaListener(id = "xxx", topics = "so71237300")
    void listen1(String in) {
        System.out.println("1:" + in);
    }

    @KafkaListener(id = "yyy", topics = "so71237300")
    void listen2(String in) {
        System.out.println("2:" + in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71237300").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> discriminator(
            ConcurrentKafkaListenerContainerFactory<String, String> factory) {

        RecordFilterStrategy<String, String> filter = rec -> {
            Header which = rec.headers().lastHeader("which");
            return which == null || !Arrays.equals(which.value(), KafkaUtils.getConsumerGroupId().getBytes());
        };
        factory.setRecordFilterStrategy(filter);
        return filter;
    }

    @Bean
    ApplicationRunner runner(RecordFilterStrategy<String, String> discriminator,
            KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {

        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("so71237300", "test.to.xxx");
            record.headers().add("which", "xxx".getBytes());
            template.send(record);
            record = new ProducerRecord<>("so71237300", "test.to.yyy");
            record.headers().add("which", "yyy".getBytes());
            template.send(record);
        };
    }

}
1:test.to.xxx
2:test.to.yyy
Logo

Python社区为您提供最前沿的新闻资讯和知识内容

更多推荐