This article is reproduced from the official account StreamCloudNative, author Xue Song, who works in the new world software as a senior software engineer.
Editor: chicken chop, StreamNative.
About Apache Pulsar
Apache Pulsar is a top-level project of the Apache Software Foundation. It is a native distributed message flow platform for the next generation cloud. It integrates message, storage and lightweight functional computing. It adopts a separate architecture design of computing and storage, supports multi tenant, persistent storage, multi machine room cross regional data replication, and has strong consistency, high throughput Stream data storage features such as low latency and high scalability.
At present, many large Internet and traditional industry companies at home and abroad have adopted Apache Pulsar. The cases are distributed in artificial intelligence, finance, telecom operators, live and short video, Internet of things, retail and e-commerce, online education and other industries, such as American cable TV network giants Comcast, Yahoo Tencent, China Telecom, China Mobile, BIGO, VIPKID, etc.
Background introduction
As a cloud native distributed messaging system, Apache Pulsar includes Zookeeper, bookie, broker, functions worker, proxy and other components, and all components are deployed on multiple hosts in a distributed manner. Therefore, the log files of each component are scattered on multiple hosts. When there is a problem with the components, due to the scattered logs, If you want to check whether each service has error information, you need to check it one by one. It is troublesome. Usually, we can obtain the desired information by directly issuing grep, awk and other commands to the log file. However, with the increase of the volume of applications and services, the number of supporting nodes also increases, so the traditional methods expose many problems, such as low efficiency, how to archive if the log volume is too large, what to do if the text search is too slow, how to multi-dimensional query and so on. Therefore, we hope that through the aggregation and monitoring of logs, we can quickly find the error information of Pulsar services and conduct rapid troubleshooting, so as to make the operation and maintenance more purposeful, targeted and direct.
In order to solve the problem of log retrieval, our team considers using a centralized log collection system to uniformly collect, manage and access logs on all nodes of Pulsar.
A complete centralized log system needs to include the following main features:
- Collection - can collect log data from multiple sources;
- Transmission - able to stably transmit log data to the central system;
- Storage - how to store log data;
- Analysis - UI analysis can be supported;
- Warning - provides error reporting and monitoring mechanisms
ELK provides a complete set of solutions, all of which are open source software. They are used together, perfectly connected, and efficiently meet the applications in many occasions. It is a mainstream log system at present. Our company has a self-developed big data management platform. ELK is deployed and managed through the big data management platform, and ELK has been used in the production system to provide support services for multiple business systems. ELK is the abbreviation of three open source software, namely Elasticsearch, Logstash and Kibana. They are all open source software. The latest version has been renamed Elastic Stack, and the Beats project has been added, including Filebeat. It is a lightweight log collection and processing tool (Agent). Filebeat occupies less resources, It is suitable for collecting logs on each server and transmitting them to Logstash.

As can be seen from the above figure, if Pulsar uses this log collection mode, there are two problems:
- The host with Pulsar service deployed must deploy a set of Filebeat services;
- The Pulsar service log must be placed on the disk once in the form of a file, which occupies the IO of the host disk.
For this reason, we consider that Apache Pulsar implements fast log retrieval based on Log4j2+Kafka+ELK. Log4j2 supports the function of sending logs to Kafka by default. By using Kafka's own Log4j2Appender and configuring it in the log4j2 configuration file, we can send the logs generated by log4j2 to Kafka in real time.
As shown in the figure below:

Implementation process
Taking pulsar version 2.6.2 as an example, the following describes the detailed implementation process of Apache Pulsar's solution for fast log retrieval based on Log4j2+Kafka+ELK.
1, Preparatory work
First, you need to determine the fields used to retrieve logs in Kibana. These fields can be aggregated and queried in multiple dimensions. Then, Elasticsearch divides words according to the retrieved fields and creates an index.

As shown in the figure above, we will create 8 retrieval fields for Pulsar logs, namely: cluster name, host name, host IP, component name, log content, system time, log level and cluster instance.
2, Implementation process
Note: in order to ensure that the structure of Pulsar's native configuration file and script file is not damaged, we implement this scheme by adding new configuration file and script file.
1. Add profile
Add the following two configuration files in the {PULSAR_HOME}/conf Directory:
1)logenv.sh this file transfers the JVM options required when starting the Pulsar component to the Java process of the Pulsar service in a configured manner. The content example is as follows:
KAFKA_CLUSTER=192.168.0.1:9092,192.168.0.2:9092,192.168.0.2:9092 PULSAR_CLUSTER=pulsar_cluster PULSAR_TOPIC=pulsar_topic HOST_IP=192.168.0.1 PULSAR_MODULE_INSTANCE_ID=1
The meanings of the above fields are:
- KAFKA_CLUSTER: Kafka broker list address;
- PULSAR_CLUSTER: cluster name of Pulsar;
- PULSAR_TOPIC: topic in Kafka for accessing Pulsar service log;
- HOST_IP: IP of Pulsar host;
- PULSAR_MODULE_INSTANCE_ID: the instance ID of Pulsar service. Multiple Pulsar clusters may be deployed on a host. Clusters are distinguished by instance ID.
2)log4j2-kafka.yaml
The configuration file is from log4j2 Yaml is copied from log4j2 Add the following modifications based on yaml: (Note: in the figure below, log4j2.yaml is on the left and log4j2-kafka.yaml is on the right.)
- Add the Kafka cluster broker list and define the message record format log4j2 written to Kafka. The eight search fields in a message are separated by spaces, and Elasticsearch uses spaces as separators to segment the eight search fields.

• add kafka Appenders;

• add Failover;

• modify the Root and Logger of Loggers to asynchronous mode;

•log4j2-kafka. The complete contents of yaml configuration file are as follows:
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
Configuration:
status: INFO
monitorInterval: 30
name: pulsar
packages: io.prometheus.client.log4j2
Properties:
Property:
- name: "pulsar.log.dir"
value: "logs"
- name: "pulsar.log.file"
value: "pulsar.log"
- name: "pulsar.log.appender"
value: "RoutingAppender"
- name: "pulsar.log.root.level"
value: "info"
- name: "pulsar.log.level"
value: "info"
- name: "pulsar.routing.appender.default"
value: "Console"
- name: "kafkaBrokers"
value: "${sys:kafka.cluster}"
- name: "pattern"
value: "${sys:pulsar.cluster} ${sys:pulsar.hostname} ${sys:pulsar.hostip} ${sys:pulsar.module.type} ${sys:pulsar.module.instanceid} %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%c{10}] %level , %msg%n"
# Example: logger-filter script
Scripts:
ScriptFile:
name: filter.js
language: JavaScript
path: ./conf/log4j2-scripts/filter.js
charset: UTF-8
Appenders:
#Kafka
Kafka:
name: "pulsar_kafka"
topic: "${sys:pulsar.topic}"
ignoreExceptions: "false"
PatternLayout:
pattern: "${pattern}"
Property:
- name: "bootstrap.servers"
value: "${kafkaBrokers}"
- name: "max.block.ms"
value: "2000"
# Console
Console:
name: Console
target: SYSTEM_OUT
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Failover:
name: "Failover"
primary: "pulsar_kafka"
retryIntervalSeconds: "600"
Failovers:
AppenderRef:
ref: "RollingFile"
# Rolling file appender configuration
RollingFile:
name: RollingFile
fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
immediateFlush: false
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: 1 GB
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d
Prometheus:
name: Prometheus
# Routing
Routing:
name: RoutingAppender
Routes:
pattern: "$${ctx:function}"
Route:
-
Routing:
name: InstanceRoutingAppender
Routes:
pattern: "$${ctx:instance}"
Route:
-
RollingFile:
name: "Rolling-${ctx:function}"
fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/${ctx:functionname}-${ctx:instance}.log"
filePattern : "${sys:pulsar.log.dir}/functions/${sys:pulsar.log.file}-${ctx:instance}-%d{MM-dd-yyyy}-%i.log.gz"
PatternLayout:
Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n"
Policies:
TimeBasedTriggeringPolicy:
interval: 1
modulate: true
SizeBasedTriggeringPolicy:
size: "20MB"
# Trigger every day at midnight that also scan
# roll-over strategy that deletes older file
CronTriggeringPolicy:
schedule: "0 0 0 * * ?"
# Delete file older than 30days
DefaultRolloverStrategy:
Delete:
basePath: ${sys:pulsar.log.dir}
maxDepth: 2
IfFileName:
glob: "*/${sys:pulsar.log.file}*log.gz"
IfLastModified:
age: 30d
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"
- ref: "${sys:pulsar.routing.appender.default}"
key: "${ctx:function}"
Loggers:
# Default root logger configuration
AsyncRoot:
level: "${sys:pulsar.log.root.level}"
additivity: true
AppenderRef:
- ref: "Failover"
level: "${sys:pulsar.log.level}"
- ref: Prometheus
level: info
AsyncLogger:
- name: org.apache.bookkeeper.bookie.BookieShell
level: info
additivity: false
AppenderRef:
- ref: Console
- name: verbose
level: info
additivity: false
AppenderRef:
- ref: Console
# Logger to inject filter script
# - name: org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
# level: debug
# additivity: false
# AppenderRef:
# ref: "${sys:pulsar.log.appender}"
# ScriptFilter:
# onMatch: ACCEPT
# onMisMatch: DENY
# ScriptRef:
# ref: filter.js
matters needing attention:
- Log access must be asynchronous and must not affect service performance;
- Systems with high response requirements must rely on decoupling when accessing third-party systems. The Failover Appender here is to decouple the dependence on Kafka. When Kafka crashes, the log triggers Failover and writes locally;
- The default value of log4j2 Failover appender retryIntervalSeconds is 1 minute, which is switched through exceptions, so the interval can be increased appropriately, such as the above 10 minutes;
- Kafka appender ignoreExceptions must be set to false, otherwise Failover cannot be triggered;
- There is a big pit here, which is max.block MS property, the default value in KafkaClient package is 60000ms. When Kafka goes down, it takes 1 minute to try to write Kafka to return an Exception, and then Failover will be triggered. When the number of requests is large, the log4j2 queue will soon be full, and then the log will be blocked, which seriously affects the response of the main service. Therefore, the queue length should be set to be short enough and long enough.
2. Add script file
Add the following two script files in the {PULSAR_HOME}/bin directory: 1) pulsar Kafka the script file is copied from the pulsar script file. Add the following modifications on the basis of the pulsar script file: (Note: in the figure below, pulsar is on the left and pulsar Kafka is on the right.)
• specify log4j2 Kafka yaml;

• add read logenv Contents of the project;

• add the OPTS option to pass the JVM option to the Java process when starting the Pulsar component in the Pulsar Kafka and Pulsar daemon Kafka script files;

• the complete contents of pulsar Kafka script file are as follows:
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
BINDIR=$(dirname "$0")
export PULSAR_HOME=`cd -P $BINDIR/..;pwd`
DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf
DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf
DEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.conf
DEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.conf
DEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.conf
DEFAULT_PROXY_CONF=$PULSAR_HOME/conf/proxy.conf
DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf
DEFAULT_WEBSOCKET_CONF=$PULSAR_HOME/conf/websocket.conf
DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2-kafka.yaml
DEFAULT_PULSAR_PRESTO_CONF=${PULSAR_HOME}/conf/presto
# functions related variables
FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
DEFAULT_WORKER_CONF=$PULSAR_HOME/conf/functions_worker.yml
DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR=$PULSAR_HOME/instances/deps
FUNCTIONS_EXTRA_DEPS_DIR=${PULSAR_FUNCTIONS_EXTRA_DEPS_DIR:-"${DEFAULT_FUNCTIONS_EXTRA_DEPS_DIR}"}
SQL_HOME=$PULSAR_HOME/pulsar-sql
PRESTO_HOME=${PULSAR_HOME}/lib/presto
# Check bookkeeper env and load bkenv.sh
if [ -f "$PULSAR_HOME/conf/bkenv.sh" ]
then
. "$PULSAR_HOME/conf/bkenv.sh"
fi
# Check pulsar env and load pulser_env.sh
if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
then
. "$PULSAR_HOME/conf/pulsar_env.sh"
fi
if [ -f "$PULSAR_HOME/conf/logenv.sh" ]
then
. "$PULSAR_HOME/conf/logenv.sh"
fi
# Check for the java to use
if [[ -z $JAVA_HOME ]]; then
JAVA=$(which java)
if [ $? != 0 ]; then
echo "Error: JAVA_HOME not set, and no java executable found in $PATH." 1>&2
exit 1
fi
else
JAVA=$JAVA_HOME/bin/java
fi
# exclude tests jar
RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ $? == 0 ]; then
PULSAR_JAR=$RELEASE_JAR
fi
# exclude tests jar
BUILT_JAR=`ls $PULSAR_HOME/pulsar-broker/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then
echo "\nCouldn't find pulsar jar.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
elif [ -e "$BUILT_JAR" ]; then
PULSAR_JAR=$BUILT_JAR
fi
#
# find the instance locations for pulsar-functions
#
# find the java instance location
if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
# didn't find a released jar, then search the built jar
BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime-all/target/java-instance.jar"
if [ -z "${BUILT_JAVA_INSTANCE_JAR}" ]; then
echo "\nCouldn't find pulsar-functions java instance jar.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
fi
# find the python instance location
if [ ! -f "${PY_INSTANCE_FILE}" ]; then
# didn't find a released python instance, then search the built python instance
BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/instance/target/python-instance/python_instance_main.py"
if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then
echo "\nCouldn't find pulsar-functions python instance.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
fi
# find pulsar sql presto distribution location
check_presto_libraries() {
if [ ! -d "${PRESTO_HOME}" ]; then
BUILT_PRESTO_HOME="${SQL_HOME}/presto-distribution/target/pulsar-presto-distribution"
if [ ! -d "${BUILT_PRESTO_HOME}" ]; then
echo "\nCouldn't find presto distribution.";
echo "Make sure you've run 'mvn package'\n";
exit 1;
fi
PRESTO_HOME=${BUILT_PRESTO_HOME}
fi
}
pulsar_help() {
cat <<EOF
Usage: pulsar <command>
where command is one of:
broker Run a broker server
bookie Run a bookie server
zookeeper Run a zookeeper server
configuration-store Run a configuration-store server
discovery Run a discovery server
proxy Run a pulsar proxy
websocket Run a web socket proxy server
functions-worker Run a functions worker server
sql-worker Run a sql worker server
sql Run sql CLI
standalone Run a broker server with local bookies and local zookeeper
initialize-cluster-metadata One-time metadata initialization
delete-cluster-metadata Delete a cluster's metadata
initialize-transaction-coordinator-metadata One-time transaction coordinator metadata initialization
initialize-namespace namespace initialization
compact-topic Run compaction against a topic
zookeeper-shell Open a ZK shell client
broker-tool CLI to operate a specific broker
tokens Utility to create authentication tokens
help This help message
or command is the full name of a class with a defined main() method.
Environment variables:
PULSAR_LOG_CONF Log4j configuration file (default $DEFAULT_LOG_CONF)
PULSAR_BROKER_CONF Configuration file for broker (default: $DEFAULT_BROKER_CONF)
PULSAR_BOOKKEEPER_CONF Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF)
PULSAR_ZK_CONF Configuration file for zookeeper (default: $DEFAULT_ZK_CONF)
PULSAR_CONFIGURATION_STORE_CONF Configuration file for global configuration store (default: $DEFAULT_CONFIGURATION_STORE_CONF)
PULSAR_DISCOVERY_CONF Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF)
PULSAR_WEBSOCKET_CONF Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF)
PULSAR_PROXY_CONF Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF)
PULSAR_WORKER_CONF Configuration file for functions worker (default: $DEFAULT_WORKER_CONF)
PULSAR_STANDALONE_CONF Configuration file for standalone (default: $DEFAULT_STANDALONE_CONF)
PULSAR_PRESTO_CONF Configuration directory for Pulsar Presto (default: $DEFAULT_PULSAR_PRESTO_CONF)
PULSAR_EXTRA_OPTS Extra options to be passed to the jvm
PULSAR_EXTRA_CLASSPATH Add extra paths to the pulsar classpath
PULSAR_PID_DIR Folder where the pulsar server PID file should be stored
PULSAR_STOP_TIMEOUT Wait time before forcefully kill the pulsar server instance, if the stop is not successful
These variable can also be set in conf/pulsar_env.sh
EOF
}
add_maven_deps_to_classpath() {
MVN="mvn"
if [ "$MAVEN_HOME" != "" ]; then
MVN=${MAVEN_HOME}/bin/mvn
fi
# Need to generate classpath from maven pom. This is costly so generate it
# and cache it. Save the file into our target dir so a mvn clean will get
# clean it up and force us create a new one.
f="${PULSAR_HOME}/distribution/server/target/classpath.txt"
if [ ! -f "${f}" ]
then
${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
fi
PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
}
if [ -d "$PULSAR_HOME/lib" ]; then
PULSAR_CLASSPATH=$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*
ASPECTJ_AGENT_PATH=`ls -1 $PULSAR_HOME/lib/org.aspectj-aspectjweaver-*.jar`
else
add_maven_deps_to_classpath
ASPECTJ_VERSION=`grep '<aspectj.version>' $PULSAR_HOME/pom.xml | awk -F'>' '{print $2}' | awk -F'<' '{print $1}'`
ASPECTJ_AGENT_PATH="$HOME/.m2/repository/org/aspectj/aspectjweaver/$ASPECTJ_VERSION/aspectjweaver-$ASPECTJ_VERSION.jar"
fi
ASPECTJ_AGENT="-javaagent:$ASPECTJ_AGENT_PATH"
# if no args specified, show usage
if [ $# = 0 ]; then
pulsar_help;
exit 1;
fi
# get arguments
COMMAND=$1
shift
if [ -z "$PULSAR_WORKER_CONF" ]; then
PULSAR_WORKER_CONF=$DEFAULT_WORKER_CONF
fi
if [ -z "$PULSAR_BROKER_CONF" ]; then
PULSAR_BROKER_CONF=$DEFAULT_BROKER_CONF
fi
if [ -z "$PULSAR_BOOKKEEPER_CONF" ]; then
PULSAR_BOOKKEEPER_CONF=$DEFAULT_BOOKKEEPER_CONF
fi
if [ -z "$PULSAR_ZK_CONF" ]; then
PULSAR_ZK_CONF=$DEFAULT_ZK_CONF
fi
if [ -z "$PULSAR_GLOBAL_ZK_CONF" ]; then
PULSAR_GLOBAL_ZK_CONF=$DEFAULT_GLOBAL_ZK_CONF
fi
if [ -z "$PULSAR_CONFIGURATION_STORE_CONF" ]; then
PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_CONFIGURATION_STORE_CONF
fi
if [ -z "$PULSAR_DISCOVERY_CONF" ]; then
PULSAR_DISCOVERY_CONF=$DEFAULT_DISCOVERY_CONF
fi
if [ -z "$PULSAR_PROXY_CONF" ]; then
PULSAR_PROXY_CONF=$DEFAULT_PROXY_CONF
fi
if [ -z "$PULSAR_WEBSOCKET_CONF" ]; then
PULSAR_WEBSOCKET_CONF=$DEFAULT_WEBSOCKET_CONF
fi
if [ -z "$PULSAR_STANDALONE_CONF" ]; then
PULSAR_STANDALONE_CONF=$DEFAULT_STANDALONE_CONF
fi
if [ -z "$PULSAR_LOG_CONF" ]; then
PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
fi
if [ -z "$PULSAR_PRESTO_CONF" ]; then
PULSAR_PRESTO_CONF=$DEFAULT_PULSAR_PRESTO_CONF
fi
PULSAR_CLASSPATH="$PULSAR_JAR:$PULSAR_CLASSPATH:$PULSAR_EXTRA_CLASSPATH"
PULSAR_CLASSPATH="`dirname $PULSAR_LOG_CONF`:$PULSAR_CLASSPATH"
OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"
# Ensure we can read bigger content from ZK. (It might be
# rarely needed when trying to list many z-nodes under a
# directory)
OPTS="$OPTS -Djute.maxbuffer=10485760 -Djava.net.preferIPv4Stack=true"
OPTS="-cp $PULSAR_CLASSPATH $OPTS"
OPTS="$OPTS $PULSAR_EXTRA_OPTS $PULSAR_MEM $PULSAR_GC"
# log directory & file
PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
PULSAR_LOG_ROOT_LEVEL=${PULSAR_LOG_ROOT_LEVEL:-"info"}
PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}
#Configure log configuration system properties
OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"
# Functions related logging
OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
# instance
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"
OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"
OPTS="$OPTS -Dpulsar.module.instanceid=${PULSAR_MODULE_INSTANCE_ID} -Dpulsar.module.type=$COMMAND -Dkafka.cluster=${KAFKA_CLUSTER} -Dpulsar.hostname=${HOSTNAME} -Dpulsar.hostip=${HOST_IP} -Dpulsar.cluster=${PULSAR_CLUSTER} -Dpulsar.topic=${PULSAR_TOPIC}"
ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true"
#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
if [ $COMMAND == "broker" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-broker.log"}
exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarBrokerStarter --broker-conf $PULSAR_BROKER_CONF $@
elif [ $COMMAND == "bookie" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"bookkeeper.log"}
# Pass BOOKIE_EXTRA_OPTS option defined in pulsar_env.sh
OPTS="$OPTS $BOOKIE_EXTRA_OPTS"
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.bookkeeper.proto.BookieServer --conf $PULSAR_BOOKKEEPER_CONF $@
elif [ $COMMAND == "zookeeper" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"zookeeper.log"}
exec $JAVA ${ZK_OPTS} $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ZooKeeperStarter $PULSAR_ZK_CONF $@
elif [ $COMMAND == "global-zookeeper" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"}
# Allow global ZK to turn into read-only mode when it cannot reach the quorum
OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_GLOBAL_ZK_CONF $@
elif [ $COMMAND == "configuration-store" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"configuration-store.log"}
# Allow global ZK to turn into read-only mode when it cannot reach the quorum
OPTS="${OPTS} ${ZK_OPTS} -Dreadonlymode.enabled=true"
exec $JAVA $OPTS $ASPECTJ_AGENT -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_CONFIGURATION_STORE_CONF $@
elif [ $COMMAND == "discovery" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"discovery.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter $PULSAR_DISCOVERY_CONF $@
elif [ $COMMAND == "proxy" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-proxy.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.proxy.server.ProxyServiceStarter --config $PULSAR_PROXY_CONF $@
elif [ $COMMAND == "websocket" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-websocket.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@
elif [ $COMMAND == "functions-worker" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-functions-worker.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.functions.worker.FunctionWorkerStarter -c $PULSAR_WORKER_CONF $@
elif [ $COMMAND == "standalone" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"}
exec $JAVA $OPTS $ASPECTJ_AGENT ${ZK_OPTS} -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
elif [ $COMMAND == "initialize-cluster-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
elif [ $COMMAND == "delete-cluster-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataTeardown $@
elif [ $COMMAND == "initialize-transaction-coordinator-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup $@
elif [ $COMMAND == "initialize-namespace" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarInitialNamespaceSetup $@
elif [ $COMMAND == "zookeeper-shell" ]; then
exec $JAVA $OPTS org.apache.zookeeper.ZooKeeperMain $@
elif [ $COMMAND == "broker-tool" ]; then
exec $JAVA $OPTS org.apache.pulsar.broker.tools.BrokerTool $@
elif [ $COMMAND == "compact-topic" ]; then
exec $JAVA $OPTS org.apache.pulsar.compaction.CompactorTool --broker-conf $PULSAR_BROKER_CONF $@
elif [ $COMMAND == "sql" ]; then
check_presto_libraries
exec $JAVA -cp "${PRESTO_HOME}/lib/*" io.prestosql.cli.Presto --server localhost:8081 "${@}"
elif [ $COMMAND == "sql-worker" ]; then
check_presto_libraries
exec ${PRESTO_HOME}/bin/launcher --etc-dir ${PULSAR_PRESTO_CONF} "${@}"
elif [ $COMMAND == "tokens" ]; then
exec $JAVA $OPTS org.apache.pulsar.utils.auth.tokens.TokensCliUtils $@
elif [ $COMMAND == "help" -o $COMMAND == "--help" -o $COMMAND == "-h" ]; then
pulsar_help;
else
echo ""
echo "-- Invalid command '$COMMAND' -- Use '$0 help' to get a list of valid commands"
echo ""
exit 1
fi
2)pulsar-daemon-kafka
The script file is copied from the pulsar daemon script file. Based on the pulsar daemon script file, the following modifications are added: (Note: in the following figure, the left side is pulsar daemon and the right side is pulsar daemon Kafka.)
• add read logenv Contents of the project;

• read the contents of pulsar Kafka;

• the complete contents of pulsar daemon Kafka script file are as follows:
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
usage() {
cat <<EOF
Usage: pulsar-daemon (start|stop) <command> <args...>
where command is one of:
broker Run a broker server
bookie Run a bookie server
zookeeper Run a zookeeper server
configuration-store Run a configuration-store server
discovery Run a discovery server
websocket Run a websocket proxy server
functions-worker Run a functions worker server
standalone Run a standalone Pulsar service
proxy Run a Proxy Pulsar service
where argument is one of:
-force (accepted only with stop command): Decides whether to stop the server forcefully if not stopped by normal shutdown
EOF
}
BINDIR=$(dirname "$0")
PULSAR_HOME=$(cd -P $BINDIR/..;pwd)
# Check bookkeeper env and load bkenv.sh
if [ -f "$PULSAR_HOME/conf/bkenv.sh" ]
then
. "$PULSAR_HOME/conf/bkenv.sh"
fi
if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
then
. "$PULSAR_HOME/conf/pulsar_env.sh"
fi
if [ -f "$PULSAR_HOME/conf/logenv.sh" ]
then
. "$PULSAR_HOME/conf/logenv.sh"
fi
PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RollingFile"}
PULSAR_STOP_TIMEOUT=${PULSAR_STOP_TIMEOUT:-30}
PULSAR_PID_DIR=${PULSAR_PID_DIR:-$PULSAR_HOME/bin}
if [ $# = 0 ]; then
usage
exit 1
elif [ $# = 1 ]; then
if [ $1 == "--help" -o $1 == "-h" ]; then
usage
exit 1
else
echo "Error: no enough arguments provided."
usage
exit 1
fi
fi
startStop=$1
shift
command=$1
shift
case $command in
(broker)
echo "doing $startStop $command ..."
;;
(bookie)
echo "doing $startStop $command ..."
;;
(zookeeper)
echo "doing $startStop $command ..."
;;
(global-zookeeper)
echo "doing $startStop $command ..."
;;
(configuration-store)
echo "doing $startStop $command ..."
;;
(discovery)
echo "doing $startStop $command ..."
;;
(websocket)
echo "doing $startStop $command ..."
;;
(functions-worker)
echo "doing $startStop $command ..."
;;
(standalone)
echo "doing $startStop $command ..."
;;
(proxy)
echo "doing $startStop $command ..."
;;
(*)
echo "Error: unknown service name $command"
usage
exit 1
;;
esac
export PULSAR_LOG_DIR=$PULSAR_LOG_DIR
export PULSAR_LOG_APPENDER=$PULSAR_LOG_APPENDER
export PULSAR_LOG_FILE=pulsar-$command-$HOSTNAME.log
pid=$PULSAR_PID_DIR/pulsar-$command.pid
out=$PULSAR_LOG_DIR/pulsar-$command-$HOSTNAME.out
logfile=$PULSAR_LOG_DIR/$PULSAR_LOG_FILE
rotate_out_log ()
{
log=$1;
num=5;
if [ -n "$2" ]; then
num=$2
fi
if [ -f "$log" ]; then # rotate logs
while [ $num -gt 1 ]; do
prev=`expr $num - 1`
[ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
num=$prev
done
mv "$log" "$log.$num";
fi
}
mkdir -p "$PULSAR_LOG_DIR"
case $startStop in
(start)
if [ -f $pid ]; then
if kill -0 `cat $pid` > /dev/null 2>&1; then
echo $command running as process `cat $pid`. Stop it first.
exit 1
fi
fi
rotate_out_log $out
echo starting $command, logging to $logfile
echo Note: Set immediateFlush to true in conf/log4j2-kafka.yaml will guarantee the logging event is flushing to disk immediately. The default behavior is switched off due to performance considerations.
pulsar=$PULSAR_HOME/bin/pulsar-kafka
nohup $pulsar $command "$@" > "$out" 2>&1 < /dev/null &
echo $! > $pid
sleep 1; head $out
sleep 2;
if ! ps -p $! > /dev/null ; then
exit 1
fi
;;
(stop)
if [ -f $pid ]; then
TARGET_PID=$(cat $pid)
if kill -0 $TARGET_PID > /dev/null 2>&1; then
echo "stopping $command"
kill $TARGET_PID
count=0
location=$PULSAR_LOG_DIR
while ps -p $TARGET_PID > /dev/null;
do
echo "Shutdown is in progress... Please wait..."
sleep 1
count=`expr $count + 1`
if [ "$count" = "$PULSAR_STOP_TIMEOUT" ]; then
break
fi
done
if [ "$count" != "$PULSAR_STOP_TIMEOUT" ]; then
echo "Shutdown completed."
fi
if kill -0 $TARGET_PID > /dev/null 2>&1; then
fileName=$location/$command.out
$JAVA_HOME/bin/jstack $TARGET_PID > $fileName
echo "Thread dumps are taken for analysis at $fileName"
if [ "$1" == "-force" ]
then
echo "forcefully stopping $command"
kill -9 $TARGET_PID >/dev/null 2>&1
echo Successfully stopped the process
else
echo "WARNNING : $command is not stopped completely."
exit 1
fi
fi
else
echo "no $command to stop"
fi
rm $pid
else
echo no "$command to stop"
fi
;;
(*)
usage
exit 1
;;
esac
3. Add jar s that Kafka Producer depends on
Add the following three jar s to the {PULSAR_HOME}/lib directory on all nodes of the pulsar cluster:
connect-api-2.0.1.jar disruptor-3.4.2.jar kafka-clients-2.0.1.jar
4. Start Pulsar service
- In order to ensure that the Pulsar service log can be correctly written to Kafka, start it in the foreground through bin / Pulsar Kafka, and then start it in the background through bin / Pulsar daemon Kafka command when there is no exception.
- Take starting broker as an example, execute the following command:
bin/pulsar-daemon-kafka start broker
- View the broker process through the ps command as follows:

As you can see in the figure above, we use logenv The OPTS configured by SH has been passed to the broker process, log4j2 Kafka The sys tag in yaml can instantiate a Kafka Producer through these attribute values, and the broker process log will be sent to Kafka broker through Kafka Producer.
5. Test whether Pulsar log is successfully written to Kafka broker
Start a Kafka Consumer and subscribe to the Topic of the message sent by log4j2. The message contents read are as follows. Multiple search fields are separated by spaces:
pulsar-cluster dapp21 192.168.0.1 broker 1 2020-12-26 17:40:14.363 [prometheus-stats-43-1] [org.eclipse.jetty.server.RequestLog] INFO - 192.168.0.1 - - [26/Dec/2020:17:40:14 +0800] "GET /metrics/ HTTP/1.1" 200 23445 "http://192.168.0.1:8080/metrics" "Prometheus/2.22.1" 4
6. Log retrieval
Open kibana page and search according to the word segmentation field. The search conditions are as follows: Cluster: "pulse cluster" and hostname: "XXX" and module: "broker" and level: "info"

In the above figure, you can see the log retrieval results in a certain time period, and you can add Available fields to the retrieval results as needed. In this way, developers or operation and maintenance personnel can quickly and effectively analyze the causes of Pulsar service exceptions from multiple dimensions through kibana. So far, Apache Pulsar is a complete solution for fast log retrieval based on Log4j2+Kafka+ELK.
summary
At present, distributed and microservice are popular technical directions. In the production system, with the continuous development of business and the rapid expansion of application and service volume, it is a natural choice to transfer from single / vertical architecture to distributed / microservice architecture, which is mainly reflected in reducing complexity, fault tolerance, independent deployment, horizontal scalability and so on. However, it also faces new challenges, such as the efficiency of problem investigation, the convenience of operation and maintenance monitoring, etc. Taking Apache Pulsar as an example, this paper shares how Java processes use Log4j2+Kafka+ELK to realize the rapid retrieval of distributed and micro service logs, so as to achieve the effect of service governance.
Related reading
Pay attention to streamcloud native and discuss the development trend of technologies in various fields with the author 👇

- Collect logs to Pulsar using Elastic Beats
- How to send log data to Apache Pulsar using Apache Flume
- KoP is officially open source: it supports the native Kafka protocol on Apache Pulsar
Welcome to contribute
Are you inspired by this article?
Do you have any unique experience to share with community partners and grow together with the community?
The Apache Pulsar community welcomes your contributions. Apache Pulsar and StreamNative hope to provide you with a platform for Pulsar experience and knowledge sharing, and help more community partners have an in-depth understanding of Pulsar. Scan the code and add Bot friends to contact and submit 👇


click link , read the original!

所有评论(0)