fluentd收集K8S日志并以K8S的container_name作为索引名存入Elasticsearch中

参考地址
https://faun.pub/how-to-create-custom-indices-based-on-kubernetes-metadata-using-fluentd-beed062faa5d

ElasticsearchDocker镜像

elasticsearch:7.17.4

fluentd DaemonSet

apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: kube-efk
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
rules:
  - apiGroups:
      - ""
    resources:
      - "namespaces"
      - "pods"
    verbs:
      - "get"
      - "watch"
      - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
  - kind: ServiceAccount
    name: fluentd-es
    namespace: kube-efk
    apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-es-v1
  namespace: kube-efk
  labels:
    k8s-app: fluentd-es
    version: v1
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-es
      version: v1
  template:
    metadata:
      labels:
        k8s-app: fluentd-es
        kubernetes.io/cluster-service: "true"
        version: v1
      # This annotation ensures that fluentd does not get evicted if the node
      # supports critical pod annotation based priority scheme.
      # Note that this does not guarantee admission on the nodes (#40573).
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ''
        seccomp.security.alpha.kubernetes.io/pod: 'docker/default'
    spec:
      priorityClassName: system-node-critical
      serviceAccountName: fluentd-es
      containers:
        - name: fluentd-es
#          image: registry.cn-shenzhen.aliyuncs.com/jonluo/k8s.gcr.io_fluentd-elasticsearch:v2.2.0
          image: fluent/fluentd-kubernetes-daemonset:v1-debian-elasticsearch7
#          env:
#            - name: FLUENT_ELASTICSEARCH_HOST
#              value: "elasticsearch-logging"
#            - name: FLUENT_ELASTICSEARCH_PORT
#              value: "9200"
#            - name: FLUENT_ELASTICSEARCH_SCHEME
#              value: "http"
#            - name: FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX
#              value: fluent-k8s
#            - name: FLUENT_ELASTICSEARCH_LOGSTASH_FORMAT
#              value: "true"
          resources:
            limits:
              memory: 500Mi
            requests:
              cpu: 100m
              memory: 200Mi
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              mountPath: /var/lib/docker/containers
              readOnly: true
            - name: config-volume
              #mountPath: /etc/fluent/config.d
              mountPath: /fluentd/etc
            - name: config-volume-kubernetes
              #mountPath: /etc/fluent/config.d
              mountPath: /fluentd/etc/kubernetes
          #nodeSelector:
          #beta.kubernetes.io/fluentd-ds-ready: "true"
      terminationGracePeriodSeconds: 30
      volumes:
        - name: varlog
          hostPath:
            path: /var/log
        - name: varlibdockercontainers
          hostPath:
            path: /var/lib/docker/containers
        - name: config-volume
          configMap:
            name: fluentd-es-fluent-config
        - name: config-volume-kubernetes
          configMap:
            name: fluentd-es-kubernetes

fluentd ConfigMap

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-es-fluent-config
  namespace: kube-efk
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
#  原始的fluent.conf配置
#  fluent.conf: |-
#    # AUTOMATICALLY GENERATED
#    # DO NOT EDIT THIS FILE DIRECTLY, USE /templates/conf/fluent.conf.erb
#
#    @include "#{ENV['FLUENTD_SYSTEMD_CONF'] || 'systemd'}.conf"
#    @include "#{ENV['FLUENTD_PROMETHEUS_CONF'] || 'prometheus'}.conf"
#    @include kubernetes.conf
#    @include conf.d/*.conf
#
#    <match **>
#      # @type elasticsearch
#      @type elasticsearch_dynamic
#      @id out_es
#      @log_level info
#      include_tag_key true
#      host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
#      port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
#      path "#{ENV['FLUENT_ELASTICSEARCH_PATH']}"
#      scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
#      ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"
#      ssl_version "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERSION'] || 'TLSv1_2'}"
#      user "#{ENV['FLUENT_ELASTICSEARCH_USER'] || use_default}"
#      password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD'] || use_default}"
#      reload_connections "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_CONNECTIONS'] || 'false'}"
#      reconnect_on_error "#{ENV['FLUENT_ELASTICSEARCH_RECONNECT_ON_ERROR'] || 'true'}"
#      reload_on_failure "#{ENV['FLUENT_ELASTICSEARCH_RELOAD_ON_FAILURE'] || 'true'}"
#      log_es_400_reason "#{ENV['FLUENT_ELASTICSEARCH_LOG_ES_400_REASON'] || 'false'}"
#      logstash_prefix "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX'] || 'logstash'}"
#      logstash_dateformat "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_DATEFORMAT'] || '%Y.%m.%d'}"
#      logstash_format "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_FORMAT'] || 'true'}"
#      index_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_INDEX_NAME'] || 'logstash'}"
#      target_index_key "#{ENV['FLUENT_ELASTICSEARCH_TARGET_INDEX_KEY'] || use_nil}"
#      type_name "#{ENV['FLUENT_ELASTICSEARCH_LOGSTASH_TYPE_NAME'] || 'fluentd'}"
#      include_timestamp "#{ENV['FLUENT_ELASTICSEARCH_INCLUDE_TIMESTAMP'] || 'false'}"
#      template_name "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_NAME'] || use_nil}"
#      template_file "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_FILE'] || use_nil}"
#      template_overwrite "#{ENV['FLUENT_ELASTICSEARCH_TEMPLATE_OVERWRITE'] || use_default}"
#      sniffer_class_name "#{ENV['FLUENT_SNIFFER_CLASS_NAME'] || 'Fluent::Plugin::ElasticsearchSimpleSniffer'}"
#      request_timeout "#{ENV['FLUENT_ELASTICSEARCH_REQUEST_TIMEOUT'] || '5s'}"
#      application_name "#{ENV['FLUENT_ELASTICSEARCH_APPLICATION_NAME'] || use_default}"
#      suppress_type_name "#{ENV['FLUENT_ELASTICSEARCH_SUPPRESS_TYPE_NAME'] || 'true'}"
#      enable_ilm "#{ENV['FLUENT_ELASTICSEARCH_ENABLE_ILM'] || 'false'}"
#      ilm_policy_id "#{ENV['FLUENT_ELASTICSEARCH_ILM_POLICY_ID'] || use_default}"
#      ilm_policy "#{ENV['FLUENT_ELASTICSEARCH_ILM_POLICY'] || use_default}"
#      ilm_policy_overwrite "#{ENV['FLUENT_ELASTICSEARCH_ILM_POLICY_OVERWRITE'] || 'false'}"
#      <buffer>
#        flush_thread_count "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_THREAD_COUNT'] || '8'}"
#        flush_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_FLUSH_INTERVAL'] || '5s'}"
#        chunk_limit_size "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_CHUNK_LIMIT_SIZE'] || '2M'}"
#        queue_limit_length "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_QUEUE_LIMIT_LENGTH'] || '32'}"
#        retry_max_interval "#{ENV['FLUENT_ELASTICSEARCH_BUFFER_RETRY_MAX_INTERVAL'] || '30'}"
#        retry_forever true
#      </buffer>
#    </match>
  fluent.conf: |
    <match fluent.**>
        # 这告诉 fluentd 不要在标准输出上输出它的日志
        @type null
    </match>
    # 这里我们从 Docker 的容器中读取日志并解析它们
    <source>
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/containers.log.pos
      tag kubernetes.*
      read_from_head true
      <parse>
        @type json
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>
    # we 我们使用 Kubernetes 元数据插件将元数据添加到日志中
    <filter kubernetes.**>
        @type kubernetes_metadata
    </filter>
    <match kubernetes.var.log.containers.**kube-logging**.log>
      @type null
    </match>
    <match kubernetes.var.log.containers.**kube-system**.log>
      @type null
    </match>
    <match kubernetes.var.log.containers.**monitoring**.log>
      @type null
    </match>
    <match kubernetes.var.log.containers.**infra**.log>
      @type null
    </match>
     # 我们将日志发送到 Elasticsearch
    <match kubernetes.**>
      @type elasticsearch_dynamic
      @log_level info
      include_tag_key true
      host "#{ENV['FLUENT_ELASTICSEARCH_HOST'] || 'elasticsearch-logging'}"
      port "#{ENV['FLUENT_ELASTICSEARCH_PORT'] || '9200'}"
      user "#{ENV['FLUENT_ELASTICSEARCH_USER']}"
      password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD']}"
      scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
      ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"
      reload_connections true
      logstash_format true
      logstash_prefix ${record['kubernetes']['container_name']}
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever true
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 32
        overflow_action block
      </buffer>
    </match>

  kubernetes.conf: |-
    # AUTOMATICALLY GENERATED
    # DO NOT EDIT THIS FILE DIRECTLY, USE /templates/conf/kubernetes.conf.erb

    <label @FLUENT_LOG>
      <match fluent.**>
        @type null
        @id ignore_fluent_logs
      </match>
    </label>

    @include kubernetes/cluster-autoscaler.conf
    @include kubernetes/containers.conf
    @include kubernetes/docker.conf
    @include kubernetes/etcd.conf
    @include kubernetes/glbc.conf
    @include kubernetes/kube-apiserver-audit.conf
    @include kubernetes/kube-apiserver.conf
    @include kubernetes/kube-controller-manager.conf
    @include kubernetes/kube-proxy.conf
    @include kubernetes/kube-scheduler.conf
    @include kubernetes/kubelet.conf
    @include kubernetes/rescheduler.conf
    @include kubernetes/salt.conf
    @include kubernetes/startupscript.conf
    <filter kubernetes.**>
      @type kubernetes_metadata
      @id filter_kube_metadata
      kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}"
      verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
      ca_file "#{ENV['KUBERNETES_CA_FILE']}"
      skip_labels "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_LABELS'] || 'false'}"
      skip_container_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_CONTAINER_METADATA'] || 'false'}"
      skip_master_url "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_MASTER_URL'] || 'false'}"
      skip_namespace_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_NAMESPACE_METADATA'] || 'false'}"
      watch "#{ENV['FLUENT_KUBERNETES_WATCH'] || 'true'}"
    </filter>
    

  prometheus.conf: |-
    # AUTOMATICALLY GENERATED
    # DO NOT EDIT THIS FILE DIRECTLY, USE /templates/conf/prometheus.conf.erb

    # Prometheus metric exposed on 0.0.0.0:24231/metrics
    <source>
      @type prometheus
      @id in_prometheus
      bind "#{ENV['FLUENTD_PROMETHEUS_BIND'] || '0.0.0.0'}"
      port "#{ENV['FLUENTD_PROMETHEUS_PORT'] || '24231'}"
      metrics_path "#{ENV['FLUENTD_PROMETHEUS_PATH'] || '/metrics'}"
    </source>

    <source>
      @type prometheus_output_monitor
      @id in_prometheus_output_monitor
    </source>
  

  systemd.conf: |-
    # AUTOMATICALLY GENERATED
    # DO NOT EDIT THIS FILE DIRECTLY, USE /templates/conf/systemd.conf.erb

    # Logs from systemd-journal for interesting services.
    <source>
      @type systemd
      @id in_systemd_kubelet
      matches [{ "_SYSTEMD_UNIT": "kubelet.service" }]
      <storage>
        @type local
        persistent true
        path /var/log/fluentd-journald-kubelet-cursor.json
      </storage>
      <entry>
        fields_strip_underscores true
      </entry>
      read_from_head true
      tag kubelet
    </source>

    # Logs from docker-systemd
    <source>
      @type systemd
      @id in_systemd_docker
      matches [{ "_SYSTEMD_UNIT": "docker.service" }]
      <storage>
        @type local
        persistent true
        path /var/log/fluentd-journald-docker-cursor.json
      </storage>
      <entry>
        fields_strip_underscores true
      </entry>
      read_from_head true
      tag docker.systemd
    </source>

    # Logs from systemd-journal for interesting services.
    <source>
      @type systemd
      @id in_systemd_bootkube
      matches [{ "_SYSTEMD_UNIT": "bootkube.service" }]
      <storage>
        @type local
        persistent true
        path /var/log/fluentd-journald-bootkube-cursor.json
      </storage>
      <entry>
        fields_strip_underscores true
      </entry>
      read_from_head true
      tag bootkube
    </source>
    

  tail_container_parse.conf: |-
    <parse>
      @type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}"
      time_format "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TIME_FORMAT'] || '%Y-%m-%dT%H:%M:%S.%NZ'}"
    </parse>
---
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-es-kubernetes
  namespace: kube-efk
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  cluster-autoscaler.conf: |-
    <source>
      @type tail
      @id in_tail_cluster_autoscaler
      multiline_flush_interval 5s
      path /var/log/cluster-autoscaler.log
      pos_file /var/log/fluentd-cluster-autoscaler.log.pos
      tag cluster-autoscaler
      <parse>
        @type kubernetes
      </parse>
    </source>

  docker.conf: |-
    <source>
      @type tail
      @id in_tail_docker
      path /var/log/docker.log
      pos_file /var/log/fluentd-docker.log.pos
      tag docker
      <parse>
        @type regexp
        expression /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
      </parse>
    </source>

  glbc.conf: |-
    <source>
      @type tail
      @id in_tail_glbc
      multiline_flush_interval 5s
      path /var/log/glbc.log
      pos_file /var/log/fluentd-glbc.log.pos
      tag glbc
      <parse>
        @type kubernetes
      </parse>
    </source>

  kube-apiserver.conf: |-
    <source>
      @type tail
      @id in_tail_kube_apiserver
      multiline_flush_interval 5s
      path /var/log/kube-apiserver.log
      pos_file /var/log/fluentd-kube-apiserver.log.pos
      tag kube-apiserver
      <parse>
        @type kubernetes
      </parse>
    </source>

  kube-proxy.conf: |-
    <source>
      @type tail
      @id in_tail_kube_proxy
      multiline_flush_interval 5s
      path /var/log/kube-proxy.log
      pos_file /var/log/fluentd-kube-proxy.log.pos
      tag kube-proxy
      <parse>
        @type kubernetes
      </parse>
    </source>

  kubelet.conf: |-
    <source>
      @type tail
      @id in_tail_kubelet
      multiline_flush_interval 5s
      path /var/log/kubelet.log
      pos_file /var/log/fluentd-kubelet.log.pos
      tag kubelet
      <parse>
        @type kubernetes
      </parse>
    </source>

  salt.conf: |-
    <source>
      @type tail
      @id in_tail_minion
      path /var/log/salt/minion
      pos_file /var/log/fluentd-salt.pos
      tag salt
      <parse>
        @type regexp
        expression /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<message>.*)$/
        time_format %Y-%m-%d %H:%M:%S
      </parse>
    </source>

  containers.conf: |-
    <source>
      @type tail
      @id in_tail_container_logs
      path "#{ENV['FLUENT_CONTAINER_TAIL_PATH'] || '/var/log/containers/*.log'}"
      pos_file /var/log/fluentd-containers.log.pos
      tag "#{ENV['FLUENT_CONTAINER_TAIL_TAG'] || 'kubernetes.*'}"
      exclude_path "#{ENV['FLUENT_CONTAINER_TAIL_EXCLUDE_PATH'] || use_default}"
      read_from_head true
      @include ../tail_container_parse.conf
    </source>

  etcd.conf: |-
    <source>
      @type tail
      @id in_tail_etcd
      path /var/log/etcd.log
      pos_file /var/log/fluentd-etcd.log.pos
      tag etcd
      <parse>
        @type none
      </parse>
    </source>
  

  kube-apiserver-audit.conf: |-
    # Example:
    # 2017-02-09T00:15:57.992775796Z AUDIT: id="90c73c7c-97d6-4b65-9461-f94606ff825f" ip="104.132.1.72" method="GET" user="kubecfg" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"
    # 2017-02-09T00:15:57.993528822Z AUDIT: id="90c73c7c-97d6-4b65-9461-f94606ff825f" response="200"
    <source>
      @type tail
      @id in_tail_kube_apiserver_audit
      multiline_flush_interval 5s
      path /var/log/kubernetes/kube-apiserver-audit.log
      pos_file /var/log/kube-apiserver-audit.log.pos
      tag kube-apiserver-audit
      <parse>
        @type multiline
        format_firstline /^\S+\s+AUDIT:/
        # Fields must be explicitly captured by name to be parsed into the record.
        # Fields may not always be present, and order may change, so this just looks
        # for a list of key="\"quoted\" value" pairs separated by spaces.
        # Unknown fields are ignored.
        # Note: We can't separate query/response lines as format1/format2 because
        # they don't always come one after the other for a given query.
        format1 /^(?<time>\S+) AUDIT:(?: (?:id="(?<id>(?:[^"\\]|\\.)*)"|ip="(?<ip>(?:[^"\\]|\\.)*)"|method="(?<method>(?:[^"\\]|\\.)*)"|user="(?<user>(?:[^"\\]|\\.)*)"|groups="(?<groups>(?:[^"\\]|\\.)*)"|as="(?<as>(?:[^"\\]|\\.)*)"|asgroups="(?<asgroups>(?:[^"\\]|\\.)*)"|namespace="(?<namespace>(?:[^"\\]|\\.)*)"|uri="(?<uri>(?:[^"\\]|\\.)*)"|response="(?<response>(?:[^"\\]|\\.)*)"|\w+="(?:[^"\\]|\\.)*"))*/
        time_format %Y-%m-%dT%T.%L%Z
      </parse>
    </source>

  kube-controller-manager.conf: |-
    <source>
      @type tail
      @id in_tail_kube_controller_manager
      multiline_flush_interval 5s
      path /var/log/kube-controller-manager.log
      pos_file /var/log/fluentd-kube-controller-manager.log.pos
      tag kube-controller-manager
      <parse>
        @type kubernetes
      </parse>
    </source>

  kube-scheduler.conf: |-
    <source>
      @type tail
      @id in_tail_kube_scheduler
      multiline_flush_interval 5s
      path /var/log/kube-scheduler.log
      pos_file /var/log/fluentd-kube-scheduler.log.pos
      tag kube-scheduler
      <parse>
        @type kubernetes
      </parse>
    </source>

  rescheduler.conf: |-
    <source>
      @type tail
      @id in_tail_rescheduler
      multiline_flush_interval 5s
      path /var/log/rescheduler.log
      pos_file /var/log/fluentd-rescheduler.log.pos
      tag rescheduler
      <parse>
        @type kubernetes
      </parse>
    </source>

  startupscript.conf: |-
    <source>
      @type tail
      @id in_tail_startupscript
      path /var/log/startupscript.log
      pos_file /var/log/fluentd-startupscript.log.pos
      tag startupscript
      <parse>
        @type syslog
      </parse>
    </source>
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐