K8S实践从0到1 进阶篇:K8S 部署 logstash 解析 nginx log 接入ELK
使用k8s 部署 logstash 解析 nginx access log 并接入到 ELK中
最近为了完善监控,将nginx access log 接入到了ELK中(ES、kibana部署之前有介绍过前情提要)
本来觉得应该挺简单的,部署个logstash容器,把日志文件挂载上就完事,结果很是折腾了一番,里面涉及到一些细节点(主要也是我做研发出身对这块不熟),网上找了很多文章,都没有能一次解决的,今天将我部署的过程分享一下,希望能帮到需要的同学。
--------------------- 原创不易,如果大家看完觉得有帮助,希望能多多点赞关注,感谢各位的支持 ----------------------
容器部署
容器部署这块相对来说比较简单,官网的镜像可以直接使用,logstash:8.12.2
由于容器分布在不同节点上,每个node都要部署一个容器,正常来说用DaemonSet比较合适,不过由于logstash本身还要配置PVC来存储数据,插件,最终我还是选用了StatefulSet,通过volumeClaimTemplates: 动态创建pvc。
logstash默认使用9600端口提供http访问,主要是做监控的,所以还是声明1个service备用
Service
# api service
apiVersion: v1
kind: Service
metadata:
name: logstash-svc
namespace: es
annotations:
desc : logstash集群 服务访问入口
spec:
selector:
k8s-app: logstash
type: ClusterIP #type: ClusterIP【默认】 | NodePort | LoadBalancer(外部负载均衡) | ExternalName (外部DNS解析)
#这里我依然使用了固定ip,便于给负载的nginx管理upstream
clusterIP: 10.106.220.231
ports:
- port: 9600
targetPort: 9600
name: httpport
#nodePort: 32000
---
StaefulSet
要点说明:
- 由于我的ES开启了SSL验证,因此logstah要使用部署ES时创建的 elasticsearch-ca.pem(详见前情提要);
- 要解析访问ip的归属地,需要用到geoip的地理位置数据库,这里使用的开源免费maxmind的GeoLite2-City.mmdb,下载地址 需要注册用户,有邮箱就行;
- 为了便于自动部署,我借用jenkions做了文件存储,基于jenkins的账号验证提供初级的文件安全保证,使用initContainers在启动容器前,下载所需的证书以及ip地理位置数据,通过配置ENV给容器获取账号、密码,此外还增加禁止下载的ENV变量,以便于在特殊情况下屏蔽下载(比如jenkins 宕机了);
- 由于是把相同配置的logstash容器调度到不同的node上,会存在logstash的pipelines中配置的任务对应的日志信息在本地存储中不存在的情况,这并不会影响logstahs执行
- 配置中涉及的{jenkins_hosts} 与 {job_name}为部署的jenkins访问域名与对应的job名称,需要给job配置文件参数,可以指定上传后的文件名与路径{jenkins_upload_path},这样在build的时候就可以进行上传了。此外下载后的存储地址{path}也是可以根据需要来修改的,不过要注意在volume中挂在对应的存储目录,以上这些参数都需要根据实际情况自行配置(当然不用jenkins,使用其他存储也可以比如 nfs,每次手动通过kubectl cp 上传也是可以的)
- 使用podAntiAffinity来确保pod会调度到不同的node上;
K8S部署yml如下
apiVersion: apps/v1
kind: StatefulSet # Deployment | StatefulSet | DaemonSet | JobSet
metadata:
name: logstash
namespace: es
spec:
replicas: 1 #运行副本数 每台node部署1个与node数量相同
selector:
matchLabels:
k8s-app: logstash #与下方template节点中的 labels 保持一致
revisionHistoryLimit: 10 #设定保留最近的几个revision 用于回滚,默认10
updateStrategy: #更新策略 [Statefulset]
#strategy: #更新策略 [Deployment]
type: RollingUpdate # RollingUpdate (滚动更新) | OnDelete (删除时更新)
rollingUpdate:
#maxSurge: 1 #[Deployment]支持-升级过程中可以启动超过原先设置的POD数量的上限:数量 或 百分比 1 | 20%
#maxUnavailable: 1 #[Deployment]支持-升级过程中无法提供服务的POD数量的上限:数量 或 百分比 1 | 20%,最好与maxSurge保持一致,这样能确保更新过程中的服务能力不会下降
partition: 0 #[Statefulset] 灰度发布控制器,每次只更新部署的pod序号 >= partition的pod,如果有5个pod[0-4],0=更新所有,4=更新1pod,3=更新2pod
persistentVolumeClaimRetentionPolicy: # Retain | Delete
whenDeleted: Delete
whenScaled: Retain
volumeClaimTemplates: #statefulset 专属动态创建pod的存储
- metadata:
name: logstash-volume
spec:
storageClassName: "local-path" ## 基于PV的动态创建
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
template:
metadata:
labels:
k8s-app: logstash
annotations:
#"cni.projectcalico.org/ipAddrs": "[\"10.244.220.10\"]" #pod绑定固定ip,依赖于calico ipam插件,必须使用calico 3.24.1以上的版本才可以
spec:
restartPolicy: Always
affinity:
nodeAffinity: # node 亲和度
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100 #软亲和匹配条件1,权重100优先级
preference:
matchExpressions:
- key: podtype
operator: In
values:
- web
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution: #反硬亲和,不调度到同一个node上
- labelSelector: #标签选择
matchExpressions: #正则匹配
- key: k8s-app
operator: In
values:
- logstash
topologyKey: kubernetes.io/hostname
namespaces:
- es
terminationGracePeriodSeconds: 20 #容器被删除变为Terminating状态的等待时间,默认是30s,以便于做一些容器删除前的处理工作
#imagePullSecrets: #私服认证信息
#- name: local-registry-key #私服账号secret资源名称,需要单独创建:kubectl create secret generic... 详见:https://kubernetes.io/zh-cn/docs/tasks/configure-pod-container/pull-image-private-registry/
initContainers:
- name: init-logstash
image: logstash:8.12.2
#securityContext:
# runAsUser: 0 #由于需要更改挂在目录的归属用户,需要以root运行
command:
- "bash"
- "-c"
# 可执行多行命令
- >
echo $DOWN_LOAD_URLS;
IFS=',' read -r -a my_array <<< "$DOWN_LOAD_URLS";
for URL in "${my_array[@]}"; do
echo 'URL:'$URL;
FILE=$(echo $URL | awk -F '/' '{print $NF}');
SAVE_PATH='/local-certs';
if [ ! -f $SAVE_PATH"/"$FILE ] || [ "$DOWN_LOAD" = "true" ];then
res_status=$(curl -u $DOWNLOAD_ACCESS_USER:$DOWNLOAD_ACCESS_PASS -I -m 10 -o /dev/null -s -w %{http_code} $URL);
echo 'res_status: ' $res_status;
if [ "$res_status" != "200" ];then
echo "===================== "$FILE"不可下载 =======================";
else
echo "===================== "$FILE"可下载,执行下载 =======================";
cd $SAVE_PATH;
curl -u $DOWNLOAD_ACCESS_USER:$DOWNLOAD_ACCESS_PASS $URL -O;
ls $SAVE_PATH;
fi
else
echo "===================== "$FILE"已存在 或 ENV:DOWN_LOAD=false =======================";
fi
done
env: #环境变量配置
- name: DOWN_LOAD
value: "true"
- name: "DOWN_LOAD_URLS"
value: "http://{jenkins_hosts}/job/{job_name}/ws/{jenkins_upload_path}/elasticsearch-ca.pem,http://{jenkins_hosts}/job/{job_name}/ws/{jenkins_upload_path}/GeoLite2-City.mmdb"
- name: DOWNLOAD_ACCESS_USER
value: ""
- name: DOWNLOAD_ACCESS_PASS
value: ""
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
volumeMounts:
- name: logstash-volume #挂载存储目录
mountPath: /local-certs
subPath: local-certs
containers:
- name: logstash
image: logstash:8.12.2
imagePullPolicy: IfNotPresent # IfNotPresent | Always | Never
securityContext: ##开启特权,因为要调整系统内核
# runAsUser: 0 #由于需要更改挂在目录的归属用户,需要以root运行
# privileged: true
resources:
requests:
memory: "600Mi" #Gi=G Mi=M 只支持整数
cpu: "250m" #1000m=1cpu (cpu物理线程)
limits:
memory: "1256Mi" #Gi=G Mi=M 只支持整数
cpu: "1000m" #1000m=1cpu (cpu物理线程)
#securityContext: ###添加参数启用容器root权限
# privileged: true
ports:
- containerPort: 9600
protocol: TCP
command: ["/bin/sh","-c"]
args: #可以设置多行命令,不过启动后初始化还是推荐使用postStart钩子函数来执行,不能有#注释符
#将挂载的配置文件同步到默认的ES配置文件中,因为elastic的安全机制,软连接无法生效
#将${POD_NAME}'.es.ndcto.com添加到本机hosts中,以便于与http.p12中的授信主机名适配
- |
cat /config/logstash.yml > /usr/share/logstash/config/logstash.yml;
cat /config/jvm.options > /usr/share/logstash/config/jvm.options;
/usr/local/bin/docker-entrypoint
#sleep 3600;
env: #环境变量配置
#- name: xxx
# value: xxx
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: logstash-volume #logstash数据
mountPath: /usr/share/logstash/data/
subPath: data
- name: logstash-volume #挂载部署目录
mountPath: /logstash/logs
subPath: logs
#- name: logstash-volume #挂载部署目录
# mountPath: /usr/share/logstash/pipeline
# subPath: pipeline
- name: logstash-pipelines #挂载配置文件
mountPath: /usr/share/logstash/pipeline
- name: logstash-volume #挂载存储目录
mountPath: /local-certs
subPath: local-certs
- name: logstash-config #挂载配置文件
mountPath: /config
- name: target-logs
mountPath: /data/logs
readOnly: true
- name: host-time #挂载本地时区
mountPath: /etc/localtime
readOnly: true
volumes:
- name: logstash-config #使用pvc
configMap: #使用configMap
name: logstash-config
defaultMode: 420 #420-644 493-755
- name: logstash-pipelines #使用pvc
configMap: #使用configMap
name: logstash-pipelines
defaultMode: 420 #420-644 493-755
- name: host-time
hostPath: #挂载本地时区
path: /etc/localtime
type: ""
- name: target-logs
hostPath: #本地日志收集目录
path: /data/logs
type: ""
---
配置文件
一共3个配置文件
logstash.yml logstash的主程序配置
jvm.options logstash主程序jvn运行环境的配置
logstash-xxx.conf 收集任务配置文件,可以有多个.cnf文件,统一挂载到容器的/usr/share/logstash/pipeline目录下即可
logstash.yml 和 jvm.options
注意:设置config.reload.interval时,一定要带上s(秒),否则会变成以毫秒间隔运行(官方文档还强调这是以秒为单位运行的,结果一开始没有设置单位,cpu瞬间爆满)
同样的pipeline.batch.delay 固定以毫秒间隔运行,也不宜设置过短,否则会频繁线程切换
apiVersion: v1
kind: ConfigMap #配置信息
metadata:
name: logstash-config #es-010配置
namespace: es
data:
logstash.yml: |
http.host: "0.0.0.0"
#数据存放位置
#path.data: /logstash/data
#logstash 使用elastic 做监控(非必需)
#xpack.monitoring.enabled: true
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: 1q2w3e
#这里必须用 https
#xpack.monitoring.elasticsearch.hosts: "https://es-01-svc.es:9200"
#你的ca.pem 的所在路径
#xpack.monitoring.elasticsearch.ssl.verification_mode: certificate
#xpack.monitoring.elasticsearch.ssl.certificate_authority: "/local-certs/elasticsearch-ca.pem"
# 探嗅 es节点,设置为 false
#xpack.monitoring.elasticsearch.sniffing: false
#并行执行管道的过滤器和输出阶段的worker数量。
pipeline.workers: 2
#单个工作线程在尝试执行其筛选器和输出之前将从输入中收集的事件的最大数量。较大的批量大小通常更有效,但是要以增加的内存开销为代价。
# 4000 -> jvm 1536m 默认256
pipeline.batch.size: 512
#在创建 pipeline 事件批处理时,以ms为单位等待每个事件多长时间,然后才向 pipeline 工作者分发小批处理。
pipeline.batch.delay: 500
#pipeline.batch.size 或 pipeline.batch.delay 达到任意一项,则会提交数据给output
#当设置为true时,定期检查配置是否已更改,并在更改时重新加载配置。
config.reload.automatic: true
#Logstash以秒为单位检查配置文件的更改频率,一定要带上单位S,否则会按照单位毫秒去执行
config.reload.interval: 15s
#当设置为true时,将完全编译的配置显示为调试日志消息。还必须设置log.level:debug。
#config.debug: true
#日志级别。fatal,error,warn,info,debug,trace。
log.level: info
#日志格式。设置为json以JSON格式登录,或者直接使用Object#.inspect。
#log.format:
#日志目录位置。
path.logs: /logstash/logs
#哪里可以找到自定义插件。您可以多次指定此设置以包括多个路径。插件应该位于特定的目录层次结构中PATH/logstash/TYPE/NAME.rb,其中TYPE是输入、过滤器、输出或编解码器,而NAME是插件的名称。
#path.plugins:
jvm.options: |
## JVM configuration
-Xms1g
-Xmx1g
################################################################
## Expert settings
################################################################
##
## All settings below this section are considered
## expert settings. Don't tamper with them unless
## you understand what you are doing
##
################################################################
## GC configuration
11-13:-XX:+UseConcMarkSweepGC
11-13:-XX:CMSInitiatingOccupancyFraction=75
11-13:-XX:+UseCMSInitiatingOccupancyOnly
-Djava.awt.headless=true
# ensure UTF-8 encoding by default (e.g. filenames)
-Dfile.encoding=UTF-8
-Djruby.compile.invokedynamic=true
-XX:+HeapDumpOnOutOfMemoryError
# Entropy source for randomness
-Djava.security.egd=file:/dev/urandom
# Copy the logging context from parent threads to children
-Dlog4j2.isThreadContextMapInheritable=true
-Dlogstash.jackson.stream-read-constraints.max-string-length=200000000
-Dlogstash.jackson.stream-read-constraints.max-number-length=10000
---
logstash-nginx-log.conf
要点说明
- 声明一个目标索引名称的变量,这样多个任务可以通过变量指定不同的目标索引,而共享同个ES连接池,降低对ES连接的消耗
mutate { add_field => { "[@metadata][target_index]" =>"nginx-log-%{+YYYY.MM}" } }
- 我在nginx中配置的access log模板为
'$remote_addr $host $server_addr [$time_local] "$request" $request_time $upstream_response_time $status $body_bytes_sent "$http_referer" "$http_user_agent" "$upstream_addr" "$http_x_forwarded_for" $http_device';
- 对应logstah的匹配规则为
'%{IPORHOST:remote_addr} %{HOSTNAME:http_host} %{IPORHOST:server_addr} \[%{HTTPDATE:timestamp}\] \"%{WORD:http_method} %{URIPATH:request_path}?%{GREEDYDATA:request_param} %{DATA:http_protocol}\" %{NUMBER:response_time} %{NUMBER:upstream_time} %{NUMBER:http_status} %{NUMBER:body_bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:user_agent}\" \"%{HOSTPORT:upstream_addr}\" \"%{DATA:http_x_forward}\" (%{WORD:http_device}|-)
- 可以在kibana的Dev Tools 的 Grok Debugger中进行匹配调试
- 这里我没有关键固定索引,而是使用了kibana创建了索引模板+lifecycle生命周期,避免单一索引过大并随日期滚动清理,创建索引模板时注意设定@timstamp的date格式时要包含时区信息,这样logstash提交日期数据(ngnix log中会带上时区:[16/Mar/2024:20:00:00 +0800])到@timstamp中时才能保存成咱们东8区的时间,否则会变成UTC时间,晚了8个小时,kibana 中设置如图
- - 索引模板一定要设置别名alias,以便于life-cycle做索引滚动,alias跟索引前缀保持同意即可,比如我的索引模板匹配的 nginx-log-* (只要有这名称索引要创建,都可以根据模板做自动创建,不需要手动一个一个配置),alias我也设置成了nginx-log;
- 为了保存客户端真实ip,需要在logstash中split处理 http_x_forward,截取第一个ip信息保存,并把这个ip赋值给geoip已解析地理位置信息;
- logstash默认会丢弃为空的字段,由于我使用的是动态映射的索引模板(只配了个string字段动态映射),为避免第一个请求字段不全导致索引创建出来少字段,因此特意针对可能为空的字段进行处理,确保生成好的索引字段都是全的。
- 在业务中使用ES,推荐还是使用静态索引模板,手动写全字段,不要依赖动态映射创建,避免出现类似问题,这里是日志收集,格式固定不会变因此影响不大。
这里我是用动态映射配置如下:
配置文件内容如下:
apiVersion: v1
kind: ConfigMap #配置信息
metadata:
name: logstash-pipelines #es-010配置
namespace: es
data:
logstash-nginx-log.conf: |
input {
file {
path => "/data/logs/nginx/ndcto_access.log"
#start_position => "beginning" # 此参数表示在第一次读取日志时从头读取
start_position => "beginning" # 此参数表示在第一次读取日志时从末尾读取
stat_interval => "3" # 采集频率为 3 秒
# sincedb_path => "自定义位置" # 此参数记录了读取日志的位置,默认在 data/plugins/inputs/file/.sincedb*
}
}
filter {
# 定义保存索引目标,注意全程小写.
mutate { add_field => { "[@metadata][target_index]" => "nginx-log-%{+YYYY.MM}" } }
grok {
match => { "message" => '%{IPORHOST:remote_addr} %{HOSTNAME:http_host} %{IPORHOST:server_addr} \[%{HTTPDATE:timestamp}\] \"%{WORD:http_method} %{URIPATH:request_path}?%{GREEDYDATA:request_param} %{DATA:http_protocol}\" %{NUMBER:response_time} %{NUMBER:upstream_time} %{NUMBER:http_status} %{NUMBER:body_bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:user_agent}\" \"%{HOSTPORT:upstream_addr}\" \"%{DATA:http_x_forward}\" (%{WORD:http_device}|-)'}
}
mutate {
split => ["http_x_forward", ","]
add_field => { "real_client_ip" => "%{[http_x_forward][0]}" }
#remove_field => ["uid_info"]
}
#借助geoip 将日志中的IP地址映射出具体的地理位置信息
geoip {
#将real_client_ip字段设置为源IP
source => "real_client_ip"
database => "/local-certs/GeoLite2-City.mmdb"
target => "geoip"
}
date {
match => [ "@timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
}
ruby {
code => "event.set('milliseconds', event.get('@timestamp').to_i * 1000)"
}
mutate {
convert => [ "body_bytes_sent","integer" ]
convert => [ "http_status","integer" ]
convert => [ "response_time","float" ]
convert => [ "upstream_time", "float" ]
}
#保留可能为空的字段
if ![request_param] {
mutate { add_field => { "request_param" => "" } }
}
#保留可能为空的字段
if ![http_device] {
mutate { add_field => { "http_device" => "" } }
}
mutate {
#移除多余字段 ["message","timestamp","event"]
remove_field => ["message","event"]
}
}
output {
#将最终处理的结果输出到调试面板(控制台),您可以开启,先观察处理结果是否是您期待的,确保正确之后,注释掉即可
#stdout { codec => rubydebug }
# 官方说,这里每出现一个 elasticsearch 都是一个数据库客户端连接,建议用一个连接一次性输出多个日志内容到 elk ,像如下这样
# 这样配置可以最大减少 elk 服务器的连接数,减小压力,因为 elk 今后将管理所有项目的日志,数据处理压力会非常大
# 在每个任务的filter中配置 mutate { add_field => { "[@metadata][target_index]" => "索引名" } } 即可
elasticsearch {
hosts => ["https://es-01-svc.es:9200"]
index => "%{[@metadata][target_index]}"
user => ""
password => ""
ssl_enabled => true
#pem 证书的 所在路径
ssl_certificate_authorities => "/local-certs/elasticsearch-ca.pem"
}
}
部署启动容器后,logstash将会自动加载/usr/share/logstash/pipeline下的任务配文件,不断扫描文件将新数据解析出来提交给ES(可以打开配置中的 stdout { codec => rubydebug } 查看解析结果),建议任务按类别放置到同个配置文件中。
--------------------- 原创不易,如果大家看完觉得有帮助,希望能多多点赞关注,感谢各位的支持 ----------------------
更多推荐
所有评论(0)