1 介绍

本文通过 fluentd 采集 k8s 指定 ns 的日志到kafka,然后按需过滤掉一些字段到子topic,最后通过logstash消费子topic的日志写入到es 集群。

该方案的优点包括:各个服务日志分离出来,能承接海量日志,可以按需调整单个索引的配置,方便后续配置日志告警。

2 采集方案

2.1 新建配置模板和rollover索引

创建索引模板:
PUT _template/sre-test-busybox-log
{
    "order":0,
    "index_patterns":[
        "sre-test-busybox-log-*"
    ],
    "settings":{
        "index":{
            "number_of_shards":"2",
            "number_of_replicas":"0"
        }
    },
    "mappings":{
        "properties":{
            "@timestamp":{
                "type":"date"
            },
            "kubernetes":{
                "properties":{
                    "container_name":{
                        "type":"keyword"
                    },
                    "host":{
                        "type":"keyword"
                    },
                    "namespace_name":{
                        "type":"keyword"
                    },
                    "pod_name":{
                        "type":"text",
                        "fields":{
                            "keyword":{
                                "type":"keyword",
                                "ignore_above":256
                            }
                        }
                    }
                }
            },
            "message":{
                "type":"text"
            },
            "stream":{
                "type":"keyword"
            },
            "tag":{
                "type":"text"
            },
            "time":{
                "type":"date"
            }
        }
    },
    "aliases":{
    }
}
创建rollover索引:
PUT /%3Csre-test-busybox-log-%7Bnow%2Fd%7D-000001%3E
{
    "aliases": {
        "sre-test-busybox-log_write": {}
    }
}

2.2 配置&部署fluentd

由于笔者使用自己搭建的es和kafka集群,因此只需要在kubernetes github cluster/addons/fluentd-elasticsearch 中下载 create-logging-namespace.yaml 、fluentd-es-configmap.yaml 和 fluentd-es-ds.yaml 三个文件即可。
按照需要修改 output.conf 中的match选项,使之能正确发送到kafka集群。修改完成后依次apply上述三个文件即可, 以下为笔者的配置,实际中按需修改kafka 的ip即可。

vim logging-namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
 name: logging
 labels:
   k8s-app: logging
   kubernetes.io/cluster-service: "true"
   addonmanager.kubernetes.io/mode: Reconcile
vim fluentd-es-configmap.yaml
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-es-config-v0.2.1
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>

  containers.input.conf: |-
    # Json Log Example:
    # {"log":"[info:2016-02-16T16:04:05.930-08:00] Some log text here\n","stream":"stdout","time":"2016-02-17T00:04:05.931087621Z"}
    # CRI Log Example:
    # 2016-02-17T00:04:05.931087621Z stdout F [info:2016-02-16T16:04:05.930-08:00] Some log text here
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      tag raw.kubernetes.**
      read_from_head true
      <parse>
        @type multi_format
        <pattern>
          format json
          time_key time
          keep_time_key true
          time_format %Y-%m-%dT%H:%M:%S.%NZ
        </pattern>
        <pattern>
          format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
          time_format %Y-%m-%dT%H:%M:%S.%N%:z
        </pattern>
      </parse>
    </source>

    # Detect exceptions in the log output and forward them as one log entry.
    # <match raw.kubernetes.var.log.containers.*_sre-test_**>
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>

    # Concatenate multi-line logs
    <filter **>
      @id filter_concat
      @type concat
      key message
      multiline_end_regexp /\n$/
      separator ""
    </filter>

    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @id filter_kubernetes_metadata
      @type kubernetes_metadata
      skip_container_metadata true
      skip_labels true
      skip_master_url true
    </filter>

    # Fixes json fields in Elasticsearch
    <filter kubernetes.**>
      @id filter_parser
      @type parser
      key_name log
      reserve_data true
      remove_key_name_field true
      <parse>
        @type multi_format
        <pattern>
          format json
        </pattern>
        <pattern>
          format none
        </pattern>
      </parse>
    </filter>
    
    <filter kubernetes.**>
      @type record_transformer
      enable_ruby true
      remove_keys $["kubernetes"]["namespace_id"],$["kubernetes"]["pod_id"],$["docker"]
      <record>
      tag ${tag}
      </record>
    </filter>

  monitoring.conf: |-
    # Prometheus Exporter Plugin
    # input plugin that exports metrics
    <source>
      @id prometheus
      @type prometheus
    </source>

    <source>
      @id monitor_agent
      @type monitor_agent
    </source>

    # input plugin that collects metrics from MonitorAgent
    <source>
      @id prometheus_monitor
      @type prometheus_monitor
      <labels>
        host ${hostname}
      </labels>
    </source>

    # input plugin that collects metrics for output plugin
    <source>
      @id prometheus_output_monitor
      @type prometheus_output_monitor
      <labels>
        host ${hostname}
      </labels>
    </source>

    # input plugin that collects metrics for in_tail plugin
    <source>
      @id prometheus_tail_monitor
      @type prometheus_tail_monitor
      <labels>
        host ${hostname}
      </labels>
    </source>
 
  output.conf: |-
    # <match **>
    <match kubernetes.var.**_sre-test_** kubernetes.var.**_logging_** kubernetes.var.**_kube-admin_**>
      @id kafka2_k8s
      @type kafka2
      @log_level warn
      # list of seed brokers
      # brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>
      brokers 10.192.148.109:9092
      use_event_time true

      # buffer settings
      <buffer topic>
        @type file
        path /var/log/td-agent/buffer/td
        flush_interval 3s
      </buffer>
      
      # data type settings
      <format>
        @type json
      </format>

      # topic settings
      topic_key k8s-test
      default_topic k8s-test
      get_kafka_client_log true

      # producer settings
      required_acks -1
      compression_codec gzip
    </match>
vim fluentd-es-ds.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
  - ""
  resources:
  - "namespaces"
  - "pods"
  verbs:
  - "get"
  - "watch"
  - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
  name: fluentd-es
  namespace: logging
  apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-es-v3.1.1
  namespace: logging
  labels:
    k8s-app: fluentd-es
    version: v3.1.1
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-es
      version: v3.1.1
  template:
    metadata:
      labels:
        k8s-app: fluentd-es
        version: v3.1.1
    spec:
      securityContext:
        seccompProfile:
          type: RuntimeDefault
      priorityClassName: system-node-critical
      serviceAccountName: fluentd-es
      containers:
      - name: fluentd-es
        image: sre_fluentd:v1.0
        env:
        - name: FLUENTD_ARGS
          value: --no-supervisor -q
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: config-volume
          mountPath: /etc/fluent/config.d
        ports:
        - containerPort: 24231
          name: prometheus
          protocol: TCP
        livenessProbe:
          tcpSocket:
            port: prometheus
          initialDelaySeconds: 5
          timeoutSeconds: 10
        readinessProbe:
          tcpSocket:
            port: prometheus
          initialDelaySeconds: 5
          timeoutSeconds: 10
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: config-volume
        configMap:
          name: fluentd-es-config-v0.2.1

2.3 新增 kafka 任务,按要求过滤日志到子topic

以下为一个简单的案例, 过滤了 sre-test 命名空间busybox-log deployment的日志;
实际中,应该通过一个模块过滤所有的任务,避免每个日志接入都起一个consumer(会产生较大的流量消耗);因此,需要预留一个接口,通过该接动态口更新不同模块的过滤参数。

#!/usr/bin/python
# -*- coding=utf-8 -*-

from kafka import KafkaConsumer
from kafka import KafkaProducer


# https://kafka-python.readthedocs.io/en/latest/usage.html

URL_LIST = ['localhost:9092']

def kafka_consumer(topic_name, ns_name, deploy_name):
    print(f"new topic: {topic_name}-ns-{ns_name}-deploy-{deploy_name}")
    # To consume latest messages and auto-commit offsets
    consumer = KafkaConsumer(topic_name, group_id='ns_'+ns_name+'_deploy_'+deploy_name, bootstrap_servers=URL_LIST)
    # print(dir(consumer))
    # time.sleep(10)
    # consumer.subscribe(pattern='.*_sre-test_.*')
    for message in consumer:
        # message value and key are raw bytes -- decode if necessary!
        # e.g., for unicode: `message.value.decode('utf-8')`
        # print(f"{message.topic}:{message.partition}:{message.offset}: key={message.key} value={message.value}")
        value_str = message.value.decode('utf-8')
        if value_str.find(ns_name)!=-1 and value_str.find(deploy_name)!=-1 :
            print(f'ns: {ns_name}, deploy: {deploy_name}, \n{value_str}')
            kafka_producer(topic_name, ns_name, deploy_name, message.value)
    consumer.close()


def kafka_producer(topic_name, ns_name, deploy_name, raw_message):
    producer = KafkaProducer(bootstrap_servers=URL_LIST, retries=5)
    # Asynchronous by default
    future = producer.send(topic_name+'-'+'ns-'+ns_name+'-deploy-'+deploy_name, raw_message)


if __name__ == '__main__':
    kafka_consumer('k8s-test', 'sre-test', 'busybox-log')

2.4 配置logstash消费日志,并写入到对应索引中

配置如下logstash参数,并启动实例;
logstash 使用方法可以参考笔者博文 elk笔记5–logstash使用

input {
  kafka {
    bootstrap_servers => "127.0.0.1:9092"
        group_id => "logstash_k8s-test-ns-sre-test-deploy-busybox-log" #此处需要更新group_id
        topics => "k8s-test-ns-sre-test-deploy-busybox-log"
        #auto_offset_reset => earliest
        #max_partition_fetch_bytes => "30000000"
  }
}
filter{
        json { 
                 source => "message"
         }
}
output{
    #stdout{}
    elasticsearch {
       hosts => ["127.0.0.1:9200"]
       index => "sre-test-busybox-log_write"
       user => "elastic"
       password => "elastic"
       #document_type => logs
    }
}

2.5 新建index pattern,查看日志

新建index pattern,时间选择@timestamp 字段,sre-test-busybox-log-* 
查看日志:
在这里插入图片描述

3 注意事项

  1. fluentd解析报错
    启动的kafka主机hostname即为xg,而kafka在返回给consumer信息的时候会使用java.net.InetAddress.getCanonicalHostName() 获取broker的信息,并返回给consumer或者producer,导致对方无法通过 xg:9092访问kafka;
    解决方法:
    启动kafka前先配置server.properties de advertisd.listeners 为 advertised.listeners=PLAINTEXT://10.192.148.109:9092,配置后fluentd就不会持续报错。
    2021-07-12 06:16:11 +0000 [info]: [kafka2_k8s] Fetching cluster metadata from kafka://10.192.148.109:9092
    2021-07-12 06:16:11.083010991 +0000 fluent.info: {"message":"[kafka2_k8s] Fetching cluster metadata from kafka://10.192.148.109:9092"}
    2021-07-12 06:16:11.088556690 +0000 fluent.info: {"message":"[kafka2_k8s] Discovered cluster metadata; nodes: xg:9092 (node_id=0)"}
    2021-07-12 06:16:11 +0000 [info]: [kafka2_k8s] Discovered cluster metadata; nodes: xg:9092 (node_id=0)
    2021-07-12 06:16:11.090692501 +0000 fluent.info: {"message":"[kafka2_k8s] Sending 3137 messages to xg:9092 (node_id=0)"}
    2021-07-12 06:16:11 +0000 [info]: [kafka2_k8s] Sending 3137 messages to xg:9092 (node_id=0)
    2021-07-12 06:16:11.094056285 +0000 fluent.error: {"message":"[kafka2_k8s] [produce] Failed to connect to xg:9092: getaddrinfo: Name or service not known"}
    2021-07-12 06:16:11 +0000 [error]: [kafka2_k8s] [produce] Failed to connect to xg:9092: getaddrinfo: Name or service not known
    2021-07-12 06:16:11.094828878 +0000 fluent.error: {"message":"[kafka2_k8s] Could not connect to broker xg:9092 (node_id=0): getaddrinfo: Name or service not known"}
    2021-07-12 06:16:11 +0000 [error]: [kafka2_k8s] Could not connect to broker xg:9092 (node_id=0): getaddrinfo: Name or service not known
    2021-07-12 06:16:11.095125101 +0000 fluent.error: {"message":"[kafka2_k8s] Failed to send all messages; keeping remaining messages in buffer"}
    2021-07-12 06:16:11 +0000 [error]: [kafka2_k8s] Failed to send all messages; keeping remaining messages in buffer
    
  2. 调整采集日志level
    deployment/logging#log-level 在相关插件中选择log_level字段可以避免采集过多不符合需要的日志;
    例如在 output.conf 中新增 @log_level warn 即可屏蔽掉 trace、debug 和 info 日志。
  3. 1.12版本的k8s 报错 SSL_connect returned=1 errno=0 state=error: certificate verify failed (unable to get issuer certificate)
    笔者在阿里云1.12 上部署fluentd后,发现DaemonSet 无法正常启动,且报如下错误;
    错误原因为 kubernetes_metadata 插件会访问api-server,并从api-server拿数据,然而默认情况下需要ssh验证,但是老集群默认验证不通过,因此pod都无法启动。
    2021-07-19 05:41:46 +0000 [error]: config error file="/etc/fluent/fluent.conf" error_class=Fluent::ConfigError error="Invalid Kubernetes API v1 endpoint https://192.168.0.192:6443/api: SSL_connect returned=1 errno=0 state=error: certificate verify failed (unable to get issuer certificate)"
    
    解决方法:
    方法1:在 kubernetes_metadata 中添加 client_cert、 client_cert、ca_flie信息,该方法需要在每个节点上都同步cert证书信息;
    方法2:在 kubernetes_metadata 中添加 verify_ssl false,以下为笔者配置
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @id filter_kubernetes_metadata
      @type kubernetes_metadata
      verify_ssl false
      skip_container_metadata true
      skip_labels true
      skip_master_url true
    </filter>
    
  4. fluentd向kafka写数据报错 Connection error EOFError: EOFError
    查看文档发现是 Kafka 版本和fluentd、ruby不匹配,重新选择版本,然后打包
    笔者报错时候使用 0.10 版本的kafka, 最新的 fluentd
    报错:
    2021-07-22 11:28:08 +0000 [error]: [kafka2_k8s] Could not connect to broker 172.19.173.105:9092 (node_id=101): Connection error EOFError: EOFError
    2021-07-22 11:28:08.551748627 +0000 fluent.error: {"message":"[kafka2_k8s] Could not connect to broker 172.19.173.105:9092 (node_id=101): Connection error EOFError: EOFError"}
    
    解决方法:
    FROM quay.io/fluentd_elasticsearch/fluentd:v3.1.0
    RUN gem uninstall ruby-kafka --version 1.3.0
    RUN gem uninstall fluent-plugin-kafka --version 0.16.3
    RUN gem install ruby-kafka  --version 0.5.3
    RUN gem install fluent-plugin-kafka  --version 0.7.6
    
    https://github.com/fluent/fluent-plugin-kafka/issues/295
    采集数据到kafka时的版本问题
    zendesk/ruby-kafka#compatibility

4 说明

使用fluentd收集kubernetes集群日志
kafka-python ‒ kafka-python 2.0.2-dev documentation
https://kafka.apache.org/documentation
fluentd报错 – certificate verify failed
fluent-plugin-kubernetes_metadata_filter

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐