Flume NG 1.x 是Flume 0.9.x的重构版本,基本面目全非了,Master和zookeeper没有了,collector没有了,Web console没有了,只有

  • source (avro:很简单使用;exec:使用shell命令)
  • sink (我用的hdfs)
  • channl
这3个组件,俨然从一个分布式系统变成了传输工具。

下面是一个例子(参数经过优化),使用avro作为source,hdfs作为sink,memory作为channel
1. 配置hadoop客户端,使hadoop fs -ls /执行无误

2.修改配置文件flume.conf:
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 100000
agent1.channels.ch1.keep-alive = 30

# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 127.0.0.1
agent1.sources.avro-source1.port = 41414
agent1.sources.avro-source1.threads = 5

# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = hdfs
agent1.sinks.log-sink1.hdfs.path = hdfs://162.105.80.1:9000/flume/
agent1.sinks.log-sink1.hdfs.writeFormat = Text
agent1.sinks.log-sink1.hdfs.fileType = DataStream
agent1.sinks.log-sink1.hdfs.rollInterval = 0
agent1.sinks.log-sink1.hdfs.rollSize = 60554432
agent1.sinks.log-sink1.hdfs.rollCount = 0
agent1.sinks.log-sink1.hdfs.batchSize = 1000
agent1.sinks.log-sink1.hdfs.txnEventMax = 1000
agent1.sinks.log-sink1.hdfs.callTimeout = 60000
agent1.sinks.log-sink1.hdfs.appendTimeout = 60000

# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1
 
3. 修改flume-env.sh配置:
# Note that the Flume conf directory is always included in the classpath.
#FLUME_CLASSPATH=""
FLUME_CLASSPATH=.
jars=`ls /usr/lib/flume-ng/lib`

for jar in $jars
do
   FLUME_CLASSPATH="$FLUME_CLASSPATH:/usr/lib/flume-ng/lib/$jar"
done


4.根据需要在log4j.properties中设置flume.log文件的路径

5.确保当前用户跟hadoop用户一致(假定hadoop没有用户认证机制),并且对flume.log有写权限
6.启动avro agent,参数“ageng1”在上述配置文件定义。

flume-ng agent --conf ./conf/ -f conf/flume.conf -n agent1
 
7. client端上传文件:

flume-ng avro-client --conf conf -H 127.0.0.1 -p 41414 -F /data/xx.txt
 

心得:
  • 感觉就是一个传输工具,使用配置很简单,但是参数还得调,要不报错。
  • 上传文件可以自动按照大小、行或者处理时间分割为多个文件。
  • 测试了单机上传700MB文件,上传速度和hadoop fs -put速度相同,记录没有丢失。

纠结:
  • 输出文件只能在flume.conf定义吗?
  • 如何将文件按照规则汇总到HDFS(比如按照客户合并输出)?
  • 基于事务的传输保证每个事务内Event(日志行)有保证传到HDFS,但是如果传输整个文件中途中断,还是会有部分数据上传遗留到HDFS。

结论:FlumeNG1.1,尚不能满足复杂业务要求,尽管支持自定义source和sink等组件,系统使用过程中觉得软件不够坚固,非常简单的示例,都报错,让人心里没底,还好源码够简单,这样看来只适合做简单传输。


错误记录:
org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
解决:设置agent1.channels.<channel_name>.keep-alive = 30

资料:
FlumeNG 架构
Flume User Guide

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐