开发者学堂课程【 SaaS 模式云数据仓库系列课程 —— 2021数仓必修课:Hadoop 数据如何同步至 MaxCompute 】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/55/detail/1038
Hadoop 数据如何同步至 MaxCompute
内容简介:
一、MMA 覆盖的主要场景
二、MMA 的主要功能
三、MMA 迁移服务架构
四、MMA Agent 技术架构和原理
五、环境准备
六、下载和编译工具包
七、MMA Agent 操作说明
八、进阶功能
九、进阶功能2:仅灵活的 hive 到 max compute 映射
十、单表/单分区迁移
十一、使用 Dataworks 自动迁移数据和工作流
十二、其他类型作业的迁移方案
十三、使用 MMA Agent 获得评估报告
一、MMA 覆盖的主要场景
二、MMA 的主要功能
迁移评估分析
数据迁移自动化
作业兼容性分析
对 Hadoop 平台进行诊断分析,评估数据迁移规模、作业迁移改造的数量、预估迁移后的成本,从而对迁移工作进行整体评估和决策。
对 Hive Meta 及数据进行检测扫描,自动在 MaxCompute 创建对应的 Meta,同时根据不同的网络环境,将 Hive 的数据自动转换并高吞吐地加载到MaxCompute 上,支持从 TB 级到 PB 级数据的迁移上云。
对 Hive 作业进行兼容性分析,识别出需要修改的任务并提供针对性的兼容性修改建议。对于用户自定义逻辑的分析任务,如UDF、MR/Spark作业等,我们将给出一般性的改造建议供用户参考。
对主流数据集成工具 Sqoop 进行作业的迁移转换,并自动创建 Dataworks 数据集成作业;支持主流 Pipeline 工 具,如 Oozie、Azkaban、Airflow 等自动迁移转化,并自动创建为 Dataworks 工作流及调度作业。
三、MMA 迁移服务架构
如下图,左边是客户 Hadoop 集群,右边是 Aliyun MaxCompute 即阿里云的大数据服务。 MMA 会跑在客户 Hadoop 集群上面,需要在客户的服务器上,这台机器上需要去部署 MMA 客户端工具,这个工具会帮助客户自动的去获取hivemeta 数据,还支持将 meta 信息自动转换成 MaxCompute ,批量创建表,之后会拉起数据同步的作业,调用 udf,udf 会 集成 SDK。
基于 MMA 客户端自动发现的 Meta 数据,基于 hiveMeta 数据会做一整个工作流的作业检查。
最后迁移过来以后对于 MMaxCompute 和 Dataworks 架构对接业务系统。
四、MMA Agent 技术架构和原理
MMA Agent 的工作流程主要分为四个步骤:
1、Metadata 抓取
2、MaxCompute DDL 与 Hive UDTF 生成
3、MaxCompute 表创建
4、Hive 数据
Meta Carrier 需要连接到 hivemete 里面,会自动将 meta 拉出来,在本地生成hivemeta 的结构。
Meta Processor 是基于第一个工具产出的结果,就是基于 hiveMeta 的数据转成DDL,即批量转成建表语句,包括数据类型的转换。
ODPS Console,基于Console可以将 Meta Processor 产出的 ODPS DDL 批量的通过 ODPS Console 去创建表。
基于 Data Carrier 去批量创建 HIVE UDTF SQL 作业。
五、环境准备
1、jdk 1.6+
2、Python 3+
3、Hive Client
4、能访问 Hive Server
5、网络连接 MaxCompute
场景举例:
·客户IDC->专线-> ECS+MaxCompute等,可以从ECS上访问
MaxCompute的endpoint ,但从IDC不可以
·需要增加 VBR 路由配置:参考
detail/57195.htmI?spm=a2c4g.11174283.6.579.33513a79ZnTEsX。
六、下载和编译工具包
1、下载源码︰切换到 odps-datacarrier-develop 分支,https/github.com/aliyun/aliyun-maxcompute-data-collectors?spm=a2c4g.11186623.2.8.422c4cO7MdjlpQ
2、解压下载的
aliyun-maxcompute-data-collectors-odps-datacarrier-develop.zip文件
在控制台运行 odps-data-carrier 目录下的 build.py 文件,编译生成 MMA 工具。
七、MMA Agent 操作说明
Bin 目录下有几个文件
Libs 下是所需要的 jar 包
[root@emr-header-1 mma_client_wz]# cd odps-data-carrier[root@emr-header-1 odps-data-carrier]# bin/meta-carrier -h
usage: meta-carrier -u <uri> -o <output dir [-h] [-d <database>][-t<table>] [--keyTab <path to keytab>] [--principalemetastore principal] [--system [key=value]+]
-d,--database <database> optional, specify a database
-h,--help optional, print help information
--keyTab <keyTab> optional, hive metastore's Kerberos keyTa
-o,--output-dir<output-dir> Required, output directory
--principal <principal>
Optional, hive metastore's Kerberosprincipal
--system system> system properties
-t,--table <table> 0ptional, specify a table
-u,--uri <uri>
Required, hive metastore thrift uri
[root@emr-header-1 odps-data-carrier]# bin/meta-carrier -u thrift://127.0.0.1:9083-d dma
demo -o meta
会生成一个 meta 目录
接下来会产生 partition meta 和 table meta。
接下来将 hivemeta 转换成 Max compute 的 dtl 语。
[root@emr-header-1 odps-data-carrier]# bin/meta-processor -h
usage: meta-processor -i <metadata directory -o coutput
directory>
-h,--help Print help information
-i,--input-dir <input-dir> Directory generated by meta carrier
-o,--output-dir <output-dir> output directory generated by meta
Processor.
-i 就是 input 输入目录
-o 就是 output 输出目录
Hive udtf sql 下 的 sql 是用于批量做数据迁移的时候写的 sql。
INFO mapred.FileInputFormat: Total input files to process : 1ds=20190925
ds=20190926ds=20190927ds=20190928ds=20190929
Time taken:_e.857 seconds,Fetched: 5 row(s)
optional arguments:-h, --help
show this help message and exit
--input INPUT
path to directory generated by meta processor
--odpscmd ODPSCMD path to odpscmd executable
[root@emr-header-1 odps-data-carrier]#
[rooteemr-header-1 odps-data-carrier]#
[root@emr-header-1 odps-data-carrier]# python36 bin/odps_ddl_runner.py -input output[root@emr-header-1 odps-data-carrier]# python36 bin/hive_udtf_sql_runner.py
Run hive UDTF SQL automatically.
optional arguments:
-h,--help
show this help message and exit
--input_all INPUT_ALL
path to directory generated by meta processor
--input_single_file INPUT_SINGLE_FILE,
path to a single sqtfile
--settings SETTINGS
path to extra settings to set before running a hive
--parallelism PARALLELISM
sql
max parallelism_of running hive sql
[rooteemr-header-1 odps-data-carrier]#python36 bin/hive_udtf_sql_runner.py -input_all
drwxr-xr-X 4 root root 4096 0ct 2919:23 dma demo-rw-r--r-- 1 root root 4525 0ct 2919:23 report.html[root@emr-header-1 output]#
[rooteemr-header-1 output]#
[root@emr-header-1 output]# sz report.html
1.使用 meta-carrier 采集 Hive Metadata
(1)解压工具包: odps-data-carrier.zip ,工具目录结构如下:
hive_udtf_sql_runner.py
meta-carrierl
meta-processor
network-measurement-tool(网络测量工具)
odps_ddl_runner.py(批量创建表)
proc_pool.py
_pycache_
proc_pool.cpython-36.pyc
sql-checker
extra_settings.ini
(2)获取 Hive metadata
(3)结果 hive metadata 输出的目录结构
说明:
① global.json 是一个全局的配置文件,包含了整个迁移过程中的一些配置
②每一个 database 会有一个独立的目录
③每一个表有一个以表名命名的 json 文件
④如果是分区表,还会有一个以表名为命名的 partition 的 json 文 件。
2.使用 network-measurement-tool
√测试 Hadoop 集群到 MaxCompute 各 Region 的网络连通质量
√测试网络上下行传输速率
Usage:
network-measure-tool --mode FIND |TEST
0ptions:
--—mode <mode>
-h ,--help
-p,--access-key <access-key>
0DPs access key, required inTEST mode
--project <project>
Print help information
0DPs project name,required inTEST mode
一t,--num-thread enum-thread>
Number of thread
--tunnel-endpoint <tunnel-endpoint> ODPS tunnel endpoint,optional-u,--access-id <access-id>
FIND (find available endpoints)
or TEST (test performance of asingle endpoint)
--endpoint cendpoint>
ODPS endpoint,required in TEST mode
ODPS access id,required in TEST
Mode
Example:
查找可用 endpoint:
odps-data-carrier/bin/network-measurement-tool --mode find
Output:
各个 endpoint 的连接情况(只输出可以连接的):
ENDPOINT: EXTERNAAL-BEI3ING:
http://lservice.cn.maxconpute.aliyun.com/apiAVAILABILITY:
true
ELAPSED TIME (ms): 4
ENDPOINT: EXTERONAL-HANGZHOU:
http://service.cn.maxcompute.aliyun.com/apiAVAILABILITY:
true
ELAPSED TINE (ms): 5
Example:
测试某个 endpoint 的读写性能:
odps-data-carrier/bin/network-measurement-tool --mode test l--endpoint <endpoint to test> \
-u<access id> -p <access key >\
--project=<project nane> --num-thread <number of thread>
3.使用 meta-processor 生成ODPS DDL和Hive UDTF SQL
(1)修改globle.json,自定义表、字段的生成规则
(2)生成 ODPS DDL 和 Hive UDTF SQL 了
4.使用 sql-checker 检查 Hive SQL 是否可以直接在 MaxCompute 执行
5.使用 odps_ddl_runner.py 批量创建表和分区
ODPS DDL 创建好以后,运行 odps_ddl_runner.py,将会遍历 meta-processor生成的目录,调用 odpscmd 自动创建 ODPS 表与分区
6.使用 hive_udtf_sql_runner.py 迁移数据
表和分区创建完成以后,运行 hive_udtf_sql_runner.py,将数据从 hive 上传至MaxCompute 。
hive_udtf_sql_runner.py 有两种模式,第一种将会遍历 meta-processor 生成的目录,调用 hive client 运行 hive udtf sql,从而将数据从 hive 上传至 ODPS 。第二种模式需要用户手动指定—个 hive sql 文件。
八、进阶功能
1、仅生成指定 database或table的metadata
在前面的 Demo 中,我们抓去了 hive 中所有 database 和表的
metadata ,但在很多环境下,我们倾向于一次处理一个 database 或一张表,因此meta-carrier 工具提供了抓取指定 database 或 table的 metadata 的能力:
执行:
sh odps-data-carrier/bin/meta-carrier -u
thrift://127.0.0.1:9083 -d test -t test -o meta
这里生成的 metadata 仅包含了 test.test 这张表。
九、进阶功能2:仅灵活的 hive 到 max compute 映射
在前面的 Demo 中,将 hive 的 test.test 表映射到 mc 中ODPS_DATA_CARRIER_TEST.test 这张表提供了更强大的能力,比如说修改 hive表到 mc 的表明与列名映射,设置 mc 中表的 life cycle, 增加 comment ,等等。
用户可以编辑 meta-carrier 生成的 metadata 来做到上述的事情。
十、单表/单分区迁移
在运行 hive sql 进行数据迁移的时候,提供了两种模式:
input_all 模式与 input_single_file 模式。
在 input_all 模式下,我们给一个 meta-processor 生成的目录,之后odps_hive_udtf_runner 会自动遍历该目录下的文件,并串行执行里面的hive sql,例如:
python3 odps-data-carrier/bin/hive_udtf_sql_runner.py --input_all processed/
在 input_single_file 模式下,我们给一个 hive sql 文件路径,odps_hive_udf_runner 会从该文件中读取 hive sql 并执行。例如∶
python3 odps-data-carrier/bin/hive_udtf_sql_runner.py--input_single_fileprocessed/test/hive_udtf_sql/single_partition/test_0.sql
input_single_file 模式可以帮助我们熟悉工具,并且在数据量大的场景下可以控制迁移的进度。
十一、使用 Dataworks 自动迁移数据和工作流
1、安装 MMA Agent 客户端工具:采集 Metadata& 生成 ODPS DDL
2、上传 Dataworks 项目描述文件
·根据模板(参见右图)生成 DataWorks 项目描述文档,打包为: dataworks_project.tgz 上传到 Dataworks。
·【注意】︰一期仅支持:1)打包文件手动上传;江2)支持OOIZE调度引擎的配置模板和Dataworks 工作流配置模板。
上传完成后, Dataworks 服务会根据 ODPS DDL 批量生成 MaxCompute 的table。
MaxCompute 的表创建完成后, Dataworks 服务会自动拉起 DataX 的数据同步任务,完成批量数据迁移。
3.项目描述文件(/project.xml)说明
tenantld: 用户在 dataworks 上的租户 ID;
name: 用户事先在 dataworks 上 创建好的项目空间名称;
owner :用户的阿里云账号 ID。
4.工作流描述文件 (/workflow.xml) 说明
十二、其他类型作业的迁移方案
1.UDF、MR 转移
支持相同逻辑的 UDF、MR 输入、输出参数的映射转换,但 UDF 和 MR 内部逻辑需要客户自己维护。
【注意】∶不支持在 UDF、MR 中直接访问文件系统、网络访问、外部数据源连接。
2.外表迁移
原则上全部迁到 MaxCompute 内部表。
如果必须通过外表访问外部文件,建议先将文件迁移到 OSS 或者 OTS,在MaxCompute 中创建外部表,实现对文件的访问。
【注意】: MaxCompute 外部表支持的格式包括 :ORC、PARQUET、SEQUENCEFILE、RCFILE.AVRO 和 TEXTFILE。
3.Spark 作业迁移
1.【作业无需访问 MaxCompute 表和 OSS 】用户 jar 包可直接运行,参照_《MaxCompute Spark 开发指南》第二节准备开发环境和修改配置。注意,对于spark 或 hadoop 的依赖必须设成 provided。
2.【作业需要访问 MaxCompute 表】参考_《 MaxCompute
Spark 开发指南》第三节编译 datasource 并安装到本地 maven 仓库,在 pom中添加依赖后重新打包即可。
【作业需要访问 OSS 】参考_《 MaxCompute Spark 开发指南》第四节在 pom中添加依赖后重新打包即可。
十三、使用 MMA Agent 获得评估报告
报告中将搬站风险分为两档,高风险 (HIGH RISK)与中等风险(MODERATE RISK)。·高风险意味着必须人工介入,例如出现了表名冲突, ODPS 完全不支持的类型等问题。
中等风险意味着迁移过程中可以自动处理,但是需要告知用户的潜在风险,例如Hive 数据类型到 ODPS 数据类型会带来的精度损失等问题。