Elastic实战:canal同步mysql到es之父子表数据同步|对象型数组同步|nested数组同步

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 最近在做mysql到es的数据同步,涉及到父子表数据同步,特此记录,以供后续参考

0. 引言

最近在做mysql到es的数据同步,涉及到父子表数据同步,特此记录,以供后续参考

关于mysql同步到es的操作明细可参考我之前的博客:
Elastic实战:通过canal1.1.5实现mysql8.0数据增量/全量同步到elasticsearch7.x

1.环境

canal 1.1.5
elasticsearch7.13
mysql 8.0

2. 基础类型数组同步

相关配置实际上在官方文档中都有示例,以下也是基于这些示例来实现的

这种方式针对的是数组中的数据为基础类型,比如List,List等

2.1 sql配置说明

sql支持多表关联自由组合, 但是有一定的限制:

1、主表不能为子查询语句

2、只能使用left outer join即最左表一定要是主表

3、关联从表如果是子查询不能有多张表

4、主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐, 可能会造成数据同步的不一致, 比如修改了where条件中的字段内容)

5、关联条件只允许主外键的'='操作不能出现其他常量判断比如: on a.role_id=b.id and b.statues=1

6、关联条件必须要有一个字段出现在主查询语句中比如: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须出现在主select语句中

7、Elastic Search的mapping 属性与sql的查询值将一一对应(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id可以填写到配置文件的 _id: _id映射

2.2 配置步骤

es mappings(已剔除部分字段)

{
  "service_comment_driver" : {
    "mappings" : {
      "properties" : {
        "id" : {
          "type" : "keyword"
        },
        "avg" : {
          "type" : "double"
        },
        "comment" : {
          "type" : "text"
        },
        "createTime" : {
          "type" : "date"
        },
        "labels" : {
          "type" : "text",
          "analyzer" : "ik_smart"
        }
      }
    }
  }
}

1、sql
将子表数据通过left join关联,并且将要查询的字段通过group_concat函数拼接起来,group_concat函数的作用是将group by产生的同一个分组中的值连接起来,返回一个字符串结果,并且不同行之间用separator指定的符号隔离

select
          t.id as _id,
          t.avg as avg, 
          t.create_time as createTime,
          t.comment as comment,
          l.labels
 from
          t_service_comment_driver t
 left join 
           (select bussiness_id,group_concat(label order by id desc separator ';') as labels from t_service_comment_label 
           where type=0 group by bussiness_id) l
 on 
         t.id = l.bussiness_id

2、adapter配置文件中添加配置

 objFields:
    labels: array:;           # 数组属性, array:; 代表字段以;分隔的

整体的canal-adapter/conf/es7中的配置文件:comment.yml

dataSourceKey: duola_bussness # 这里的key与上述application.yml中配置的数据源保持一致
outerAdapterKey: esKey # 与上述application.yml中配置的outerAdapters.key一直
destination: example # 默认为example,与application.yml中配置的instance保持一致
groupId:
esMapping:
  _index: service_comment_driver
  _type: _doc
  _id: _id
  sql: "select
          t.id as _id,
          t.avg as avg, 
          t.create_time as createTime,
          t.comment as comment,
          l.labels
        from
          t_service_comment_driver t
        left join 
           (select bussiness_id,group_concat(label order by id desc separator ';') as labels from t_service_comment_label 
           where type=0 group by bussiness_id) l
        on t.id = l.bussiness_id"
  objFields:
    labels: array:;           # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
  #etlCondition: "where t.create_time>='{0}'"
  commitBatch: 3000

3、启动adapter

./bin/startup.sh

4、修改对应的数据库表中的数据,然后查看日志

cat logs/adapter/adapter.log

发现已经有更新数据了
在这里插入图片描述
5、查看es中的数据

GET service_comment_driver/_search 

发现labels中的数据已经同步更新了,并且是数组形式,修改子表数据后也会同步更新
在这里插入图片描述

2.3 常见报错

1. Unknown column '_v._id' in 'where clause'

将配置文件中的_id映射调整为_id即可,注意sql中的别名一样要为_id。

_id: _id

sql

select t.id as _id ...

3. 对象型数组同步

3.1 思路

这种方式针对的是数组中是自定义对象的数据,比如List<Object>
对比到es中的结构就是 List<Nested>

针对这一类型的同步,官方没有明确的示例说明能够支持,但是观察官方文档会发现官方提供了一个对象型字段的同步

objFields:
  <field>: object

虽然官方的描述这一类型更针对的是一对一的json型字符串,但是不妨尝试一下,看看是否能够支持json型数组

canal中object是识别的json型字符串,所以我们的思路就是将子表数据转换为json字符串,然后通过object

3.2 配置步骤

1、es mapping

{
  "service_comment_owner" : {
    "mappings" : {
      "properties" : {
        "avg" : {
          "type" : "double"
        },
        "comment" : {
          "type" : "text"
        }, 
        "createTime" : {
          "type" : "date"
        }, 
        "id" : {
          "type" : "keyword"
        }, 
        "labels" : {
          "type" : "nested",
          "properties" : {
            "id" : {
              "type" : "long"
            },
            "label" : {
              "type" : "text",
              "analyzer" : "ik_smart"
            },
            "type" : {
              "type" : "integer"
            }
          }
        }
      }
    }
  }
}

2、sql

select
    t.id as _id, 
    t.avg as avg, 
    t.create_time as createTime, 
    t.comment as comment,
    CONCAT('[',l.labels,']') as labels
from
    t_service_comment_owner t
left join 
    (select bussiness_id,group_concat(json_object('id',id,'type',type,'label',label)) as labels from t_service_comment_label where type=1 group by bussiness_id) l 
on 
    t.id=l.bussiness_id

3、adapter配置文件

 objFields:
    labels: object

4、整体配置文件

dataSourceKey: duola_bussness # 这里的key与上述application.yml中配置的数据源保持一致
outerAdapterKey: esKey # 与上述application.yml中配置的outerAdapters.key一直
destination: example # 默认为example,与application.yml中配置的instance保持一致
groupId:
esMapping:
  _index: service_comment_owner
  _type: _doc
  _id: _id
  sql: "select
    t.id as _id, 
    t.avg as avg, 
    t.create_time as createTime, 
    t.comment as comment,
    CONCAT('[',l.labels,']') as labels
from
    t_service_comment_owner t
left join 
    (select bussiness_id,group_concat(json_object('id',id,'type',type,'label',label)) as labels from t_service_comment_label where type=1 group by bussiness_id) l 
on 
    t.id=l.bussiness_id"
  #etlCondition: "where t.update_time>='{0}'"
  commitBatch: 3000
  objFields:
    labels: object           # 数组或者对象属性

5、启动adapter

./bin/startup.sh

6、修改对应的数据库表中的数据,然后查看日志,会发现日志中有数据输出

cat logs/adapter/adapter.log

7、查询索引数据,注意因为是nested结构,所以使用nested查询

GET service_comment_owner/_search
{
  "query": {
    "nested": {
      "path": "labels",
      "query": {
        "match": {
          "labels.label": "信息"
        }
      }
    }
  }
}

会发现刚刚修改的信息已经更新上去了
在这里插入图片描述

3.3 常见报错

1. RuntimeException: com.alibaba.fastjson.JSONException: not close json text, token : ,

这个错误是因为json识别缺少必要符号导致的,因为我们上述的做法是将对象型数组转换为json数组,json数组需要在有[]符号,将这两个符号添加上就可以了

CONCAT('[',l.labels,']')

4. join型数据同步

4.1 join类型应用场景

所谓join型是指es中的join数据类型,这种类型适用于以下条件的场景
1、父子表结构的数据
2、子表数据明显多于父表数据

join类型不能像关系型数据库中的表连接那样去用,无论是has_child或者has_parent查询都会对索引的查询性能有严重的负面影响,并且会触发global ordinals。所以join类型不能遇到父子表结构就使用,先考虑上述两种方式,当子表数据远超父表数据时再考虑。

4.2 配置步骤

(因暂无应用需求,以下配置说明根据官方文档给出,后续持续更新)
1、es mappings

{
  "mappings":{
    "_doc":{
      "properties":{
        "id": {
          "type": "long"
        },
        "name": {
          "type": "text"
        },
        "email": {
          "type": "text"
        },
        "order_id": {
          "type": "long"
        },
        "order_serial": {
          "type": "text"
        },
        "order_time": {
          "type": "date"
        },
        "customer_order":{
          "type":"join",
          "relations":{
            "customer":"order"
          }
        }
      }
    }
  }
}

2、adapter/es7/customer.yml

esMapping:
  _index: customer
  _type: _doc
  _id: id
  relations:
    customer_order:
      name: customer
  sql: "select t.id, t.name, t.email from customer t"

3、adapter/es7/order.yml配置文件

esMapping:
  _index: customer
  _type: _doc
  _id: _id
  relations:
    customer_order:
      name: order
      parent: customer_id
  sql: "select concat('oid_', t.id) as _id,
        t.customer_id,
        t.id as order_id,
        t.serial_code as order_serial,
        t.c_time as order_time
        from biz_order t"
  skips:
    - customer_id

4、启动服务

./bin/startup.sh
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
558 4
|
4月前
|
运维 DataWorks 数据管理
数据管理DMS使用问题之正在使用“同步表”功能,如何设置数据同步的过期时间
阿里云数据管理DMS提供了全面的数据管理、数据库运维、数据安全、数据迁移与同步等功能,助力企业高效、安全地进行数据库管理和运维工作。以下是DMS产品使用合集的详细介绍。
数据管理DMS使用问题之正在使用“同步表”功能,如何设置数据同步的过期时间
|
3月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
704 0
|
4月前
|
监控 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行数据同步时,重新创建了一个新的任务,但发现无法删除旧任务同步的历史数据,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
1月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
107 1
|
3月前
|
关系型数据库 MySQL 数据库
【MySQL】手把手教你MySQL数据同步
【MySQL】手把手教你MySQL数据同步
|
20天前
|
消息中间件 NoSQL 关系型数据库
一文彻底搞定Redis与MySQL的数据同步
【10月更文挑战第21天】本文介绍了 Redis 与 MySQL 数据同步的原因及实现方式。同步的主要目的是为了优化性能和保持数据一致性。实现方式包括基于数据库触发器、应用层双写和使用消息队列。每种方式都有其优缺点,需根据具体场景选择合适的方法。此外,文章还强调了数据同步时需要注意的数据一致性、性能优化和异常处理等问题。
205 0
|
3月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
311 1
|
3月前
|
SQL canal 关系型数据库
(二十四)全解MySQL之主从篇:死磕主从复制中数据同步原理与优化
兜兜转转,经过《全解MySQL专栏》前面二十多篇的内容讲解后,基本对MySQL单机模式下的各方面进阶知识做了详细阐述,同时在前面的《分库分表概念篇》、《分库分表隐患篇》两章中也首次提到了数据库的一些高可用方案,但前两章大多属于方法论,并未涵盖真正的实操过程。接下来的内容,会以目前这章作为分割点,开启MySQL高可用方案的落地实践分享的新章程!
1456 1