k8s笔记11--基于fluentd-kafka-es的日志收集方案
k8s笔记11--基于fluentd-kafka-es的日志收集方案1 介绍2 采集方案2.1 新建配置模板和rollover索引2.2 配置&部署fluentd2.3 新增 kafka 任务,按要求过滤日志到子topic2.4 配置logstash消费日志,并写入到对应索引中2.5 新建index pattern,查看日志3 注意事项4 说明1 介绍本文通过 fluentd 采集 k8s
k8s笔记11--基于fluentd-kafka-es的日志收集方案
1 介绍
本文通过 fluentd 采集 k8s 指定 ns 的日志到kafka,然后按需过滤掉一些字段到子topic,最后通过logstash消费子topic的日志写入到es 集群。
该方案的优点包括:各个服务日志分离出来,能承接海量日志,可以按需调整单个索引的配置,方便后续配置日志告警。
2 采集方案
2.1 新建配置模板和rollover索引
创建索引模板:
PUT _template/sre-test-busybox-log
{
"order":0,
"index_patterns":[
"sre-test-busybox-log-*"
],
"settings":{
"index":{
"number_of_shards":"2",
"number_of_replicas":"0"
}
},
"mappings":{
"properties":{
"@timestamp":{
"type":"date"
},
"kubernetes":{
"properties":{
"container_name":{
"type":"keyword"
},
"host":{
"type":"keyword"
},
"namespace_name":{
"type":"keyword"
},
"pod_name":{
"type":"text",
"fields":{
"keyword":{
"type":"keyword",
"ignore_above":256
}
}
}
}
},
"message":{
"type":"text"
},
"stream":{
"type":"keyword"
},
"tag":{
"type":"text"
},
"time":{
"type":"date"
}
}
},
"aliases":{
}
}
创建rollover索引:
PUT /%3Csre-test-busybox-log-%7Bnow%2Fd%7D-000001%3E
{
"aliases": {
"sre-test-busybox-log_write": {}
}
}
2.2 配置&部署fluentd
由于笔者使用自己搭建的es和kafka集群,因此只需要在kubernetes github cluster/addons/fluentd-elasticsearch 中下载 create-logging-namespace.yaml 、fluentd-es-configmap.yaml 和 fluentd-es-ds.yaml 三个文件即可。
按照需要修改 output.conf 中的match选项,使之能正确发送到kafka集群。修改完成后依次apply上述三个文件即可, 以下为笔者的配置,实际中按需修改kafka 的ip即可。
vim logging-namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: logging
labels:
k8s-app: logging
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
vim fluentd-es-configmap.yaml
kind: ConfigMap
apiVersion: v1
metadata:
name: fluentd-es-config-v0.2.1
namespace: logging
labels:
addonmanager.kubernetes.io/mode: Reconcile
data:
system.conf: |-
<system>
root_dir /tmp/fluentd-buffers/
</system>
containers.input.conf: |-
# Json Log Example:
# {"log":"[info:2016-02-16T16:04:05.930-08:00] Some log text here\n","stream":"stdout","time":"2016-02-17T00:04:05.931087621Z"}
# CRI Log Example:
# 2016-02-17T00:04:05.931087621Z stdout F [info:2016-02-16T16:04:05.930-08:00] Some log text here
<source>
@id fluentd-containers.log
@type tail
path /var/log/containers/*.log
pos_file /var/log/es-containers.log.pos
tag raw.kubernetes.**
read_from_head true
<parse>
@type multi_format
<pattern>
format json
time_key time
keep_time_key true
time_format %Y-%m-%dT%H:%M:%S.%NZ
</pattern>
<pattern>
format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
time_format %Y-%m-%dT%H:%M:%S.%N%:z
</pattern>
</parse>
</source>
# Detect exceptions in the log output and forward them as one log entry.
# <match raw.kubernetes.var.log.containers.*_sre-test_**>
<match raw.kubernetes.**>
@id raw.kubernetes
@type detect_exceptions
remove_tag_prefix raw
message log
stream stream
multiline_flush_interval 5
max_bytes 500000
max_lines 1000
</match>
# Concatenate multi-line logs
<filter **>
@id filter_concat
@type concat
key message
multiline_end_regexp /\n$/
separator ""
</filter>
# Enriches records with Kubernetes metadata
<filter kubernetes.**>
@id filter_kubernetes_metadata
@type kubernetes_metadata
skip_container_metadata true
skip_labels true
skip_master_url true
</filter>
# Fixes json fields in Elasticsearch
<filter kubernetes.**>
@id filter_parser
@type parser
key_name log
reserve_data true
remove_key_name_field true
<parse>
@type multi_format
<pattern>
format json
</pattern>
<pattern>
format none
</pattern>
</parse>
</filter>
<filter kubernetes.**>
@type record_transformer
enable_ruby true
remove_keys $["kubernetes"]["namespace_id"],$["kubernetes"]["pod_id"],$["docker"]
<record>
tag ${tag}
</record>
</filter>
monitoring.conf: |-
# Prometheus Exporter Plugin
# input plugin that exports metrics
<source>
@id prometheus
@type prometheus
</source>
<source>
@id monitor_agent
@type monitor_agent
</source>
# input plugin that collects metrics from MonitorAgent
<source>
@id prometheus_monitor
@type prometheus_monitor
<labels>
host ${hostname}
</labels>
</source>
# input plugin that collects metrics for output plugin
<source>
@id prometheus_output_monitor
@type prometheus_output_monitor
<labels>
host ${hostname}
</labels>
</source>
# input plugin that collects metrics for in_tail plugin
<source>
@id prometheus_tail_monitor
@type prometheus_tail_monitor
<labels>
host ${hostname}
</labels>
</source>
output.conf: |-
# <match **>
<match kubernetes.var.**_sre-test_** kubernetes.var.**_logging_** kubernetes.var.**_kube-admin_**>
@id kafka2_k8s
@type kafka2
@log_level warn
# list of seed brokers
# brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>
brokers 10.192.148.109:9092
use_event_time true
# buffer settings
<buffer topic>
@type file
path /var/log/td-agent/buffer/td
flush_interval 3s
</buffer>
# data type settings
<format>
@type json
</format>
# topic settings
topic_key k8s-test
default_topic k8s-test
get_kafka_client_log true
# producer settings
required_acks -1
compression_codec gzip
</match>
vim fluentd-es-ds.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluentd-es
namespace: logging
labels:
k8s-app: fluentd-es
addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: fluentd-es
labels:
k8s-app: fluentd-es
addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
- ""
resources:
- "namespaces"
- "pods"
verbs:
- "get"
- "watch"
- "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: fluentd-es
labels:
k8s-app: fluentd-es
addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
name: fluentd-es
namespace: logging
apiGroup: ""
roleRef:
kind: ClusterRole
name: fluentd-es
apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd-es-v3.1.1
namespace: logging
labels:
k8s-app: fluentd-es
version: v3.1.1
addonmanager.kubernetes.io/mode: Reconcile
spec:
selector:
matchLabels:
k8s-app: fluentd-es
version: v3.1.1
template:
metadata:
labels:
k8s-app: fluentd-es
version: v3.1.1
spec:
securityContext:
seccompProfile:
type: RuntimeDefault
priorityClassName: system-node-critical
serviceAccountName: fluentd-es
containers:
- name: fluentd-es
image: sre_fluentd:v1.0
env:
- name: FLUENTD_ARGS
value: --no-supervisor -q
resources:
limits:
memory: 500Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: config-volume
mountPath: /etc/fluent/config.d
ports:
- containerPort: 24231
name: prometheus
protocol: TCP
livenessProbe:
tcpSocket:
port: prometheus
initialDelaySeconds: 5
timeoutSeconds: 10
readinessProbe:
tcpSocket:
port: prometheus
initialDelaySeconds: 5
timeoutSeconds: 10
terminationGracePeriodSeconds: 30
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: config-volume
configMap:
name: fluentd-es-config-v0.2.1
2.3 新增 kafka 任务,按要求过滤日志到子topic
以下为一个简单的案例, 过滤了 sre-test 命名空间busybox-log deployment的日志;
实际中,应该通过一个模块过滤所有的任务,避免每个日志接入都起一个consumer(会产生较大的流量消耗);因此,需要预留一个接口,通过该接动态口更新不同模块的过滤参数。
#!/usr/bin/python
# -*- coding=utf-8 -*-
from kafka import KafkaConsumer
from kafka import KafkaProducer
# https://kafka-python.readthedocs.io/en/latest/usage.html
URL_LIST = ['localhost:9092']
def kafka_consumer(topic_name, ns_name, deploy_name):
print(f"new topic: {topic_name}-ns-{ns_name}-deploy-{deploy_name}")
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(topic_name, group_id='ns_'+ns_name+'_deploy_'+deploy_name, bootstrap_servers=URL_LIST)
# print(dir(consumer))
# time.sleep(10)
# consumer.subscribe(pattern='.*_sre-test_.*')
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
# print(f"{message.topic}:{message.partition}:{message.offset}: key={message.key} value={message.value}")
value_str = message.value.decode('utf-8')
if value_str.find(ns_name)!=-1 and value_str.find(deploy_name)!=-1 :
print(f'ns: {ns_name}, deploy: {deploy_name}, \n{value_str}')
kafka_producer(topic_name, ns_name, deploy_name, message.value)
consumer.close()
def kafka_producer(topic_name, ns_name, deploy_name, raw_message):
producer = KafkaProducer(bootstrap_servers=URL_LIST, retries=5)
# Asynchronous by default
future = producer.send(topic_name+'-'+'ns-'+ns_name+'-deploy-'+deploy_name, raw_message)
if __name__ == '__main__':
kafka_consumer('k8s-test', 'sre-test', 'busybox-log')
2.4 配置logstash消费日志,并写入到对应索引中
配置如下logstash参数,并启动实例;
logstash 使用方法可以参考笔者博文 elk笔记5–logstash使用
input {
kafka {
bootstrap_servers => "127.0.0.1:9092"
group_id => "logstash_k8s-test-ns-sre-test-deploy-busybox-log" #此处需要更新group_id
topics => "k8s-test-ns-sre-test-deploy-busybox-log"
#auto_offset_reset => earliest
#max_partition_fetch_bytes => "30000000"
}
}
filter{
json {
source => "message"
}
}
output{
#stdout{}
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "sre-test-busybox-log_write"
user => "elastic"
password => "elastic"
#document_type => logs
}
}
2.5 新建index pattern,查看日志
新建index pattern,时间选择@timestamp 字段,sre-test-busybox-log-*
查看日志:
3 注意事项
- fluentd解析报错
启动的kafka主机hostname即为xg,而kafka在返回给consumer信息的时候会使用java.net.InetAddress.getCanonicalHostName() 获取broker的信息,并返回给consumer或者producer,导致对方无法通过 xg:9092访问kafka;
解决方法:
启动kafka前先配置server.properties de advertisd.listeners 为 advertised.listeners=PLAINTEXT://10.192.148.109:9092,配置后fluentd就不会持续报错。2021-07-12 06:16:11 +0000 [info]: [kafka2_k8s] Fetching cluster metadata from kafka://10.192.148.109:9092 2021-07-12 06:16:11.083010991 +0000 fluent.info: {"message":"[kafka2_k8s] Fetching cluster metadata from kafka://10.192.148.109:9092"} 2021-07-12 06:16:11.088556690 +0000 fluent.info: {"message":"[kafka2_k8s] Discovered cluster metadata; nodes: xg:9092 (node_id=0)"} 2021-07-12 06:16:11 +0000 [info]: [kafka2_k8s] Discovered cluster metadata; nodes: xg:9092 (node_id=0) 2021-07-12 06:16:11.090692501 +0000 fluent.info: {"message":"[kafka2_k8s] Sending 3137 messages to xg:9092 (node_id=0)"} 2021-07-12 06:16:11 +0000 [info]: [kafka2_k8s] Sending 3137 messages to xg:9092 (node_id=0) 2021-07-12 06:16:11.094056285 +0000 fluent.error: {"message":"[kafka2_k8s] [produce] Failed to connect to xg:9092: getaddrinfo: Name or service not known"} 2021-07-12 06:16:11 +0000 [error]: [kafka2_k8s] [produce] Failed to connect to xg:9092: getaddrinfo: Name or service not known 2021-07-12 06:16:11.094828878 +0000 fluent.error: {"message":"[kafka2_k8s] Could not connect to broker xg:9092 (node_id=0): getaddrinfo: Name or service not known"} 2021-07-12 06:16:11 +0000 [error]: [kafka2_k8s] Could not connect to broker xg:9092 (node_id=0): getaddrinfo: Name or service not known 2021-07-12 06:16:11.095125101 +0000 fluent.error: {"message":"[kafka2_k8s] Failed to send all messages; keeping remaining messages in buffer"} 2021-07-12 06:16:11 +0000 [error]: [kafka2_k8s] Failed to send all messages; keeping remaining messages in buffer
- 调整采集日志level
deployment/logging#log-level 在相关插件中选择log_level字段可以避免采集过多不符合需要的日志;
例如在 output.conf 中新增 @log_level warn 即可屏蔽掉 trace、debug 和 info 日志。 - 1.12版本的k8s 报错 SSL_connect returned=1 errno=0 state=error: certificate verify failed (unable to get issuer certificate)
笔者在阿里云1.12 上部署fluentd后,发现DaemonSet 无法正常启动,且报如下错误;
错误原因为 kubernetes_metadata 插件会访问api-server,并从api-server拿数据,然而默认情况下需要ssh验证,但是老集群默认验证不通过,因此pod都无法启动。
解决方法:2021-07-19 05:41:46 +0000 [error]: config error file="/etc/fluent/fluent.conf" error_class=Fluent::ConfigError error="Invalid Kubernetes API v1 endpoint https://192.168.0.192:6443/api: SSL_connect returned=1 errno=0 state=error: certificate verify failed (unable to get issuer certificate)"
方法1:在 kubernetes_metadata 中添加 client_cert、 client_cert、ca_flie信息,该方法需要在每个节点上都同步cert证书信息;
方法2:在 kubernetes_metadata 中添加 verify_ssl false,以下为笔者配置# Enriches records with Kubernetes metadata <filter kubernetes.**> @id filter_kubernetes_metadata @type kubernetes_metadata verify_ssl false skip_container_metadata true skip_labels true skip_master_url true </filter>
- fluentd向kafka写数据报错 Connection error EOFError: EOFError
查看文档发现是 Kafka 版本和fluentd、ruby不匹配,重新选择版本,然后打包
笔者报错时候使用 0.10 版本的kafka, 最新的 fluentd
https://github.com/fluent/fluent-plugin-kafka/issues/295报错: 2021-07-22 11:28:08 +0000 [error]: [kafka2_k8s] Could not connect to broker 172.19.173.105:9092 (node_id=101): Connection error EOFError: EOFError 2021-07-22 11:28:08.551748627 +0000 fluent.error: {"message":"[kafka2_k8s] Could not connect to broker 172.19.173.105:9092 (node_id=101): Connection error EOFError: EOFError"} 解决方法: FROM quay.io/fluentd_elasticsearch/fluentd:v3.1.0 RUN gem uninstall ruby-kafka --version 1.3.0 RUN gem uninstall fluent-plugin-kafka --version 0.16.3 RUN gem install ruby-kafka --version 0.5.3 RUN gem install fluent-plugin-kafka --version 0.7.6
采集数据到kafka时的版本问题
zendesk/ruby-kafka#compatibility
4 说明
使用fluentd收集kubernetes集群日志
kafka-python ‒ kafka-python 2.0.2-dev documentation
https://kafka.apache.org/documentation
fluentd报错 – certificate verify failed
fluent-plugin-kubernetes_metadata_filter
更多推荐
所有评论(0)