背景

之前已经收集了k8s的日志,但是现在又想通过filebeat收集物理机的程序日志,传输到kafka,并通过logstash输出到es,有个小问题就是,如何在logstash中区分开不同的topic,或者相同topic中不同程序日志,假设多种程序日志的日志格式相同,并且已经配置了grok过滤

思考

  1. 解决收集本地日志,并且输出到kafka
  2. 处理java异常日志
  3. logstash从topic中过滤出需要的日志,并输出到es
  4. kibana展示

过程

既然要按照我们想的去实现,那就一步步来吧

1. 物理机安装filebeat

安装过程忽略,不多说

2. filebeat配置 ,先解决问题1,2

input配置

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /data/apps/yourapp/logs/yourlog.log
  multiline.pattern: '^\[|^[0-9]{4}-[0-9]{2}-[0-9]{2}|^[0-9]{1,3}\.[0-9]{1,3}'
  multiline.negate: true
  multiline.match: after
  multiline.timeout: 15s

output配置

output.kafka:
  hosts: ["node1:9092", "node2:9092", "node3:9092"]
  topic: "yourapp"
  required_acks: 1
  compression: gzip
  max_message_bytes: 100000000
  partition.hash:
    reachable_only: true

这样,收集本地日志,合并异常日志,输出到kafka就算完成了

3. logstash配置

正常情况下配置不多说,input是kafka,output是es,问题是我怎么拿到我需要的日志
目前输入有多个topic:topics => [“filebeat”,“logstash”,“yourapp”]
输出好像不能写if topic == “yourapp”,因为找不到
继续思考如何解决,直到看到filebeat配置文件中有一个选项

4. 修改filebeat配置

filebeat.yml配置中有如下选项,可在日志中添加自定义字段

  # Optional additional fields. These fields can be freely picked
  # to add additional information to the crawled log files for filtering
  #fields:
  #  level: debug
  #  review: 1

既然如此,就不客气了,添加

  fields:
    logtopic: yourapp

5. logstash添加判断

输出加上判断添加的字段,这样就可以对需要的日志进行单独创建索引了

output {
  if [fields][logtopic] == "yourapp" {
      elasticsearch {
        hosts => [ "node1:9200", "node2:9200", "node3:9200" ]
        index => "logstash-yourapp-%{+YYYY.MM.dd}"
        template_overwrite => true
      }
  }
}

6.kibana

这个也不多说了,添加索引就能看到日志了

放到最后

添加fields这种方法,也可以使用相同的topic
如果遇到相同格式的日志,并且已经有grok过滤规则,那么可以在已经存在的配置中添加一个判断,比如input.type == “docker”,这样可以过滤掉本地日志,取本地日志的input.type为“log”

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐