Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

简介: Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

章节内容

上一节完成了如下的内容:


编写Agent Conf配置文件

收集Hive数据

汇聚到HDFS中

测试效果

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。

之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。


2C4G 编号 h121

2C4G 编号 h122

2C2G 编号 h123

文档推荐

除了官方文档以外,这里有一个写的很好的中文文档:

https://flume.liyifeng.org/

监控目录

业务需求

想要监控指定目录 收集信息并上传到HDFS中

Source

选择 spooldir,因为 spooldir 能够保证数据不丢失,且能够进行断点续传,但是延迟较高,不能实时监控。


Channel

选择 memory


Sink

选择 HDFS


需要注意

拷贝到 spool 目录下的文件 不可以再打开编辑

无法监控子目录的文件夹变动

被监控文件夹每500毫秒 扫描一次文件变动

适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步

配置文件

cd /opt/wzk/flume_test
vim flume_spooldir-hdfs.conf

我们需要写入如下内容

# Name the components on this agent
a3.sources = r3
a3.channels = c3
a3.sinks = k3
# Describe/configure the source
a3.sources.r3.type = spooldir
# 注意这里的文件夹 换成自己的!!!
a3.sources.r3.spoolDir = /opt/wzk/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true

# 忽略以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 500
# Describe the sink
a3.sinks.k3.type = hdfs
# 注意修改成你自己的IP!!!
a3.sinks.k3.hdfs.path = hdfs://h121.wzk.icu:9000/flume/upload/%Y%m%d/%H%M

# 上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
# 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# 积攒500个Event,flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 500
# 设置文件类型
a3.sinks.k3.hdfs.fileType = DataStream
# 60秒滚动一次
a3.sinks.k3.hdfs.rollInterval = 60
# 128M滚动一次
a3.sinks.k3.hdfs.rollSize = 134217700
# 文件滚动与event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# 最小冗余数
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

启动Agent

$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file flume-spooldir-hdfs.conf \
-Dflume.root.logger=INFO,console

测试效果

Flume

cd /opt/wzk/upload
vim 1.txt

随便向其中写入一些内容,并保存,可以看到Flume已经有反应了。

HDFS

查看HDFS,也已经有内容了

采集双写

这里业务上需要:

  • Flume将数据写入本地
  • Flume将数据写入HDFS

分析实现

  • 需要多个Agent级联实现
  • Source选择taildir
  • Channel选择memory
  • 最终的Sink分别选择HDFS,file_roll

配置文件1

配置文件包含如下内容:

  • 1个 taildir source
  • 2个 memory channel
  • 2个 avro sink

新建文件

vim flume-taildir-avro.conf

写入如下内容

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating
# source
a1.sources.r1.type = taildir
# 记录每个文件最新消费位置
a1.sources.r1.positionFile = /root/flume/taildir_position.json
a1.sources.r1.filegroups = f1
# 备注:.*log 是正则表达式;这里写成 *.log 是错误的
a1.sources.r1.filegroups.f1 = /tmp/root/.*log
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux123
a1.sinks.k1.port = 9091
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux123
a1.sinks.k2.port = 9092
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

配置文件2

配置文件包含如下内容:

  • 1个 avro source
  • 1个 memory channel
  • 1个 hdfs sink

新建配置文件

vim flume-avro-hdfs.conf

写入如下的内容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = linux123
a2.sources.r1.port = 9091
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 500
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://linux121:8020/flume2/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 500个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 500
# 设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
# 60秒生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 0
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

配置文件3

配置文件包含如下内容:

  • 1个 avro source
  • 1个 memory channel
  • 1个 file_roll sink

新建配置文件

vim flume-avro-file.conf

写入如下的内容

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = linux123
a3.sources.r1.port = 9092
# Describe the sink
a3.sinks.k1.type = file_roll
# 目录需要提前创建好
a3.sinks.k1.sink.directory = /root/flume/output
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 10000
a3.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

启动Agent1

$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file ~/conf/flume-avro-file.conf \
-Dflume.root.logger=INFO,console &

启动Agent2

$FLUME_HOME/bin/flume-ng agent --name a2 \
--conf-file ~/conf/flume-avro-hdfs.conf \
-Dflume.root.logger=INFO,console &

启动Agent3

$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file ~/conf/flume-taildir-avro.conf \
-Dflume.root.logger=INFO,console &


Hive测试

hive -e "show databases;"


目录
相关文章
|
2月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
177 6
|
29天前
|
Prometheus 监控 Java
深入探索:自制Agent监控API接口耗时实践
在微服务架构中,监控API接口的调用耗时对于性能优化至关重要。通过监控接口耗时,我们可以识别性能瓶颈,优化服务响应速度。本文将分享如何自己动手实现一个Agent来统计API接口的调用耗时,提供一种实用的技术解决方案。
37 3
|
29天前
|
监控 数据可视化 Java
深入探索:自制Agent监控API接口耗时
在微服务架构中,监控API接口的调用耗时对于性能优化至关重要。通过监控这些指标,我们可以识别瓶颈,优化系统性能。本文将分享如何自己动手实现一个Agent来统计API接口的调用耗时,提供一种有效的监控解决方案。
38 2
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
94 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
42 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
54 0
|
26天前
|
存储 人工智能 自然语言处理
AI经营|多Agent择优生成商品标题
商品标题中关键词的好坏是商品能否被主搜检索到的关键因素,使用大模型自动优化标题成为【AI经营】中的核心能力之一,本文讲述大模型如何帮助商家优化商品素材,提升商品竞争力。
AI经营|多Agent择优生成商品标题
|
28天前
|
人工智能 算法 搜索推荐
清华校友用AI破解162个高数定理,智能体LeanAgent攻克困扰陶哲轩难题!
清华校友开发的LeanAgent智能体在数学推理领域取得重大突破,成功证明了162个未被人类证明的高等数学定理,涵盖抽象代数、代数拓扑等领域。LeanAgent采用“持续学习”框架,通过课程学习、动态数据库和渐进式训练,显著提升了数学定理证明的能力,为数学研究和教育提供了新的思路和方法。
51 3
|
29天前
|
人工智能 自然语言处理 算法
企业内训|AI/大模型/智能体的测评/评估技术-某电信运营商互联网研发中心
本课程是TsingtaoAI专为某电信运营商的互联网研发中心的AI算法工程师设计,已于近日在广州对客户团队完成交付。课程聚焦AI算法工程师在AI、大模型和智能体的测评/评估技术中的关键能力建设,深入探讨如何基于当前先进的AI、大模型与智能体技术,构建符合实际场景需求的科学测评体系。课程内容涵盖大模型及智能体的基础理论、测评集构建、评分标准、自动化与人工测评方法,以及特定垂直场景下的测评实战等方面。
91 4
|
2月前
|
Python 机器学习/深度学习 人工智能
手把手教你从零开始构建并训练你的第一个强化学习智能体:深入浅出Agent项目实战,带你体验编程与AI结合的乐趣
【10月更文挑战第1天】本文通过构建一个简单的强化学习环境,演示了如何创建和训练智能体以完成特定任务。我们使用Python、OpenAI Gym和PyTorch搭建了一个基础的智能体,使其学会在CartPole-v1环境中保持杆子不倒。文中详细介绍了环境设置、神经网络构建及训练过程。此实战案例有助于理解智能体的工作原理及基本训练方法,为更复杂应用奠定基础。首先需安装必要库: ```bash pip install gym torch ``` 接着定义环境并与之交互,实现智能体的训练。通过多个回合的试错学习,智能体逐步优化其策略。这一过程虽从基础做起,但为后续研究提供了良好起点。
166 4
手把手教你从零开始构建并训练你的第一个强化学习智能体:深入浅出Agent项目实战,带你体验编程与AI结合的乐趣