大数据实战项目之电商数仓(二)

简介: 大数据实战项目之电商数仓(二)

大数据实战项目之电商数仓(一):https://developer.aliyun.com/article/1535211

flume数据采集通道搭建

flume第一层采集通道

设置flume的配置文件f1.conf

#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
#读取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx开头$代表以什么结尾 .代表匹配任意字符
#+代表匹配任意位置
a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$
#JSON文件的保存位置
a1.sources.r1.positionFile=/opt/module/flume/test/log_position.json

#定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder


#定义sink
a1.sinks.k1.type=logger

#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

flume第一层通道的启动和关闭脚本f1

#!/bin/bash
if(($#!=1))
then
        echo 请输入start或stop!
        exit;
fi

cmd=cmd
if [ $1 = start ]
then
        cmd="nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f1.conf -Dflume.root.logger=DEBUG,console > /home/atguigu/f1.log 2>&1 &"
elif [ $1 = stop ]
then
        cmd="ps -ef | grep f1.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9"
else
        echo 请输入start或stop!
fi


#在hadoop102和hadoop103开启采集
for i in hadoop102 hadoop103
do
        ssh $i $cmd
done
flume第二层采集通道

设置flume的配置文件f1.conf

#配置文件编写
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.sources.r1.kafka.consumer.group.id=CG_Start

a1.sources.r2.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event
a1.sources.r2.kafka.consumer.auto.offset.reset=earliest
a1.sources.r2.kafka.consumer.group.id=CG_Event
#配置channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/module/flume/c1/checkpoint
#启动备用checkpoint
a1.channels.c1.useDualCheckpoints=true
a1.channels.c1.backupCheckpointDir=/opt/module/flume/c1/backupcheckpoint
#event存储的目录
a1.channels.c1.dataDirs=/opt/module/flume/c1/datas


a1.channels.c2.type=file
a1.channels.c2.checkpointDir=/opt/module/flume/c2/checkpoint
a1.channels.c2.useDualCheckpoints=true
a1.channels.c2.backupCheckpointDir=/opt/module/flume/c2/backupcheckpoint
a1.channels.c2.dataDirs=/opt/module/flume/c2/datas


#sink
a1.sinks.k1.type = hdfs
#一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要将useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-

a1.sinks.k1.hdfs.batchSize = 1000

#文件的滚动
#60秒滚动生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
#设置每个文件到128M时滚动
a1.sinks.k1.hdfs.rollSize = 134217700
#禁用基于event数量的文件滚动策略
a1.sinks.k1.hdfs.rollCount = 0
#指定文件使用LZO压缩格式
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = second



a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.batchSize = 1000
a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 134217700
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.codeC = lzop
#a1.sinks.k2.hdfs.round = true
#a1.sinks.k2.hdfs.roundValue = 10
#a1.sinks.k2.hdfs.roundUnit = second

#连接组件
a1.sources.r1.channels=c1
a1.sources.r2.channels=c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

flume写入hdfs采用lzo格式需要先向core-site.xml添加相关压缩格式的配置

<property>
    <name>io.compression.codecs</name>
    <value>
         com.hadoop.compression.lzo.LzoCodec,
         com.hadoop.compression.lzo.LzopCodec
     </value>
  </property>
  <property>
    <name>io.compression.codec.lzo.class</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
 </property>

flume第二层通道的启动和关闭脚本f2

#!/bin/bash
if(($#!=1))
then
        echo 请输入start或stop!
        exit;
fi

if [ $1 = start ]
then
        ssh hadoop104 "nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f2.conf -Dflume.root.logger=INFO,console > /home/atguigu/f2.log 2>&1 &"
elif [ $1 = stop ]
then
        ssh hadoop104 "ps -ef | grep f2.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9"
else
        echo 请输入start或stop!
fi

数据采集集群一键启动脚本

#!/bin/bash
if(($#!=1))
then
  echo 请输入start或stop!
  exits;
fi
#编写函数,这个函数的功能为返回集群中启动成功的broker的数量
function countKafkaBrokers()
{
  count=0
  for((i=102;i<=104;i++))
  do
    result=$(ssh hadoop$i "jps | grep Kafka | wc -l")
    count=$[$result+$count]
  done
  #函数可以定义返回值,如果定义,返回函数最后一行命令的状态(返回0,代表成功,非0,即为异常)
  return $count
}
#启动
if [ $1 = start ]
then
  zk start
  hd start
  kf start
  #保证kafka集群已经启动,才能启动f1,f2采集通道
  while [ 1 ]
  do
    countKafkaBrokers
    if(($?==3))
    then
      break
    fi
    sleep 2s
  done
  f1 start
  f2 start
  #查看已经启动进程
  xcall jps
elif [ $1 = stop ]
then
  f1 stop
  f2 stop
  kf stop
  #在kafka没有停止完成之前,不能停止zk集群
  while [ 1 ]
        do
                countKafkaBrokers
                if(($?==0))
                then
                        break
                fi
                sleep 2s
        done
  zk stop
  hd stop
  #查看还剩那些进程
  xcall jps
else
  echo echo 请输入start或stop!
fi

HDFS-HA配置

配置nameservice,编写hdfs-sitx.xml

vim hdfs-site.xml
• 1
<property>
  <name>dfs.nameservices</name>
  <value>mycluster</value>
</property>
<property>
  <name>dfs.ha.namenodes.mycluster</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.mycluster.nn1</name>
  <value>hadoop101:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn1</name>
  <value>hadoop101:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn2</name>
  <value>hadoop102:50070</value>
</property>

<property>
  <name>dfs.namenode.rpc-address.mycluster.nn2</name>
  <value>hadoop102:8020</value>
</property>
<property>
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://hadoop101:8485;hadoop102:8485;hadoop103:8485/mycluster</value>
</property>
<property>
  <name>dfs.journalnode.edits.dir</name>
  <value>/opt/module/hadoop-2.7.2/data/data</value>
</property>
 <!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应 -->
        <property>
                <name>dfs.ha.fencing.methods</name>
                <value>sshfence</value>
        </property>

<!-- 使用隔离机制时需要ssh无秘钥登录-->
        <property>
         <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/home/atguigu/.ssh/id_rsa</value>
        </property>
<property>
  <name>dfs.client.failover.proxy.provider.mycluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>hadoop103:50090</value>
</property>
<!--配置故障自动转义-->
 <property>
   <name>dfs.ha.automatic-failover.enabled</name>
   <value>true</value>

(2)编写core-site.xml

<!-- 指定HDFS中NameNode的地址 -->
<property>
        <name>fs.defaultFS</name>
        <value>hdfs://mycluster</value>
</property>
 <!--配置hadoop运行时临时文件-->
  <property>
  <name>hadoop.tmp.dir</name>
  <value>/opt/module/hadoop-2.7.2/data/tmp</value>
 </property>
<!-- 指定Hadoop运行时产生文件的存储目录 -->
<!--<property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-2.7.2/data/tmp</value>
</property>-->
<!--配置zookeeper地址-->
  <property>
   <name>ha.zookeeper.quorum</name>  <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
 </property>

启动journalnode

xcall hadoop-daemon.sh start journalnode
• 1

在nn1上对namenode进行格式化

hadoop namenode -format
hdfs namenode -bootstrapStandby

在nn2上对namenode信息进行拷贝

stop-all.sh
hdfs zkfc -formatZK
start-dfs.sh

ResouceManager-HA配置

(1)编写yarn-site.xml

<property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
</property>
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>cluster1</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>hadoop101</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>hadoop102</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>hadoop101:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>hadoop102:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.zk-address</name>
  <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
</property>
<!--启用自动恢复-->
    <property>
        <name>yarn.resourcemanager.recovery.enabled</name>
        <value>true</value>
</property>
<!--指定resourcemanager的状态信息存储在zookeeper集群-->
 <property>
     <name>yarn.resourcemanager.store.class</name>
   <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<!-- 日志聚集功能使能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 日志保留时间设置7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
相关实践学习
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
8月前
|
存储 分布式计算 大数据
基于Python大数据的的电商用户行为分析系统
本系统基于Django、Scrapy与Hadoop技术,构建电商用户行为分析平台。通过爬取与处理海量用户数据,实现行为追踪、偏好分析与个性化推荐,助力企业提升营销精准度与用户体验,推动电商智能化发展。
|
8月前
|
机器学习/深度学习 人工智能 供应链
别再靠拍脑袋进货了!用大数据让电商库存“自己会算”
别再靠拍脑袋进货了!用大数据让电商库存“自己会算”
516 10
|
9月前
|
SQL 缓存 分布式计算
【跨国数仓迁移最佳实践5】MaxCompute近线查询解决方案助力物流电商等实时场景实现高效查询
本系列文章将围绕东南亚头部科技集团的真实迁移历程展开,逐步拆解 BigQuery 迁移至 MaxCompute 过程中的关键挑战与技术创新。本篇为第5篇,解析跨国数仓迁移背后的性能优化技术。 注:客户背景为东南亚头部科技集团,文中用 GoTerra 表示。
362 8
|
11月前
|
数据采集 分布式计算 DataWorks
ODPS在某公共数据项目上的实践
本项目基于公共数据定义及ODPS与DataWorks技术,构建一体化智能化数据平台,涵盖数据目录、归集、治理、共享与开放六大目标。通过十大子系统实现全流程管理,强化数据安全与流通,提升业务效率与决策能力,助力数字化改革。
371 4
|
11月前
|
SQL 分布式计算 大数据
别再迷信“上大数据就能飞”了!大数据项目成败的5个真相
别再迷信“上大数据就能飞”了!大数据项目成败的5个真相
243 6
|
11月前
|
存储 SQL 监控
数据中台架构解析:湖仓一体的实战设计
在数据量激增的数字化时代,企业面临数据分散、使用效率低等问题。数据中台作为统一管理与应用数据的核心平台,结合湖仓一体架构,打通数据壁垒,实现高效流转与分析。本文详解湖仓一体的设计与落地实践,助力企业构建统一、灵活的数据底座,驱动业务决策与创新。
|
存储 SQL 分布式计算
别让你的数据“裸奔”!大数据时代的数据隐私保护实战指南
别让你的数据“裸奔”!大数据时代的数据隐私保护实战指南
755 19
|
12月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
632 0
|
11月前
|
JSON 分布式计算 大数据
springboot项目集成大数据第三方dolphinscheduler调度器
springboot项目集成大数据第三方dolphinscheduler调度器
709 3