I'm subscribing to Kafka using a pattern with a wildcard, as shown below. The wildcard represents a dynamic customer id.
consumer.subscribe(pattern='customer.*.validations')
This works well, because I can pluck the customer Id from the topic string. But now I need to expand on the functionality to listen to a similar topic for a slightly different purpose. Let's call it customer.*.additional-validations
. The code needs to live in the same project because so much functionality is shared, but I need to be able to take a different path based on the type of queue.
In the Kafka documentation I can see that it is possible to subscribe to an array of topics. However these are hard-coded strings. Not patterns that allow for flexibility.
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
So I'm wondering if it is possible to somehow do a combination of the two? Kind of like this (non-working):
consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])
所有评论(0)