需求
- 将bigdata02和bigdata03机器实时产生的日志数据汇总到bigdata04中
- 通过bigdata04将数据输出到HDFS指定目录
这里注意:HDFS目录要按天生产每天一个目录。
分析
图解:
这里需要用到3个Agent
- Agent1负责采集机器bigdata02数据
- Agent2负责采集机器bigdata03数据
- Agent3负责汇总机器1和2数据到机器3再统一输出到HDFS
- Agent1和Agent2因为要实时读取文件中新增数据,所以使用基于文件的source,Exec Source。
- Channel统一使用基于内存的Channel-Memory Channel
- 由于需要汇总数据,所以sink端加快传输使用Avro Sink
- 备注:Avro是一种序列化的手段,经过序列化的数据进行传输的时候效率非常高,Avro Sink发送的数据可以直接被Avro Source接受,无缝衔接
实战
以下定义02为A、03为B、04为C
首先在02机器上配置Flume
配置Agent,创建文件 file-to-avro-104.conf
在03机器上配置Flume
与02机器一样的操作
配置Agent,创建文件file-to-avro-104.conf
在04机器上配置文件avro-to-hdfs.conf
这里有个注意的点:
在指定Agent中sink配置的时候注意,我们的需求是需要按天在hdfs中创建目录,并把当天的数据上传到 当天的日期目录中,这也就意味着hdfssink中的path不能写死,需要使用变量,动态获取时间,查看官 方文档可知,在hdfs的目录中需要使用%Y%m%d。
这个时间其实是需要从数据里面抽取,咱们前面 说过数据的基本单位是Event,Event是一个对象,后面我们会详细分析,在这里大家先知道它里面包含 的既有我们采集到的原始的数据,还有一个header属性,这个header属性是一个key-value结构的,我 们现在抽取时间就需要到event的header中抽取,但是默认情况下event的header中是没有日期的,强行 抽取是会报错的,会提示抽取不到,返回空指针异常。
其实官方文档中也说了,可以使用hdfs.useLocalTimeStamp或者时间 拦截器,暂时最简单直接的方式就是使用hdfs.useLocalTimeStamp,这个属 性的值默认为false,需要改为true
三台机器中的Flume Agent都配置好了,在开始启动之前需要先在bigdata02和bigdata03中生成测试数 据,为了模拟真实情况,在这里我们就开发一个脚本,定时向文件中写数据。
#!/bin/bash
# 循环向文件中生成数据
while [ "1"="1" ]
do
# 获取当前时间戳
curr_time=`date +%s`
# 获取当前主机名
name=`hostname`
echo${name}_${curr_time} >> /data/log/access.log
# 暂停1秒
sleep1
done
1.首先在bigdata02上创建/data/log目录,然后创建 generateAccessLog.sh 脚本
2.接着在bigdata03上创建/data/log目录,然后创建 generateAccessLog.sh 脚本
3.接下来开始启动相关的服务进程 首先启动bigdata04上的agent服务
这里要注意下启动顺序
首先应该启动的是04机器、如果没有启动04就启动了02和03,会丢失一部分数据
- 启动04
../bin/flume-ng agent --name a1 --conf /data/soft/apache-flume-1.9.0-bin/conf/ --conf-file avro-to-hdfs.conf -Dflume.root.logger=INFO,console
- 启动03
../bin/flume-ng agent --name a1 --conf /data/soft/apache-flume-1.9.0-bin/conf/ --conf-file file-to-avro-104.conf -Dflume.root.logger=INFO,console
初始化测试数据
sh -x generateAccessLog.sh
- 启动02
../bin/flume-ng agent --name a1 --conf /data/soft/apache-flume-1.9.0-bin/conf/ --conf-file file-to-avro-104.conf -Dflume.root.logger=INFO,console
初始化测试数据
sh -x generateAccessLog.sh
验证数据结果
启动之后稍等一会就可以看到数据了,我们观察数据的变化,会发现hdfs中数据增长的不 是很快,它会每隔一段时间添加一批数据,实时性好像没那么高
注意
这是因为avrosink中有一个配置batch-size,它的默认值是100,也就是每次发送100条数据,如果数据 不够100条,则不发送。 具体这个值设置多少合适,要看你source数据源大致每秒产生多少数据,以及你希望的延迟要达到什么 程度,如果这个值设置太小的话,会造成sink频繁向外面写数据,这样也会影响性能。
实战结束
最终,依次停止bigdata02、bigdata03中的服务,最后停止bigdata04中的服务