log4j2+flume+hadoop

简介: 数据采集框架

大数据数据采集架构

我们日常的应用会打印很多日志,很可能我们需要从这些日志中提取某些有用信息,要实现这个功能可以通过如下架构实现。我的选型是log4j2+flume+hadoop。整个架构如图所示:

DBC_1_

问题一:为什么是log4j2?
1.传统的log4j对性能的消耗很大。Apache宣称,对于并发发操作log4j2的性能是log4j的18倍
2.log4j2为flume专门提供了一个flume appender 利于flume做数据采集
3.log4j提供jsonLaout,可以生成json形式的日志,这种类型的数据对于第二阶段的数据解析提供了便利。
问题二:为什么是flume
1.flume是JAVA语言开发的,我个人是专门做JAVA,如果要做自定义会很方便,而flume提供了灵活的自定义功能。
2.flume在采集数据的时候便可做一些数据清洗的东西,将不想要的东西过滤掉。
3.flume本身比较轻巧,日数据在100W以内都能稳定使用。如果超过100W可以考虑跟kafka集成。
问题三:为什么是hadoop?
公司的要求是将用户的数据收集,存储,然后进行分析,根据分析结果改善用户体验,等等。hadoop的优势是对硬件的要求不高,并且有很强的容错性,能对数据进行离线分析。这些特点恰好满足公司需求。
IP分配
IP flume hadoop
m1 192.168.1.111 agent1 NameNode
s2 192.168.1.112 collector1 DataNode1
s3 192.168.1.113 collector2 DataNode2

一.log4j2

1. 新建一个marven项目,目录结构如图所示

image

2. 配置pom文件
    <properties>
        <log4j.version>2.8.2</log4j.version>
        <slf4j.version>2.8.2</slf4j.version>
        <flume-ng.versiopn>2.8.2</flume-ng.versiopn>
        <log4j-flume-ng.version>1.7.0</log4j-flume-ng.version>
        <jackson.version>2.7.0</jackson.version>
    </properties>

    <dependencies>
        <!-- log4j -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!-- slf4j -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <!-- flume -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-flume-ng</artifactId>
            <version>${flume-ng.versiopn}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flume.flume-ng-clients</groupId>
            <artifactId>flume-ng-log4jappender</artifactId>
            <version>${log4j-flume-ng.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.7.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>
    </dependencies>
3.配置log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
    <!--自定义flume日志级别-->
    <CustomLevels>
        <CustomLevel name="FLUME" intLevel="88" />
    </CustomLevels>
    <!--定义输出日志的地方-->
    <Appenders>
        <!--控制台输出-->
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d %-7level %logger{36} - %msg%n"/>
        </Console>
        <!--log文件输出-->
        <File name="MyFile" fileName="logs/app.log">
            <PatternLayout pattern="%d %-7level %logger{36} - %msg%n"/>
        </File>
        <!--输出到flume-->
        <Flume name="eventLogger" compress="false">
            <Agent host="192.168.1.111" port="41414"/>
            <!--输出方式为json-->
            <JSONLayout/>
        </Flume>
    </Appenders>
    <!--配置不同的日志级别输出到不同地点-->
    <Loggers>
        <!--root代表默认日志级别-->
        <Root level="error">
            <!--设定flume级别及以上的日志通过flume-appender输出-->
            <AppenderRef ref="eventLogger" level="FLUME" />
            <!--设定console级别及以上的日志通过控制台输出-->
            <AppenderRef ref="Console" level="info" />
            <!--设定error及以上的日志通过log文件输出-->
            <AppenderRef ref="MyFile" level="error" />
        </Root>
    </Loggers>
</Configuration>

3.LaoutTest.java

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


import java.util.Date;

/**
 * Created by hadoop on 2017/7/28.
 */
public class LaoutTest {
    static Logger logger = LogManager.getLogger(LaoutTest.class);
    public static void main(String[] args) throws InterruptedException {
        while (true) {
            // 每隔两秒log输出一下当前系统时间戳
            Thread.sleep(100);
            logger.info(String.valueOf(new Date().getTime()));
            logger.log(Level.getLevel("FLUME"), "another diagnostic message");
            try {
                throw new Exception("exception msg");
            }
            catch (Exception e) {
                logger.error("error:" + e.getMessage());
            }
        }
    }
}

二.flume

flume我使用的是1.7版本,下载地址https://flume.apache.org/download.html
安装在/usr下 文件名更名为flume。三台机器都这样操作。如下配置文件都是在/user/flume/conf下创建生成
1. agent1: 配置文件名avro-mem-hdfs-collector.properties
#nents on this agent
agent1.sources = r1
agent1.sinks = k1 k2 k3
agent1.channels = c1 c2 c3

#设定来源 通道 存储之间的关系
agent1.sources.r1.channels = c1 c2 c3
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c2
agent1.sinks.k3.channel = c3
agent1.sources.r1.selector = replicating

#source
agent1.sources.r1.type = avro
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 41414
agent1.sources.r1.fileHeader = false
agent1.sources.r1.interceptors =i1
agent1.sources.r1.interceptors.i1.type = timestamp

#channel c1
agent1.channels.c1.type = memory
agent1.channels.c1.keep-alive = 30 
agent1.channels.c1.capacity = 10000
agent1.channels.c1.transactionCapacity = 1000

#sink k1
agent1.sinks.k1.type = hdfs
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hdfs.path = hdfs://192.168.1.111:9000/all/%Y-%m-%d/%H
agent1.sinks.k1.hdfs.filePrefix = logs
agent1.sinks.k1.hdfs.inUsePrefix = .
agent1.sinks.k1.hdfs.fileType = DataStream
agent1.sinks.k1.hdfs.rollInterval = 0
agent1.sinks.k1.hdfs.rollSize = 16777216
agent1.sinks.k1.hdfs.rollCount = 0
agent1.sinks.k1.hdfs.batchSize = 1000
agent1.sinks.k1.hdfs.writeFormat = text
agent1.sinks.k1.hdfs.fileType = DataStream
agent1.sinks.k1.callTimeout =10000

#channel c2
agent1.channels.c2.type=memory
agent1.channels.c2.keep-alive = 30
agent1.channels.c2.capacity = 10000
agent1.channels.c2.transactionCapacity = 1000

#sink for k2
agent1.sinks.k2.type = avro
agent1.sinks.k2.channel = c2
agent1.sinks.k2.hostname = 192.168.1.112
agent1.sinks.k2.port = 41414

#channel c3
agent1.channels.c3.type=memory
agent1.channels.c3.keep-alive = 30
agent1.channels.c3.capacity = 10000
agent1.channels.c3.transactionCapacity = 1000

#sink for k3
agent1.sinks.k3.type = avro
agent1.sinks.k3.channel = c2
agent1.sinks.k3.hostname = 192.168.1.113
agent1.sinks.k3.port = 41414
2. collector2 配置文件名avro-mem-hdfs.properties
#nents on this agent
collector2.sources = r1
collector2.sinks = k1
collector2.channels = c1

#source
collector2.sources.r1.channels = c1
collector2.sources.r1.type = avro
collector2.sources.r1.bind = 0.0.0.0
collector2.sources.r1.port = 41414
collector2.sources.r1.fileHeader = false
collector2.sources.r1.interceptors =i1
collector2.sources.r1.interceptors.i1.type = timestamp

# channel 
collector2.channels.c1.type = memory
collector2.channels.c1.keep-alive = 30 
collector2.channels.c1.capacity = 30000
collector2.channels.c1.transactionCapacity = 3000

# sink
collector2.sinks.k1.channel = c1
collector2.sinks.k1.type = hdfs
collector2.sinks.k1.hdfs.path = hdfs://192.168.1.111:9000/business1/%Y-%m-%d/%H
collector2.sinks.k1.hdfs.filePrefix = logs
collector2.sinks.k1.hdfs.inUsePrefix = .
collector2.sinks.k1.hdfs.fileType = DataStream
collector2.sinks.k1.hdfs.rollInterval = 0
collector2.sinks.k1.hdfs.rollSize = 16777216
collector2.sinks.k1.hdfs.rollCount = 0
collector2.sinks.k1.hdfs.batchSize = 1000
collector2.sinks.k1.hdfs.writeFormat = text
collector2.sinks.k1.hdfs.fileType = DataStream
collector2.sinks.k1.callTimeout =10000
3. collector3 配置文件avro-mem-hdfs.properties
#nents on this agent
#nents on this agent
collector3.sources = r1
collector3.sinks = k1
collector3.channels = c1

#source
collector3.sources.r1.channels = c1
collector3.sources.r1.type = avro
collector3.sources.r1.bind = 0.0.0.0
collector3.sources.r1.port = 41414
collector3.sources.r1.fileHeader = false
collector3.sources.r1.interceptors =i1
collector3.sources.r1.interceptors.i1.type = timestamp

# channel 
collector3.channels.c1.type = memory
collector3.channels.c1.keep-alive = 30 
collector3.channels.c1.capacity = 30000
collector3.channels.c1.transactionCapacity = 3000

# sink
collector3.sinks.k1.channel = c1
collector3.sinks.k1.type = hdfs
collector3.sinks.k1.hdfs.path = hdfs://192.168.1.111:9000/business2/%Y-%m-%d/%H
collector3.sinks.k1.hdfs.filePrefix = logs
collector3.sinks.k1.hdfs.inUsePrefix = .
collector3.sinks.k1.hdfs.fileType = DataStream
collector3.sinks.k1.hdfs.rollInterval = 0
collector3.sinks.k1.hdfs.rollSize = 16777216
collector3.sinks.k1.hdfs.rollCount = 0
collector3.sinks.k1.hdfs.batchSize = 1000
collector3.sinks.k1.hdfs.writeFormat = text
collector3.sinks.k1.hdfs.fileType = DataStream
collector3.sinks.k1.callTimeout =10000

4. 进入/user/flume目录启动agent与collectoer
1.启动agent1: 
bin/flume-ng agent -c ./conf/ -f conf/avro-mem-hdfs-collector.properties -Dflume.root.logger=INFO,console -n agent1
2.启动collector1: 
bin/flume-ng agent -c ./conf/ -f conf/avro-mem-hdfs.properties -Dflume.root.logger=INFO,console -n collector1
3.启动collector2:
bin/flume-ng agent -c ./conf/ -f conf/avro-mem-hdfs.properties -Dflume.root.logger=INFO,console -n collector3
相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
目录
相关文章
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
存储 运维 监控
【Flume】flume 日志管理中的应用
【4月更文挑战第4天】【Flume】flume 日志管理中的应用
|
消息中间件 分布式计算 大数据
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
977 0
|
SQL 消息中间件 分布式计算
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
1597 0
如何查看spark与hadoop、kafka、Scala、flume、hive等兼容版本【适用于任何版本】
|
消息中间件 数据采集 SQL
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
|
2月前
|
数据采集 缓存 大数据
【赵渝强老师】大数据日志采集引擎Flume
Apache Flume 是一个分布式、可靠的数据采集系统,支持从多种数据源收集日志信息,并传输至指定目的地。其核心架构由Source、Channel、Sink三组件构成,通过Event封装数据,保障高效与可靠传输。
235 1
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
178 3
|
存储 数据采集 数据处理
【Flume拓扑揭秘】掌握Flume的四大常用结构,构建强大的日志收集系统!
【8月更文挑战第24天】Apache Flume是一个强大的工具,专为大规模日志数据的收集、聚合及传输设计。其核心架构包括源(Source)、通道(Channel)与接收器(Sink)。Flume支持多样化的拓扑结构以适应不同需求,包括单层、扇入(Fan-in)、扇出(Fan-out)及复杂多层拓扑。单层拓扑简单直观,适用于单一数据流场景;扇入结构集中处理多源头数据;扇出结构则实现数据多目的地分发;复杂多层拓扑提供高度灵活性,适合多层次数据处理。通过灵活配置,Flume能够高效构建各种规模的数据收集系统。
344 0
|
12月前
|
存储 分布式计算 大数据
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
632 4
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
283 2