聊聊 datax 的 OceanBase 数据同步插件 ||批处理参数 rewriteBatchedStatements=true&useCursorFetch=true

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 聊聊 datax 的 OceanBase 数据同步插件分析下批处理参数 rewriteBatchedStatements=true&useCursorFetch=true 对大规模数据读写的性能影响

聊聊 datax 的 OceanBase 数据同步插件 ||批处理参数 rewriteBatchedStatements=true&useCursorFetch=true

1 背景

  • 在信创的大背景下,不少公司选用了蚂蚁的分布式数据库 OceanBase,OceanBase 是一款开源分布式 HTAP(Hybrid Transactional/Analytical Processing)数据库管理系统,具有原生分布式架构,支持金融级高可用、透明水平扩展、分布式事务、多租户和语法兼容等企业级特性。OceanBase 内核通过大规模商用场景的考验,已服务众多行业客户,现面向未来持续构建内核技术竞争力。

  • 在大数据场景下,不可避免地会遇到使用数据同步工具在 hdfs 和 OceanBase 之间同步数据的需求,常见的离线数据同步工具有 sqoop/datax/spark/seatunnel等。

  • 在使用 datax 同步OceanBase数据时,我们可以使用rdbmswriter/rdbmsreader,也可以使用oceanbasev10writer/oceanbasev10reader来同步 ob数据。

近期我们在某客户现场使用datax 的 rdbmswriter/rdbmsreader 进行大批量数据同步时却遇到了 OutOfMemoryError 问题,本文针对该问题进行分析,并给出解决方案。

2 问题现象

某客户通过 datax 使用 rdbmsreader 读取 ob 数据并以 orc 格式写入到 hdfs 时,当数据量达到400w 时(act_stock_holder),jkd 使用 2g 堆空间都会导致 OutOfMemoryError,需要显示配置 8G 的堆空间才能同步成功,经验证oracle jdk arm 版 1.8.0_381 和 dragonwell jdk arm 版 1.8.0_332 都是如此。OutOfMemoryError详细的报错堆栈信息如下:

"0-0-0-reader" prio=5 tid=34 RUNNABLE
    at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48)
    at com.alipay.oceanbase.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2657)
       Local Variable: byte[][]#535681
    at com.alipay.oceanbase.jdbc.MysqlIO.nextRow(MysqlIO.java:2320)
    at com.alipay.oceanbase.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:4207)
       Local Variable: java.util.ArrayList#4
    at com.alipay.oceanbase.jdbc.MysqlIO.getResultSet(MysqlIO.java:572)
       Local Variable: com.alipay.oceanbase.jdbc.Field[]#1
    at com.alipay.oceanbase.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3651)
    at com.alipay.oceanbase.jdbc.MysqlIO.readAllResults(MysqlIO.java:2800)
    at com.alipay.oceanbase.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:3220)
       Local Variable: com.alipay.oceanbase.jdbc.MysqlIO#1
       Local Variable: com.alipay.oceanbase.jdbc.Buffer#1
       Local Variable: com.alipay.oceanbase.jdbc.Buffer#2
    at com.alipay.oceanbase.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2903)
       Local Variable: java.lang.String#218
    at com.alipay.oceanbase.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2855)
       Local Variable: java.lang.String#219
    at com.alipay.oceanbase.jdbc.StatementImpl.executeQuery(StatementImpl.java:1445)
       Local Variable: com.alipay.oceanbase.jdbc.StatementImpl$CancelTask#1
    at com.alibaba.datax.plugin.rdbms.util.DBUtil.query(DBUtil.java:471)
    at com.alibaba.datax.plugin.rdbms.util.DBUtil.query(DBUtil.java:443)
       Local Variable: com.alipay.oceanbase.jdbc.StatementImpl#1
    at com.alibaba.datax.plugin.rdbms.util.DBUtil.query(DBUtil.java:422)
    at com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader$Task.startRead(CommonRdbmsReader.java:194)
       Local Variable: com.alipay.oceanbase.jdbc.JDBC4Connection#1
       Local Variable: com.alibaba.datax.common.util.Configuration#1
       Local Variable: java.lang.String#220
       Local Variable: com.alibaba.datax.common.statistics.PerfRecord#1
       Local Variable: com.alibaba.datax.plugin.reader.rdbmsreader.SubCommonRdbmsReader$Task#1
       Local Variable: com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector#2
    at com.alibaba.datax.plugin.reader.rdbmsreader.RdbmsReader$Task.startRead(RdbmsReader.java:84)
       Local Variable: com.alibaba.datax.core.transport.exchanger.BufferedRecordExchanger#2
    at com.alibaba.datax.core.taskgroup.runner.ReaderRunner.run(ReaderRunner.java:57)
       Local Variable: com.alibaba.datax.core.taskgroup.runner.ReaderRunner#1
       Local Variable: com.alibaba.datax.plugin.reader.rdbmsreader.RdbmsReader$Task#1
       Local Variable: com.alibaba.datax.common.statistics.PerfRecord#2
       Local Variable: com.alibaba.datax.common.statistics.PerfRecord#4
       Local Variable: com.alibaba.datax.common.statistics.PerfRecord#3
       Local Variable: com.alibaba.datax.common.statistics.PerfRecord#5
    at java.lang.Thread.run(Thread.java:855)

3 问题原因

该问题跟 jdk 版本和 arm 架构无关,而是因为读取 Ob 数据时,默认情况下会一次性读取所有数据并加载到内存后再做后续处理,所以数据量大时会占用大量堆空间甚至oom,目前版本的oceanbasev10reader/rdbmsreader都存在该问题。

4 解决方案

  • 可以在ob的jdbcurl中显示设置参数 useCursorFetch=true (>=5.0版驱动开始支持),此时底层读取数据时会使用服务器端游标且每次从服务端批量读取 fetch_size 条数据进行处理,从而避免掉大数据量下占用堆空间大甚至 OOM 的问题(该推荐配置与UF30微服务略有不同).
  • 推荐的OB jdbc url格式如下(读写都可以使用该格式;oceanbasev10writer/oceanbasev10reader/rdbmswriter/rdbmsreader都可以使用该格式):jdbc:oceanbase://10.20.182.144:2883/sys?rewriteBatchedStatements=true&useCursorFetch=true。
  • 推荐使用 ob 的专用插件 oceanbasev10writer/oceanbasev10reader,作为 ob 专用插件,其底层自动配置了多个参数,比如oceanbasev10writer 会自动配置 rewriteBatchedStatements=ture,比如oceanbasev10reader会自动配置ResultSet.TYPE_FORWARD_ONLY,所以理论上同步性能会更好一些,特别是考虑到后续随着版本升级迭代,还会有一些功能增强问题修复之类的更新,所以推荐优先使用专用插件;

    5 技术背景

  • datax 的 oceanbasev10reader/ oceanbasev10writer 专用插件,在底层会自动设置一些相关参数如 ResultSet.TYPE_FORWARD_ONLY/rewriteBatchedStatements,且当用户没有显示配置readBatchSize会自动设置 DEFAULT_READ_BATCH_SIZE.
  • 相关源码:
    com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ReaderTask#startRead0
    com.alibaba.datax.plugin.reader.oceanbasev10reader.ext.ReaderTask#doRead
    com.alibaba.datax.plugin.writer.oceanbasev10writer.task.SingleTableWriterTask#init
    com.alibaba.datax.plugin.writer.oceanbasev10writer.task.SingleTableWriterTask#rewriteSql
    com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.OCJConnHolder#initConnection
    com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.OBDataSourceV10#buildJdbcProperty
    

  • 相关参数:rewriteBatchedStatements/useServerPrepStmts/allowMultiQueries/defaultFetchSize/useCursorFetch
  • useCursorFetch: Should the driver use cursor-based fetching to retrieve rows? If set to "true" and 'defaultFetchSize' is set to a value higher than zero or 'setFetchSize()' with a value higher than zero is called on a statement, then the cursor-based result set will be used. Please note that 'useServerPrepStmts' is automatically set to "true" in this case because cursor functionality is available only for server-side prepared statements.
  • 参考链接:
    https://www.oceanbase.com/docs/common-oceanbase-connector-j-cn-10000000001943209
    https://dev.mysql.com/doc/connector-j/8.1/en/connector-j-reference-configuration-properties.html
    https://www.oceanbase.com/docs/common-oceanbase-connector-j-cn-10000000001944617
    https://www.oceanbase.com/docs/enterprise-oceanbase-connector-j-cn-10000000001943207
    https://cloud.tencent.com/developer/article/2101083
    

    6 不同 JDK 的性能差异

  • 使用同样的上游ob数据源和下游hdfs数据源,且同一时间同一台机器分别提交两个datax,一个使用龙井一个使用oracle(oracle jdk arm 版 1.8.0_381 和 dragonwell jdk arm 版 1.8.0_332),进行对比测试可以发现,在同步大量数据且配置堆空间为8G时,oracleJDK 和龙井 jdk在平均同步速度上差很多,前者只有 330KB/S, 后者能达到2.63MB/S,即龙井jdk arm版比oralce jdk arm更有性能优势;
  • 其底层原因可能跟两者默认的GC参数不同有关(龙井arm默认是ParNew和ConcurrentMarkSweep,而oracle arm默认是PS Scavenge和PS MarkSweep);
  • 为进一步确认性能差异的原因,可以再指定GC参数对比测试下 oracle/dragonwell 的同步性能,其中 oracle jdk 可以使用如下命令指定GC参数,dragon使用默认GC参数即可(都需要配置JAVA_HOME环境变量):
    python /opt/DataX/bin/datax.py /tmp/ob.datax  --jvm='-Xms8196m -Xmx8196m -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -Doracle.jdbc.fanEnabled=false -Duser.timezone=GMT+8'
    

7. tcpdump- 抓包分析

在分析问题的过程中,我们通过在 datax 节点使用tcpdump 抓包并导入到wireshark进行分析(tcpdump -i any -nn -s 100 "port 2883 or port 8020" -w /tmp/ob.pcap),并发现了如下现象:

  • 在不进行流控限制时(即不配置datax的"speed": { "channel": 5, "byte": 1048576, "record": 10000}),datax 读ob 和 写 hdfs 时都有短时间内打满网络的现象(即TCP ZeroWindow 和 TCP Window Full);
  • 同时datax 写 hdfs 时会出现有规律地暂停10秒左右的现象,这是因为 orc 是列存储格式,datax 需要累积一批 row data 并计算转换为 orc 的 stripe后(包括stripe底层的index data/stripe footer)才能写入 hdfs。

相关文章
|
5月前
|
DataWorks API 数据库
DataWorks操作报错合集之在使用 OceanBase (OB) 作为数据源进行数据集成时遇到报错,该如何排查
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
DataWorks NoSQL fastjson
DataWorks操作报错合集之DataX进行MongoDB全量迁移的过程中,DataX的MongoDB Reader插件在初始化阶段找不到Fastjson 2.x版本的类库,该怎么办
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
DataWorks Java 调度
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
73 0
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
|
7月前
|
NoSQL MongoDB 数据安全/隐私保护
实时计算 Flink版产品使用合集之与OceanBase进行数据同步时遇到用户名和密码失败的问题,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
消息中间件 SQL 分布式计算
DataX插件开发-KafkaWriter
DataX插件开发-KafkaWriter
215 0
|
Java 关系型数据库 MySQL
DataX教程(10)- DataX插件热插拔原理
DataX教程(10)- DataX插件热插拔原理
596 0
|
关系型数据库 MySQL Java
对比下 datax 的 OceanBase/MYSQL 不同数据同步方案的效率差异 || 聊聊参数 rewriteBatchedStatements
对比下 datax 的 OceanBase/MYSQL 不同数据同步方案的效率差异 || 聊聊参数 rewriteBatchedStatements
|
7月前
|
关系型数据库 Shell OceanBase
您的ulimit参数"max user processes"的当前值为4096,而OceanBase安装OCP 4.2.1时要求该值不能小于655350
您的ulimit参数"max user processes"的当前值为4096,而OceanBase安装OCP 4.2.1时要求该值不能小于655350
373 2
|
DataWorks 关系型数据库 MySQL
DataWorks可以通过数据同步任务(DTS)实现OceanBase和其他数据库之间的实时数据同步
DataWorks可以通过数据同步任务(DTS)实现OceanBase和其他数据库之间的实时数据同步
237 2
|
7月前
|
Apache 流计算 OceanBase
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
本文介绍了如何将数据从 OceanBase 迁移到阿里云数据库 SelectDB 内核版 Apache Doris。提供 3 种数据同步方法 1. 使用 DataX,下载 DataX 并编写配置文件,通过 OceanBaseReader 和 DorisWriter 进行数据迁移。 2. 利用 Apache Doris 的 Catalog功 能,将 OceanBase 表映射到 Doris 并插入数据。 3. 通过Flink CDC,设置 OceanBase 环境,配置 Flink 连接器,实现实时数据同步。
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南

热门文章

最新文章