一:镜像准备

       同上篇一样你需要先搞一个镜像,把flume装上去。

二:flume的配置文件准备

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
# For each one of the sources, the type is defined
# The channel can be defined as follows.
# Each sink's type must be defined
#Specify the channel the sink should use
# Each channel's type is defined.
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel

agentTest.sources = src
agentTest.channels = ch1
agentTest.sinks = sin1

agentTest.sources.src.type = TAILDIR
agentTest.sources.src.positionFile = /position/taildir_position.json
agentTest.sources.src.filegroups = logs
agentTest.sources.src.filegroups.logs = /log/.*log
agentTest.sources.src.fileHeader = true
agentTest.sources.src.interceptors = i1
agentTest.sources.src.interceptors.i1.type = Interceptor.MultInterceptor$Builder
agentTest.sources.src.interceptors.i1.regex=(((?!0000)[0-9]{4}-((0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-8])|(0[13-9]|1[0-2])-(29|30)|(0[13578]|1[02])-31)|([0-9]{2}(0[48]|[2468][048]|[13579][26])|(0[48]|[2468][048]|[13579][26])00)-02-29))


agentTest.channels.ch1.type = memory
agentTest.channels.ch1.dataDirs = /opt/flume/tmpData/filechannle/dataDirs_1
agentTest.channels.ch1.checkpointDir = /opt/flume/tmpData/filechannle/checkpointDir_1
agentTest.channels.ch1.capacity = 1000
agentTest.channels.ch1.transactionCapacity = 100

agentTest.sinks.sin1.type = org.apache.flume.sink.kafka.KafkaSink
#agentTest.sinks.sin1.sink.directory = /var/log/flume
#agentTest.sinks.sin1.sink.rollInterval = 0
agentTest.sinks.sin1.kafka.topic = test
agentTest.sinks.sin1.kafka.bootstrap.servers = 10.10.0.2:9091
agentTest.sinks.sin1.kafka.flumeBatchSize = 20
agentTSest.sinks.sin1.kafka.producer.acks = 1
agentTest.sinks.sin1.kafka.producer.linger.ms = 1
agentTest.sinks.sin1.kafka.producer.compression.type = snappy


agentTest.sources.src.channels = ch1
agentTest.sinks.sin1.channel = ch1

注意点,src.interceptors.i1.type = ... 是我自己通过flumeAPI写的拦截器,并且放到镜像的flume里面了,这个不懂可以百度。还有,kafka配置的10.10.0.2:9091就是我上一篇的代理kafka 服务的ClusterIP Service。

三:yaml文件

apiVersion: v1
kind: Pod
metadata:
  name: flume
spec:
  nodeName: worker01
  containers:
    - name: flume
      image: myflume:1.4
      args:
        - /bin/sh
        - -c
        - >
          bash /opt/flume/flume/bin/flume-ng agent
          -c /opt/flume/flume/conf/
          -f /opt/flume/flume/conf/test1.conf
          -n agentTest
          -Dflume.root.logger=DEBUG,console
      imagePullPolicy: IfNotPresent
      volumeMounts:
        - mountPath: "/log"
          name: flume
        - mountPath: "/position"
          name: position
        - mountPath: "/opt/flume/flume/conf"
          name: conf
        - mountPath: "/var/log/flume"
          name: out
        - mountPath: "/opt/flume/tmpData"
          name: tmp
  restartPolicy: Always
  volumes:
    - name: flume
      hostPath:
        path: "/opt/flume/log"
    - name: position
      hostPath:
        path: "/opt/flume/position"
    - name: conf
      hostPath:
        path: "/opt/flume/conf"
    - name: out
      hostPath:
        path: "/opt/flume/out"
    - name: tmp
      hostPath:
        path: "/opt/flume/tmp"

把配置文件中需要的目录都挂载出来,不然pod一重启就gg,当然这都是本地环境。其他环境挂载肯定不用hostpath,控制器类型也应该是Daemonset

 

 

 

 

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐