基于logback自定义一个kafka appender,以实现日志异步自动格式化为JSON字符串并推送到kafka

加载xml中的kafka的配置信息和项目的一些其它参数
继承UnsynchronizedAppenderBase

public abstract class KafkaAppenderConfig<E> extends UnsynchronizedAppenderBase<E> {

    String topic="errorMsgTopic";
    boolean neverBlock = false;
    int queueSize = 256;
    int maxTimeout = 5000;
    Formatter formatter=new MessageFormatter();

    protected Map<String,Object> producerConfig = new HashMap<String, Object>();

    // 在XML配置中设置属性值
    public void setTopic(String topic) {
        this.topic = topic;
    }
    public void setNeverBlock(boolean neverBlock) {
        this.neverBlock = neverBlock;
    }
    public void setQueueSize(int queueSize) {
        this.queueSize = queueSize;
    }
    public void setMaxTimeout(int maxTimeout) {
        this.maxTimeout = maxTimeout;
    }

    // 在XML配置中设置多个属性
    public void addProducerConfig(String keyValue) {
        String[] split = keyValue.split("=", 2);
        addInfo(keyValue);
        if(split.length == 2) {
            addProducerConfigValue(split[0], split[1]);
        }
    }

    public void addProducerConfigValue(String key, Object value) {
        this.producerConfig.put(key, value);
    }

    public void setFormatter(Formatter formatter) {
        this.formatter = formatter;
    }
}

懒加载的方式实现kafka producer

public abstract class LazyKafkaProducer extends KafkaAppenderConfig<ILoggingEvent> {

    private volatile Producer<String, String> producer;

    /**
     * Lazy initializer for producer, patterned after commons-lang.
     * @see <a href="https://commons.apache.org/proper/commons-lang/javadocs/api-3.4/org/apache/commons/lang3/concurrent/LazyInitializer.html">LazyInitializer</a>
     */
    public void initProducer() {
        if (this.producer == null) {
            synchronized(this) {
                if(this.producer == null) {
                    this.producer = new KafkaProducer<>(new HashMap<>(producerConfig));
                }
            }
        }
    }

    public void send(String topic, String key, String msg){
        try {
            producer.send(new ProducerRecord<>(topic, key , msg));
        } catch (AuthorizationException e) {
            // We can't recover from these exceptions, so our only option is to close the producer and exit.
            addError("kafka authorization exception", e);
            producer.close();
        } catch (KafkaException e) {
            // For all other exceptions, just abort the transaction and try again.
            addError("kafka exception", e);
            producer.close();
        } catch (Exception e) {
            addError("kafka execute exception", e);
            producer.close();
        }
    }

    public void close() {
        if (producer!=null) {
            producer.close();
        }
    }

}

自定义的kafka appender
此处选择在日志走到重写的append方法后发送到一个内存队列中,而不是直接推送到Kafka是因为在Kafka Broker不可用的时候会导致阻塞。然后启动一个Worker线程异步消费这个内存队列中的日志,Worker线程每拿到一条日志首先格式化为JSON字符串再推送到Kafka。并且在程序关闭之前会等待一定时间让Worker线程去处理剩余任务。

public class KafkaAppender extends LazyKafkaProducer {

    private ArrayBlockingQueue<ILoggingEvent> blockingQueue;;
    private volatile boolean started = false;
    Log2KafkaWorker worker = new Log2KafkaWorker();

    @Override
    public void setContext(Context context) {
        // 设置控制台输出
        OnConsoleStatusListener onConsoleStatusListener = new OnConsoleStatusListener();
        onConsoleStatusListener.start();
        context.getStatusManager().add(onConsoleStatusListener);
        super.setContext(context);
    }

    public KafkaAppender() {
        // 默认的参数
        addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    }

    @Override
    public void start() {
        addInfo("启动自定义Kafka Appender");

        blockingQueue = new ArrayBlockingQueue<>(queueSize);

        super.start();

        worker.start();

        // 不加这一行 stop不会调用
        Runtime.getRuntime().addShutdownHook(new Thread(this::stop));

    }

    @Override
    public void stop() {
        addInfo("停止自定义Kafka Appender");

        worker.interrupt();
        addInfo("通知 log2KafkaWorker 中断");

        try {
            // 在设定的超时时间内 等待worker处理完
            worker.join(maxTimeout);

            // check to see if the thread ended and if not add a warning message
            if (worker.isAlive()) {
                addWarn("Max queue flush timeout (" + maxTimeout + " ms) exceeded. Approximately " + blockingQueue.size()
                        + " queued events were possibly discarded.");
            } else {
                addInfo("Queue flush finished successfully within timeout.");
            }
        } catch (InterruptedException e) {
            int remaining = blockingQueue.size();
            addError("Failed to join worker thread. " + remaining + " queued events may be discarded.", e);
        } finally {
        }

        KafkaAppender.super.close();

        super.stop();

    }

    @Override
    protected void append(ILoggingEvent eventObject) {
        // 发送到kafka之前确保初始化producer
        super.initProducer();
        put(eventObject);

    }

    private void put(ILoggingEvent eventObject) {
        if (neverBlock) {
            blockingQueue.offer(eventObject);
        } else {
            putUninterruptibly(eventObject);
        }
    }

    private void putUninterruptibly(ILoggingEvent eventObject) {
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    blockingQueue.put(eventObject);
                    break;
                } catch (InterruptedException e) {
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    class Log2KafkaWorker extends Thread {

        @Override
        public synchronized void start() {
            KafkaAppender.this.started = true;
            super.start();
        }

        @Override
        public void interrupt() {
            KafkaAppender.this.started = false;
            super.interrupt();
        }

        @Override
        public void run() {
            while (started) {
                try {
                    ILoggingEvent e = blockingQueue.take();
                    send(e);
                } catch (InterruptedException e) {
                    // 收到中断信号之后,终止循环,
                    // 当前阻塞任务被丢弃,剩余任务交给下面遍历处理
                    break;
                }
            }

            for (ILoggingEvent e : blockingQueue) {
                send(e);
                blockingQueue.remove(e);
            }
        }

        private void send(ILoggingEvent event) {
            addInfo(">>>>>>发送到kafka:"+event.getFormattedMessage());
            String message = KafkaAppender.super.formatter.format(event);
            KafkaAppender.super.send(topic, event.getThreadName(), message);
            addInfo("<<<<<<发送到kafka");
        }
    }
}

格式化为JSON字符串

public class JsonFormatter implements Formatter {
    private static final String QUOTE = "\"";
    private static final String COLON = ":";
    private static final String COMMA = ",";

    private int maxDepth;
    private boolean expectJson = false;
    private final ObjectMapper mapper = new ObjectMapper();
    private final ThrowableProxyJsonSerializer throwableProxyJsonSerializer = new ThrowableProxyJsonSerializer();
    private final SimpleModule module = new SimpleModule();

    {
        module.addSerializer(IThrowableProxy.class, throwableProxyJsonSerializer);
        mapper.registerModule(module);
    }

    public String format(ILoggingEvent event) {
        StringBuilder sb = new StringBuilder();
        sb.append("{");
        fieldName("project", sb);
        quote(event.getLoggerContextVO().getName(), sb);
        sb.append(COMMA);
        fieldName("level", sb);
        quote(event.getLevel().levelStr, sb);
        sb.append(COMMA);
        fieldName("logger", sb);
        quote(event.getLoggerName(), sb);
        sb.append(COMMA);
        fieldName("thread", sb);
        quote(event.getThreadName(), sb);
        sb.append(COMMA);
        fieldName("timestamp", sb);
        sb.append(event.getTimeStamp());
        sb.append(COMMA);
        fieldName("message", sb);
        if (this.expectJson) {
            sb.append(event.getFormattedMessage());
        } else {
            quote(event.getFormattedMessage(), sb);
        }
        sb.append(COMMA);
        fieldName("detail", sb);
        try {
            sb.append(mapper.writeValueAsString(event.getThrowableProxy()));
        } catch (JsonProcessingException e) {
            sb.append(mapper.nullNode());
        }
        sb.append("}");
        return sb.toString();
    }

    private static void fieldName(String name, StringBuilder sb) {
        quote(name, sb);
        sb.append(COLON);
    }

    private static void quote(String value, StringBuilder sb) {
        sb.append(QUOTE);
        sb.append(value);
        sb.append(QUOTE);
    }

    public boolean isExpectJson() {
        return expectJson;
    }

    public void setExpectJson(boolean expectJson) {
        this.expectJson = expectJson;
    }

    public void setMaxDepth(int maxDepth) {
        this.maxDepth = maxDepth;
        throwableProxyJsonSerializer.setMaxDepth(maxDepth);
    }
}

格式化为JSON的时候异常堆栈信息根据配置的遍历深度来进行输出

public class ThrowableProxyJsonSerializer extends JsonSerializer<IThrowableProxy> {

    private int maxDepth = 10;

    public void setMaxDepth(int maxDepth) {
        this.maxDepth = maxDepth;
    }

    @Override
    public void serialize(IThrowableProxy throwableProxy, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
        jsonGenerator.writeStartObject();
        jsonGenerator.writeStringField("class", throwableProxy.getClassName());
        jsonGenerator.writeStringField("message", throwableProxy.getMessage());
        StackTraceElementProxy[] stackTrace = throwableProxy.getStackTraceElementProxyArray();
        if (stackTrace != null) {
            jsonGenerator.writeArrayFieldStart("stackTrace");
            int depth = 0;
            for (StackTraceElementProxy element : stackTrace) {
                if (depth++ == maxDepth) {
                    break;
                }
                jsonGenerator.writeStartObject();
                jsonGenerator.writeStringField("class", element.getStackTraceElement().getClassName());
                jsonGenerator.writeStringField("method", element.getStackTraceElement().getMethodName());
                jsonGenerator.writeStringField("file", element.getStackTraceElement().getFileName());
                jsonGenerator.writeNumberField("line", element.getStackTraceElement().getLineNumber());
                jsonGenerator.writeEndObject();
            }
            jsonGenerator.writeEndArray();
        }

        IThrowableProxy cause = throwableProxy.getCause();
        if (cause != null) {
            jsonGenerator.writeFieldName("cause");
            serializerProvider.defaultSerializeValue(cause, jsonGenerator);
        }

        jsonGenerator.writeEndObject();
    }
}

源代码
github:https://github.com/shuaiyu-wang/logback-kafka-appender
gitee:https://gitee.com/shuaiyu-wang/logback-kafka-appender

参考项目

logback-kafka-appender
logback-kafka
logback

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐