Golang 实战 ELK 日志系统全流程教程(六):把 Logstash 加进日志链路

上一篇我们已经跑通了一条最短链路:

Go -> logs/app.log -> Filebeat -> Elasticsearch -> Kibana

这条链路很适合刚开始做日志系统。组件少,问题也容易定位。Go 服务只要稳定输出一行一条 JSON,Filebeat 负责采集,Elasticsearch 负责存,Kibana 负责查,整个闭环就能跑起来。

这一篇要往前走一步,把 Logstash 加进来:

Go -> logs/app.log -> Filebeat -> Logstash -> Elasticsearch -> Kibana

这次的目标很明确:

Filebeat 不再直写 Elasticsearch
Filebeat 把日志发给 Logstash
Logstash 做字段加工、脱敏、类型转换
最后再写入 go-app-log-* 索引

先说清楚边界:这不是说所有项目都必须上 Logstash。如果日志格式已经很干净,只需要采集和查询,Filebeat 直写 ES 反而更轻。Logstash 的价值,通常是在日志开始变复杂以后才体现出来。

比如真实项目里经常会遇到这些情况:

手机号、token、邮箱要脱敏
老服务日志格式不统一
某些字段类型打错了,需要转换
不同 service_name 想写到不同索引
ERROR 日志想单独路由
Filebeat 带来的元数据字段太多,想清理一部分

这些事情放在 Go 应用里做,会把业务代码弄脏;放在 Filebeat 里做,又不是特别顺手。Logstash 更像日志链路中间的加工层,适合集中处理这类规则。

日志链路演进图:第一阶段 Filebeat 直写 Elasticsearch,第二阶段 Filebeat 发送到 Logstash,Logstash 在 filter 阶段完成脱敏、字段转换和路由后再写入 Elasticsearch

先理解 Logstash 在哪一层

Logstash 的模型可以先记成三段:

input -> filter -> output

input 负责接收数据。

filter 负责加工数据。

output 负责把数据发出去。

这三个词看起来像概念,但写配置时真的就是这三块。Filebeat 把事件发到 Logstash 的 input,Logstash 在 filter 里改字段、删字段、转类型,最后在 output 里写入 Elasticsearch。

这一篇先不追求复杂规则,先跑通一个最小可用版本。链路通了以后,再一点点补脱敏、类型转换和路由。

准备目录

沿用上一篇的 demo,目录可以整理成这样:

go-elk-demo
├── docker-compose.yml
├── logs
│   └── app.log
├── filebeat
│   └── filebeat.yml
└── logstash
    └── pipeline.conf

logs/app.log 还是 Go 服务写出来的 JSON 日志。

filebeat/filebeat.yml 负责采集日志文件。

logstash/pipeline.conf 是这篇新增的 Logstash 管道配置。

这里默认 Elasticsearch 和 Kibana 仍然使用上一篇 demo 里的服务。

compose 里加 Logstash

先在 docker-compose.yml 里追加 Logstash 服务:

services:
  logstash:
    image: docker.elastic.co/logstash/logstash:${STACK_VERSION}
    container_name: go-elk-logstash
    ports:
      - "5044:5044"
    volumes:
      - ./logstash/pipeline.conf:/usr/share/logstash/pipeline/logstash.conf:ro
    environment:
      - LS_JAVA_OPTS=-Xms512m -Xmx512m
    depends_on:
      elasticsearch:
        condition: service_healthy

这里有几个点要注意。

5044 是 Beats input 常用端口。Filebeat 后面会把日志发到 logstash:5044

pipeline.conf 挂载到容器里的 /usr/share/logstash/pipeline/logstash.conf。Logstash 默认会读取这个目录下的 pipeline 配置。

LS_JAVA_OPTS=-Xms512m -Xmx512m 是本地 demo 的写法。Logstash 是 JVM 应用,内存给太小容易慢,给太大本地电脑又吃紧。学习环境先给 512m,足够观察链路。

加完以后可以先校验 compose:

docker compose config

如果 YAML 缩进错了,这一步会比启动容器更早暴露问题。

Filebeat 改成输出到 Logstash

上一篇 Filebeat 是直写 Elasticsearch:

output.elasticsearch:
  hosts: ["http://elasticsearch:9200"]
  index: "go-app-log-%{+yyyy.MM.dd}"

现在要把输出改成 Logstash:

output.logstash:
  hosts: ["logstash:5044"]

完整的 filebeat.yml 可以先这样写:

filebeat.inputs:
  - type: filestream
    id: go-app-log
    enabled: true
    paths:
      - /logs/*.log
    parsers:
      - ndjson:
          target: ""
          overwrite_keys: true
          add_error_key: true

output.logstash:
  hosts: ["logstash:5044"]

这里还保留 filestream + ndjson,因为 Go 服务输出的仍然是一行一条 JSON。Filebeat 在采集阶段就把 JSON 展开成字段,然后把事件交给 Logstash。

注意一个容易漏的点:用了 Logstash output 以后,Filebeat 就不再直接负责写 ES 索引了。也就是说,上一篇里 setup.template.namesetup.template.pattern 这些配置不再是这条链路的重点。模板和索引治理后面会单独讲,这一篇先把链路跑通。

写第一版 pipeline.conf

先写一个最小版本,几乎不做加工,只加一个字段,方便确认日志确实经过了 Logstash:

input {
  beats {
    port => 5044
  }
}

filter {
  mutate {
    add_field => {
      "log_source" => "go-app"
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "go-app-log-%{+YYYY.MM.dd}"
  }

  stdout {
    codec => rubydebug
  }
}

beats { port => 5044 } 用来接收 Filebeat 发来的事件。

add_field 给每条日志加一个 log_source 字段。这个字段没什么复杂含义,就是为了验证 Logstash 的 filter 真的执行了。

stdout { codec => rubydebug } 是调试用的。我建议学习阶段先保留它。Logstash 收到什么、加工后变成什么样,能直接在容器日志里看到。

启动服务:

docker compose up -d logstash filebeat

看 Logstash 日志:

docker logs -f go-elk-logstash

再请求 Go 服务生成一条日志:

curl -X POST http://localhost:8080/api/orders

如果 Logstash 日志里打印出了 rubydebug 格式的事件,并且能看到 service_nametrace_idlevelmessagelog_source 这些字段,就说明 Filebeat 到 Logstash 已经通了。

Logstash rubydebug 输出截图:控制台打印一条 Go 应用日志事件,字段包括 service_name、trace_id、level、message、path、status、cost_ms、log_source

再查 Elasticsearch:

curl "http://localhost:9200/_cat/indices/go-app-log-*?v"

如果能看到 go-app-log-2026.xx.xx 这类索引,说明 Logstash 到 Elasticsearch 也通了。

做一个字段脱敏

链路通了以后,再加一个真实一点的规则:手机号脱敏。

假设业务日志里有这样的字段:

{
  "phone": "13800138000"
}

这种字段不应该原样进日志系统。很多团队一开始没注意,后面才发现 Kibana 里一搜全是敏感信息,再回头处理就很麻烦。

Logstash 可以用 mutate gsub 做一个简单脱敏:

filter {
  if [phone] {
    mutate {
      gsub => [
        "phone", "(\d{3})\d{4}(\d{4})", "\1****\2"
      ]
    }
  }
}

脱敏前:

13800138000

脱敏后:

138****8000

这只是 demo 级别的规则。真实项目里还会遇到身份证、邮箱、token、银行卡号,甚至嵌套 JSON 里的敏感字段。这里先不用一次写全,先确认 Logstash 适合做这类集中处理。

手机号脱敏前后对比图:左侧 Logstash 收到 phone=13800138000,右侧经过 mutate gsub 后写入 Elasticsearch 的 phone=138****8000
手机号脱敏前后对比图:左侧 Logstash 收到 phone=13800138000,右侧经过 mutate gsub 后写入 Elasticsearch 的 phone=138****8000

我个人不太喜欢把脱敏完全寄托在应用代码上。应用层当然要注意,但日志链路中间再兜一层,会更稳一点。

删除不想进入 ES 的字段

Filebeat 会带一些元数据字段,例如 agentecsinputhost。有些字段对排查有用,有些字段在本地 demo 里只是让结果看起来很乱。

如果想删掉一部分字段,可以这样写:

filter {
  mutate {
    remove_field => ["agent", "ecs", "input", "host"]
  }
}

这个地方不要为了“干净”就一股脑全删。

host 在多机器排查时很有用,agent 能帮你确认采集器版本。学习环境可以删得干净一点,生产环境要按查询、排障、审计这些实际需求来决定。

做字段类型转换

有些旧服务打出来的日志字段是字符串:

{
  "status": "500",
  "cost_ms": "1200"
}

如果 Elasticsearch 动态映射把它们识别成 keyword,后面做范围查询会很难受:

cost_ms >= 1000

Logstash 可以在 filter 里转类型:

filter {
  mutate {
    convert => {
      "status" => "integer"
      "cost_ms" => "integer"
      "user_id" => "integer"
    }
  }
}

不过这里也要保持清醒:更好的做法是应用侧一开始就打对类型。Logstash 可以兜底,但不应该变成上游日志随便打的借口。

按服务名路由到不同索引

前期所有服务都写一个索引最简单:

go-app-log-2026.05.28

然后用 service_name 过滤:

service_name: "order-api"

但服务多了以后,有些团队会希望按服务拆索引:

go-order-api-log-2026.05.28
go-user-api-log-2026.05.28
go-payment-worker-log-2026.05.28

Logstash 可以在 output 里判断字段:

output {
  if [service_name] == "order-api" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "go-order-api-log-%{+YYYY.MM.dd}"
    }
  } else if [service_name] == "user-api" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "go-user-api-log-%{+YYYY.MM.dd}"
    }
  } else {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "go-app-log-%{+YYYY.MM.dd}"
    }
  }
}

我不建议一开始就把索引拆得太碎。

拆索引有好处:权限、保留周期、查询范围都能更细。

代价也很明显:模板、ILM、Data View、Dashboard 都会变复杂。日志量不大的系统,用统一索引加 service_name 字段过滤通常就够了。

ERROR 日志要不要单独写一份

还有一种常见需求:错误日志写到单独索引,方便告警和排查。

配置可以这样写:

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "go-app-log-%{+YYYY.MM.dd}"
  }

  if [level] == "ERROR" {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "go-app-error-log-%{+YYYY.MM.dd}"
    }
  }
}

这会让 ERROR 日志写两份。

优点是错误索引很清爽,后面做告警也方便。

缺点是存储会变多,而且一条日志存在两个索引里,后面统计时要避免重复计算。

我现在更倾向于前期先不复制,直接在 Kibana 里用条件过滤:

level: "ERROR"

等错误量真的大了、告警需求也明确了,再考虑单独索引。

Logstash 日志路由示意图:普通日志写入 go-app-log-_,ERROR 日志根据 level 条件额外写入 go-app-error-log-_,旁边标注重复写入带来的存储和统计成本

这一篇先用的完整 pipeline

把前面的几个规则合起来,这一篇可以先用下面这版:

input {
  beats {
    port => 5044
  }
}

filter {
  mutate {
    add_field => {
      "log_source" => "go-app"
    }
  }

  if [phone] {
    mutate {
      gsub => [
        "phone", "(\d{3})\d{4}(\d{4})", "\1****\2"
      ]
    }
  }

  mutate {
    convert => {
      "status" => "integer"
      "cost_ms" => "integer"
      "user_id" => "integer"
    }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "go-app-log-%{+YYYY.MM.dd}"
  }

  stdout {
    codec => rubydebug
  }
}

这版做了三件事:

给日志加 log_source
如果有 phone 字段就脱敏
把 status、cost_ms、user_id 转成 integer

它没有做复杂路由,也没有删除 Filebeat 元数据字段。原因很简单:这一篇的重点是把 Logstash 放进链路,并确认它能稳定加工日志。其他策略可以后面再加。

验证链路

启动:

docker compose up -d elasticsearch kibana logstash filebeat

启动 Go 服务:

go run .

请求接口生成日志:

curl -X POST http://localhost:8080/api/orders

看本地日志文件:

tail -n 5 logs/app.log

Windows PowerShell 可以用:

Get-Content .\logs\app.log -Tail 5

看 Logstash 是否收到事件:

docker logs -f go-elk-logstash

查 Elasticsearch 索引:

curl "http://localhost:9200/_cat/indices/go-app-log-*?v"

查具体文档:

curl "http://localhost:9200/go-app-log-*/_search?pretty"

如果结果里能看到 log_source,说明日志经过了 Logstash 的 filter。

如果能看到 statuscost_ms 这些字段正常存在,说明 Go 日志里的业务字段也被保留下来了。

【图片占位】【Elasticsearch 查询结果截图:go-app-log-* 索引中的文档包含 service_name、trace_id、level、message、status、cost_ms、log_source,说明 Logstash 已经完成加工并写入 ES】

排查 Logstash 时先看三段

Logstash 出问题时,不要一上来就改配置。先按管道三段排查。

第一段:input 有没有收到。

docker logs -f go-elk-logstash

如果 stdout rubydebug 完全没输出,先看 Filebeat 到 Logstash 的网络和端口。重点检查 output.logstash.hosts 是否写成了 logstash:5044,以及 Logstash 容器是否正常启动。

第二段:filter 有没有写坏。

Logstash 配置对括号、引号、字段路径都比较敏感。有时候只是少了一个 },容器就会启动失败。先看 Logstash 日志,不要凭感觉改。

第三段:output 有没有写进 ES。

curl "http://localhost:9200/_cat/indices/go-app-log-*?v"

如果 Logstash 已经打印了事件,但 ES 里没有索引,重点看 Elasticsearch output 的报错,比如连接失败、索引名不合法、字段映射冲突。

这一篇的边界

到这里,日志链路已经从:

Go -> Filebeat -> Elasticsearch

演进成:

Go -> Filebeat -> Logstash -> Elasticsearch

这个变化的意义不是“组件更多了”,而是日志系统开始有了统一加工层。

但我还是想强调:不是所有项目都必须上 Logstash。

如果你的日志是标准 JSON,只需要采集、查询、简单看板,Filebeat 直写 ES 更轻。

Logstash 更适合这些场景:

日志格式不统一
需要集中脱敏
需要复杂路由
需要字段转换
需要兼容旧系统日志

下一篇要处理一个更容易被低估的问题:Elasticsearch 索引治理。

日志能写进去,不代表日志系统就稳定了。字段类型错了、索引无限增长、磁盘没有保留策略,这些问题一般不会第一天爆,但它会在你以为系统已经没问题的时候回来找你。

参考资料:

更多推荐