离线数仓(四)【数仓数据同步策略】(3)https://developer.aliyun.com/article/1532384
sink 配置:
这里除了设置输出的 hdfs 路径必须包含日期之外,主要就是滚动策略的配置,我们要防止小文件的问题。
编写拦截器:
package com.lyh.gmall.interceptor; import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; public class TimestampAndTableNameInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 1. 把 body 中的 timestamp 和 table 字段提取出来 放到 header Map<String, String> headers = event.getHeaders(); String log = new String(event.getBody(), StandardCharsets.UTF_8); // 2. 解析 log 中的 ts 和 table 字段 JSONObject json = JSONObject.parseObject(log); String ts = json.getString("ts"); String table = json.getString("table"); // 3. 把 ts 和 table 字段放到 header 中的 tableName 和 timestamp 字段 headers.put("tableName",table); headers.put("timestamp",ts + "000"); return event; } @Override public List<Event> intercept(List<Event> list) { for (Event event: list) intercept(event); return list; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new TimestampAndTableNameInterceptor(); } @Override public void configure(Context context) { } } }
打包放到 hadoop104 上 flume 的 lib 目录下,开始测试:
打通通道
myhadoop start zk start kf.sh start mxw.sh start
启动 flume 作业:
[lyh@hadoop104 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/kafka_to_hdfs_db.conf -Dflume.root.logger=INFO,console
模拟业务数据生成:
cd /opt/module/db_log/ java -jar gmall2020-mock-db-2021-11-14.jar
查看 hdfs:
可以看到,其中带 inc 后缀的都是我们增量同步进来的数据。
增量同步文件数 = 总文件数 - 全量同步文件数 = 27 - 15 = 12 ,没有问题
这里存在一个问题:我们之前在拦截器中设置了 event header 中的 timestamp 为 kafka 中的数据t ts 字段的时间信息,但是这里却依然是我们机器的时间,这是因为我们 java -jar 操作数据库的时间就是我们服务器当前的时间,所以导致 Maxwelll 读取 binlog 后的数据就是当前服务器的时间。具体解决办法看下面的 Maxwell 配置。
2. 编写增量数据同步脚本
vim f3.sh
#!/bin/bash case $1 in "start") echo " --------启动 hadoop104 业务数据flume-------" ssh hadoop104 "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf -f /opt/module/flume-1.9.0/job/warehouse/kafka_to_hdfs_db.conf >/dev/null 2>&1 &" ;; "stop") echo " --------停止 hadoop104 业务数据flume-------" ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill" ;; esac
3. Maxwell 配置
这里主要是解决时间戳的问题:
生产环境中是不会有这个问题的,这里我们用的是 经过修改源码的 Maxwell,所以只需要修改一下配置文件即可:
cd /opt/module/maxwell-1.29.2/ vim config.properties
添加配置:
mock_date=2020-06-14
4. 增量表首日全量同步
增量表本来就存在一些数据,但是 Maxwell 在监听的 binlog 的时候是不知道的,所以我们还需要全量同步一次增量表中的历史数据。但是我们用哪个工具呢,我们知道,Maxwell 也可以做全量,DataX也可以。这里我们选择 Maxwell ,因为 DataX 同步到 HDFS 的文件是一个以特定字符分割的文件,而 Maxwell 同步到 HDFS 的文件是 json 格式的,所以我们肯定是希望保存到 HDFS 后的数据格式都是一致的,那我们就自然会联想到学习 Maxwell 说的 bootstrap,它是 Maxwell 的一张元数据表。
编写初始化脚本:
vim mysql_to_kafka_inc_init.sh
#!/bin/bash # 该脚本的作用是初始化所有的增量表,只需执行一次 MAXWELL_HOME=/opt/module/maxwell-1.29.2 import_data() { $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties } case $1 in "cart_info") import_data cart_info ;; "comment_info") import_data comment_info ;; "coupon_use") import_data coupon_use ;; "favor_info") import_data favor_info ;; "order_detail") import_data order_detail ;; "order_detail_activity") import_data order_detail_activity ;; "order_detail_coupon") import_data order_detail_coupon ;; "order_info") import_data order_info ;; "order_refund_info") import_data order_refund_info ;; "order_status_log") import_data order_status_log ;; "payment_info") import_data payment_info ;; "refund_payment") import_data refund_payment ;; "user_info") import_data user_info ;; "all") import_data cart_info import_data comment_info import_data coupon_use import_data favor_info import_data order_detail import_data order_detail_activity import_data order_detail_coupon import_data order_info import_data order_refund_info import_data order_status_log import_data payment_info import_data refund_payment import_data user_info ;; esac
测试:
f3.sh start mysql_to_hdfs_full_init.sh all
这里需要牢记 Maxwell 可以既做全量又做增量为什么还需要 DataX,这是因为 DataX 对于全量同步更加专业,因为它可以进行一些流控,而且支持更多的数据源并且支持并发。所以 Maxwell 只在初始化同步历史数据的时候用一下,所以不用担心它的性能问题。
2.3、采集通道启/停脚本
这里只是为了方便学习的时候用的,生产环境千万不敢用:
#!/bin/bash case $1 in "start"){ echo ================== 启动 集群 ================== #启动 Zookeeper集群 zk start #启动 Hadoop集群 myhadoop start #启动 Kafka采集集群 kf.sh start #启动采集 Flume f1.sh start #启动日志消费 Flume f2.sh start #启动业务消费 Flume f3.sh start #启动 maxwell mxw.sh start };; "stop"){ echo ================== 停止 集群 ================== #停止 Maxwell mxw.sh stop #停止 业务消费Flume f3.sh stop #停止 日志消费Flume f2.sh stop #停止 日志采集Flume f1.sh stop #停止 Kafka采集集群 kf.sh stop #停止 Hadoop集群 myhadoop stop #停止 Zookeeper集群 zk stop };; esac
总结
现在是2024-2-27 19:28 。
到这里,我们的数仓数据同步工作就都做完了,包括全量用户行为日志的同步(用户行为日志数据并没有增量同步)、增量业务数据的同步、全量业务数据的同步以及业务数据的历史数据初始化全量同步。
接下来就是关于数仓的知识的学习了,这部分也将是最最重要的!不管是理论还是建模方法和编程实践。
今天额外的好消息就是四级终于过了,这就剩下了很多时间去专心技术啦!