离线数仓(四)【数仓数据同步策略】(4)

本文涉及的产品
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 离线数仓(四)【数仓数据同步策略】

离线数仓(四)【数仓数据同步策略】(3)https://developer.aliyun.com/article/1532384

sink 配置:

这里除了设置输出的 hdfs 路径必须包含日期之外,主要就是滚动策略的配置,我们要防止小文件的问题。

编写拦截器:

package com.lyh.gmall.interceptor;
 
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
 
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
 
public class TimestampAndTableNameInterceptor implements Interceptor {
 
    @Override
    public void initialize() {
 
    }
 
    @Override
    public Event intercept(Event event) {
        // 1. 把 body 中的 timestamp 和 table 字段提取出来 放到 header
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);
 
        // 2. 解析 log 中的 ts 和 table 字段
        JSONObject json = JSONObject.parseObject(log);
        String ts = json.getString("ts");
        String table = json.getString("table");
 
        // 3. 把 ts 和 table 字段放到 header 中的 tableName 和 timestamp 字段
        headers.put("tableName",table);
        headers.put("timestamp",ts + "000");
 
        return event;
    }
 
    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event: list)
            intercept(event);
        return list;
    }
 
    @Override
    public void close() {
 
    }
 
    public static class Builder implements Interceptor.Builder{
 
        @Override
        public Interceptor build() {
            return new TimestampAndTableNameInterceptor();
        }
 
        @Override
        public void configure(Context context) {
 
        }
    }
 
}

打包放到 hadoop104 上 flume 的 lib 目录下,开始测试:

打通通道

myhadoop start
zk start
kf.sh start
mxw.sh start

启动 flume 作业:

[lyh@hadoop104 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/kafka_to_hdfs_db.conf -Dflume.root.logger=INFO,console

模拟业务数据生成:

cd /opt/module/db_log/
java -jar gmall2020-mock-db-2021-11-14.jar

查看 hdfs:

可以看到,其中带 inc 后缀的都是我们增量同步进来的数据。

增量同步文件数 =  总文件数 - 全量同步文件数 = 27 - 15 = 12 ,没有问题

这里存在一个问题:我们之前在拦截器中设置了 event header 中的 timestamp 为 kafka 中的数据t  ts 字段的时间信息,但是这里却依然是我们机器的时间,这是因为我们 java -jar 操作数据库的时间就是我们服务器当前的时间,所以导致 Maxwelll 读取 binlog 后的数据就是当前服务器的时间。具体解决办法看下面的 Maxwell 配置。

2. 编写增量数据同步脚本
vim f3.sh
#!/bin/bash
 
case $1 in
"start")
        echo " --------启动 hadoop104 业务数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf -f /opt/module/flume-1.9.0/job/warehouse/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")
 
        echo " --------停止 hadoop104 业务数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
3. Maxwell 配置

这里主要是解决时间戳的问题:

生产环境中是不会有这个问题的,这里我们用的是 经过修改源码的 Maxwell,所以只需要修改一下配置文件即可:

cd /opt/module/maxwell-1.29.2/
vim config.properties

添加配置:

mock_date=2020-06-14

4. 增量表首日全量同步

       增量表本来就存在一些数据,但是 Maxwell 在监听的 binlog 的时候是不知道的,所以我们还需要全量同步一次增量表中的历史数据。但是我们用哪个工具呢,我们知道,Maxwell 也可以做全量,DataX也可以。这里我们选择 Maxwell ,因为 DataX 同步到 HDFS 的文件是一个以特定字符分割的文件,而 Maxwell 同步到 HDFS 的文件是 json 格式的,所以我们肯定是希望保存到 HDFS 后的数据格式都是一致的,那我们就自然会联想到学习 Maxwell 说的 bootstrap,它是 Maxwell  的一张元数据表。

编写初始化脚本:

vim mysql_to_kafka_inc_init.sh
#!/bin/bash
 
# 该脚本的作用是初始化所有的增量表,只需执行一次
 
MAXWELL_HOME=/opt/module/maxwell-1.29.2
 
import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}
 
case $1 in
"cart_info")
  import_data cart_info
  ;;
"comment_info")
  import_data comment_info
  ;;
"coupon_use")
  import_data coupon_use
  ;;
"favor_info")
  import_data favor_info
  ;;
"order_detail")
  import_data order_detail
  ;;
"order_detail_activity")
  import_data order_detail_activity
  ;;
"order_detail_coupon")
  import_data order_detail_coupon
  ;;
"order_info")
  import_data order_info
  ;;
"order_refund_info")
  import_data order_refund_info
  ;;
"order_status_log")
  import_data order_status_log
  ;;
"payment_info")
  import_data payment_info
  ;;
"refund_payment")
  import_data refund_payment
  ;;
"user_info")
  import_data user_info
  ;;
"all")
  import_data cart_info
  import_data comment_info
  import_data coupon_use
  import_data favor_info
  import_data order_detail
  import_data order_detail_activity
  import_data order_detail_coupon
  import_data order_info
  import_data order_refund_info
  import_data order_status_log
  import_data payment_info
  import_data refund_payment
  import_data user_info
  ;;
esac

测试:

f3.sh start 
mysql_to_hdfs_full_init.sh all

这里需要牢记 Maxwell 可以既做全量又做增量为什么还需要 DataX,这是因为 DataX 对于全量同步更加专业,因为它可以进行一些流控,而且支持更多的数据源并且支持并发。所以 Maxwell 只在初始化同步历史数据的时候用一下,所以不用担心它的性能问题。

2.3、采集通道启/停脚本

这里只是为了方便学习的时候用的,生产环境千万不敢用:

#!/bin/bash
 
case $1 in
"start"){
        echo ================== 启动 集群 ==================
 
        #启动 Zookeeper集群
        zk start
 
        #启动 Hadoop集群
        myhadoop start
 
        #启动 Kafka采集集群
        kf.sh start
 
        #启动采集 Flume
        f1.sh start
 
  #启动日志消费 Flume
        f2.sh start
 
  #启动业务消费 Flume
        f3.sh start
 
  #启动 maxwell
        mxw.sh start
 
        };;
"stop"){
        echo ================== 停止 集群 ==================
 
  #停止 Maxwell
        mxw.sh stop
 
  #停止 业务消费Flume
        f3.sh stop
 
  #停止 日志消费Flume
        f2.sh stop
 
  #停止 日志采集Flume
        f1.sh stop
 
        #停止 Kafka采集集群
        kf.sh stop
 
        #停止 Hadoop集群
        myhadoop stop
 
        #停止 Zookeeper集群
        zk stop
 
};;
esac

总结

       现在是2024-2-27 19:28 。

       到这里,我们的数仓数据同步工作就都做完了,包括全量用户行为日志的同步(用户行为日志数据并没有增量同步)、增量业务数据的同步、全量业务数据的同步以及业务数据的历史数据初始化全量同步。

       接下来就是关于数仓的知识的学习了,这部分也将是最最重要的!不管是理论还是建模方法和编程实践。

       今天额外的好消息就是四级终于过了,这就剩下了很多时间去专心技术啦!

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
7天前
|
消息中间件 关系型数据库 调度
离线数据同步变迁
本文介绍了从第一代基于Hadoop体系的离线数据同步,到第二代基于DolphinScheduler和StarRocks的改进方案,再到第三代基于Python自定义的离线数据同步的演变过程。每一代方案都在不断优化,以适应日益增长的数据量和复杂的业务需求。
离线数据同步变迁
|
6月前
|
消息中间件 关系型数据库 Kafka
深入理解数仓开发(二)数据技术篇之数据同步
深入理解数仓开发(二)数据技术篇之数据同步
|
4月前
|
消息中间件 监控 关系型数据库
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题
|
5月前
|
监控 数据挖掘 大数据
阿里云开源利器:DataX3.0——高效稳定的离线数据同步解决方案
对于需要集成多个数据源进行大数据分析的场景,DataX3.0同样提供了有力的支持。企业可以使用DataX将多个数据源的数据集成到一个统一的数据存储系统中,以便进行后续的数据分析和挖掘工作。这种集成能力有助于提升数据分析的效率和准确性,为企业决策提供有力支持。
|
5月前
|
存储 DataWorks Java
DataWorks产品使用合集之开发离线数仓时,需要多个工作空间的情况有哪些
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
分布式计算 DataWorks 关系型数据库
阿里云数加-分析型数据库AnalyticDB数据导入的多样化策略
通过合理利用这些数据导入方法,用户可以充分发挥AnalyticDB的实时计算能力和高并发查询性能,为业务分析和决策提供强有力的数据支持。
|
6月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之离线同步任务中,把表数据同步到POLARDB,显示所有数据都是脏数据,报错信息:ERROR JobContainer - 运行scheduler 模式[local]出错.是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
存储 消息中间件 NoSQL
Flink 实时数仓(一)【实时数仓&离线数仓对比】(2)
Flink 实时数仓(一)【实时数仓&离线数仓对比】
|
6月前
|
存储 消息中间件 Kafka
Flink 实时数仓(一)【实时数仓&离线数仓对比】(1)
Flink 实时数仓(一)【实时数仓&离线数仓对比】
|
6月前
|
SQL
离线数仓(十)【ADS 层开发】(5)
离线数仓(十)【ADS 层开发】

热门文章

最新文章