Configure a debezium connector for multiple tables in a database
Answer a question I'm trying to configure a Debezium connector for multiple tables in a MySQL database (i'm using debezium 1.4 on a MySQL 8.0). My company have a nomenclature pattern to follow when cr
Answer a question
I'm trying to configure a Debezium connector for multiple tables in a MySQL database (i'm using debezium 1.4 on a MySQL 8.0). My company have a nomenclature pattern to follow when creating topics in kafka, and this pattern does not allow the use of underscores (_), so I had to replace them with hyphens (-)
So, my topics names are:
Topic 1
fjf.db.top-domain.domain.sub-domain.transaction-search.order-status
WHERE
- transaction-search = schema "transaction_search"
- order-status = table "order_status".
- All changes in that table, must go to that topic.
Topic 2
fjf.db.top-domain.domain.sub-domain.transaction-search.shipping-tracking
WHERE
- transaction-search = schema "transaction_search"
- shipping-tracking = table "shipping_tracking"
- All changes in that table, must go to that topic.
Topic 3
fjf.db.top-domain.domain.sub-domain.transaction-search.proposal
WHERE
- transaction-search = schema "transaction_search"
- proposal = table "proposal"
- All changes in that table, must go to that topic.
I'm trying to use the transforms "ByLogicalTableRouter", but i can't find a regex solution that solve my case.
{ "name": "debezium.connector",
"config":
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "myhostname",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1000",
"database.server.name": "fjf.db.top-domain.domain.sub-domain.transaction-search",
"schema.include.list": "transaction_search",
"table.include.list": "transaction_search.order_status,transaction_search.shipping_tracking,transaction_search.proposal",
"database.history.kafka.bootstrap.servers": "kafka.intranet:9097",
"database.history.kafka.topic": "fjf.db.top-domain.domain.sub-domain.transaction-search.schema-history",
"snapshot.mode": "schema_only",
"transforms":"RerouteName,RerouteUnderscore",
"transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteName.topic.regex":"(.*)transaction_search(.*)",
"transforms.RerouteName.topic.replacement": "$1$2"
"transforms.RerouteUnderscore.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteUnderscore.topic.regex":"(.*)_(.*)",
"transforms.RerouteUnderscore.topic.replacement": "$1-$2"
}
}
- In the first transforms,im trying to remove the duplicated schema name in the topic routering.
- In the second transforms, to replace all remains underscores _ for hiphens -
But with that, I'm getting the error below, which indicates that it is trying to send everything to the same topic
Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __dbz__physicalTableIdentifier
How can i make a transform that will forward the events of each table to their respective topic?
Answers
- Removing the schema name
In the first transforms,im trying to remove the duplicated schema name in the topic routering.
After transforamtion with your regex you'll have two dots, so you need to fix it:
"transforms.RerouteName.topic.regex":"([^.]+)\\.transaction_search\\.([^.]+)",
"transforms.RerouteName.topic.replacement": "$1.$2"
- Replace underscores for hiphens
You can try to use ChangeCase SMT from Kafka Connect Common Transformations.
更多推荐
所有评论(0)