Flume 使用学习小结

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: #概述 在做埋点数据离线存储到odps中,用到了Flume。一边使用,一边学习了下Flume。其中感受到Flume确实易伸缩、易扩展。其中的组件都可以根据自己的业务特点方便的自定义使用。 Flume可进行大量日志数据采集、聚合和并转移到存储中,并提供数据在流转中的事务机制; 可适用场景:日志--->Flume--->实时计算(如MQ+Storm) 、日志--->Flume--->

概述

  在做埋点数据离线存储到odps中,用到了Flume。一边使用,一边学习了下Flume。其中感受到Flume确实易伸缩、易扩展。其中的组件都可以根据自己的业务特点方便的自定义使用。

Flume可进行大量日志数据采集、聚合和并转移到存储中,并提供数据在流转中的事务机制;
可适用场景:日志--->Flume--->实时计算(如MQ+Storm) 、日志--->Flume--->离线计算(如ODPS、HDFS、HBase)、日志--->Flume--->ElasticSearch等。

Flume架构

  Flume主要分为 Source、Channel、Sink三个组件,他们包含在一个Agent中,一个Agent相当于一个独立的application。数据从源头经过Agent的这几个组件最后到达目的地。一个Flume 服务可同时运行多个Agent,大致架构可参照下图:

_

对照这个图,作一些说明;

  • Event 一条日志数据在Flume中对应一个Event对象。不过给他添加了header属性,就是一个Map,放 一些额外信息,可以针对每条Event做特殊处理,比如Channel的选择。这些额外的键值对可以在Event从Source到Channel之间的interceptor(拦截器)中set。
  • Source 负责日志流入,比如从文件、网络、MQ等数据源流入数据。
  • Channel 负责数据聚合/暂存,以供Sink消费掉,事务机制主要在这里实现
  • Sink 负责数据转移到存储,比如从Channel拿到日志后直接存储到ODPS、ElasticSearch等。
  • 拦截器 如果配置了拦截器,则Event从Source 进入Channel前,经过拦截器链做过滤或其他处理;如识别不需要的数据等
  • 选择器 Flume默认实现有ReplicatingChannelSelector(复制,Event可同时发往多个Channel)和MultiplexingChannelSelector(复用,可根据header中某个字段值,发往不同的Channel)

下图是个简单的多Channel、Sink情况;Flume还包含一些其他的高级的特性和使用方法,有时间可以继续研究。
_2

Flume实际使用

 现在做的埋点数据导入ODPS的情况是,每天夜里1点左右把前一天的日志文件copy到Flume监控的目录,Flume处理新加入的文件。最终数据存储到ODPS。

遇到的问题:

  • 日志数据中包含空行等不正确格式的记录,导致从Channel中take日志记录后保存到ODPS失败;失败的操作被事务回滚,结果是数据流传在这个地方错误循环下去。
  • 日志数据按实际生成的日期为分区保存在OPDS表的分区中,导入日志数据的日期为实际日期的后一天。

在尝试了几种方法后,最后选择自定义了一个拦截器实现(UaLogFilteringInterceptor),能很好的达到目前的需求,他主要做如下两件事:

  • 过滤掉不需要、不规范的数据,并且把过滤掉的这些数据存储到指定的文件里,每天一个文件(如果有异常记录)。
  • 在每条Event的header中加入qt值(qt值为前一天的日期,格式为yyyyMMdd),每条Event根据该值保持到ODPS的对应表分区中。

这里顺带说下ODPS的Flume插件,他主要根据ODPS的特点自定义实现了一个Sink,在Flume的配置文件中配置使用该Sink,配置好该Sink的各个配置项,主要包含连接ODPS和使用对应表的。

说说Flume的事务

  主要用到事务实现有针对MemoryChannel的MemoryTransaction和FileChannel的FileBackedTransaction;

Event从Source PUT到Channel和从Channel Take到Sink后落地,这两个步骤都包裹在事务中;我这里说下MemoryTransaction大致实现。
MemoryTransaction 主要用到了两个双向阻塞队列(LinkedBlockingDeque)putList和takeList作为缓冲区,同时配合使用MemoryChannel中的LinkedBlockingDeque queue;队列的大小通过Flume的配置初始化好;

  • PUT事务

    1. 批量数据循环PUT到putList中
    2. Commit,把putList队列中数据offer到queue队列中,然后释放信号量,清空(clear)putList队列
    3. Rollback,清空(clear)putList队列
      这里其实没有做太多事。
  • Take事务

    1. 检查takeList队列大小是否够用,从queue队列中poll Event到takeList队列中
    2. Commit,表明被Sink正确消费掉,清空(clear)takeList队列
    3. Rollback,异常出现,则把takeList队列中的Event返还到queue队列顶部
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
SQL 分布式计算 监控
Flume学习--1、Flume概述、Flume入门、(一)
Flume学习--1、Flume概述、Flume入门、(一)
|
SQL 存储 分布式计算
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合
|
存储 Java 分布式数据库
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
|
监控 负载均衡
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(二)
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(二)
|
SQL 存储 分布式计算
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(一)
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(一)
|
JSON 监控 Unix
Flume学习--1、Flume概述、Flume入门、(二)
Flume学习--1、Flume概述、Flume入门、(二)
|
存储 消息中间件 数据采集
大数据开发笔记(六):Flume基础学习
Flume是数据采集,日志收集的框架,通过分布式形式进行采集,(高可用分布式)
323 0
大数据开发笔记(六):Flume基础学习
|
消息中间件 SQL 数据采集
|
分布式计算 监控 Java
02. Spark Streaming实时流处理学习——分布式日志收集框架Flume
2. 分布式日志收集框架Flume 2.1 业务现状分析 如上图,大量的系统和各种服务的日志数据持续生成。用户有了很好的商业创意想要充分利用这些系统日志信息。比如用户行为分析,轨迹跟踪等等。如何将日志上传到Hadoop集群上?对比方案存在什么问题,以及有什么优势? 方案1: 容错,负载均衡,高延时等问题如何消除? 方案2: Flume框架 2.
2453 0