k8s elk之logstash日志数据筛选、合并、字段匹配、索引区分
【代码】k8s elk之logtash日志数据筛选、合并、字段匹配、索引区分。
·
申明:
- 我想把我收集的日志中不需要的数据,在kibana上不展示。
- 我想索引名字按照日志中某个字段定义名称
- 我想把日志几行合并为一行在kibana上展示
- 我想把几个索引的某个关键字匹配的内容写入一个新的索引
1. 安装基础环境
- 安装es:
#cat elasticsearch-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: es-cluster
namespace: kube-logging
spec:
serviceName: elasticsearch
replicas: 1
selector:
matchLabels:
app: elasticsearch
template:
metadata:
labels:
app: elasticsearch
spec:
# nodeName: k8s-node3 指定node
containers:
- name: elasticsearch
image: elasticsearch:7.12.1
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 1000m
requests:
cpu: 100m
ports:
- containerPort: 9200
name: rest
protocol: TCP
- containerPort: 9300
name: inter-node
protocol: TCP
volumeMounts:
- name: data
mountPath: /usr/share/elasticsearch/data
env:
- name: cluster.name
value: k8s-logs
- name: node.name
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: discovery.seed_hosts
value: "es-cluster-0.elasticsearch"
- name: cluster.initial_master_nodes
value: "es-cluster-0"
- name: ES_JAVA_OPTS
value: "-Xms2g -Xmx2g"
initContainers:
- name: fix-permissions
image: busybox
imagePullPolicy: IfNotPresent
command: ["sh", "-c", "chown -R 1000:1000 /usr/share/elasticsearch/data"]
securityContext:
privileged: true
volumeMounts:
- name: data
mountPath: /usr/share/elasticsearch/data
- name: increase-vm-max-map
image: busybox
imagePullPolicy: IfNotPresent
command: ["sysctl", "-w", "vm.max_map_count=262144"]
securityContext:
privileged: true
- name: increase-fd-ulimit
image: busybox
imagePullPolicy: IfNotPresent
command: ["sh", "-c", "ulimit -n 65536"]
securityContext:
privileged: true
volumes:
- name: data
hostPath:
path: /home/es/
- es-svc
#cat elasticsearch_svc.yaml
apiVersion: v1
kind: Service
metadata:
name: elasticsearch
namespace: kube-logging
labels:
app: elasticsearch
spec:
selector:
app: elasticsearch
clusterIP: None
ports:
- port: 9200
name: rest
- port: 9300
name: inter-node
- filebeat部署
# cat flebete.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: filebeat
namespace: kube-logging
labels:
k8s-app: filebeat
spec:
selector:
matchLabels:
k8s-app: filebeat
template:
metadata:
labels:
k8s-app: filebeat
spec:
serviceAccountName: filebeat
terminationGracePeriodSeconds: 30
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: filebeat
image: elastic/filebeat:7.6.2
args: [
"-c", "/etc/filebeat.yml",
"-e",
]
env:
- name: ELASTICSEARCH_HOST
value: elasticsearch
- name: ELASTICSEARCH_PORT
value: "9200"
- name: ELASTICSEARCH_USERNAME
value: elastic
- name: ELASTICSEARCH_PASSWORD
value: changeme
- name: ELASTIC_CLOUD_ID
value:
- name: ELASTIC_CLOUD_AUTH
value:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
securityContext:
runAsUser: 0
# If using Red Hat OpenShift uncomment this: privileged: true
resources:
limits:
cpu: 500m
memory: 500Mi
requests:
cpu: 100m
memory: 100Mi
volumeMounts:
- name: config
mountPath: /etc/filebeat.yml
readOnly: true
subPath: filebeat.yml
- name: data
mountPath: /usr/share/filebeat/data
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: varlog
mountPath: /var/log
readOnly: true
#nodeSelector:
# logging: es
volumes:
- name: config
configMap:
defaultMode: 0600
name: filebeat-config
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: varlog
hostPath:
path: /var/log
# data folder stores a registry of read status for all files, so we don't send everything again on a Filebeat pod restart
- name: data
hostPath:
path: /var/lib/filebeat-data
type: DirectoryOrCreate
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: filebeat
subjects:
- kind: ServiceAccount
name: filebeat
namespace: kube-logging
roleRef:
kind: ClusterRole
name: filebeat
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: filebeat
labels:
k8s-app: filebeat
rules:
- apiGroups: [""]
# "" indicates the core API group
resources:
- namespaces
- pods
verbs:
- get
- watch
- list
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: filebeat
namespace: kube-logging
labels:
k8s-app: filebeat
- filebeat-config文件
#cat flebete-config.yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
namespace: kube-logging
labels:
k8s-app: filebeat
data:
filebeat.yml: |-
filebeat.inputs:
- type: container
paths: #多日志路径匹配
- /var/log/containers/*.log
- /var/log/test/*.log
output.logstash: #logstash地址
hosts: ['logstash.kube-logging.svc.cluster.local:5044']
- logstash部署
#cat logs.yaml
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: logstash
namespace: kube-logging
spec:
selector:
matchLabels:
app: logstash
template:
metadata:
labels:
app: logstash
spec:
hostname: logstash
containers:
- name: logstash
ports:
- containerPort: 5044
name: logstash
image: logstash:v1 #这儿我自己打了一个镜像,安装了multiline模块。基础镜像(logstash:7.5.0)没有这个模块,进入容器执行(bin/logstash-plugin install logstash-filter-multiline),查看(bin/logstash-plugin install logstash-filter-multiline)
volumeMounts:
- name: logstash-config
mountPath: /usr/share/logstash/pipeline/
command:
- logstash
volumes:
# Previously defined ConfigMap object.
- name: logstash-config
configMap:
name: logstash-config
items:
- key: logstash.conf
path: logstash.conf
---
kind: Service
apiVersion: v1
metadata:
name: logstash
namespace: kube-logging
spec:
#type: NodePort
clusterIP: None
selector:
app: logstash
ports:
- protocol: TCP
port: 5044
name: logstash
- logtash 配置
# cat logs-config.yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-config
namespace: kube-logging
data:
logstash.conf: |-
input {
beats {
port => "5044"
}
}
filter {
if [agent][id] == "7e857655-7475-4eb2-866e-d2939fe1ba03" { #此行可以删除,因为我是调试单个项目。解释日志字段agetn.id等于"7e857655-7475-4eb2-866e-d2939fe1ba03"并且删除包含info或者test的行,写入es,索引名字为my_index-7e857655-7475-4eb2-866e-d2939fe1ba03-2023.04.22。因为日志太多,所以无关紧要的日志不写入es
if [message] =~ /info|test/ { #如果message里面包含了info或者test字符,就删除此行,不写入es索引。
drop {}
}
}
multiline { #此模块需要安装,正则匹配,把同一时间戳的合并为一行日志,多个正则用'|'来隔开,写入到es。
pattern => "^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})|^(.*)(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})\s(\d{2}):(\d{1,2})|^{(.*)}|^\d{4}|^(.*)(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})\s(\d{2}):(\d{1,2})"
negate => true #在 multiline filter 中,negate 是一个布尔类型的参数。如果将其设置为 true,则该插件将只处理不匹配正则表达式的行。换句话说,如果 negate 值为 true,则只有在输入中找到不与 pattern 相匹配的行时,该插件才会开始合并日志行到上一个事件中。
what => "previous" #"previous"(默认值):将当前行添加到前一个事件的结尾。"next":将当前行添加到下一个事件的开头
}
}
output {
elasticsearch { #es地址
hosts => "elasticsearch.kube-logging.svc.cluster.local:9200"
index => "my_index-%{[agent][id]}-%{+yyyy.MM.dd}" #新建索引名字为agetn.id的值,此功能用于,区分不同的项目日志,如app1日志索引名字my_index-app1-20230423,app2日志索引名字my_index-app2-20230423,一次内推。
}
}
- 启动
kubectl apply -f ./
- 多匹配条件
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-config
namespace: logging
data:
logstash.conf: |-
input {
beats {
port => "5044"
}
}
filter {
if [kubernetes][labels][app_kubernetes_io/name] == "dtk-go-open-api" {
#字段=dtk-go-open-api中,删除message中包含---的行
if [message] =~ /---/ {
drop {}
}
}
if [kubernetes][labels][app_kubernetes_io/name] == "dtk-go-taobao-api" or [kubernetes][labels][app_kubernetes_io/name] == "dtk-go-taobao-api-h" {
#字段等于dtk-go-taobao-api或者等于dtk-go-taobao-api-h中,message中包含CheckNewGoodsSign或者CheckNewGoodsSign2的行添加字段new_index=CheckNewGoodsSign-index
if [message] =~ /CheckNewGoodsSign|CheckNewGoodsSign2/ {
mutate {
add_field => { "new_index" => "CheckNewGoodsSign-index" }
}
}
}
}
output {
#如果new_index字段等于CheckNewGoodsSign-index就写入checknewgoodssign-%{+yyyy.MM.dd}索引,其他的走下面的索引。
if [new_index] == "CheckNewGoodsSign-index" {
elasticsearch {
hosts => "elasticsearch.logging.svc.cluster.local:9200"
index => "checknewgoodssign-%{+yyyy.MM.dd}"
}
}
else {
elasticsearch {
hosts => "elasticsearch.logging.svc.cluster.local:9200"
index => "%{[kubernetes][labels][app_kubernetes_io/name]}-%{+yyyy.MM.dd}"
}
}
}
- 更新
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-config
namespace: logging
data:
logstash.conf: |-
input {
beats {
port => "5044"
}
}
filter {
if [kubernetes][labels][app_kubernetes_io/name] == "dtk-go-open-api" {
if [message] =~ /---/ {
drop {}
}
}
if [kubernetes][labels][app_kubernetes_io/name] == "dtk-go-taobao-api-h" {
#多匹配删除
if [message] =~ /cloud.go:2279|cloud.go:2226/ {
drop {}
}
}
if [kubernetes][labels][app_kubernetes_io/name] == "dtk-go-taobao-api" {
if [message] =~ /-----auth_id-|-------11111----/{
drop {}
}
}
if [kubernetes][labels][app_kubernetes_io/name] == "dtk-php-api-android-cms" {
drop {}
}
if ![kubernetes][labels][app_kubernetes_io/name] {
#空索引删除
drop {}
}
if [kubernetes][labels][app_kubernetes_io/name] == "dtk-go-taobao-api" or [kubernetes][labels][app_kubernetes_io/name] == "dtk-go-taobao-api-h" {
if [message] =~ /CheckNewGoodsSign|CheckNewGoodsSign2/ {
mutate {
add_field => { "new_index" => "CheckNewGoodsSign-index" }
}
}
}
}
output {
if [new_index] == "CheckNewGoodsSign-index" {
elasticsearch {
hosts => "elasticsearch.logging.svc.cluster.local:9200"
index => "checknewgoodssign-%{+yyyy.MM.dd}"
}
}
else if ![kubernetes][labels][app_kubernetes_io/name] {
elasticsearch {
hosts => "elasticsearch.logging.svc.cluster.local:9200"
index => "index-unknown-%{+yyyy.MM.dd}"
}
}
else {
elasticsearch {
hosts => "elasticsearch.logging.svc.cluster.local:9200"
index => "%{[kubernetes][labels][app_kubernetes_io/name]}-%{+yyyy.MM.dd}"
}
}
}
更多推荐
已为社区贡献14条内容
所有评论(0)