Answer a question

I'm trying to set an Spring Boot Application listening to Kafka.

I'm using Kafka Streams Binder.

With one simple @EnableBinding

@EnableBinding(StreamExample.StreamProcessor.class)
public class StreamExample {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}

and in application.yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:29092
      bindings:
         input_1:
           destination: mytopic1
           group: readgroup
         output_1:
           destination: mytopic2
         input_2:
           destination: mytopic3
           group: readgroup
         output_2:
           destination: mytopic4
  application:
    name: stream_s1000_app

Everything works fine.

But if I try to add a second class with other binding, the following error occurs:

The following subscribed topics are not assigned to any members: [mytopic1]

Example of the second binding:

@EnableBinding(StreamExampleBindingTwo.StreamProcessor.class)
public class StreamExampleBindingTwo {

    @StreamListener(StreamProcessor.INPUT)
    @SendTo(StreamProcessor.OUTPUT)
    public KStream<String, String> process(KStream<String, String> input) {

        logger.info("Stream listening binding two");

        return input
                .peek(((key, value) -> logger.info("key = {} value = {}", key, value)));
    }

    interface StreamProcessor {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }
}

What I'm missing? Can't I use multiple input topics and multiple output in the same application? There is something related to application.name?

Answers

I just tried an app and that worked. When you have multiple processors in the same application, you need to make sure that each processor gets its own application id. See below how I have 2 distinct application id's for both inputs in the application.yml.

I saw both processors are getting logged on the console. Also, saw the messages on the output topics.

@SpringBootApplication
@EnableBinding({So54522918Application.StreamProcessor1.class, So54522918Application.StreamProcessor2.class})
public class So54522918Application {

    public static void main(String[] args) {
        SpringApplication.run(So54522918Application.class, args);
    }

    @StreamListener(StreamProcessor1.INPUT)
    @SendTo(StreamProcessor1.OUTPUT)
    public KStream<String, String> process1(KStream<String, String> input) {

        System.out.println("Stream listening");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    @StreamListener(StreamProcessor2.INPUT)
    @SendTo(StreamProcessor2.OUTPUT)
    public KStream<String, String> process2(KStream<String, String> input) {

        System.out.println("Stream listening binding two");

        return input
                .peek(((key, value) -> System.out.println("key = " + key +", value = " + value)));
    }

    interface StreamProcessor1 {

        String INPUT = "input_1";
        String OUTPUT = "output_1";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

    interface StreamProcessor2 {

        String INPUT = "input_2";
        String OUTPUT = "output_2";

        @Input(INPUT)
        KStream<String, String> input();

        @Output(OUTPUT)
        KStream<String, String> outputProcessed();
    }

}

Relevant part of application.yml

spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams:
  binder.configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  bindings.input_1.consumer.application-id: process-1
  bindings.input_2.consumer.application-id: process-2
spring.cloud.stream.bindings.input_1:
  destination: mytopic1
spring.cloud.stream.bindings.output_1:
  destination: mytopic2
spring.cloud.stream.bindings.input_2:
  destination: mytopic3
spring.cloud.stream.bindings.output_2:
  destination: mytopic4
Logo

华为、百度、京东云现已入驻,来创建你的专属开发者社区吧!

更多推荐