背景

采用的是标准的ELK+filebeat架构

ES版本:7.17.15

logstash版本:7.17.15

filebeat版本: 7.17.15

helm版本:7.17.3,官方地址:elastic/helm-charts

说一下为什么会想到使用多管道的原因

我们刚开始部署的是单管道,里面有多种类型的日志需要传输,比如埋点日志、系统日志、日志推送至kafka、日志推送至阿里云sls

后来在系统运行中,开发人员不够细心,在配置埋点日志的时候,出现了部分语法错误,导致整个日志系统受到影响,logstash整个推送都不可用,相当于没有隔离性,所以,在调研的时候,看到了多管道,多实例,所以来进行研究一下。

为什么使用多管道?

优点

  • 隔离性:每个管道可以独立处理不同类型的日志或数据流,有助于数据的分类和管理。
  • 灵活性:可以在同一实例中灵活配置和管理多个管道,以适应不同的需求。
  • 资源共享:多个管道共享同一个Logstash实例的资源,提高资源利用率。

缺点

  • 复杂性:配置和管理多个管道会增加复杂性,特别是在调试和维护时。
  • 性能瓶颈:当管道数量过多或数据量很大时,可能导致性能瓶颈,影响整体处理效率。

适用于需要在同一Logstash实例中处理不同类型数据的场景,提高资源利用率,但可能增加配置和管理的复杂性。

相较于单管道

单管道配置简单,适用于数据流较少或需求简单的场景,并且他的处理流程单一,性能更容易预测和管理

同时,他的缺点也很明显,不具备隔离性,正如我背景里面遇到的问题,灵活性,拓展性也比较差。

适用于简单数据流的处理

相较于多实例

优点:

  1. 高可用性:通过部署多个Logstash实例,可以提高系统的高可用性,减少单点故障的风险。
  2. 扩展性强:可以根据需求增加实例,水平扩展系统处理能力。
  3. 独立性:每个实例可以独立处理特定的数据流或任务,减少相互影响。

缺点:

  1. 资源开销大:需要更多的资源(内存、CPU等)来运行多个实例,增加运维成本。
  2. 管理复杂:需要配置和管理多个实例,增加了运维的复杂性。

适用于高可用性和高扩展性需求的场景,独立性强,但资源开销大,管理复杂

helm的方式配置

整体分为两部分,主管道和子管道

主管道用来接受数据来源,以及公共的处理,通过filter处理之后,在将信息分发给子管道,子管道来控制输出源

在logstash需要配置两个地方:

  • 一个是pipeline.yml文件,配置需要加载的管道文件以及id;
  • 一个是管道文件配置内容,确定输入源以及输出源头;

下面的方式采用的helm的方式,docker方式或者其他方式可参考一下

示例如下:

filebeat配置

filebeat.inputs:
- type: log
  paths:
  - /tmp/logs/biz/*.log
  fields:
    fb_collect_app: xxx-xxx-test-biz
    fb_collect_type: bizlog
    system_env: dev  # 如果不包含 "prod",设置为test
    send_kafka: "false"
  fields_under_root: true
- type: log
  paths:
  - /tmp/logs/sys/*.log
  multiline.pattern: '^\s|^"|^Caused by:'
  multiline.match: after
  fields:
    fb_collect_app: xxx-xxx-test-sys
    fb_collect_type: syslog
    system_env: dev  # 如果不包含 "prod",设置为test
  fields_under_root: true
output.logstash:
  hosts:
  - "xxx"

logstash配置

values.yaml

# 配置文件
logstashConfig:
  logstash.yml: |
    # 如果处理的字符中含有\t\n等字符,是不生效的,我们需要开启logstash的字符转义功能,config.support_escapes: true
    config.support_escapes: true
    http.host: 0.0.0.0
    pipeline.ecs_compatibility: v1
  pipelines.yml: |
    - pipeline.id: base-processing
      path.config: "/usr/share/logstash/pipeline/base-processing.conf"
    - pipeline.id: syslog-processing
      path.config: "/usr/share/logstash/pipeline/syslog-processing.conf"
    - pipeline.id: bizlog-processing
      path.config: "/usr/share/logstash/pipeline/bizlog-processing.conf"
      
# 管道内容
  base-processing.conf: |
    input {
       beats{
         port => "5055"
       }
    }

    output {
      if [fb_collect_type] == "bizlog" {
        pipeline {
          send_to => bizlogs
        }
        if [send_kafka] == "true" {
          stdout { codec => rubydebug }
          pipeline {
            send_to => kafkalogs
          }
        }
      }else if [fb_collect_type] == "syslog" {
        pipeline {
          send_to => syslogs
        }
      }
    }
  bizlog-processing.conf: |
    input {
       pipeline {
         address => bizlogs
       }
    }
    filter {
        ruby {
          code => "event.cancel if (Time.now.to_f - event.get('@timestamp').to_f) > (60 * 60 * 24 * 3)"
        }
        json {
          source => "message"
          skip_on_invalid_json => true
        }
        date {
          match => ["business_time","yyyy-MM-dd HH:mm:ss.SSS"]
          target => "@timestamp"
        }
    }  
    output {
        elasticsearch {
          hosts => ["elasticsearch-master.business:9200"]
          index => "%{fb_collect_app}-%{+YYYY.MM.dd}"
          user => elastic
          password => "xxx"
       }
    }

  syslog-processing.conf: |
    input {
       pipeline {
         address => syslogs
       }
    }
    filter {
        ruby {
          code => "event.cancel if (Time.now.to_f - event.get('@timestamp').to_f) > (60 * 60 * 24 * 3)"
        }
        mutate{
          strip => ["message"]
          gsub => [ "message", "\r", " " ]
          gsub => [ "message", "\t", " " ]
          gsub => [ "message", "\n", " " ]
          gsub => [ "message", "\u0000", " " ]
        }
        json {
          source => "message"
          skip_on_invalid_json => true
        }
        date {
          match => ["timestamp","yyyy-MM-dd HH:mm:ss.SSS"]
          target => "@timestamp"
        }
    }  
    output {
        elasticsearch {
          hosts => ["elasticsearch-master.business:9200"]
          index => "%{fb_collect_app}-%{+YYYY.MM.dd}"
          user => elastic
          password => "xxx"
       }
    }
  kafka-processing.conf: |
      input {
           pipeline {
             address => kafkalogs
           }
        }
      output {
        stdout { codec => rubydebug }
      }

其中,base-processing.conf为主管道,用来确定接收源的,然后在根据条件,将数据输入到某个pipeline中,

最后pipeline来觉得输出到es那个索引下

最后重启logstash下即可生效

踩坑点

ECS Compatibility mode

部署之后,logstash一直报错:

Relying on default value of pipeline.ecs_compatibility, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode

这个是因为Logstash 正在使用默认的 ECS(Elastic Common Schema)兼容性模式,这可能在未来的版本中发生变化。为了避免升级时出现意外变化,你可以在 Logstash 的配置文件中显式声明所需的 ECS 兼容性模式。

解决方案:

logstash.yml 文件中添加或修改 pipeline.ecs_compatibility 参数。你可以选择以下几种模式之一:

  • disabled: 不使用 ECS 兼容性模式。
  • v1: 使用 ECS 1.0 兼容性模式。
  • v8: 使用 ECS 8.0 兼容性模式(如果你的 Logstash 版本支持)。

例如,设置 ECS 兼容性模式为 disabled

pipeline.ecs_compatibility: disabled

或者设置为 ECS 1.0 兼容性模式:

pipeline.ecs_compatibility: v1

我的logstash版本是7.17.15,选择的是ECS 1.0兼容模式

配置完成后,重启logstash

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐