k8s部署flume并传给kafka
一:镜像准备同上篇一样你需要先搞一个镜像,把flume装上去。二:flume的配置文件准备# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements.See the NOTICE file# distributed with this work for ad
·
一:镜像准备
同上篇一样你需要先搞一个镜像,把flume装上去。
二:flume的配置文件准备
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
# For each one of the sources, the type is defined
# The channel can be defined as follows.
# Each sink's type must be defined
#Specify the channel the sink should use
# Each channel's type is defined.
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agentTest.sources = src
agentTest.channels = ch1
agentTest.sinks = sin1
agentTest.sources.src.type = TAILDIR
agentTest.sources.src.positionFile = /position/taildir_position.json
agentTest.sources.src.filegroups = logs
agentTest.sources.src.filegroups.logs = /log/.*log
agentTest.sources.src.fileHeader = true
agentTest.sources.src.interceptors = i1
agentTest.sources.src.interceptors.i1.type = Interceptor.MultInterceptor$Builder
agentTest.sources.src.interceptors.i1.regex=(((?!0000)[0-9]{4}-((0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-8])|(0[13-9]|1[0-2])-(29|30)|(0[13578]|1[02])-31)|([0-9]{2}(0[48]|[2468][048]|[13579][26])|(0[48]|[2468][048]|[13579][26])00)-02-29))
agentTest.channels.ch1.type = memory
agentTest.channels.ch1.dataDirs = /opt/flume/tmpData/filechannle/dataDirs_1
agentTest.channels.ch1.checkpointDir = /opt/flume/tmpData/filechannle/checkpointDir_1
agentTest.channels.ch1.capacity = 1000
agentTest.channels.ch1.transactionCapacity = 100
agentTest.sinks.sin1.type = org.apache.flume.sink.kafka.KafkaSink
#agentTest.sinks.sin1.sink.directory = /var/log/flume
#agentTest.sinks.sin1.sink.rollInterval = 0
agentTest.sinks.sin1.kafka.topic = test
agentTest.sinks.sin1.kafka.bootstrap.servers = 10.10.0.2:9091
agentTest.sinks.sin1.kafka.flumeBatchSize = 20
agentTSest.sinks.sin1.kafka.producer.acks = 1
agentTest.sinks.sin1.kafka.producer.linger.ms = 1
agentTest.sinks.sin1.kafka.producer.compression.type = snappy
agentTest.sources.src.channels = ch1
agentTest.sinks.sin1.channel = ch1
注意点,src.interceptors.i1.type = ... 是我自己通过flumeAPI写的拦截器,并且放到镜像的flume里面了,这个不懂可以百度。还有,kafka配置的10.10.0.2:9091就是我上一篇的代理kafka 服务的ClusterIP Service。
三:yaml文件
apiVersion: v1
kind: Pod
metadata:
name: flume
spec:
nodeName: worker01
containers:
- name: flume
image: myflume:1.4
args:
- /bin/sh
- -c
- >
bash /opt/flume/flume/bin/flume-ng agent
-c /opt/flume/flume/conf/
-f /opt/flume/flume/conf/test1.conf
-n agentTest
-Dflume.root.logger=DEBUG,console
imagePullPolicy: IfNotPresent
volumeMounts:
- mountPath: "/log"
name: flume
- mountPath: "/position"
name: position
- mountPath: "/opt/flume/flume/conf"
name: conf
- mountPath: "/var/log/flume"
name: out
- mountPath: "/opt/flume/tmpData"
name: tmp
restartPolicy: Always
volumes:
- name: flume
hostPath:
path: "/opt/flume/log"
- name: position
hostPath:
path: "/opt/flume/position"
- name: conf
hostPath:
path: "/opt/flume/conf"
- name: out
hostPath:
path: "/opt/flume/out"
- name: tmp
hostPath:
path: "/opt/flume/tmp"
把配置文件中需要的目录都挂载出来,不然pod一重启就gg,当然这都是本地环境。其他环境挂载肯定不用hostpath,控制器类型也应该是Daemonset
更多推荐
已为社区贡献3条内容
所有评论(0)