Answer a question

I use kafka-connect-jdbc-4.0.0.jar and postgresql-9.4-1206-jdbc41.jar

configuration of connector of kafka connect

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "timestamp",
  "timestamp.column.name": "updated_at",
  "topic.prefix": "streaming.data.v2",
  "connection.password": "password",
  "connection.user": "user",
  "schema.pattern": "test",
  "query": "select * from view_source",
  "connection.url": "jdbc:postgresql://host:5432/test?currentSchema=test"
}

I have configured two connectors one source and another sink using the jdbc driver, against a postgresql database ("PostgreSQL 9.6.9") everything works correctly

I have doubts in how the connector collects the source data, looking at the log I see that between the execution of the queries there is a time difference of 21 seconds

11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > ? AND "updated_at" < ? ORDER BY "updated_at" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.util.JdbcUtils)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:17:07.000 end time = 2019-01-11 08:20:18.985 (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:19[2019-01-11 08:20:19,070] DEBUG Resetting querier TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)

11/1/2019 9:20:49[2019-01-11 08:20:49,499] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > ? AND "updated_at" < ? ORDER BY "updated_at" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG executing query select CURRENT_TIMESTAMP; to get current time from database (io.confluent.connect.jdbc.util.JdbcUtils)
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:20:39.000 end time = 2019-01-11 08:20:49.500 (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)

the first query collects data between 08: 17: 07.000 and 08: 20: 18.985, but the second gathers data between 08: 20: 39.000 and 08: 20: 49.500 .. between both there are 21 seconds of difference in which there may be records ...

11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:17:07.000 end time = 2019-01-11 08:20:18.985 
11/1/2019 9:20:49[2019-01-11 08:20:49,500] DEBUG Executing prepared statement with timestamp value = 2019-01-11 08:20:39.000 end time = 2019-01-11 08:20:49.500 

I assume that one of the data is the last record obtained and the other value the timestamp of the moment

I can not find an explanation about this Is the normal operation of the connector? Should you assume that you are not always going to collect all the information?

Answers

The JDBC connector is not guaranteed to retrieve every message. For that, you need log-based Change Data Capture. For Postgres that is provided by Debezium and Kafka Connect. You can read more about this here.

Disclaimer: I work for Confluent, and wrote the above blog

Edit: This is now a recording available of the above blog too from ApacheCon 2020: 🎥 https://rmoff.dev/no-more-silos

Logo

华为、百度、京东云现已入驻,来创建你的专属开发者社区吧!

更多推荐