ELFK + Kafka + Zabbix日志收集告警
环境准备角色划分:系统:CentOS 7es主节点/es数据节点/kibana/head192.168.1.253kafka/zookeeper/logstash192.168.1.253日志测试/filebeat192.168.1.253关闭防火墙和s...
环境准备
- 角色划分:
系统:CentOS 7 es主节点/es数据节点/kibana/head 192.168.1.253 kafka/zookeeper/logstash 192.168.1.253 日志测试/filebeat 192.168.1.253
- 关闭防火墙和selinux:
# systemctl stop firewalld && systemctl disable firewalld
# sed -i 's/=enforcing/=disabled/g' /etc/selinux/config && setenforce 0
- 配置系统环境:
# vim /etc/security/limits.conf
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096
# vim /etc/sysctl.conf
vm.max_map_count=655360
# sysctl -p
- 安装Java环境:
# tar zxf jdk-8u191-linux-x64.tar.gz && mv jdk1.8.0_191/ /usr/local/jdk
# vim /etc/profile
JAVA_HOME=/usr/local/jdk
PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib
export JAVA_HOME PATH CLASSPATH
# source !$
# java -version
# ln -s /usr/local/jdk/bin/java /usr/local/bin/java
ELFK结合Kafka
生产初期,Service服务较少,访问量较少,使用ELFK集群就可以满足生产需求。但随着业务量的不断增加,日志量成倍增长,针对此情况,需要对ELFK增加消息队列,以减轻前端ES集群的压力。
那么选择redis还是kafka作为消息队列呢?从以下三点考虑:
* 消息推送的可靠性:
Redis 消息推送(基于分布式 Pub/Sub)多用于实时性较高的消息推送,并不保证可靠。
Redis-Pub/Sub 断电就会清空数据,而使用 Redis-List 作为消息推送虽然有持久化,也并非完全可靠不会丢失。
Kafka 虽然有一些延迟但保证可靠。
* 订阅功能的分组:
Redis 发布订阅除了表示不同的 topic 外,并不支持分组。
Kafka 中发布一个内容,多个订阅者可以分组,同一个组里只有一个订阅者会收到该消息,这样可以用作负载均衡。
* 集群资源的消耗:
Redis 3.0之后个有提供集群ha机制,但是要为每个节点都配置一个或者多个从节点,从节点从主节点上面拉取数据,主节点挂了,从节点顶替上去成为主节点,但是这样对资源比较浪费。
Kafka 作为消息队列,能充分的运用集群资源,每个应用相当于一个topic,一个topic可拥有多个partition,并且partition能轮询分配到每个节点上面,并且生产者生产的数据也会均匀的放到partition中,
即使上层只有1个应用kafka集群的资源也会被充分的利用到,这样就避免了redis集群出现的数据倾斜问题,并且kafka有类似于hdfs的冗余机制,一个broker挂掉了不影响整个集群的运行。
这里,我们选择kafka作为消息队列,配置kafka集群,结合ELFK集群收集应用日志。
Elasticsearch、Kibana及Head安装过程此处省略。
kafka集群安装
Apache kafka是消息中间件的一种,是一种分布式的、基于发布/订阅的消息系统。能实现一个为处理实时数据提供一个统一、高吞吐、低延迟的平台,且拥有分布式的,可划分的,冗余备份的持久性的日志服务等特点。
- 环境准备:
kafka/zookeeper/logstash 192.168.1.253
- 下载安装kafka:
# cd /software
# wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz
# tar zxf kafka_2.11-2.2.0.tgz && mv kafka_2.11-2.2.0 /usr/local/kafka
- 修改zookeeper配置:
# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/usr/local/kafka/zookeeper
clientPort=2181
maxClientCnxns=1024
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.1.253:2888:3888
说明:
tickTime : 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
2888 端口:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
3888 端口:表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader ,而这个端口就是用来执行选举时服务器相互通信的端口。
- 创建所需目录、文件:
在zookeeper目录下创建myid文件,里面的内容为数字,用于标识主机,如果这个文件没有的话,zookeeper是没法启动的。
# mkdir /usr/local/kafka/zookeeper
# echo 1 > /usr/local/kafka/zookeeper/myid #如果是集群,每台机器id不同即可
- 修改kafka配置:
# vim /usr/local/kafka/config/server.properties
broker.id=1 #ID唯一,填数字
port=9092
host.name=192.168.1.253 #本机ip
log.dirs=/data/kafka/kafka-logs #数据存放目录,不是日志目录
num.partitions=16 #每个topic的默认分区数
log.retention.hours=168 #过期时间,默认为1周
zookeeper.connect=192.168.1.253:2181 #zookeeper ip及端口
# mkdir -p /data/kafka
- 配置zookeeper服务:
# vim /usr/bin/zk_kafka
#!/bin/bash
#chkconfig: 2345 55 24
#description: zookeeper and kafka service manager
BASE_DIR=/usr/local/kafka
SERVICE=$1
START_ZK()
{
cd $BASE_DIR
nohup $BASE_DIR/bin/zookeeper-server-start.sh $BASE_DIR/config/zookeeper.properties &>/dev/null &
}
STOP_ZK()
{
cd $BASE_DIR
nohup $BASE_DIR/bin/zookeeper-server-stop.sh &>/dev/null &
}
START_KAFKA()
{
cd $BASE_DIR
nohup $BASE_DIR/bin/kafka-server-start.sh $BASE_DIR/config/server.properties &>/dev/null &
}
STOP_KAFKA()
{
cd $BASE_DIR
nohup $BASE_DIR/bin/kafka-server-stop.sh &>/dev/null &
}
if [ -z "$1" ];
then
echo $"Usage: $0 {zookeeper|kafka} {start|stop|restart}"
exit 0
else
if [ "$1" != "zookeeper" ] && [ "$1" != "kafka" ];
then
echo $"Usage: $0 {zookeeper|kafka} {start|stop|restart}"
exit 1
fi
fi
START()
{
if [ "$SERVICE" = "zookeeper" ];
then
START_ZK
if [ $? -eq 0 ];
then
echo -e "\033[32m Start $SERVICE OK. \033[0m"
fi
else
START_KAFKA
if [ $? -eq 0 ];
then
echo -e "\033[32m Start $SERVICE OK. \033[0m"
fi
fi
}
STOP()
{
if [ "$SERVICE" = "zookeeper" ];
then
STOP_ZK
if [ $? -eq 0 ];
then
echo -e "\033[32m Stop $SERVICE OK. \033[0m"
fi
else
STOP_KAFKA
if [ $? -eq 0 ];
then
echo -e "\033[32m Stop $SERVICE OK. \033[0m"
fi
fi
}
case "$2" in
start)
START
;;
stop)
STOP
;;
restart)
STOP
sleep 2
START
;;
*)
echo $"Usage: $0 {zookeeper|kafka} {start|stop|restart}"
exit 1
;;
esac
exit 0
- 启动zookeeper:
这里先要启动zookeeper,才能启动kafka。
# chmod +x /usr/bin/zk_kafka
# zk_kafka zookeeper start
# netstat -lntp | grep -E "2181|2888|3888" #检查端口,拥有2888端口为leader
tcp6 0 0 192.168.100.132:2888 :::* LISTEN 6787/java
tcp6 0 0 192.168.100.132:3888 :::* LISTEN 6787/java
tcp6 0 0 :::2181 :::* LISTEN 6787/java
- 启动kafka:
zookeeper集群已经启动起来了,下面启动kafka。
# zk_kafka kafka start
- 创建一个topic:
# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.253:2181 --replication-factor 1 --partitions 1 --topic test #注意:factor大小不能超过broker的个数
Created topic test.
- 查看已创建的topic:
# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.1.253:2181
test
查看topic test的详情
# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.1.253:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
上面,
#主题名称:test
#Partition:只有一个,从0开始
#leader :id为1的broker
#Replicas 副本存在于broker id为1,2,3的上面
#Isr:活跃状态的broker
- 发送消息,使用生产者角色:
# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.253:9092 --topic test
>This is a test message for kafka producer
>welcome
>let's go
- 接收消息,使用消费者角色:
另开一个会话
# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.253:9092 --topic test --from-beginning # --from-beginning表示从开始接收,否则只接收新产生的消息
This is a test message for kafka producer
welcome
let's go
可以看到,以上是Kafka生产者和消费者的测试,基于Kafka的Zookeeper集群没有问题。
安装Logstash+Filebeat
- 需要安装:
192.168.1.253 logstash/filebeat
- 安装logstash:
# cd /software
# tar zxf logstash-6.7.1.tar.gz && mv logstash-6.7.1/ /usr/local/logstash
# mkdir /usr/local/logstash/conf.d
# useradd elk
# vim /usr/local/logstash/config/logstash.yml
http.host: "192.168.1.253" #填本机ip
http.port: 9600
#没有部署x-pack则省略下面部分
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: logstash_system
xpack.monitoring.elasticsearch.password: elk-2019
xpack.monitoring.elasticsearch.hosts: ["http://192.168.100.128:9200","http://192.168.100.129:9200","http://192.168.100.130:9200"]
xpack.monitoring.collection.interval: 10s
- 配置logstash服务:
服务配置文件
# vim /etc/default/logstash
LS_HOME="/usr/local/logstash"
LS_SETTINGS_DIR="/usr/local/logstash"
LS_PIDFILE="/usr/local/logstash/run/logstash.pid"
LS_USER="elk"
LS_GROUP="elk"
LS_GC_LOG_FILE="/usr/local/logstash/logs/gc.log"
LS_OPEN_FILES="16384"
LS_NICE="19"
SERVICE_NAME="logstash"
SERVICE_DESCRIPTION="logstash"
服务文件
# vim /etc/systemd/system/logstash.service
[Unit]
Description=logstash
[Service]
Type=simple
User=elk
Group=elk
# Load env vars from /etc/default/ and /etc/sysconfig/ if they exist.
# Prefixing the path with '-' makes it try to load, but if the file doesn't
# exist, it continues onward.
EnvironmentFile=-/etc/default/logstash
EnvironmentFile=-/etc/sysconfig/logstash
ExecStart=/usr/local/logstash/bin/logstash "--path.settings" "/usr/local/logstash/config" "--path.config" "/usr/local/logstash/conf.d"
Restart=always
WorkingDirectory=/
Nice=19
LimitNOFILE=16384
[Install]
WantedBy=multi-user.target
管理服务:
# mkdir /usr/local/logstash/{run,logs} && touch /usr/local/logstash/run/logstash.pid
# touch /usr/local/logstash/logs/gc.log && chown -R elk:elk /usr/local/logstash
# yum install -y bash-completion && source /etc/profile #命令自动补全
# systemctl daemon-reload
# systemctl enable logstash
- 安装filebeat:
# tar zxf filebeat-6.7.1-linux-x86_64.tar.gz && mv filebeat-6.7.1-linux-x86_64 /usr/local/filebeat
- 配置filebeat服务:
服务文件
# vim /usr/lib/systemd/system/filebeat.service
[Unit]
Description=Filebeat sends log files to Logstash or directly to Elasticsearch.
Documentation=https://www.elastic.co/products/beats/filebeat
Wants=network-online.target
After=network-online.target
[Service]
ExecStart=/usr/local/filebeat/filebeat -c /usr/local/filebeat/filebeat.yml -path.home /usr/local/filebeat -path.config /usr/local/filebeat -path.data /usr/local/filebeat/data -path.logs /usr/local/filebeat/logs
Restart=always
[Install]
WantedBy=multi-user.target
管理服务:
# mkdir /usr/local/filebeat/{data,logs}
# systemctl daemon-reload
# systemctl enable filebeat
结合Kafka+Zabbix
这里,我们以收集日志/home/logs/ciphermachine/ciphermachine.log
和/home/logs/webservice/webservice.log
为例,测试ELFK与kafka结合。
- 配置filebeat:
filebeat配置多个topic
# vim /usr/local/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /home/logs/webservice/webservice.log
fields:
serverip: 192.168.1.253
logtopic: 253-webservice
- type: log
enabled: true
paths:
- /home/logs/ciphermachine/ciphermachine.log
fields:
serverip: 192.168.1.253
logtopic: 253-ciphermachine
output.kafka:
enabled: true
hosts: ["192.168.1.253:9092"]
topic: '%{[fields.logtopic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip #消息压缩模式
max_message_bytes: 1000000
- kafka创建topic:
# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.253:2181 --replication-factor 1 --partitions 1 --topic 253-ciphermachine
Created topic 253-ciphermachine.
# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.253:2181 --replication-factor 1 --partitions 1 --topic 253-webservice
Created topic 253-webservice.
- logstash配置:
kafka和logstash部署在相同机器上,因此这一步需要在kafka部署机器上配置logstash。
# vim /usr/local/logstash/conf.d/ciphermachine.conf
input {
kafka {
bootstrap_servers => "192.168.1.253:9092"
group_id => "ciphermachine"
client_id => "ciphermachine-1" #需要指定client_id,否则logstash启动报错
topics => "253-ciphermachine"
auto_offset_reset => "latest" #从最新的偏移量开始消费
codec => "json"
consumer_threads => 5 #消费的线程数
decorate_events => false #在输出消息的时候不让输出自身的信息,包括:消费消息的大小、topic来源以及consumer的group信息
type => "253_ciphermachine"
}
}
filter {
if [type] == "253_ciphermachine" {
ruby {
code => "event.set('log_time',event.get('@timestamp').time.localtime + 8*60*60)"
}
#grok{
# match => [ "log_time","(?<thisdate>^\d{4}-\d{1,2}-\d{1,2})" ]
#}
grok {
match => [ "message", "%{TIME:thistime} %{NOTSPACE:level}" ]
}
mutate {
add_field => [ "[zabbix_key]", "ciphermachine_error" ]
add_field => [ "[zabbix_host]", "192.168.1.253" ]
}
#ruby {
# code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )"
#}
date {
match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ]
target => "@timestamp"
}
ruby {
code => "event.set('logtime',event.get('@timestamp').time.localtime + 8*60*60)"
}
mutate {
remove_field => "@version"
remove_field => "host"
remove_field => "path"
remove_field => "_type"
remove_field => "_score"
remove_field => "_id"
remove_field => "thread-id"
remove_field => "log_time"
remove_field => "thisdate"
remove_field => "thistime"
remove_field => "score"
remove_field => "id"
remove_field => "name"
remove_field => "beat"
remove_field => "fields"
remove_field => "host"
remove_field => "input.type"
remove_field => "log.file.path"
remove_field => "offset"
remove_field => "prospector.type"
remove_field => "source"
}
}
}
output {
if [type] == "253_ciphermachine" {
elasticsearch {
hosts => ["192.168.1.253:9200"]
user => "elastic"
password => "changeme"
index => "webservice.log-%{+YYYY.MM.dd}"
}
if [level] =~ /(ERR|error|ERROR|Failed)/ {
zabbix {
zabbix_host => "[zabbix_host]"
zabbix_key => "[zabbix_key]"
zabbix_server_host => "192.168.1.252"
zabbix_server_port => "10051"
zabbix_value => "message"
}
}
}
}
# vim /usr/local/logstash/conf.d/webservice.conf
input {
kafka {
bootstrap_servers => "192.168.1.253:9092"
group_id => "webservice"
client_id => "webservice-1"
topics => "253-webservice"
auto_offset_reset => "latest"
codec => "json"
consumer_threads => 5
decorate_events => false
type => "253_webservice"
}
}
filter {
if [type] == "253_webservice" {
ruby {
code => "event.set('log_time',event.get('@timestamp').time.localtime + 8*60*60)"
}
#grok{
# match => [ "log_time","(?<thisdate>^\d{4}-\d{1,2}-\d{1,2})" ]
#}
grok {
match => [ "message", "%{TIME:thistime} %{NOTSPACE:thread-id}\[%{DATA:name}\] %{LOGLEVEL:level}" ]
}
mutate {
add_field => [ "[zabbix_key]", "webservice_error" ]
add_field => [ "[zabbix_host]", "192.168.1.253" ]
}
#ruby {
# code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )"
#}
date {
match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ]
target => "@timestamp"
}
ruby {
code => "event.set('logtime',event.get('@timestamp').time.localtime + 8*60*60)"
}
mutate {
remove_field => "@version"
remove_field => "host"
remove_field => "path"
remove_field => "_type"
remove_field => "_score"
remove_field => "_id"
remove_field => "thread-id"
remove_field => "log_time"
remove_field => "thisdate"
remove_field => "thistime"
remove_field => "score"
remove_field => "id"
remove_field => "name"
remove_field => "beat"
remove_field => "fields"
remove_field => "host"
remove_field => "input.type"
remove_field => "log.file.path"
remove_field => "offset"
remove_field => "prospector.type"
remove_field => "source"
}
}
}
output {
if [type] == "253_webservice" {
elasticsearch {
hosts => ["192.168.1.253:9200"]
user => "elastic"
password => "changeme"
index => "webservice.log-%{+YYYY.MM.dd}"
}
if [level] =~ /(ERR|error|ERROR|Failed)/ {
zabbix {
zabbix_host => "[zabbix_host]"
zabbix_key => "[zabbix_key]"
zabbix_server_host => "192.168.1.252"
zabbix_server_port => "10051"
zabbix_value => "message"
}
}
}
}
# chown -R elk:elk /usr/local/logstash
# /usr/local/logstash/bin/logstash-plugin install logstash-input-kafka
# /usr/local/logstash/bin/logstash-plugin install logstash-output-zabbix
# /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf.d/ciphermachine.conf -t #显示OK说明配置文件没问题
# /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf.d/webservice.conf -t
- 启动相关服务:
# systemctl start filebeat
# systemctl start logstash
可以查看logstash启动日志
# tail -f /usr/local/logstash/logs/logstash-plain.log
如果filebeat启动报错:
# systemctl status filebeat
Failed to start Filebeat sends log files to Logstash or directly to Elasticsearch..
# /usr/local/filebeat/filebeat -c /usr/local/filebeat/filebeat.yml -path.home /usr/local/filebeat -path.config /usr/local/filebeat -path.data /usr/local/filebeat/data -path.logs /usr/local/filebeat/logs
Exiting: error unpacking config data: more than one namespace configured accessing 'output' (source:'/usr/local/filebeat/filebeat.yml')
则修改filebeat配置文件/usr/local/filebeat/filebeat.yml
,只保留kafka的output部分,其余默认的output部分全部注释掉。
# /usr/local/filebeat/filebeat.yml
#output.elasticsearch:
# Array of hosts to connect to.
# hosts: ["localhost:9200"]
在启动filebeat之后重启logstash。
/home/logs/ciphermachine/ciphermachine.log
模拟新日志产生:
# echo "echo '10:47:52.225 TRSID[%PARSER_ERROR[wDPI]] - Unable to read additional data from server sessionid 0x164fdd3863600e6, likely server has closed socket, closing socket connection and attempting reconnect' >> /home/logs/ciphermachine/ciphermachine.log" >> /home/logs/ciphermachine/ciphermachine.log
# echo "该消息是ciphermachine通过Kafka队列到达ES集群!!!" >> /home/logs/ciphermachine/ciphermachine.log
分别到head、kibana、zabbix及邮箱查看:
/home/logs/webservice/webservice.log
模拟新日志产生:
# echo '10:58:26.612 TRSID[DubboShutdownHook] INFO - [DUBBO] Close all registries [], dubbo version: 2.6.2, current host: 172.17.0.1' >> /home/logs/webservice/webservice.log
# echo '10:59:26.612 TRSID[DubboShutdownHook] ERROR - [DUBBO] Close all registries [], dubbo version: 2.6.2, current host: 172.17.0.1' >> /home/logs/webservice/webservice.log
# echo "该消息是webservice通过Kafka队列到达ES集群!!!" >> /home/logs/webservice/webservice.log
分别到head、kibana、zabbix及邮箱查看:
至此,ELFK结合Kafka和Zabbix收集日志并实现错误告警完成。
通常在日志量不大的情况下,可以使用单机部署,但为了高可用负载均衡,建议使用ELFK集群结合Kafka集群。总的来说,结合Kafka后有利于日志压缩传输。
更多推荐
所有评论(0)