Elastic实战:通过pipeline实现mysql同步数据到es的数据预处理

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 首先canal是支持自定义客户端的,需要引入如下依赖,这种方式适合数据转换规则比较复杂,具有强定制性的场景,但是考虑到我这里还要做logstash的数据同步,因此需要一个比较通用的方式来实现数据转换处理,因此我用到了es的pipeline来做预处理

0. 引言

最近在将公司的一部分mysql数据同步到es中,采用了logstash-input-jdbc实现全量同步canal实现增量同步,但是还有一个问题就是es中的数据结构需要重新设计,也就导致部分mysql字段需要经过转换,然后同步到es中

首先canal是支持自定义客户端的,需要引入如下依赖,这种方式适合数据转换规则比较复杂,具有强定制性的场景,但是考虑到我这里还要做logstash的数据同步,因此需要一个比较通用的方式来实现数据转换处理,因此我用到了es的pipeline来做预处理

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

1. pipeline实现数据预处理

首先pipeline的作用就是在数据进入索引之前进行预处理,而且其也支持类java的painless语法,可以满足我们当前的业务需求。

下面我以用户表的处理来举例示范。为方便演示和脱敏,已经剔除掉部分数据

1.1 mysql中user结构

mysql8.0

id: Long
code: varchar
real_name: varchar
role_id: varchar ,多个id用逗号隔开
dept_id: varchar ,多个id用逗号隔开
post_id: varchar ,多个id用逗号隔开
create_time: datetime

1.2 es中的user结构

以下演示基于es7.13.0

PUT user
{
  "mappings": {
    "properties": {
      "code": {
        "type": "keyword"
      }, 
      "realName": {
        "type": "text",
        "analyzer": "ik_smart"
      },   
      "roleId": {
        "type": "long"
      },
      "deptId": {
        "type": "keyword"
      },
      "postId": {
        "type": "long"
      },  
      "userSource": {
        "type": "integer"
      }
    }
  }
}

1.3 目标

我们需要进行的处理包括:
1、将role_id、dept_id、post_id由字符串转换为数组
2、因为还涉及到要从另外一张微信用户表数据同步到es中,为了区分是来自微信还是pc,通过nickName字段来判定,因为nickName是微信用户表独有的字段。当它存在时说明用户来自于微信表,将userSource标注为1,否则标注为0

1.4 书写pipeline

可以看到直接通过split函数实现字符串转数组,通过自定义脚本来标注userSource的值

更多关于pipeline的使用,可以参考官方文档:ingest pipeline

关于painless语法的使用,也可参考官方文档:painless guide

如果对于pipeline或者自定义脚本的书写有疑惑的,可以留言讨论

PUT _ingest/pipeline/user_mysql_pipeline
{
  "description": "用户数据mysql导入转换为es结构",
  "processors": [
    {
      "split": {
        "field": "roleId",
        "separator": ","
      }
    },
    {
      "split": {
        "field": "deptId",
        "separator": ","
      }
    },
    {
      "split": {
        "field": "postId",
        "separator": ","
      }
    },
    {
      "script": {
        "lang": "painless", 
        "source": """ 
          if(ctx.containsKey('nickName')){
            ctx.name = ctx.nickName;
            ctx.remove('nickName');
            ctx.userSource = 1;
          }
        """
      }
    }
  ]
}

1.5 调用pipeline

1、使用pipeline需要在es中添加ignest角色,修改es配置文件

node.roles: [ignest]

2、在user的settings中指定pipeline

PUT user
{
  "mappings": {
    "properties": {
      "code": {
        "type": "keyword"
      },
      "userType": {
        "type": "long"
      },
      "account": {
        "type": "text",
        "analyzer": "ik_smart"
      },
      "realName": {
        "type": "text",
        "analyzer": "ik_smart"
      },
      "email": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "phone": {
        "type": "keyword"
      },
      "sex": {
        "type": "integer"
      },
      "roleIds": {
        "type": "long"
      },
      "deptIds": {
        "type": "keyword"
      },
      "postIds": {
        "type": "long"
      },
      "parentDeptIds": {
        "type": "keyword"
      },
      "thirdPlatformUserId": {
        "type": "keyword"
      },
      "tenantUserId": {
        "type": "long"
      },
      "userSource": {
        "type": "integer"
      },
      "tenantId": {
        "type": "keyword"
      },
      "createUser": {
        "type": "long"
      },
      "createDept": {
        "type": "keyword"
      },
      "createTime": {
        "type": "date"
      }
    }
  },
  "settings": {
    "default_pipeline": "user_mysql_pipeline",
    "number_of_replicas": 0,  // 因为我测试用的单节点,所以将副本分片设置为0
    "number_of_shards": 1
  }
}

或者还可以在插入数据的时候指定pipeline,这里因为是自动同步,所以这种方式不适用

PUT user/_doc/1?pipeline=user_mysql_pipeline
{
   ...
}

3、将上述语句在kibana或者其他es客户端中执行后,再启动canal,logstash同步数据,es就会对数据进行预处理了

4、测试,可以看到数据转换成功

GET user/_search?size=100

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
1天前
|
SQL 前端开发 关系型数据库
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
25 9
|
13天前
|
SQL Java 关系型数据库
java连接mysql查询数据(基础版,无框架)
【10月更文挑战第12天】该示例展示了如何使用Java通过JDBC连接MySQL数据库并查询数据。首先在项目中引入`mysql-connector-java`依赖,然后通过`JdbcUtil`类中的`main`方法实现数据库连接、执行SQL查询及结果处理,最后关闭相关资源。
|
9天前
|
SQL 关系型数据库 MySQL
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
23 1
|
10天前
|
SQL 关系型数据库 MySQL
mysql数据误删后的数据回滚
【11月更文挑战第1天】本文介绍了四种恢复误删数据的方法:1. 使用事务回滚,通过 `pymysql` 库在 Python 中实现;2. 使用备份恢复,通过 `mysqldump` 命令备份和恢复数据;3. 使用二进制日志恢复,通过 `mysqlbinlog` 工具恢复特定位置的事件;4. 使用延迟复制从副本恢复,通过停止和重启从库复制来恢复数据。每种方法都有详细的步骤和示例代码。
|
26天前
|
存储 SQL 关系型数据库
Mysql学习笔记(二):数据库命令行代码总结
这篇文章是关于MySQL数据库命令行操作的总结,包括登录、退出、查看时间与版本、数据库和数据表的基本操作(如创建、删除、查看)、数据的增删改查等。它还涉及了如何通过SQL语句进行条件查询、模糊查询、范围查询和限制查询,以及如何进行表结构的修改。这些内容对于初学者来说非常实用,是学习MySQL数据库管理的基础。
103 6
|
23天前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
56 3
Mysql(4)—数据库索引
|
26天前
|
SQL Ubuntu 关系型数据库
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
本文为MySQL学习笔记,介绍了数据库的基本概念,包括行、列、主键等,并解释了C/S和B/S架构以及SQL语言的分类。接着,指导如何在Windows和Ubuntu系统上安装MySQL,并提供了启动、停止和重启服务的命令。文章还涵盖了Navicat的使用,包括安装、登录和新建表格等步骤。最后,介绍了MySQL中的数据类型和字段约束,如主键、外键、非空和唯一等。
62 3
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
|
9天前
|
关系型数据库 MySQL Linux
在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。
本文介绍了在 CentOS 7 中通过编译源码方式安装 MySQL 数据库的详细步骤,包括准备工作、下载源码、编译安装、配置 MySQL 服务、登录设置等。同时,文章还对比了编译源码安装与使用 RPM 包安装的优缺点,帮助读者根据需求选择最合适的方法。通过具体案例,展示了编译源码安装的灵活性和定制性。
45 2
|
12天前
|
存储 关系型数据库 MySQL
MySQL vs. PostgreSQL:选择适合你的开源数据库
在众多开源数据库中,MySQL和PostgreSQL无疑是最受欢迎的两个。它们都有着强大的功能、广泛的社区支持和丰富的生态系统。然而,它们在设计理念、性能特点、功能特性等方面存在着显著的差异。本文将从这三个方面对MySQL和PostgreSQL进行比较,以帮助您选择更适合您需求的开源数据库。
52 4
|
17天前
|
存储 关系型数据库 MySQL
如何在MySQL中创建数据库?
【10月更文挑战第16天】如何在MySQL中创建数据库?