目录

一、背景

二、设计

三、具体实现

Filebeat配置

K8S SideCar yaml

Logstash配置


一、背景

    将容器中服务的trace日志和应用日志收集到KAFKA,需要注意的是 trace 日志和app 日志需要存放在同一个KAFKA两个不同的topic中。分别为APP_TOPIC和TRACE_TOPIC

二、设计

流程图如下:

日志采集流程​​

说明:

        APP_TOPIC:主要存放服务的应用日志

        TRACE_TOPIC:存放程序输出的trace日志,用于排查某一个请求的链路

文字说明:

     filebeat 采集容器中的日志(这里需要定义一些规范,我们定义的容器日志路径如下),filebeat会采集两个不同目录下的日志,然后输出到对应的topic中,之后对kafka 的topic进行消费、存储。最终展示出来

/home/service/
└── logs
    ├── app
    │   └── pass
    │       ├── 10.246.84.58-paas-biz-784c68f79f-cxczf.log
    │       ├── 1.log
    │       ├── 2.log
    │       ├── 3.log
    │       ├── 4.log
    │       └── 5.log
    └── trace
        ├── 1.log
        ├── 2.log
        ├── 3.log
        ├── 4.log
        ├── 5.log
        └── trace.log

4 directories, 13 files

三、具体实现

上干货~

Filebeat配置

配置说明:

        其中我将filebeat的一些配置设置成了变量,在接下来的k8s yaml文件中需要定义变量和设置变量的value。

        需要特别说明的是我这里是使用了  tags: ["trace-log"]结合when.contains来匹配,实现将对应intput中的日志输出到对应kafka的topic中

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /home/service/logs/trace/*.log
  fields_under_root: true
  fields:
    topic: "${TRACE_TOPIC}"
  json.keys_under_root: true
  json.add_error_key: true
  json.message_key: message
  scan_frequency: 10s
  max_bytes: 10485760
  harvester_buffer_size: 1638400
  ignore_older: 24h
  close_inactive: 1h
  tags: ["trace-log"]
  processors:
    - decode_json_fields:
        fields: ["message"]
        process_array: false
        max_depth: 1
        target: ""
        overwrite_keys: true

- type: log
  enabled: true
  paths:
    - /home/service/logs/app/*/*.log
  fields:
    topic: "${APP_TOPIC}"
  scan_frequency: 10s
  max_bytes: 10485760
  harvester_buffer_size: 1638400
  close_inactive: 1h
  tags: ["app-log"]

output.kafka:
  enabled: true
  codec.json:
    pretty: true  # 是否格式化json数据,默认false
  compression: gzip
  hosts: "${KAFKA_HOST}"
  topics:
    - topic: "${TRACE_TOPIC}"
      bulk_max_duration: 2s
      bulk_max_size: 2048
      required_acks: 1
      max_message_bytes: 10485760
      when.contains:
        tags: "trace-log"

    - topic: "${APP_TOPIC}"
      bulk_flush_frequency: 0
      bulk_max_size: 2048
      compression: gzip
      compression_level: 4
      group_id: "k8s_filebeat"
      grouping_enabled: true
      max_message_bytes: 10485760
      partition.round_robin:
        reachable_only: true
      required_acks: 1
      workers: 2
      when.contains:
        tags: "app-log"

K8S SideCar yaml

配置说明:

        该yaml中定一个两个容器,容器1为nginx(示例)容器2为filebeat容器。定义了一个名称为logs的emptryDir类型的卷,将logs卷同时挂载在了容器1和容器2的/home/service/logs目录

        接下来又在filebeat容器中自定义了三个环境变量,这样我们就可以通过修改yaml的方式很灵活的来配置filebeat

                TRACE_TOPIC: Trace日志的topic

                APP_TOPIC:App日志的topic

                KAFKA_HOST:KAFKA地址

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: nginx
  name: nginx
  namespace: default
spec:
  replicas: 2
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      imagePullSecrets:
      - name: uhub-registry
      containers:
      - image: uhub.service.ucloud.cn/sre-paas/nginx:v1
        imagePullPolicy: IfNotPresent
        name: nginx
        ports:
        - name: nginx
          containerPort: 80
        - mountPath: /home/service/logs
          name: logs
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /home/service/logs
          name: logs
      - env:
        - name: TRACE_TOPIC
          value: pro_platform_monitor_log
        - name: APP_TOPIC
          value: platform_logs
        - name: KAFKA_HOST
          value: '["xxx.xxx.xxx.xxx:9092","xx.xxx.xxx.xxx:9092","xx.xxx.xxx.xxx:9092"]'
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        image: xxx.xxx.xxx.cn/sre-paas/filebeat-v2:8.11.2
        imagePullPolicy: Always
        name: filebeat
        resources:
          limits:
            cpu: 150m
            memory: 200Mi
          requests:
            cpu: 50m
            memory: 100Mi
        securityContext:
          privileged: true
          runAsUser: 0
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /home/service/logs
          name: logs
      dnsPolicy: ClusterFirst
      imagePullSecrets:
      - name: xxx-registry
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      terminationGracePeriodSeconds: 30
      volumes:
      - emptyDir: {}
        name: logs                                                                                                                                                                              

Logstash配置

input {
  kafka {
    type => "platform_logs"
    bootstrap_servers => "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092"
    topics => ["platform_logs"]
    group_id => 'platform_logs'
    client_id => 'open-platform-logstash-logs'
  }
  kafka {
    type => "platform_pre_log"
    bootstrap_servers => "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092"
    topics => ["pre_platform_logs"]
    group_id => 'pre_platform_logs'
    client_id => 'open-platform-logstash-pre'
  }
  kafka {
    type => "platform_nginx_log"
    bootstrap_servers => "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092"
    topics => ["platform_nginx_log"]
    group_id => 'platform_nginx_log'
    client_id => 'open-platform-logstash-nginx'
  }
}
filter {
  if [type] == "platform_pre_log" {
    grok {
      match => { "message" => "\[%{IP}-(?<service>[a-zA-Z-]+)-%{DATA}\]" }
    }
  }
  if [type] == "platform_logs" {
    grok {
      match => { "message" => "\[%{IP}-(?<service>[a-zA-Z-]+)-%{DATA}\]" }
    }
  }
}
output {
  if [type] == "platform_logs" {
    elasticsearch {
      id => "platform_logs"
      hosts => ["http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200"]
      index => "log-xxx-prod-%{service}-%{+yyyy.MM.dd}"
      user => "logstash_transformer"
      password => "xxxxxxx"
      template_name => "log-xxx-prod"
      manage_template => "true"
      template_overwrite => "true"
    }
  }
  if [type] == "platform_pre_log" {
    elasticsearch {
      id => "platform_pre_logs"
      hosts => ["http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200"]
      index => "log-xxx-pre-%{service}-%{+yyyy.MM.dd}"
      user => "logstash_transformer"
      password => "xxxxxxx"
      template_name => "log-xxx-pre"
      manage_template => "true"
      template_overwrite => "true"
    }
  }
  if [type] == "platform_nginx_log" {
    elasticsearch {
      id => "platform_nginx_log"
      hosts => ["http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200","http://xxx.xxx.xxx.xxx:9200"]
      index => "log-platform-nginx-%{+yyyy.MM.dd}"
      user => "logstash_transformer"
      password => "xxxxxxx"
      template_name => "log-platform-nginx"
      manage_template => "true"
      template_overwrite => "true"
    }
  }
}

        如果有帮助到你麻烦给个或者收藏一下~,有问题可以随时私聊我或者在评论区评论,我看到会第一时间回复

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐