DataX使用指南——ODPS to ODPS

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 1. DataX是什么 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。

1. DataX是什么

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

2. 何时用DataX

  1. DataX是离线数据同步工具,当需要迁移增量时,建议使用DTS,而不是DataX;
  2. 针对离线数据,当数据量很大或表非常多时,建议使用DataX。此时配置文件可编写脚本批量生成,详见ODPS数据迁移指南。同时可以增大DataX本身的并发,并提高运行DataX的任务机数量,来达到高并发,从而实现快速迁移;

3. DataX怎么用

3.1 DataX的配置文件

如下是DataX的配置文件示例:

{
    "job": {
           "content":[
                {
                    "reader":{
                        "name":"odpsreader",
                        "parameter":{
                            "accessId":"<accessID>",
                            "accessKey":"******************************",
                            "column":[
                                "col_1",
                                "col_2"
                            ],
                            "odpsServer":"http://service.odps.aliyun-inc.com/api",
                            "partition":[
                                "dt=20160524"
                            ],
                            "project":"src_project_name",
                            "splitMode":"record",
                            "table":"table_name_1"
                        }
                    },
                    "writer":{
                        "name":"odpswriter",
                        "parameter":{
                            "accessId":"<accessId>",
                            "accessKey":"******************************",
                            "accountType":"aliyun",
                            "column":[
                                "ci_name",
                                "geohash"
                            ],
                            "odpsServer":"http://service.odps.xxx.com/api",
                            "partition":"dt=20160524",
                            "project":"dst_project_name",
                            "table":"nb_tab_http"
                        }
                    }
                }
            ],
           "setting":{
               "speed":{
                   "channel":20
               }
           }
    }
}
  1. 整个配置文件是一个job的描述;
  2. job下面有两个配置项,content和setting,其中content用来描述该任务的源和目的端的信息,setting用来描述任务本身的信息;
  3. content又分为两部分,reader和writer,分别用来描述源端和目的端的信息;
  4. 本例中由于源和目的都是ODPS,所以类型为odpsreader和odpswriter。均包含accessId,accessKey与odpsServer的api地址。
  5. 同时预迁移表的project,table以及列名和分区信息都要一一填写清楚。
  6. setting中的speed项表示同时起20个并发去跑该任务。

3.2 运行Datax任务

运行Datax任务很简单,只要执行python脚本即可。

python /home/admin/datax3/bin/datax.py ./json/table_1.json

运行后,可在终端查看运行信息。建议真正跑任务时,可按照ODPS迁移指南中给出的批量工具的方式运行。即将所有的命令整理到一个sh文件中,最后再用nohup运行该文件。

cat /root/datax_tools/run_datax.sh
python /home/admin/datax3/bin/datax.py ./json/table_1.json > ./log/table_1.log
#实际运行
nohup /root/datax_tools/run_datax.sh > result.txt 2>&1 &

4. DataX调优

DataX调优要分成几个部分,任务机指运行Datax任务所在的机器。

  1. 网络本身的带宽等硬件因素造成的影响;
  2. DataX本身的参数;
  3. 从源端到任务机;
  4. 从任务机到目的端;

即当觉得DataX传输速度慢时,需要从上述四个方面着手开始排查。

4.1 网络本身的带宽等硬件因素造成的影响

此部分主要需要了解网络本身的情况,即从源端到目的端的带宽是多少,平时使用量和繁忙程度的情况,从而分析是否是本部分造成的速度缓慢。一下提供几个思路。

  1. 可使用从源端到目的端scp的方式观察速度;
  2. 结合监控观察任务运行时间段时,网络整体的繁忙情况,来判断是否应将任务避开网络高峰运行;
  3. 观察任务机的负载情况,尤其是网络和磁盘IO,观察其是否成为瓶颈,影响了速度;

4.2 DataX本身的参数

  1. 可通过增加如下的core参数,去除掉DataX默认对速度的限制;

        
        {
            "core":{
                "transport":{
                    "channel":{
                        "speed":{
                            "record":-1,
                            "byte":-1
                        }
                    }
                }
            },
            "job":{
                ...
            }
        }
        
  2. 针对odpsreader有如下参数可以调节,注意并不是压缩速度就会提升,根据具体情况不同,速度还有可能下降,此项为试验项,需要具体情况具体分析。

        
        ...
        “parameter”:{
            "isCompress":"true",
            ...
        }
        
  3. 针对odpswrtier有如下参数可以调节,其中isCompress选项同reader,blockSizeInMB,为按块写入,后面的值为块的大小。该项值并不是越大越好,一般可以结合tunnel做综合考量。过分大的 blockSizeInMB 可能造成速度波动以及内存OOM。

        ...
        “parameter”:{
            "isCompress":"true",
            "blockSizeInMB":128,
            ...
        }
        
  4. 针对任务本身,有如下参数可以调节,注意如果调整了tunnel的数量,可能会造成JVM虚拟机崩溃,故需修改相应的参数;

        "job": {
        "setting": {
            "speed": {
                "channel": 32
            }
        }
        
    channel增大,为防止OOM,需要修改datax工具的datax.py文件。如下所示,可根据任务机的实际配置,提升-Xms与-Xmx,来防止OOM。由此可以看出,tunnel并不是越大越好,过分大反而会影响宿主机的性能。
    
        DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
    

4.3 从源端到任务机

  1. 可使用dataX从源端传输分区信息到本机,来观察速度。并和初始任务的速度进行比较,从而判断是哪一部分的原因造成的速度缓慢;具体配置文件如下:

    "writer": {

    "name": "txtfilewriter",
    "parameter": {
        "path": "/root/log",
        "fileName": "test_src_result.csv",
        "writeMode": "truncate",
        "dateFormat": "yyyy-MM-dd"
    }

    }

    试验时,注意观察任务机本身的IO负载。
  2. 如果判断结果是源端的速度慢,可将任务机迁移至源端所在的集群,并再次运行任务,观察结果;
  3. 可尝试用odpscmd,直接从源端odps下载分区到本地,和上述结果作比较。如果odpscmd速度快,可尝试调整datax的相关参数;

    odpscmd --config=odps_config.ini.src

    tunnel download -thread 20 project_name.table_name/dt='20150101' log.txt;

  4. 如果是在专有云环境可尝试指定源端的tunnel server的ip进行测试,从而排除从域名到负载均衡部分的网络造成的影响。源端的tunnel server的ip需要咨询云端管理员。

    ...
    “parameter”:{

    "tunnelServer":"http://src_tunnel_server_ip",
    ...

    }

    注意此步骤可选择负载较低的tunnel_server。
    
  5. 观察源端tunnel server的负载情况,尤其是磁盘IO和网卡的负载,从而判断是否为tunnel sever负载过高造成了资源瓶颈。
  6. 观察源端的表结果,当有多个分区键或列过多时,都有可能造成传输的性能下降,此时可考虑换一张表进行测试,以排除表结构等问题造成的影响。

4.4 从任务机到目的端

  1. 可使用datax从任务机传输文件分区文件到目的端,来观察速度。并和初始任务的速度进行比较,从而判断是哪一部分的原因造成的速度缓慢;具体配置文件如下:

    "reader": {

    "name": "txtfilereader",
    "parameter": {
        "path": ["/home/haiwei.luo/case00/data"],
        "encoding": "UTF-8",
        "column": [
            {
                "index": 0,
                "type": "long"
            },
            {
                "index": 1,
                "type": "boolean"
            },
            {
                "index": 2,
                "type": "double"
            },
            {
                "index": 3,
                "type": "string"
            },
            {
                "index": 4,
                "type": "date",
                "format": "yyyy.MM.dd"
            }
        ],
        "fieldDelimiter": ","
    }

    },

  2. 如果判断结果是源端的速度慢,可将任务机迁移至源端所在的集群,并再次运行任务,观察结果;
  3. 可尝试用odpscmd,直接从本地上传分区到目的端,和上述结果作比较。如果odpscmd速度快,可尝试调整datax的相关参数;

    odpscmd --config=odps_config.ini.src

    tunnel upload ./log.txt mingxuantest/pt='20150101';

  4. 如果是在专有云环境可尝试指定指定端的tunnel server的ip进行测试,从而排除从域名到负载均衡部分的网络造成的影响。源端的tunnel server的ip需要咨询云端管理员。

    ...
    “parameter”:{

    "tunnelServer":"http://dst_tunnel_server_ip",
    ...

    }

    注意此步骤可选择负载较低的tunnel_server。
  5. 观察源端tunnel server的负载情况,尤其是磁盘IO和网卡的负载,从而判断是否为tunnel sever负载过高造成了资源瓶颈。

4.5 综合

  1. 通过对DataX本身参数,源端到任务机、任务机到目的端的网络、负载等情况综合考量,进行针对各个部分的优化;
  2. 同时,可在多台机器上部署DataX,将任务平均分配到多台机器上并发运行,来提高速度;
相关文章
|
5月前
|
弹性计算 DataWorks 关系型数据库
DataWorks操作报错合集之DataX在执行过程中接收到了意外的信号15,导致进程被终止,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之如何解决datax同步任务时报错ODPS-0410042:Invalid signature value
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
DataWorks NoSQL fastjson
DataWorks操作报错合集之DataX进行MongoDB全量迁移的过程中,DataX的MongoDB Reader插件在初始化阶段找不到Fastjson 2.x版本的类库,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
DataWorks 安全 API
DataWorks产品使用合集之是否可以不使用DataWorks进行EMR的调度和DataX数据导入
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
分布式计算 DataWorks 数据挖掘
DataWorks操作报错合集之上传数据时报错com.alibaba.datax.common.exception.DataXException: Code:[UnstructuredStorageReader-11],该如何排查
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
DataWorks Java 调度
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
73 0
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
|
5月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之DataX如何进行删除数据的操作
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
6月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之数据源同步时,使用脚本模式采集mysql数据到odps中,使用querySql方式采集数据,在脚本中删除了Reader中的column,但是datax还是报错OriginalConfPretreatmentUtil - 您的配置有误。如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
分布式计算 DataWorks DataX
DataWorks产品使用合集之DataX的ODPSReader和Tunnel是两种不同的读取MC(原名ODPS)数据的方式吗
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
82 3
|
6月前
|
SQL DataWorks Oracle
DataWorks产品使用合集之datax解析oracle增量log日志该如何操作
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
62 0

热门文章

最新文章