Flume 日志收集系统 Spooldir-Source HDFS-sink

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 日志即log,记录发生的事件。以Nginx为例,有error_log和access_log 2个日志。access_log是访问日志,每条访问记录会产生几百字节的数据,随着访问量增加,日志文件会越来越大,必须定期清理日志。

日志即log,记录发生的事件。以Nginx为例,有error_log和access_log 2个日志。access_log是访问日志,每条访问记录会产生几百字节的数据,随着访问量增加,日志文件会越来越大,必须定期清理日志。
现在数据越来越重要,因此不能简单丢弃,要保存这些数据做更多数据分析。可以将数据保存到HDFS系统上,Flume是一个数据搬运软件,它扩展了很多功能,支持很多数据源。不编写代码利用Flume就可以搭建一个将log保存到HDFS的可靠系统。

一、Flume 组件

  • Source 采集信息源
  • Channel 消息缓存队列
  • Sink 从缓存队列中拉取消息,并处理。

消息 Record,Source封装Event(事件)成为Record对象,并保存到Channel中,Sink拉取Record并保存到目标系统中。

Sink处理完成之后,会向Channel发送确认消息,提供消息处理的可靠性。

因为Flume是一个大数据组件,在刚接触的时候犯了思维惯性错误,以为Source、Channel、Sink是部署在不同主机上的。如图一个Agent包括了三个组件,运行在一台主机上,准确的说一个JVM进程。常见的Source是agent可监听的文件夹、文件,Sink是hdfs。

二、配置文件

LogAgent.sources = mysource
LogAgent.channels = mychannel
LogAgent.sinks = mysink
LogAgent.sources.mysource.type = spooldir
LogAgent.sources.mysource.channels = mychannel
LogAgent.sources.mysource.spoolDir =/Users/wangsen/hadoop/apache-flume-1.7.0-bin/conf_copy
LogAgent.sinks.mysink.channel = mychannel
LogAgent.sinks.mysink.type = hdfs
LogAgent.sinks.mysink.hdfs.path = hdfs://namenode:9000/data/logs2/
LogAgent.sinks.mysink.hdfs.rollInterval = 30
LogAgent.sinks.mysink.hdfs.batchSize = 10000
LogAgent.sinks.mysink.hdfs.rollSize = 0
LogAgent.sinks.mysink.hdfs.rollCount = 10000
LogAgent.sinks.mysink.hdfs.fileType = DataStream
LogAgent.sinks.mysink.hdfs.useLocalTimeStamp = true
LogAgent.channels.mychannel.type = memory
LogAgent.channels.mychannel.capacity = 10000
LogAgent.channels.mychannel.transactionCapacity = 10000

运行flume
bin/flume-ng agent --conf conf --conf-file conf/logagent.properties --name LogAgent -Dflume.root.logger=DEBUG,console

三、注意事项

1. sinks.mysink.hdfs.batchSize 和channels.mychannel.transactionCapacity

process failed
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or in creasing thread count

如果sink.batchSize 大于 transactionCapacity:channel的处理能力被占满,得不到sink的确认消息,因为没有达到sink批处理数。

2.spooldir 监听目录中的文件


spooldir监听文件目录,当出现新文件时,将新文件转化成事件。默认deseriallizer的值是LINE,文件的每行封装成一个Event。因此,在sink端也是按代表一行的Record进行处理。

3.hdfs sink 配置

hdfs.fileType = DataStream ##保存文件时不用压缩
hdfs.rollCount = 10000 ##每个文件记录10000条Record,超过10000条分割文件
hdfs.rollSize = 0 ## 不以文件的大小分割
hdfs.batchSize = 10000 ## 批处理数,没达到时保存在.tmp文件中
hdfs.rollInterval = 30 ##批处理超时时间,将tmp文件写入到正式文件,并提交确认。

四、实验结果

源文件夹:

drwxr-xr-x 2 wangsen staff 64 8 23 09:50 .flumespool
-rw-r--r-- 1 wangsen staff 1661 9 26 2016 flume-conf.properties.template.COMPLETED
-rw-r--r-- 1 wangsen staff 1455 9 26 2016 flume-env.ps1.template.COMPLETED
-rw-r--r-- 1 wangsen staff 1565 9 26 2016 flume-env.sh.template.COMPLETED
-rw-r--r-- 1 wangsen staff 3107 9 26 2016 log4j.properties.COMPLETED
-rw-r--r--@ 1 wangsen staff 778 8 23 09:49 logagent.properties.COMPLETED

处理成功数据,添加后缀.COMPLETED,此后缀可以在.properties文件中设置。
HDFS:

-rw-r--r-- 3 root supergroup 8567 2018-08-23 09:50 /data/logs2/FlumeData.1534989021404

生成一个文件,没有超过10000行就保存在一个文件。文件名称可以在.properties文件中配置。

总结

本文是Flume基本实验,TailDir是一种更强大的目录源Source,支持文件级的监听。通过设置Decoder可以文件作为事件(不以Line为Event),实现文件夹的同步。通过级联方式,实现多个主机之间高可靠文件/文件夹同步。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
4月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
4月前
|
存储 运维 监控
【Flume】flume 日志管理中的应用
【4月更文挑战第4天】【Flume】flume 日志管理中的应用
|
4月前
|
消息中间件 监控 网络协议
Flume系统
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输系统,起源于Cloudera。【2月更文挑战第8天】
64 4
|
28天前
|
存储 数据采集 数据处理
【Flume拓扑揭秘】掌握Flume的四大常用结构,构建强大的日志收集系统!
【8月更文挑战第24天】Apache Flume是一个强大的工具,专为大规模日志数据的收集、聚合及传输设计。其核心架构包括源(Source)、通道(Channel)与接收器(Sink)。Flume支持多样化的拓扑结构以适应不同需求,包括单层、扇入(Fan-in)、扇出(Fan-out)及复杂多层拓扑。单层拓扑简单直观,适用于单一数据流场景;扇入结构集中处理多源头数据;扇出结构则实现数据多目的地分发;复杂多层拓扑提供高度灵活性,适合多层次数据处理。通过灵活配置,Flume能够高效构建各种规模的数据收集系统。
28 0
|
17天前
|
存储 分布式计算 资源调度
通过日志聚合将作业日志存储在HDFS中
如何通过配置Hadoop的日志聚合功能,将作业日志存储在HDFS中以实现长期保留,并详细说明了相关配置参数和访问日志的方法。
15 0
通过日志聚合将作业日志存储在HDFS中
|
28天前
|
存储 分布式计算 大数据
【Flume的大数据之旅】探索Flume如何成为大数据分析的得力助手,从日志收集到实时处理一网打尽!
【8月更文挑战第24天】Apache Flume是一款高效可靠的数据收集系统,专为Hadoop环境设计。它能在数据产生端与分析/存储端间搭建桥梁,适用于日志收集、数据集成、实时处理及数据备份等多种场景。通过监控不同来源的日志文件并将数据标准化后传输至Hadoop等平台,Flume支持了性能监控、数据分析等多种需求。此外,它还能与Apache Storm或Flink等实时处理框架集成,实现数据的即时分析。下面展示了一个简单的Flume配置示例,说明如何将日志数据导入HDFS进行存储。总之,Flume凭借其灵活性和强大的集成能力,在大数据处理流程中占据了重要地位。
33 3
|
23天前
|
存储 分布式计算 资源调度
Hadoop生态系统概览:从HDFS到Spark
【8月更文第28天】Hadoop是一个开源软件框架,用于分布式存储和处理大规模数据集。它由多个组件构成,旨在提供高可靠性、高可扩展性和成本效益的数据处理解决方案。本文将介绍Hadoop的核心组件,包括HDFS、MapReduce、YARN,并探讨它们如何与现代大数据处理工具如Spark集成。
48 0
|
2月前
|
分布式计算 Hadoop
|
4月前
|
SQL 分布式计算 监控
Flume实时读取本地/目录文件到HDFS
Flume实时读取本地/目录文件到HDFS
80 7
|
3月前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
106 0