Answer a question

Currently two types of Kafka Connect log are being collected.

  • connect-rest.log.2018-07-01-21, connect-rest.log.2018-07-01-22 ...
  • connectDistributed.out

The thing is that I don't know how to configure connectDistributed.out file in Kafka Connect. Following is the sample output of the file:

[2018-07-11 08:42:40,798] INFO WorkerSinkTask{id=elasticsearch-sink- 
connector-0} Committing offsets asynchronously using sequence number 
216: {test-1=OffsetAndMetadata{offset=476028, metadata=‘’}, 
test-0=OffsetAndMetadata{offset=478923, metadata=‘’}, 
test-2=OffsetAndMetadata{offset=477944, metadata=‘’}} 
(org.apache.kafka.connect.runtime.WorkerSinkTask:325)
[2018-07-11 08:43:40,798] INFO WorkerSinkTask{id=elasticsearch-sink-connector0} 
Committing offsets asynchronously using sequence number 217: 
{test-1=OffsetAndMetadata{offset=476404, metadata=‘’}, 
test-0=OffsetAndMetadata{offset=479241, metadata=‘’}, 
test-2=OffsetAndMetadata{offset=478316, metadata=‘’}} 
(org.apache.kafka.connect.runtime.WorkerSinkTask:325)

Not having configured any logging option, the file size is getting bigger as time goes by. Today, it reached 20GB and I had to manually empty the file. So my question is how do I configure this connectDistributed.out? I'm configuring log options for other components such as the kafka broker log.


Following is some of the Kafka Connect-related log configurations under confluent-4.1.0/etc/kafka that I'm using.

log4j.properties

log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

# Change the two lines below to adjust ZK client logging
log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
log4j.logger.org.apache.zookeeper=INFO

# Change the two lines below to adjust the general broker logging level (output to server.log and stdout)
log4j.logger.kafka=INFO
log4j.logger.org.apache.kafka=INFO

# Change to DEBUG or TRACE to enable request logging
log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false

# Uncomment the lines below and change log4j.logger.kafka.network.RequestChannel$ to TRACE for additional output
# related to the handling of requests
#log4j.logger.kafka.network.Processor=TRACE, requestAppender
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

# Access denials are logged at INFO level, change to DEBUG to also log allowed accesses
log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

connect-log4j.properties

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR
log4j.logger.org.reflections=ERROR


log4j.appender.kafkaConnectRestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaConnectRestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaConnectRestAppender.File=/home/ec2-user/logs/connect-rest.log
log4j.appender.kafkaConnectRestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaConnectRestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.org.apache.kafka.connect.runtime.rest=INFO, kafkaConnectRestAppender
log4j.additivity.org.apache.kafka.connect.runtime.rest=false

Answers

The connectDistributed.out file only exists if you use daemon mode, e.g.

connect-distributed -daemon connect-distributed.properties

Reason: From kafka-run-class script, CONSOLE_OUTPUT_FILE is set to connectDistributed.out

# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
    nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else... 

Option 1: Load a custom log4j property file

You can update the KAFKA_LOG4J_OPTS environment variable to point at any log4j property file that you want to before starting connect (see example below)

$ export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file://path/to/connect-log4j-new.properties"
$ connect-distributed connect-distributed.properties

Notice: Not using -daemon here

If you no longer have ConsoleAppender in the log4j properties, this will output next to nothing, and just hang, so would be good to nohup it.


The default log4j config is named connect-log4j.properties, and in Confluent Platform, this is at etc/kafka/ folder. This is what it looks like by default

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

In order to set a max log file size, you'll need change the root logger to go to a FileAppender rather than ConsoleAppender, but I prefer using a DailyRollingFileAppender.

Here is an example

log4j.rootLogger=INFO, stdout, FILE

log4j.appender.FILE=org.apache.log4j.DailyRollingFileAppender
log4j.appender.FILE.DatePattern='.'yyyy-MM-dd
log4j.appender.FILE.File=/var/log/kafka-connect/connect.log
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR
log4j.logger.org.reflections=ERROR
Logo

华为、百度、京东云现已入驻,来创建你的专属开发者社区吧!

更多推荐