Hadoop 数据如何同步至 MaxCompute | 学习笔记

本文涉及的产品
大数据开发治理平台DataWorks,Serverless资源组抵扣包300CU*H
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 快速学习 Hadoop 数据如何同步至 MaxCompute

开发者学堂课程【 SaaS  模式云数据仓库系列课程 —— 2021数仓必修课Hadoop  数据如何同步至  MaxCompute  】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/55/detail/1038


Hadoop  数据如何同步至  MaxCompute


内容简介:

一、MMA  覆盖的主要场景

二、MMA  的主要功能

三、MMA  迁移服务架构

四、MMA Agent  技术架构和原理

五、环境准备

六、下载和编译工具包

七、MMA Agent  操作说明

八、进阶功能

九、进阶功能2:仅灵活的  hive  到  max compute  映射

十、单表/单分区迁移

十一、使用  Dataworks  自动迁移数据和工作流

十二、其他类型作业的迁移方案

十三、使用  MMA Agent  获得评估报告



一、MMA  覆盖的主要场景

image.png



二、MMA   的主要功能

迁移评估分析

数据迁移自动化

作业兼容性分析

对  Hadoop  平台进行诊断分析,评估数据迁移规模、作业迁移改造的数量、预估迁移后的成本,从而对迁移工作进行整体评估和决策。

对  Hive Meta  及数据进行检测扫描,自动在  MaxCompute  创建对应的  Meta,同时根据不同的网络环境,将  Hive  的数据自动转换并高吞吐地加载到MaxCompute  上,支持从 TB  级到 PB  级数据的迁移上云。

对  Hive  作业进行兼容性分析,识别出需要修改的任务并提供针对性的兼容性修改建议。对于用户自定义逻辑的分析任务,如UDF、MR/Spark作业等,我们将给出一般性的改造建议供用户参考。

对主流数据集成工具  Sqoop  进行作业的迁移转换,并自动创建  Dataworks  数据集成作业;支持主流  Pipeline  工 具,如  Oozie、Azkaban、Airflow 等自动迁移转化,并自动创建为  Dataworks  工作流及调度作业。


三、MMA  迁移服务架构

如下图,左边是客户  Hadoop  集群,右边是  Aliyun MaxCompute  即阿里云的大数据服务。 MMA  会跑在客户  Hadoop  集群上面,需要在客户的服务器上,这台机器上需要去部署  MMA  客户端工具,这个工具会帮助客户自动的去获取hivemeta  数据,还支持将  meta  信息自动转换成  MaxCompute ,批量创建表,之后会拉起数据同步的作业,调用  udf,udf  会 集成  SDK。

基于  MMA  客户端自动发现的  Meta  数据,基于  hiveMeta  数据会做一整个工作流的作业检查。

最后迁移过来以后对于  MMaxCompute  和  Dataworks  架构对接业务系统。

image.png


四、MMA Agent  技术架构和原理

MMA Agent  的工作流程主要分为四个步骤:

1、Metadata  抓取

2、MaxCompute DDL  与  Hive UDTF 生成

3、MaxCompute  表创建

4、Hive  数据

image.png

Meta Carrier  需要连接到  hivemete  里面,会自动将  meta  拉出来,在本地生成hivemeta  的结构。

Meta Processor  是基于第一个工具产出的结果,就是基于  hiveMeta  的数据转成DDL,即批量转成建表语句,包括数据类型的转换。

ODPS Console,基于Console可以将  Meta Processor  产出的  ODPS DDL  批量的通过  ODPS Console  去创建表。

基于  Data Carrier  去批量创建  HIVE UDTF SQL 作业。


五、环境准备

1、jdk 1.6+

2、Python 3+

3、Hive Client

4、能访问  Hive Server

5、网络连接  MaxCompute

场景举例:

·客户IDC->专线-> ECS+MaxCompute等,可以从ECS上访问

MaxCompute的endpoint ,但从IDC不可以

·需要增加  VBR  路由配置:参考

detail/57195.htmI?spm=a2c4g.11174283.6.579.33513a79ZnTEsX。


六、下载和编译工具包

1、下载源码︰切换到  odps-datacarrier-develop  分支,https/github.com/aliyun/aliyun-maxcompute-data-collectors?spm=a2c4g.11186623.2.8.422c4cO7MdjlpQ

2、解压下载的

aliyun-maxcompute-data-collectors-odps-datacarrier-develop.zip文件

在控制台运行  odps-data-carrier  目录下的  build.py  文件,编译生成  MMA  工具。


七、MMA Agent  操作说明

Bin  目录下有几个文件

image.png

Libs  下是所需要的  jar  包

image.png

[root@emr-header-1 mma_client_wz]# cd odps-data-carrier[root@emr-header-1 odps-data-carrier]# bin/meta-carrier -h

usage: meta-carrier -u <uri> -o <output dir [-h] [-d <database>][-t<table>] [--keyTab <path to keytab>] [--principalemetastore principal] [--system [key=value]+]

-d,--database <database>   optional, specify a database

-h,--help  optional, print help information

--keyTab <keyTab>    optional, hive metastore's Kerberos keyTa

-o,--output-dir<output-dir> Required, output directory

--principal <principal>

Optional, hive metastore's Kerberosprincipal

--system system>   system properties

-t,--table <table>   0ptional, specify a table

-u,--uri <uri>

Required, hive metastore thrift uri

[root@emr-header-1 odps-data-carrier]# bin/meta-carrier -u thrift://127.0.0.1:9083-d dma demo -o meta

会生成一个  meta  目录

image.png

接下来会产生  partition meta  和  table meta。

接下来将  hivemeta  转换成  Max compute  的  dtl  语。

[root@emr-header-1 odps-data-carrier]# bin/meta-processor -h

usage: meta-processor -i <metadata directory -o coutput

directory>

-h,--help  Print help information

-i,--input-dir <input-dir> Directory generated by meta carrier

-o,--output-dir <output-dir> output directory generated by meta

Processor.

-i  就是  input  输入目录

-o  就是 output  输出目录

Hive udtf sql  下 的  sql  是用于批量做数据迁移的时候写的  sql。

INFO mapred.FileInputFormat: Total input files to process : 1ds=20190925

ds=20190926ds=20190927ds=20190928ds=20190929

Time taken:_e.857 seconds,Fetched: 5 row(s)

optional arguments:-h, --help

show this help message and exit

--input INPUT

path to directory generated by meta processor

--odpscmd ODPSCMD path to odpscmd executable

[root@emr-header-1 odps-data-carrier]#

[rooteemr-header-1 odps-data-carrier]#

[root@emr-header-1 odps-data-carrier]# python36 bin/odps_ddl_runner.py -input output[root@emr-header-1 odps-data-carrier]# python36 bin/hive_udtf_sql_runner.py

Run hive UDTF SQL automatically.

optional arguments:

-h,--help

show this help message and exit

--input_all INPUT_ALL

path to directory generated by meta processor

--input_single_file INPUT_SINGLE_FILE,

path to a single sqtfile

--settings SETTINGS

path to extra settings to set before running a hive

--parallelism PARALLELISM

sql

max parallelism_of running hive sql

[rooteemr-header-1 odps-data-carrier]#python36 bin/hive_udtf_sql_runner.py -input_all

drwxr-xr-X 4 root root 4096 0ct 2919:23 dma demo-rw-r--r-- 1 root root 4525 0ct 2919:23 report.html[root@emr-header-1 output]#

[rooteemr-header-1 output]#

[root@emr-header-1 output]# sz report.html

1.使用  meta-carrier  采集  Hive Metadata

(1)解压工具包:  odps-data-carrier.zip  ,工具目录结构如下:

hive_udtf_sql_runner.py

meta-carrierl

meta-processor

network-measurement-tool(网络测量工具)

odps_ddl_runner.py(批量创建表)

proc_pool.py

_pycache_

proc_pool.cpython-36.pyc

sql-checker

extra_settings.ini

(2)获取  Hive   metadata

(3)结果  hive metadata  输出的目录结构

说明:

①  global.json   是一个全局的配置文件,包含了整个迁移过程中的一些配置

②每一个  database  会有一个独立的目录

③每一个表有一个以表名命名的  json  文件

④如果是分区表,还会有一个以表名为命名的  partition  的  json  文 件。

2.使用  network-measurement-tool

√测试  Hadoop  集群到  MaxCompute  各  Region  的网络连通质量

√测试网络上下行传输速率

Usage:

network-measure-tool --mode FIND |TEST

0ptions:

--—mode <mode>

-h ,--help

-p,--access-key <access-key>

0DPs access key, required inTEST mode

--project <project>

Print help information

0DPs project name,required inTEST mode

一t,--num-thread enum-thread>

Number of thread

--tunnel-endpoint <tunnel-endpoint> ODPS tunnel endpoint,optional-u,--access-id <access-id>

FIND (find available endpoints)

or TEST (test performance of asingle endpoint)

--endpoint cendpoint>

ODPS endpoint,required in TEST mode

ODPS access id,required in TEST

Mode

Example:

查找可用   endpoint:

odps-data-carrier/bin/network-measurement-tool --mode find

Output:

各个  endpoint  的连接情况(只输出可以连接的):

ENDPOINT: EXTERNAAL-BEI3ING: http://lservice.cn.maxconpute.aliyun.com/apiAVAILABILITY: true

ELAPSED TIME (ms): 4

ENDPOINT: EXTERONAL-HANGZHOU: http://service.cn.maxcompute.aliyun.com/apiAVAILABILITY: true

ELAPSED TINE (ms): 5

Example:

测试某个  endpoint   的读写性能:

odps-data-carrier/bin/network-measurement-tool --mode test l--endpoint <endpoint to test> \

-u<access id> -p <access key >\

--project=<project nane> --num-thread <number of thread>

3.使用  meta-processor  生成ODPS DDL和Hive UDTF SQL

(1)修改globle.json,自定义表、字段的生成规则

(2)生成  ODPS DDL  和  Hive UDTF SQL  了

4.使用  sql-checker  检查  Hive SQL  是否可以直接在  MaxCompute  执行

5.使用  odps_ddl_runner.py  批量创建表和分区

ODPS DDL  创建好以后,运行  odps_ddl_runner.py,将会遍历   meta-processor生成的目录,调用  odpscmd  自动创建  ODPS  表与分区

6.使用  hive_udtf_sql_runner.py   迁移数据

表和分区创建完成以后,运行  hive_udtf_sql_runner.py,将数据从  hive  上传至MaxCompute  。

hive_udtf_sql_runner.py   有两种模式,第一种将会遍历 meta-processor  生成的目录,调用  hive client  运行  hive udtf sql,从而将数据从  hive  上传至  ODPS  。第二种模式需要用户手动指定—个  hive sql  文件。


八、进阶功能

1、仅生成指定  database或table的metadata

在前面的  Demo  中,我们抓去了  hive  中所有  database  和表的

metadata  ,但在很多环境下,我们倾向于一次处理一个  database  或一张表,因此meta-carrier  工具提供了抓取指定  database  或  table的  metadata  的能力:

执行:

sh odps-data-carrier/bin/meta-carrier -u

thrift://127.0.0.1:9083 -d test -t test -o meta

这里生成的  metadata  仅包含了   test.test  这张表。



九、进阶功能2:仅灵活的  hive  到  max compute  映射

在前面的  Demo  中,将  hive  的  test.test  表映射到  mc  中ODPS_DATA_CARRIER_TEST.test  这张表提供了更强大的能力,比如说修改  hive表到 mc  的表明与列名映射,设置  mc  中表的  life cycle, 增加  comment ,等等。

用户可以编辑  meta-carrier  生成的  metadata  来做到上述的事情。


十、单表/单分区迁移

在运行  hive sql  进行数据迁移的时候,提供了两种模式:

input_all  模式与  input_single_file  模式。

在  input_all  模式下,我们给一个  meta-processor  生成的目录,之后odps_hive_udtf_runner  会自动遍历该目录下的文件,并串行执行里面的hive sql,例如:

python3 odps-data-carrier/bin/hive_udtf_sql_runner.py --input_all processed/

在  input_single_file  模式下,我们给一个  hive sql  文件路径,odps_hive_udf_runner  会从该文件中读取  hive sql  并执行。例如∶

python3 odps-data-carrier/bin/hive_udtf_sql_runner.py--input_single_fileprocessed/test/hive_udtf_sql/single_partition/test_0.sql

input_single_file   模式可以帮助我们熟悉工具,并且在数据量大的场景下可以控制迁移的进度。



十一、使用  Dataworks  自动迁移数据和工作流

1、安装  MMA Agent  客户端工具:采集  Metadata&  生成  ODPS DDL

2、上传  Dataworks  项目描述文件

·根据模板(参见右图)生成  DataWorks  项目描述文档,打包为:   dataworks_project.tgz  上传到  Dataworks。

·【注意】︰一期仅支持:1)打包文件手动上传;江2)支持OOIZE调度引擎的配置模板和Dataworks  工作流配置模板。

上传完成后,  Dataworks  服务会根据  ODPS DDL  批量生成  MaxCompute   的table。

MaxCompute   的表创建完成后,  Dataworks  服务会自动拉起  DataX  的数据同步任务,完成批量数据迁移。

3.项目描述文件(/project.xml)说明

tenantld:  用户在  dataworks  上的租户  ID;

name:  用户事先在  dataworks  上 创建好的项目空间名称;

owner  :用户的阿里云账号  ID。

4.工作流描述文件  (/workflow.xml) 说明


十二、其他类型作业的迁移方案

1.UDF、MR   转移

支持相同逻辑的  UDF、MR  输入、输出参数的映射转换,但  UDF  和  MR  内部逻辑需要客户自己维护。

【注意】∶不支持在  UDF、MR  中直接访问文件系统、网络访问、外部数据源连接。

2.外表迁移

原则上全部迁到  MaxCompute  内部表。

如果必须通过外表访问外部文件,建议先将文件迁移到  OSS  或者  OTS,在MaxCompute   中创建外部表,实现对文件的访问。

【注意】: MaxCompute   外部表支持的格式包括  :ORC、PARQUET、SEQUENCEFILE、RCFILE.AVRO  和  TEXTFILE。

3.Spark  作业迁移

1.【作业无需访问  MaxCompute  表和  OSS  】用户  jar   包可直接运行,参照_《MaxCompute Spark  开发指南》第二节准备开发环境和修改配置。注意,对于spark  或  hadoop  的依赖必须设成  provided。

2.【作业需要访问  MaxCompute  表】参考_《  MaxCompute

Spark 开发指南》第三节编译  datasource  并安装到本地  maven  仓库,在  pom中添加依赖后重新打包即可。

【作业需要访问  OSS  】参考_《  MaxCompute Spark  开发指南》第四节在  pom中添加依赖后重新打包即可。


十三、使用  MMA Agent   获得评估报告

报告中将搬站风险分为两档,高风险 (HIGH RISK)与中等风险(MODERATE RISK)。·高风险意味着必须人工介入,例如出现了表名冲突, ODPS  完全不支持的类型等问题。

中等风险意味着迁移过程中可以自动处理,但是需要告知用户的潜在风险,例如Hive  数据类型到  ODPS  数据类型会带来的精度损失等问题。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
2月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
572 7
|
2月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
71 2
|
16天前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试
|
3天前
|
数据采集 存储 分布式计算
解密大数据:从零开始了解数据海洋
解密大数据:从零开始了解数据海洋
38 17
|
5天前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
30 7
|
30天前
|
存储 分布式计算 大数据
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
68 4
|
1月前
|
数据采集 分布式计算 Hadoop
使用Hadoop MapReduce进行大规模数据爬取
使用Hadoop MapReduce进行大规模数据爬取
|
2月前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
135 4
|
2月前
|
存储 大数据 数据管理
大数据分区简化数据维护
大数据分区简化数据维护
35 4
|
2月前
|
存储 大数据 定位技术
大数据 数据索引技术
【10月更文挑战第26天】
82 3

热门文章

最新文章