基于logback自定义一个kafka appender,以实现日志异步自动格式化为JSON字符串并推送到kafka
基于Logback自定义一个Kafka Appender,可以实现日志异步自动格式化为JSON字符串并推送到Kafka Broker
·
基于logback自定义一个kafka appender,以实现日志异步自动格式化为JSON字符串并推送到kafka
加载xml中的kafka的配置信息和项目的一些其它参数
继承UnsynchronizedAppenderBase
public abstract class KafkaAppenderConfig<E> extends UnsynchronizedAppenderBase<E> {
String topic="errorMsgTopic";
boolean neverBlock = false;
int queueSize = 256;
int maxTimeout = 5000;
Formatter formatter=new MessageFormatter();
protected Map<String,Object> producerConfig = new HashMap<String, Object>();
// 在XML配置中设置属性值
public void setTopic(String topic) {
this.topic = topic;
}
public void setNeverBlock(boolean neverBlock) {
this.neverBlock = neverBlock;
}
public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}
public void setMaxTimeout(int maxTimeout) {
this.maxTimeout = maxTimeout;
}
// 在XML配置中设置多个属性
public void addProducerConfig(String keyValue) {
String[] split = keyValue.split("=", 2);
addInfo(keyValue);
if(split.length == 2) {
addProducerConfigValue(split[0], split[1]);
}
}
public void addProducerConfigValue(String key, Object value) {
this.producerConfig.put(key, value);
}
public void setFormatter(Formatter formatter) {
this.formatter = formatter;
}
}
懒加载的方式实现kafka producer
public abstract class LazyKafkaProducer extends KafkaAppenderConfig<ILoggingEvent> {
private volatile Producer<String, String> producer;
/**
* Lazy initializer for producer, patterned after commons-lang.
* @see <a href="https://commons.apache.org/proper/commons-lang/javadocs/api-3.4/org/apache/commons/lang3/concurrent/LazyInitializer.html">LazyInitializer</a>
*/
public void initProducer() {
if (this.producer == null) {
synchronized(this) {
if(this.producer == null) {
this.producer = new KafkaProducer<>(new HashMap<>(producerConfig));
}
}
}
}
public void send(String topic, String key, String msg){
try {
producer.send(new ProducerRecord<>(topic, key , msg));
} catch (AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
addError("kafka authorization exception", e);
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
addError("kafka exception", e);
producer.close();
} catch (Exception e) {
addError("kafka execute exception", e);
producer.close();
}
}
public void close() {
if (producer!=null) {
producer.close();
}
}
}
自定义的kafka appender
此处选择在日志走到重写的append方法后发送到一个内存队列中,而不是直接推送到Kafka是因为在Kafka Broker不可用的时候会导致阻塞。然后启动一个Worker线程异步消费这个内存队列中的日志,Worker线程每拿到一条日志首先格式化为JSON字符串再推送到Kafka。并且在程序关闭之前会等待一定时间让Worker线程去处理剩余任务。
public class KafkaAppender extends LazyKafkaProducer {
private ArrayBlockingQueue<ILoggingEvent> blockingQueue;;
private volatile boolean started = false;
Log2KafkaWorker worker = new Log2KafkaWorker();
@Override
public void setContext(Context context) {
// 设置控制台输出
OnConsoleStatusListener onConsoleStatusListener = new OnConsoleStatusListener();
onConsoleStatusListener.start();
context.getStatusManager().add(onConsoleStatusListener);
super.setContext(context);
}
public KafkaAppender() {
// 默认的参数
addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
}
@Override
public void start() {
addInfo("启动自定义Kafka Appender");
blockingQueue = new ArrayBlockingQueue<>(queueSize);
super.start();
worker.start();
// 不加这一行 stop不会调用
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
}
@Override
public void stop() {
addInfo("停止自定义Kafka Appender");
worker.interrupt();
addInfo("通知 log2KafkaWorker 中断");
try {
// 在设定的超时时间内 等待worker处理完
worker.join(maxTimeout);
// check to see if the thread ended and if not add a warning message
if (worker.isAlive()) {
addWarn("Max queue flush timeout (" + maxTimeout + " ms) exceeded. Approximately " + blockingQueue.size()
+ " queued events were possibly discarded.");
} else {
addInfo("Queue flush finished successfully within timeout.");
}
} catch (InterruptedException e) {
int remaining = blockingQueue.size();
addError("Failed to join worker thread. " + remaining + " queued events may be discarded.", e);
} finally {
}
KafkaAppender.super.close();
super.stop();
}
@Override
protected void append(ILoggingEvent eventObject) {
// 发送到kafka之前确保初始化producer
super.initProducer();
put(eventObject);
}
private void put(ILoggingEvent eventObject) {
if (neverBlock) {
blockingQueue.offer(eventObject);
} else {
putUninterruptibly(eventObject);
}
}
private void putUninterruptibly(ILoggingEvent eventObject) {
boolean interrupted = false;
try {
while (true) {
try {
blockingQueue.put(eventObject);
break;
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
class Log2KafkaWorker extends Thread {
@Override
public synchronized void start() {
KafkaAppender.this.started = true;
super.start();
}
@Override
public void interrupt() {
KafkaAppender.this.started = false;
super.interrupt();
}
@Override
public void run() {
while (started) {
try {
ILoggingEvent e = blockingQueue.take();
send(e);
} catch (InterruptedException e) {
// 收到中断信号之后,终止循环,
// 当前阻塞任务被丢弃,剩余任务交给下面遍历处理
break;
}
}
for (ILoggingEvent e : blockingQueue) {
send(e);
blockingQueue.remove(e);
}
}
private void send(ILoggingEvent event) {
addInfo(">>>>>>发送到kafka:"+event.getFormattedMessage());
String message = KafkaAppender.super.formatter.format(event);
KafkaAppender.super.send(topic, event.getThreadName(), message);
addInfo("<<<<<<发送到kafka");
}
}
}
格式化为JSON字符串
public class JsonFormatter implements Formatter {
private static final String QUOTE = "\"";
private static final String COLON = ":";
private static final String COMMA = ",";
private int maxDepth;
private boolean expectJson = false;
private final ObjectMapper mapper = new ObjectMapper();
private final ThrowableProxyJsonSerializer throwableProxyJsonSerializer = new ThrowableProxyJsonSerializer();
private final SimpleModule module = new SimpleModule();
{
module.addSerializer(IThrowableProxy.class, throwableProxyJsonSerializer);
mapper.registerModule(module);
}
public String format(ILoggingEvent event) {
StringBuilder sb = new StringBuilder();
sb.append("{");
fieldName("project", sb);
quote(event.getLoggerContextVO().getName(), sb);
sb.append(COMMA);
fieldName("level", sb);
quote(event.getLevel().levelStr, sb);
sb.append(COMMA);
fieldName("logger", sb);
quote(event.getLoggerName(), sb);
sb.append(COMMA);
fieldName("thread", sb);
quote(event.getThreadName(), sb);
sb.append(COMMA);
fieldName("timestamp", sb);
sb.append(event.getTimeStamp());
sb.append(COMMA);
fieldName("message", sb);
if (this.expectJson) {
sb.append(event.getFormattedMessage());
} else {
quote(event.getFormattedMessage(), sb);
}
sb.append(COMMA);
fieldName("detail", sb);
try {
sb.append(mapper.writeValueAsString(event.getThrowableProxy()));
} catch (JsonProcessingException e) {
sb.append(mapper.nullNode());
}
sb.append("}");
return sb.toString();
}
private static void fieldName(String name, StringBuilder sb) {
quote(name, sb);
sb.append(COLON);
}
private static void quote(String value, StringBuilder sb) {
sb.append(QUOTE);
sb.append(value);
sb.append(QUOTE);
}
public boolean isExpectJson() {
return expectJson;
}
public void setExpectJson(boolean expectJson) {
this.expectJson = expectJson;
}
public void setMaxDepth(int maxDepth) {
this.maxDepth = maxDepth;
throwableProxyJsonSerializer.setMaxDepth(maxDepth);
}
}
格式化为JSON的时候异常堆栈信息根据配置的遍历深度来进行输出
public class ThrowableProxyJsonSerializer extends JsonSerializer<IThrowableProxy> {
private int maxDepth = 10;
public void setMaxDepth(int maxDepth) {
this.maxDepth = maxDepth;
}
@Override
public void serialize(IThrowableProxy throwableProxy, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("class", throwableProxy.getClassName());
jsonGenerator.writeStringField("message", throwableProxy.getMessage());
StackTraceElementProxy[] stackTrace = throwableProxy.getStackTraceElementProxyArray();
if (stackTrace != null) {
jsonGenerator.writeArrayFieldStart("stackTrace");
int depth = 0;
for (StackTraceElementProxy element : stackTrace) {
if (depth++ == maxDepth) {
break;
}
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("class", element.getStackTraceElement().getClassName());
jsonGenerator.writeStringField("method", element.getStackTraceElement().getMethodName());
jsonGenerator.writeStringField("file", element.getStackTraceElement().getFileName());
jsonGenerator.writeNumberField("line", element.getStackTraceElement().getLineNumber());
jsonGenerator.writeEndObject();
}
jsonGenerator.writeEndArray();
}
IThrowableProxy cause = throwableProxy.getCause();
if (cause != null) {
jsonGenerator.writeFieldName("cause");
serializerProvider.defaultSerializeValue(cause, jsonGenerator);
}
jsonGenerator.writeEndObject();
}
}
源代码
github:https://github.com/shuaiyu-wang/logback-kafka-appender
gitee:https://gitee.com/shuaiyu-wang/logback-kafka-appender
参考项目
更多推荐
已为社区贡献1条内容
所有评论(0)