Answer a question

I'm trying to configure a Debezium connector for multiple tables in a MySQL database (i'm using debezium 1.4 on a MySQL 8.0). My company have a nomenclature pattern to follow when creating topics in kafka, and this pattern does not allow the use of underscores (_), so I had to replace them with hyphens (-)

So, my topics names are:

Topic 1

fjf.db.top-domain.domain.sub-domain.transaction-search.order-status
WHERE
- transaction-search = schema "transaction_search"
- order-status = table "order_status". 
- All changes in that table, must go to that topic.

Topic 2

fjf.db.top-domain.domain.sub-domain.transaction-search.shipping-tracking
WHERE
- transaction-search = schema "transaction_search"
- shipping-tracking = table "shipping_tracking"
- All changes in that table, must go to that topic.

Topic 3

fjf.db.top-domain.domain.sub-domain.transaction-search.proposal
WHERE
- transaction-search = schema "transaction_search"
- proposal = table "proposal"
- All changes in that table, must go to that topic.

I'm trying to use the transforms "ByLogicalTableRouter", but i can't find a regex solution that solve my case.

{ "name": "debezium.connector",
 "config":
    { 
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "myhostname",
"database.port": "3306",
"database.user": "debezium", 
"database.password": "password", 
"database.server.id": "1000", 
"database.server.name": "fjf.db.top-domain.domain.sub-domain.transaction-search",
"schema.include.list": "transaction_search",
"table.include.list": "transaction_search.order_status,transaction_search.shipping_tracking,transaction_search.proposal",
"database.history.kafka.bootstrap.servers": "kafka.intranet:9097",
"database.history.kafka.topic": "fjf.db.top-domain.domain.sub-domain.transaction-search.schema-history",
"snapshot.mode": "schema_only",
"transforms":"RerouteName,RerouteUnderscore",
"transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteName.topic.regex":"(.*)transaction_search(.*)",
"transforms.RerouteName.topic.replacement": "$1$2" 
"transforms.RerouteUnderscore.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteUnderscore.topic.regex":"(.*)_(.*)",
"transforms.RerouteUnderscore.topic.replacement": "$1-$2" 
    }
}
  • In the first transforms,im trying to remove the duplicated schema name in the topic routering.
  • In the second transforms, to replace all remains underscores _ for hiphens -

But with that, I'm getting the error below, which indicates that it is trying to send everything to the same topic

Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __dbz__physicalTableIdentifier

How can i make a transform that will forward the events of each table to their respective topic?

Answers

  1. Removing the schema name

In the first transforms,im trying to remove the duplicated schema name in the topic routering.

After transforamtion with your regex you'll have two dots, so you need to fix it:

"transforms.RerouteName.topic.regex":"([^.]+)\\.transaction_search\\.([^.]+)",
"transforms.RerouteName.topic.replacement": "$1.$2" 
  1. Replace underscores for hiphens

You can try to use ChangeCase SMT from Kafka Connect Common Transformations.

Logo

华为、百度、京东云现已入驻,来创建你的专属开发者社区吧!

更多推荐