Flume的配置文件就是类似与Kettle的ktr或者kjb,从哪里获取数据怎么处理录到哪里都是通过配置文件进行描述的,官方《Flume 1.9.0 User Guide》已经很详细了,各种sources、channels、sinks都有相当详细的配置说明和demo举例,我们这里弄几个常用的案例进行测试说明。
1.配置格式
配置通常需要【定义】和【绑定】两个部分,放在哪里就是个人习惯了,我习惯定义在上,绑定在下:
- 定义 Agent 的 Sources,Channels,Sinks 及其具体参数【参数可以从官网查询】。基本格式如下:
```xml定义agentName的sources、channels、sinks
.sources =
.channels =
.sinks =
定义sources的具体属性
.sources.. =
定义channels的具体属性
.channels.. =
.channels.. =
定义sinks的具体属性
.sinks.. =
2. 绑定 Sources 和 Sinks 的 Channels。需要注意的是一个Sources可以配置多个Channels,但一个 Sink只能配置一个Channel。【sources后的是channels,sinks后的是channel,一定要注意。】基本格式如下:
```xml
# 绑定sources的channels
<agentName>.sources.<sourceName>.channels = <channelName1> <channelName2> ...
# 绑定sinks的channel
<agentName>.sinks.<sinkName>.channel = <channelName1>
2.文件命名
配置文件的命名并没有很严格的规范,我的命名习惯是【模块名称】-【sources类型】-【channels类型】-【sinks类型】各个部分都可以简写以缩短文件名。如何通过配置文件启动一个agent:
# 单- 短指令
flume-ng agent \
-n $agent_name \
-c conf -f conf/flume-conf.properties.template
# 双-- 完整指令
flume-ng agent \
--name $agent_name \
--conf conf --conf-file /home/flume/test/test-exec-memory-logger.properties \ -Dflume.root.logger=INFO,console
3.常用案例
3.1 案例一
这里主要是使用一下exec类型的source,exec类型的source能实现的数据获取很广泛,因为command能实现的操作太多了。sinks使用最简单的logger,我们看一下官方的配置及Demo:
- 配置
新建配置文件test-exec-memory-logger.properties 其内容如下:
# 1.定义
# 定义agent的sources,channels,sinks
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 定义sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
# 定义channel类型
a1.channels.c1.type = memory
# 定义sink属性
a1.sinks.k1.type = logger
# 2.绑定
#将sources与channels进行绑定【a1的数据源s1的通道是】
a1.sources.s1.channels = c1
#将sinks与channels进行绑定 【a1的sinks k1的通道是】
a1.sinks.k1.channel = c1
- 启动
我们配置过FLUME_HOME的环境变量,所以可在任何地方执行:
[root@tcloud ~]# flume-ng agent \
-n a1 \
-c conf \
-f /home/flume/test/test-exec-memory-logger.properties \
-Dflume.root.logger=INFO,console
- 测试
向文件中追加数据:
[root@tcloud ~]# echo "Hello" >> /tmp/log.txt
[root@tcloud ~]# echo "ARE" >> /tmp/log.txt
[root@tcloud ~]# echo "YOU" >> /tmp/log.txt
[root@tcloud ~]# echo "OK" >> /tmp/log.txt
控制台的显示:
# 这里省去一些日志信息只贴出主要的
2021-08-24 11:58:05,499 INFO sink.LoggerSink: Event:
{
headers:{
} body: 48 65 6C 6C 6F Hello }
{
headers:{
} body: 41 52 45 ARE }
{
headers:{
} body: 59 4F 55 YOU }
{
headers:{
} body: 4F 4B OK }
3.2 案例二
这里主要是使用一下spooldir类型的source,sinks使用hdfs,将指定目录下的文件上传到hdfs也是常用的场景,我们看一下官方的配置及Demo:
- 配置
新建配置文件test-exec-memory-logger.properties 其内容如下:
# 1.定义
# 定义agent的sources,channels,sinks
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# 定义sources属性
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /tmp/logs
# /tmp/logs 文件夹必须存在 否则会报错
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName
# 定义channel属性
a1.channels.c1.type = memory
# 定义sink属性
a1.sinks.k1.type = hdfs
# 这个地方要注意 不要写成/flume/events/%y-%m-%d/%H/
# 否则将会是这样的结果 /flume/events/21-08-24/15//log.txt.1629791801963
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H
a1.sinks.k1.hdfs.filePrefix = %{
fileName}
# 生成的文件类型 默认是Sequencefile 可用DataStream 则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 2.绑定
# 将sources与channels进行绑定
a1.sources.s1.channels = c1
# 将sinks与channels进行绑定
a1.sinks.k1.channel = c1
- 启动
启动前要先启动hdfs:
[root@tcloud sbin]# start-dfs.sh
[root@tcloud sbin]# hdfs version
Hadoop 3.1.3
这次使用以下双杠指令:
[root@tcloud ~]# flume-ng agent \
--name a1 \
--conf conf \
--conf-file /home/flume/test/test-spooling-memory-hdfs.properties \
-Dflume.root.logger=INFO,console
- 测试
我们拷贝案例一生成的文件 /tmp/log.txt 到监听目录下,可以从日志看到文件上传到 HDFS 的路径:
[root@tcloud ~]# cp /tmp/log.txt /tmp/logs/
# 日志信息
2021-08-24 15:56:41,727 INFO avro.ReliableSpoolingFileEventReader:
Preparing to move file /tmp/logs/log.txt to /tmp/logs/log.txt.COMPLETED
2021-08-24 15:56:41,962 INFO hdfs.HDFSDataStream:
Serializer = TEXT, UseRawLocalFileSystem = false
2021-08-24 15:56:42,265 INFO hdfs.BucketWriter:
Creating /flume/events/21-08-24/15/log.txt.1629791801963.tmp
查看上传到 HDFS 上的文件内容与本地是否一致:
[root@tcloud ~]# hdfs dfs -cat /flume/events/21-08-24/15/log.txt.1629791801963
2021-08-24 15:58:24,855 INFO sasl.SaslDataTransferClient:
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Hello
ARE
YOU
OK
3.3 案例三
Avro是一个数据序列化系统,设计用于支持大批量数据交换的应用。这里主要是使用一下avro类型的source和sink,将本服务器收集到的数据发送到另外一台服务器。特别注意 :exclamation::exclamation::exclamation: 大家一定要分清 source 和 sink,两台服务器绑定的都是 【把avro作为source的服务器的ip和端口】 这点儿一定要配置好。
- 配置日志收集Flume
新建配置 test-exec-memory-avro.properties【某些备注不再赘述】,监听文件内容变化,然后将新的文件内容通过avro sink 发送到 tcloud 服务器的 14545 端口:
# 1.定义
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = tcloud
a1.sinks.k1.port = 14545
a1.sinks.k1.batch-size = 1
# 2.绑定
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
- 配置日志聚合Flume
新建配置 test-avro-memory-logger.properties【某些备注不再赘述】, 监听 tcloud 服务器的 14545 端口,将获取到内容输出到控制台:
# 1.定义
a2.sources = s2
a2.channels = c2
a2.sinks = k2
# sources
a2.sources.s2.type = avro
a2.sources.s2.bind = tcloud
a2.sources.s2.port = 14545
# channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# sink
a2.sinks.k2.type = logger
# 2.绑定
a2.sources.s2.channels = c2
a2.sinks.k2.channel = c2
- 启动
在tcloud服务器上启动日志聚集 Flume【此服务器将avro作为sources】:
[root@tcloud ~]# flume-ng agent \
--name a2 \
--conf conf \
--conf-file /home/flume/test/test-avro-memory-logger.properties \
-Dflume.root.logger=INFO,console
在aliyun服务器上启动日志收集 Flume【此服务器将avro作为sinks】:
[root@aliyun ~]#flume-ng agent \
--name a1 \
--conf conf \
--conf-file /home/flume/test/test-exec-memory-avro.properties \
-Dflume.root.logger=INFO,console
这里建议按以上顺序启动,原因是 Avro Source 会先与端口进行绑定,这样 Avro Sink 连接时才不会报无法连接的异常。但是即使不按顺序启动也是没关系的,sink会一直重试,直至建立好连接。
# sources端 连接成功
2021-08-25 09:43:40,119 INFO node.Application: Starting Sink k2
2021-08-25 09:43:40,120 INFO node.Application: Starting Source s2
2021-08-25 09:43:40,121 INFO source.AvroSource: Starting Avro source s2: {
bindAddress: tcloud, port: 14545 }...
2021-08-25 09:43:40,490 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s2: Successfully registered new MBean.
2021-08-25 09:43:40,490 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s2 started
2021-08-25 09:43:40,500 INFO source.AvroSource: Avro source s2 started.
# sinks端 连接成功
2021-08-25 09:43:44,713 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:212)] Rpc sink k1: Building RpcClient with hostname: tcloud, port: 14545
2021-08-25 09:43:44,714 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:113)] Attempting to create Avro Rpc client.
2021-08-25 09:43:44,714 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:594)] Using default maxIOWorkers
4.测试
在aliyun也就是【avro作为sinks端】的服务器上向文件 /tmp/log.txt 中追加内容:
[root@aliyun ~]# echo "Hello" >> /tmp/log.txt
[root@aliyun ~]# echo "Are" >> /tmp/log.txt
[root@aliyun ~]# echo "You" >> /tmp/log.txt
[root@aliyun ~]# echo "Ok" >> /tmp/log.txt
在tcloud也就是【avro作为source是端】的服务器上可以看到已经从 14545 端口监听到内容,并成功输出到控制台:
2021-08-25 09:51:48,176 INFO sink.LoggerSink:
Event: {
headers:{
} body: 48 65 6C 6C 6F Hello }
2021-08-25 09:51:54,084 INFO sink.LoggerSink:
Event: {
headers:{
} body: 41 72 65 Are }
2021-08-25 09:52:03,118 INFO sink.LoggerSink:
Event: {
headers:{
} body: 59 6F 75 You }
2021-08-25 09:52:07,311 INFO sink.LoggerSink:
Event: {
headers:{
} body: 4F 6B Ok }
4.总结
Flume的部署和使用都相对简单,官网给出的配置和demo也比较详细,我们在使用的时候结合使用场景,选择需要的Soucres、Channels、Sinks类型就行,希望这些内容有用 :sunny: