配置语法

logstash主要配置 inputfilteroutput

区段

Logstash用{}来定义区域。区域内可以包括插件去预定义,可以在一个区域内定义多个插件。插件区域则可以定义键值对来设置。示例:

input {    stdin{}    syslog{}}

数据类型

Logstash支持少量的数据值类型:

•bool

debug => true

string

host => "localhost"

number

port => 2020

array

match => ["datetime","UNIX"]

hash

options => {    key1 => "value1"    key2 => "value2"}

注意: 如果你的Logstash的版本低于1.2.0,哈希的语法跟数组一样。

match => {"key1","value1","key2","value2"}

字段引用

如果你想在Logstash配置中使用字段的值,只需要把字段的名字写在中括号[]里就行了。 对于嵌套字段,每层字段都写在[]中就可以了。

例如:

[geoip][location][0]

Logstash中也支持倒序下标,[array][-1]

Logstash还支持变量内插,在字符串中使用字段引用,可以这样使用:

"My name is %{[name]}"

条件判断

Logstash从1.3.0开始支持条件判断和表达式。 主要支持下列操作符:

== 等于,!=不等于,<小于,>大于,<=>==~ 匹配正则,!~不匹配正则•in 包含,not in不包含•and,or,nand 非与,xor非或•()复合表达式,!()对复合表达式取反

命令行参数

-e

意即执行(exec)。我们在 "Hello World" 的时候已经用过这个参数了。事实上你可以不写任何具体配置,直接运行 bin/logstash -e '' 达到相同效果。这个参数的默认值是下面这样:

input {    stdin { }}output {    stdout { }}

这种配置文件的形式,是我们工作中最常用的形式

--conf或者-f

意即文件。真实运用中,我们会写很长的配置,甚至可能超过 shell 所能支持的 1024 个字符长度。所以我们必把配置固化到文件里,然后通过 bin/logstash -f agent.conf 这样的形式来运行。

如果我们想运行一个文件夹下的所有配置文件,logstash 还提供一个方便我们规划和书写配置的小功能。你可以直接用 bin/logstash -f /etc/logstash.d/ 来运行。logstash 会自动读取 /etc/logstash.d/ 目录下所有 *.conf 的文本文件,然后在自己内存里拼接成一个完整的大配置文件,再去执行。 但是,ogstash 列出目录下所有文件时,是字母排序的。而 logstash 配置段的 filter 和 output 都是顺序执行,所以顺序非常重要。采用多文件管理的用户,推荐采用数字编号方式命名配置文件,同时在配置中,严谨采用 if 判断限定不同日志的动作。

-t或者--configtest

意即测试。用来测试 Logstash 读取到的配置文件语法是否能正常解析。Logstash 配置语法是用 grammar.treetop 定义的。尤其是使用了上一条提到的读取目录方式的读者,尤其要提前测试。

--log或者-l

意即日志。Logstash 默认输出日志到标准错误。生产环境下你可以通过 bin/logstash -l logs/logstash.log 命令来统一存储日志。

--pipeline-workers 或 -w

运行 filter 和 output 的 pipeline 线程数量。默认是 CPU 核数。

--pipeline-batch-size 或 -b

每个 Logstash pipeline 线程,在执行具体的 filter 和 output 函数之前,最多能累积的日志条数。默认是 125 条。越大性能越好,同样也会消耗越多的 JVM 内存。

--pipeline-batch-delay 或 -u

每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。默认是 5 ms。

--pluginpath 或 -P

可以写自己的插件,然后用 bin/logstash --pluginpath /path/to/own/plugins 加载它们。

--verbose

输出一定的调试日志。

--debug

输出更多的调试日志。

Logstash配置详解

input

标准的控制台输入

input {    stdin { }}

接收来自文件的内容

Logstash 使用一个名叫 FileWatch 的 Ruby Gem库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录一个叫.sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。所以,不要担心 logstash 会漏过你的数据。

sincedb 文件中记录了每个被监听的文件的inode, major number, minor number 和 pos 。

input {    file {        path => ["/var/log/*.log", "/var/log/message"]        type => "system"        start_position => "beginning"    }}

配置说明:

discover_interval logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。

exclude 剔除不想监听的文件,这里和path一样支持glob。

close_older 一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认为3600s,一个小时。

ignore_older 在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认为86400s,一天。

sincedb_path 设置sincedb的文件存储位置,windows默认在C:\Windows\System32\config\systemprofile\.sincedb

sincedb_write_interval logstash 每隔多久写一次 sincedb 文件,默认是 15 秒

stat_interval logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。

start_position logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 "beginning",logstash 进程就从头开始读取,类似 less +F 的形式运行。

注意:

1.如果想修改@timestamp,那么需要通过配置过滤器来设置。2.FileWatch仅支持文件的绝对路径,不会自动的递归目录,所以如果监听父目录和子目录,都需要在数组中明确地写出来。3.如果想递归多级目录可以这样写,/path/to/**/*.log,用 ** 来缩写表示递归全部子目录。4.start_position 仅在该文件从未被监听过的时候起作用。如果 sincedb 文件中已经有这个文件的 inode 记录了,那么 logstash 依然会从记录过的 pos 开始读取数据。所以重复测试的时候每回需要删除 sincedb 文件(官方博客上提供了另一个巧妙的思路:将 sincedb_path 定义为 /dev/null,则每次重启自动从头开始读)。

接收来自beat 的数据

例如filebeat

input {    # 配置filebeat    beats {        port => 10515    }}

结合filebeat使用

** 一个完整的示例**

1.设置filebeat的配置文件输出到logstash

filebeat.yml 添加/修改

#----------------------------- Logstash output --------------------------------output.logstash:  # The Logstash hosts  hosts: ["127.0.0.1:10515"]

1.启动filebeat

./filebeat -e -c filebeat.yml -d "publish"

1.创建logstash启动配置文件

filebeat10515.conf

input {    # 配置filebeat    beats {        port => 10515    }}filter {}output {    # 配置输出到文件中    file{       path=>"/home/shaofei/output/output.log"    }    #配置输出到控制台    stdout{        codec=>rubydebug    }}

1.启动logstash

bin/logstash -f config/filebeat10515.conf

1.测试

手动模拟向filebeat监听的文件输入关键词信息,观察 filebeat控制台输出 和 logstash控制台输出

filebeat控制台输出 2021-07-07T22:27:42.534+0800    DEBUG    [publish]    pipeline/processor.go:308    Publish event: {  "@timestamp": "2021-07-07T14:27:37.529Z",  "@metadata": {    "beat": "filebeat",    "type": "doc",    "version": "6.4.3"  },  "host": {    "name": "node01"  },  "offset": 0,  "source": "/home/shaofei/logs/javaException.log",  "tags": [    "tags1",    "tags2"  ],  "prospector": {    "type": "log"  },  "input": {    "type": "log"  },  "message": "2020-08-04 23:39:27.213 ERROR [francis,76fc4531346b63e2,76fc4531346b63e2,false] 16121 --- [nio-8089-exec-8] com.example.demo.aspect.LogAspect        : com.example.demo.controller.StudentController.testError 请求异常,原因:\n\njava.lang.ArithmeticException: / by zero\n        at com.example.demo.controller.StudentController.testError(StudentController.java:48)\n        at com.example.demo.controller.StudentController8368FastClassBySpringCGLIB83687356fabf.invoke(<generated>)\n        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)\n        at org.springframework.aop.framework.CglibAopProxy.invokeJoinpoint(CglibAopProxy.java:769)\n        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)\n        at org.springframework.aop.framework.CglibAopProxy.proceed(CglibAopProxy.java:747)\n",  "fields": {    "review": 1,    "name": "shaofei",    "level": "debug",    "env": "staging"  },  "beat": {    "name": "node01",    "hostname": "node01",    "version": "6.4.3"  }}logstash控制台输出{       "message" => "2020-08-04 23:39:27.213 ERROR [francis,76fc4531346b63e2,76fc4531346b63e2,false] 16121 --- [nio-8089-exec-8] com.example.demo.aspect.LogAspect        : com.example.demo.controller.StudentController.testError 请求异常,原因:\n\njava.lang.ArithmeticException: / by zero\n        at com.example.demo.controller.StudentController.testError(StudentController.java:48)\n        at com.example.demo.controller.StudentController8368FastClassBySpringCGLIB83687356fabf.invoke(<generated>)\n        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)\n        at org.springframework.aop.framework.CglibAopProxy.invokeJoinpoint(CglibAopProxy.java:769)\n        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)\n        at org.springframework.aop.framework.CglibAopProxy.proceed(CglibAopProxy.java:747)\n",        "fields" => {         "level" => "debug",        "review" => 1,          "name" => "shaofei",           "env" => "staging"    },        "source" => "/home/shaofei/logs/javaException.log",          "beat" => {         "version" => "6.4.3",            "name" => "node01",        "hostname" => "node01"    },         "input" => {        "type" => "log"    },          "tags" => [        [0] "tags1",        [1] "tags2",        [2] "beats_input_codec_plain_applied"    ],    "prospector" => {        "type" => "log"    },          "host" => {        "name" => "node01"    },    "@timestamp" => 2021-07-07T14:27:37.529Z,      "@version" => "1",        "offset" => 0}

filter

过滤器插件(Filter)

丰富的过滤器插件的存在是 logstash 威力如此强大的重要因素。 名为过滤器,其实提供的不单单是过滤的功能。在本章我们就会重点介绍几个插件,它们扩展了进入过滤器的原始数据,进行复杂的逻辑处理, 甚至可以无中生有的添加新的 logstash 事件到后续的流程中去!

json

有些日志可能是一种复合的数据结构,其中只是一部分记录是 JSON 格式的。这时候,我们依然需要在 filter 阶段,单独启用 JSON 解码插件。

参数类型是否必须默认值
sourcestring
targetstring

配置示例

filter {    json {        source => "message"        target => "jsoncontent"    }}

当我们输入一下内容时

{"id":"1","ITV":"5312000222215"}

运行结果为

{    "jsoncontent" => {        "ITV" => "5312000222215",         "id" => "1"    },     "@timestamp" => 2021-07-07T16:14:08.665Z,        "message" => "{\"id\":\"1\",\"ITV\":\"5312000222215\"}",       "@version" => "1",           "host" => "node01"}

注意

如果不打算使用多层结构的话,删掉 target 配置即可。新的结果如下:

这样可以实现数据完全透传,不需要加一层message

{    "@timestamp" => 2021-07-07T16:16:27.986Z,      "@version" => "1",       "message" => "{\"id\":\"1\",\"ITV\":\"5312000222215\"}",           "ITV" => "5312000222215",          "host" => "node01",            "id" => "1"}

kv

参数类型是否必须默认值说明
field_splitstring
分割字符
field_split_patternstring
分割字符匹配正则

Key-Value 切分

在很多情况下,日志内容本身都是一个类似于 key-value 的格式,但是格式具体的样式却是多种多样的。logstash 提供 filters/kv 插件,帮助处理不同样式的 key-value 日志,变成实际的 LogStash::Event 数据。

配置示例

input {    # 配置filebeat    beats {        port => 10515    }    stdin { }}filter {    kv {        field_split => "&?"    }}output {    # 配置输出到文件中    file{       path=>"/home/shaofei/output/output.log"    }    #配置输出到控制台    stdout{        codec=>rubydebug    }}

使用示例

输入

http://127.0.0.1:9020/main/shandong/user/userorderquery/queryorder_init.action?UserID=531200442112&UserToken=03172910597510641278820924115047

输出

{        "UserID" => "531200442112",     "UserToken" => "03172910597510641278820924115047",    "@timestamp" => 2021-07-07T16:24:15.996Z,      "@version" => "1",          "host" => "node01",       "message" => "http://127.0.0.1:9020/main/shandong/user/userorderquery/queryorder_init.action?UserID=531200442112&UserToken=03172910597510641278820924115047"}

时间处理(Date)

filters/date 插件可以按指定的时间格式读取事件的指定字段值后,赋值给指定的字段(默认为@timestamp)。

参数类型是否必须默认值
matcharray[]
targetstring@timestamp
timezonestring

match

按指定的时间格式格式化指定字段,可以指定多个时间格式,参数为[ field, formats... ]

match => [ "logdate", "yyyy-MM-dd HH:mm:ss",  "yyyy-MM-dd", "ISO8601" ]

target

将匹配的时间戳赋值给定的目标字段中。默认为赋值给@timestamp字段。

timezone

指定日期解析的shiqu,有效的ID值域在http://joda-time.sourceforge.net/timezones.html[1]可用时区页面上。

logstash会将时间格式化为UTC时间,即比北京时间早8小时。如果非要校准该8小时时间差,可指定timezone字段为UTC+0的时区。但是不建议这么做,因为elasticsearch内部也是使用UTC时间储存时间,使用错误的时区会导致需要做额外的时间转换处理。

配置示例

filter {    grok {        # 匹配正则表达式,将符合条件的数据赋值给timestamp字段        match => {  "message" => "(?<timestamp>%{TIMESTAMP_ISO8601})"  }    }    date {        # 匹配时间格式,将timestamp值赋值给 @timestamp ,后面的规则可以写多个,示例:[ "logdate", "MMM dd yyyy HH:mm:ss","MMM  d yyyy HH:mm:ss", "ISO8601" ]        match => [ "timestamp", "ISO8601" ]    }}

输入

2021-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider

打印

{          "host" => "node01",      "@version" => "1",    "@timestamp" => 2021-02-26T07:48:32.708Z,       "message" => "2021-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider",     "timestamp" => "2021-02-26 15:48:32.708"}

注意

logstash 中filter中date多个字段需要格式时间,只能一个date里边只能一个match和一个target

filter {    date {        match => ["message", "yyyyMMddHHmmss"]          add_field =>["starTime","%{starTime}"]      }    date {        match => ["message", "yyyyMMddHHmmss"]          add_field =>["endTime","%{endTime}"]      }}

grok

Grok 是 Logstash 最重要的插件。你可以在 grok 里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。

1.grok中的match属性,它的作用是从message字段中把符合正则表达式的数据赋值给另外一个字段,所有文本数据都是在Logstash的message字段中,我们要在过滤器里操作的数据就是message。2.grok插件是一个十分耗费资源的插件3.grok有超级多的预装正则表达式;(可以参考: https://quxuecx.blog.csdn.net/article/details/118559962)

可以在 grok 里写标准的正则,像下面这样:

\s+(?<request_time>\d+(?:\.\d+)?)\s+

修改配置文件进行使用

input {stdin{}}filter {    grok {        match => {            "message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+"        }    }}output {stdout{codec => rubydebug}}

运行 logstash 进程然后输入 "begin 123.456 end",你会看到类似下面这样的输出:

{    "request_time" => "123.456",         "message" => "begin 123.456 end",        "@version" => "1",      "@timestamp" => 2021-07-08T02:43:30.392Z,            "host" => "node01"}

实际运用注意点

实际运用中,我们需要处理各种各样的日志文件,如果你都是在配置文件里各自写一行自己的表达式,就完全不可管理了。所以,我们建议是把所有的 grok 表达式统一写入到一个地方。 然后用 filter/grok 的 patterns_dir 选项来指明。

如果你把 "message" 里所有的信息都 grok 到不同的字段了,数据实质上就相当于是重复存储了两份。所以你可以用 remove_field 参数来删除掉 message 字段,或者用 overwrite 参数来重写默认的 message 字段,只保留最重要的部分。

重写参数的示例如下:

filter {    grok {        patterns_dir => ["/path/to/your/own/patterns"]        match => {            "message" => "%{SYSLOGBASE} %{DATA:message}"        }        overwrite => ["message"]    }}

多项选择

有时候我们会碰上一个日志有多种可能格式的情况。这时候要写成单一正则就比较困难,或者全用 | 隔开又比较丑陋。 logstash 的语法提供给我们一个解决方式,可以传递多个正则来匹配同一个字段:

match => [    "message", "(?<request_time>\d+(?:\.\d+)?)",    "message", "%{SYSLOGBASE} %{DATA:message}",    "message", "(?m)%{WORD}"]

logstash 会按照这个定义次序依次尝试匹配,到匹配成功为止。虽说效果跟用 | 分割写个大大的正则是一样的,但是可阅读性好了很多。

mutate

filters/mutate 插件是 Logstash 另一个重要插件。它提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。

类型转换

可以设置的转换类型包括:"integer","float" 和 "string"。示例如下:

input {stdin{}}filter {    grok {        match => {            "message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+"        }    }    # 该配置可以把 request_time 转换为float类型    mutate {        convert => ["request_time", "float"]    }}output {stdout{codec => rubydebug}}

输出结果:

{        "@version" => "1",         "message" => "begin 123.456 end",      "@timestamp" => 2021-07-08T02:54:37.539Z,            "host" => "node01",    "request_time" => 123.456}

注意: mutate 除了转换简单的字符值,还支持对数组类型的字段进行转换,即将 ["1","2"] 转换成 [1,2]。但不支持对哈希类型的字段做类似处理。

字符串处理

•split

filter {    mutate {        split => ["message", "|"]    }}

随意输入一串以|分割的字符,比如 "userId|110|addTime|2021-07-18",可以看到如下输出:

{       "message" => [        [0] "userId",        [1] "110",        [2] "addTime",        [3] "2021-07-18"    ],    "@timestamp" => 2021-07-08T03:11:50.024Z,          "host" => "node01",          "tags" => [        [0] "_grokparsefailure"    ],      "@version" => "1"}

•join

仅对数组类型字段有效

我们在之前已经用 split 割切的基础再 join 回去。

配置改成:

filter {    mutate {        split => ["message", "|"]    }    mutate {        join => ["message", ","]    }}

filter 区段之内,是顺序执行的。所以我们最后看到的输出结果是:

{          "tags" => [        [0] "_grokparsefailure"    ],       "message" => "userId,110,addTime,2021-07-18",    "@timestamp" => 2021-07-08T03:15:19.398Z,      "@version" => "1",          "host" => "node01"}

•merge

合并两个数组或者哈希字段。

在之前 split 的基础上继续修改配置:

filter {    mutate {        split => ["message", "|"]    }    mutate {        merge => ["message", "message"]    }}

我们会看到输出:

{       "message" => [        [0] "userId",        [1] "110",        [2] "addTime",        [3] "2021-07-18",        [4] "userId",        [5] "110",        [6] "addTime",        [7] "2021-07-18"    ],    "@timestamp" => 2021-07-08T03:21:09.248Z,          "tags" => [        [0] "_grokparsefailure"    ],      "@version" => "1",          "host" => "node01"}

如果 src 字段是字符串,会自动先转换成一个单元素的数组再合并。把上一示例中的来源字段改成 "host":

filter {    mutate {        split => ["message", "|"]    }    mutate {        merge => ["message", "host"]    }}

结果变成:

{       "message" => [        [0] "userId",        [1] "110",        [2] "addTime",        [3] "2021-07-18",        [4] "node01"    ],    "@timestamp" => 2021-07-08T03:21:09.248Z,          "tags" => [        [0] "_grokparsefailure"    ],      "@version" => "1",          "host" => "node01"}

还有更多的插件字段,不再一一表述,请参考官方文档 https://www.elastic.co/guide/en/logstash/6.4/plugins-filters-mutate.html

执行次序

需要注意的是,filter/mutate 内部是有执行次序的。其次序如下:

rename(event) if @rename    update(event) if @update    replace(event) if @replace    convert(event) if @convert    gsub(event) if @gsub    uppercase(event) if @uppercase    lowercase(event) if @lowercase    strip(event) if @strip    remove(event) if @remove    split(event) if @split    join(event) if @join    merge(event) if @merge    filter_matched(event)

而 filter_matched 这个 filters/base.rb 里继承的方法也是有次序的。

@add_field.each do |field, value|  end  @remove_field.each do |field|  end  @add_tag.each do |tag|  end  @remove_tag.each do |tag|  end

ruby

最灵活的插件,可以以ruby语言来随心所欲的修改Logstash Event对象

配置示例

filter {    ruby {        init => "@kname = ['client','servername','url','status','time','size','upstream','upstreamstatus','upstreamtime','referer','xff','useragent']"        code => "            new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])            new_event.remove('@timestamp')            event.append(new_event)"    }}

解释

通常我们都是用 filters/grok 插件来捕获字段的,但是正则耗费大量的 CPU 资源,很容易成为 Logstash 进程的瓶颈。 而实际上,很多流经 Logstash 的数据都是有自己预定义的特殊分隔符的,我们可以很简单的直接切割成多个字段。 filters/mutate 插件里的 "split" 选项只能切成数组,后续很不方便使用和识别。 而在 filters/ruby 里,我们可以通过 "init" 参数预定义好由每个新字段的名字组成的数组, 然后在 "code" 参数指定的 Ruby 语句里通过两个数组的 zip 操作生成一个哈希并添加进数组里。 短短一行 Ruby 代码,可以减少 50% 以上的 CPU 使用率。

注1: 从 Logstash-2.3 开始,LogStash::Event.append 不再直接接受 Hash 对象,而必须是 LogStash::Event 对象。所以示例变成要先初始化一个新 event,再把无用的 @timestamp 移除,再 append 进去。否则会把 @timestamp 变成有两个时间的数组了!

注2: 从 Logstash-5.0 开始,LogStash::Event 改为 Java 实现,直接使用 event["parent"]["child"] 形式获取的不是原事件的引用而是复制品。需要改用 event.get('[parent][child]') 和 event.set('[parent][child]', 'value') 的方法。

ruby 官网教程:https://www.elastic.co/guide/en/logstash/6.4/plugins-filters-ruby.html

split

split 拆分事件

我们可以通过 multiline 插件将多行数据合并进一个事件里,那么反过来,也可以把一行数据,拆分成多个事件。这就是 split 插件。

配置示例

filter {    split {        field => "message"        terminator => "#"    }}

运行结果

输入一行数据:"test1#test2",结果看到输出两个事件:

{       "message" => "test1",    "@timestamp" => 2021-07-08T06:49:19.722Z,          "host" => "node01",      "@version" => "1"}{       "message" => "test2",    "@timestamp" => 2021-07-08T06:49:19.722Z,          "host" => "node01",      "@version" => "1"}

重要提示

split 插件中使用的是 yield 功能,其结果是 split 出来的新事件,会直接结束其在 filter 阶段的历程, 也就是说写在 split 后面的其他 filter 插件都不起作用,进入到 output 阶段。 所以,一定要保证 split 配置写在全部 filter 配置的最后

output

elasticsearch

output {    elasticsearch {        hosts => ["192.168.0.2:9200"]        index => "logstash-%{type}-%{+YYYY.MM.dd}"        document_type => "%{type}"        sniffing => true        template_overwrite => true    }}

解释

•索引名

写入的 ES 索引的名称,这里可以使用变量。为了更贴合日志场景,Logstash 提供了 %{+YYYY.MM.dd} 这种写法。在语法解析的时候,看到以 + 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。所以,之前处理过程中不要给自定义字段取个加号开头的名字……

注意索引名中不能有大写字母,否则 ES 在日志中会报 InvalidIndexNameException,但是 Logstash 不会报错,这个错误比较隐晦,也容易掉进这个坑中。

•模板

Elasticsearch 支持给索引预定义设置和 mapping(前提是你用的 elasticsearch 版本支持这个 API,不过估计应该都支持)。Logstash 自带有一个优化好的模板,内容如下:

{    "template" : "logstash-*",    "version" : 50001,    "settings" : {        "index.refresh_interval" : "5s"    },    "mappings" : {        "_default_" : {            "_all" : {"enabled" : true, "norms" : false},            "dynamic_templates" : [ {                "message_field" : {                    "path_match" : "message",                    "match_mapping_type" : "string",                    "mapping" : {                        "type" : "text",                        "norms" : false                    }                }            }, {                "string_fields" : {                    "match" : "*",                    "match_mapping_type" : "string",                    "mapping" : {                        "type" : "text", "norms" : false,                        "fields" : {                            "keyword" : { "type": "keyword"  }                        }                    }                }            }  ],            "properties" : {                "@timestamp": { "type": "date", "include_in_all": false  },                "@version": { "type": "keyword", "include_in_all": false  },                "geoip"  : {                    "dynamic": true,                    "properties" : {                        "ip": { "type": "ip"  },                        "location" : { "type" : "geo_point"  },                        "latitude" : { "type" : "half_float"  },                        "longitude" : { "type" : "half_float"  }                    }                }            }        }    }}

这其中的关键设置包括:

template for index-pattern

只有匹配 logstash-*的索引才会应用这个模板。有时候我们会变更 Logstash 的默认索引名称,通过 PUT 方法上传可以匹配你自定义索引名的模板。建议的做法是,把你自定义的名字放在 "logstash-" 后面,变成 index => "logstash-custom-%{+yyyy.MM.dd}" 这样。

refresh_interval for indexing

Elasticsearch 是一个近实时搜索引擎。它实际上是每 1 秒钟刷新一次数据。对于日志分析应用,我们用不着这么实时,所以 logstash 自带的模板修改成了 5 秒钟。你还可以根据需要继续放大这个刷新间隔以提高数据写入性能。

multi-field with keyword

Elasticsearch 会自动使用自己的默认分词器(空格,点,斜线等分割)来分析字段。分词器对于搜索和评分是非常重要的,但是大大降低了索引写入和聚合请求的性能。所以 logstash 模板定义了一种叫"多字段"(multi-field)类型的字段。这种类型会自动添加一个 ".keyword" 结尾的字段,并给这个字段设置为不启用分词器。简单说,你想获取 url 字段的聚合结果的时候,不要直接用 "url" ,而是用 "url.keyword" 作为字段名。

email

126 邮箱发送到 qq 邮箱 示例如下

output {    email {        port           =>    "25"        address        =>    "smtp.126.com"        username       =>    "test@126.com"        password       =>    ""        authentication =>    "plain"        use_tls        =>    true        from           =>    "test@126.com"        subject        =>    "Warning: %{title}"        to             =>    "test@qq.com"        via            =>    "smtp"        body           =>    "%{message}"    }}

解释

outputs/email 插件支持 SMTP 协议和 sendmail 两种方式,通过 via 参数设置。SMTP 方式有较多的 options 参数可配置。

exec

调用命令执行(Exec) outputs/exec 插件的运用也非常简单,如下所示,将 logstash 切割成的内容作为参数传递给命令。这样,在每个事件到达该插件的时候,都会触发这个命令的执行。

output {    exec {        command => "sendsms.pl \"%{message}\" -t %{user}"    }}

需要注意的是。这种方式是每次都重新开始执行一次命令并退出。本身是比较慢速的处理方式(程序加载,网络建联等都有一定的时间消耗)。最好只用于少量的信息处理场景,比如不适用 nagios 的其他报警方式。示例就是通过短信发送消息。

file

保存成文件(File) 通过日志收集系统将分散在数百台服务器上的数据集中存储在某中心服务器上,这是运维最原始的需求。Logstash 当然也能做到这点。 和 LogStash::Inputs::File 不同, LogStash::Outputs::File 里可以使用 sprintf format 格式来自动定义输出到带日期命名的路径。

配置示例

output {    file {        path => "/path/to/%{+yyyy}/%{+MM}/%{+dd}/%{host}.log.gz"        gzip => true    }}

stdout

标准输出(Stdout) 和之前 inputs/stdin 插件一样,outputs/stdout 插件也是最基础和简单的输出插件。同样在这里简单介绍一下,作为输出插件的一个共性了解。

配置示例

output {    stdout {        codec => rubydebug        workers => 2    }}

解释 输出插件统一具有一个参数是 workers。Logstash 为输出做了多线程的准备。 单就 outputs/stdout 插件来说,其最重要和常见的用途就是调试。所以在不太有效的时候,加上命令行参数 -vv 运行,查看更多详细调试信息。

学习更多:https://www.elastic.co/guide/en/logstash/6.4/index.html

References

[1] http://joda-time.sourceforge.net/timezones.html: Joda.org

73c7c785e369f24e26c662ab059a5aef.jpeg

关注查看更多有趣内容

扫描二维码关注我们

阅读全文
AI总结
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐