DataX工具的使用

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 一、DataX框架 1、Datax3.0设计框架 Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。 Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

一、DataX框架

1、Datax3.0设计框架

image

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

2、DataX3.0核心架构

image

核心模块介绍:

1)DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

2)DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

3)切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

4)每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

5)DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

3、DataX脚本初试

3.1 Reader

1)MySQL Reader

方法一:简单模式,直接写需要查询的字段

"reader": {
    "name": "mysqlreader",
    "parameter": {
        "username": "Username",
        "password": "Password",
        "column": ["id", "name"],        //查询表字段
        "splitPk": "id",                        //切分键
        "connection": [{
            "table": ["tableName"],                //表名
            "jdbcUrl": ["jdbc:mysql://172.16.4.37:3306/db1"]    //jdbcurl连接串
        }]
}
}

方法二:querySQL模式,可自定义编写SQL内容

"reader": {
    "name": "mysqlreader",
    "parameter": {
        "username": "Username",
        "password": "Password",
        "connection": [{
            "querySql": ["select id,k,c,pad from sbtest1 where all_query_conditions"],    //查询语句,此处可写复杂查询
            "jdbcUrl": ["jdbc:mysql://172.16.4.37:3306/db2"]    //jdbcurl连接串
        }]
    }
}

2)odpsreader

"reader": {
    "name": "odpsreader",
    "parameter": {
        "accessId": "accessId",
        "accessKey": "accessKey",
        "project": "targetProjectName",  //odps项目名
        "table": "tableName",        //odps表名
        "partition": [                 //分区信息
        "all_query_conditions"],
        "column": [                     //查询表字段
        "customer_id", "nickname"],
        "packageAuthorizedProject": "yourCurrentProjectName",
        "splitMode": "record",
        "odpsServer": "http://xxx/api" //odps的endpoint
}
}

3.2 Writer

1)MySQL writer

"writer": {
    "name": "mysqlwriter",
    "parameter": {
        "writeMode": "insert",    //数据写入方式
        "username": "Username",
        "password": "Password",
        "column": ["id", "k", "c", "pad"],    //写入字段
        "session": ["set session innodb_lock_wait_timeout=1000"],    //session操作
        "preSql": ["delete from tableName "],        //前置操作SQL
        "connection": [{
            "jdbcUrl": "jdbc:mysql://172.16.4.37:3306/db2",
            "table": ["tableName"]
        }]
    }
}

2)odpswriter

"writer": {
    "name": "odpswriter",
    "parameter": {
        "accessId": "accessId",
        "accessKey": "accessKey",
        "accountType": "aliyun",
        "column": ["id", "k", "c", "pad"],
        "odpsServer": "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", //endpoint
        "partition": "all_insert_conditions",        //分区条件
        "project": "ProjectName",
        "table": "tableName",
        "truncate": true        //在操作前是否先进行分区删除
    }
}

3.3 Setting

"setting": {
    "errorLimit": {
        "record": ""    //允许脏数据记录数
    },
    "speed": {
        "concurrent": 2,    //最大并发数
        "throttle": true,    //是否限速
        "mbps": 1        //具体限速大小
    }
}

3.4 如何通过脚本实现每日自动化调度

1、数据脚本demo

将原开发脚本中all_query_conditions作为查询条件作为每日替换目标,将all_insert_conditions作为每日写入分区的条件作为每日替换,可查考上述给出的一些模版

2、编写具体数据传输条件的数据集成调度脚本(以myql2odps为例)

脚本查询、写入分区条件定制

#vim imp_data.sh
#根据$1识别不同的数据集成任务名,根据$2识别分区,若有多个分区可继续使用$3、$4。使用一个调度脚本满足多个数据集成任务的数据传输条件
interface_name=$1
interface_name_pt=${interface_name}$2$3$4    #为不同的数据集成脚本任务编写demo模版

#拷贝脚本demo为具体的具体的执行脚本
cp /home/admin/scripts/datax-${interface_name}-demo.json /home/admin/config/datax-${interface_name_pt}-dev.json

#查询条件、写入表分区条件判断
in_conditions=''
if [ ${interface_name} == 'mysqlqury2odps1' ]; then      //根据数据集成任务名$1判断reader条件
    check_time=$2' '$3$4
    check_unixtime=`date -d "${check_time}" +%s`
    in_conditions='pt='$2' and _timestamp='${check_unixtime}
elif [ ${interface_name} == 'mysqlqury2odps2' ]; then
    in_conditions='pt='$2' and hh='$3
fi
echo $in_conditions

out_conditions='ds='$2',hh='$3',mm='$4
if [ ! -n "$4" ]; then
    out_conditions='ds='$2',hh='$3',mm=00'
fi
echo $out_conditions

#参数替换-查询条件
sed -i "s/all_query_conditions/$in_conditions/g" /home/admin/config/datax-${interface_name_pt}-dev.json

#参数替换-分区条件
sed -i "s/all_insert_conditions/$out_conditions/g" /home/admin/config/datax-${interface_name_pt}-dev.json

###命令执行
/opt/datax/bin/datax.py /home/admin/config/datax-${interface_name_pt}-dev.json
#rm /home/admin/gaode_data/config/datax-${interface_name_pthh}-dev.json

3、任务执行

bash imp_data.sh ${interface_name} ${pt} ${hh} ${ss}

4、实现每日自动定时调度

1)若仅是用ECS做数据同步,配置contab进行调度

2)若该ECS作为DataWorks作为自定义资源组进行调度,那可将任务执行命令写在shell脚本中,每个$1配置为一个shell任务,并为$2$3$4配置自定义参数即可实现每日定时调度

文章参考:

https://github.com/alibaba/DataX/blob/master/introduction.md

目录
相关文章
|
7月前
|
SQL 分布式计算 Oracle
数据同步工具DataX的安装
数据同步工具DataX的安装
1312 0
|
7月前
|
存储 关系型数据库 MySQL
DataX: 阿里开源的又一款高效数据同步工具
DataX 是由阿里巴巴集团开源的一款大数据同步工具,旨在解决不同数据存储之间的数据迁移、同步和实时交换的问题。它支持多种数据源和数据存储系统,包括关系型数据库、NoSQL 数据库、Hadoop 等。 DataX 提供了丰富的数据读写插件,可以轻松地将数据从一个数据源抽取出来,并将其加载到另一个数据存储中。它还提供了灵活的配置选项和高度可扩展的架构,以适应各种复杂的数据同步需求。
|
数据采集 SQL 分布式计算
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
数据处理 、大数据、数据抽取 ETL 工具 DataX 、Kettle、Sqoop
1446 0
|
6月前
|
SQL 存储 关系型数据库
DataX - 全量数据同步工具(2)
DataX - 全量数据同步工具
|
数据采集 SQL 分布式计算
常用的数据集成ETL工具有哪些?
六种常用的数据集成ETL工具
常用的数据集成ETL工具有哪些?
|
4月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute 生态系统中的数据集成工具
【8月更文第31天】在大数据时代,数据集成对于构建高效的数据处理流水线至关重要。阿里云的 MaxCompute 是一个用于处理大规模数据集的服务平台,它提供了强大的计算能力和丰富的生态系统工具来帮助用户管理和处理数据。本文将详细介绍如何使用 DataWorks 这样的工具将 MaxCompute 整合到整个数据处理流程中,以便更有效地管理数据生命周期。
141 0
|
4月前
|
关系型数据库 MySQL 大数据
DataX:数据同步的超音速英雄!阿里开源工具带你飞越数据传输的银河系,告别等待和故障的恐惧!快来见证这一数据工程的奇迹!
【8月更文挑战第13天】DataX是由阿里巴巴开源的一款专为大规模数据同步设计的工具,在数据工程领域展现强大竞争力。它采用插件化架构,支持多种数据源间的高效迁移。相较于Apache Sqoop和Flume,DataX通过并发写入和流处理实现了高性能同步,并简化了配置流程。DataX还支持故障恢复,能够在同步中断后继续执行,节省时间和资源。这些特性使其成为构建高效可靠数据同步方案的理想选择。
347 2
|
7月前
|
存储 分布式计算 NoSQL
DataX深度剖析:解读数据传输工具的设计理念与架构特点
DataX深度剖析:解读数据传输工具的设计理念与架构特点
459 5
DataX深度剖析:解读数据传输工具的设计理念与架构特点
|
6月前
|
SQL 关系型数据库 MySQL
DataX - 全量数据同步工具(1)
DataX - 全量数据同步工具
|
Java DataX Docker
arm 64 环境利用 docker 编译 datax 工具
arm 64 环境利用 docker 编译 datax 工具
279 0

热门文章

最新文章