0. 引言
最近在将公司的一部分mysql数据同步到es中,采用了logstash-input-jdbc实现全量同步,canal实现增量同步,但是还有一个问题就是es中的数据结构需要重新设计,也就导致部分mysql字段需要经过转换,然后同步到es中
首先canal是支持自定义客户端的,需要引入如下依赖,这种方式适合数据转换规则比较复杂,具有强定制性的场景,但是考虑到我这里还要做logstash的数据同步,因此需要一个比较通用的方式来实现数据转换处理,因此我用到了es的pipeline来做预处理
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
1. pipeline实现数据预处理
首先pipeline的作用就是在数据进入索引之前进行预处理,而且其也支持类java的painless语法,可以满足我们当前的业务需求。
下面我以用户表的处理来举例示范。为方便演示和脱敏,已经剔除掉部分数据
1.1 mysql中user结构
mysql8.0
id: Long
code: varchar
real_name: varchar
role_id: varchar ,多个id用逗号隔开
dept_id: varchar ,多个id用逗号隔开
post_id: varchar ,多个id用逗号隔开
create_time: datetime
1.2 es中的user结构
以下演示基于es7.13.0
PUT user
{
"mappings": {
"properties": {
"code": {
"type": "keyword"
},
"realName": {
"type": "text",
"analyzer": "ik_smart"
},
"roleId": {
"type": "long"
},
"deptId": {
"type": "keyword"
},
"postId": {
"type": "long"
},
"userSource": {
"type": "integer"
}
}
}
}
1.3 目标
我们需要进行的处理包括:
1、将role_id、dept_id、post_id由字符串转换为数组
2、因为还涉及到要从另外一张微信用户表数据同步到es中,为了区分是来自微信还是pc,通过nickName字段来判定,因为nickName是微信用户表独有的字段。当它存在时说明用户来自于微信表,将userSource标注为1,否则标注为0
1.4 书写pipeline
可以看到直接通过split函数实现字符串转数组,通过自定义脚本来标注userSource的值
更多关于pipeline的使用,可以参考官方文档:ingest pipeline
关于painless语法的使用,也可参考官方文档:painless guide
如果对于pipeline或者自定义脚本的书写有疑惑的,可以留言讨论
PUT _ingest/pipeline/user_mysql_pipeline
{
"description": "用户数据mysql导入转换为es结构",
"processors": [
{
"split": {
"field": "roleId",
"separator": ","
}
},
{
"split": {
"field": "deptId",
"separator": ","
}
},
{
"split": {
"field": "postId",
"separator": ","
}
},
{
"script": {
"lang": "painless",
"source": """
if(ctx.containsKey('nickName')){
ctx.name = ctx.nickName;
ctx.remove('nickName');
ctx.userSource = 1;
}
"""
}
}
]
}
1.5 调用pipeline
1、使用pipeline需要在es中添加ignest角色,修改es配置文件
node.roles: [ignest]
2、在user的settings中指定pipeline
PUT user
{
"mappings": {
"properties": {
"code": {
"type": "keyword"
},
"userType": {
"type": "long"
},
"account": {
"type": "text",
"analyzer": "ik_smart"
},
"realName": {
"type": "text",
"analyzer": "ik_smart"
},
"email": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"phone": {
"type": "keyword"
},
"sex": {
"type": "integer"
},
"roleIds": {
"type": "long"
},
"deptIds": {
"type": "keyword"
},
"postIds": {
"type": "long"
},
"parentDeptIds": {
"type": "keyword"
},
"thirdPlatformUserId": {
"type": "keyword"
},
"tenantUserId": {
"type": "long"
},
"userSource": {
"type": "integer"
},
"tenantId": {
"type": "keyword"
},
"createUser": {
"type": "long"
},
"createDept": {
"type": "keyword"
},
"createTime": {
"type": "date"
}
}
},
"settings": {
"default_pipeline": "user_mysql_pipeline",
"number_of_replicas": 0, // 因为我测试用的单节点,所以将副本分片设置为0
"number_of_shards": 1
}
}
或者还可以在插入数据的时候指定pipeline,这里因为是自动同步,所以这种方式不适用
PUT user/_doc/1?pipeline=user_mysql_pipeline
{
...
}
3、将上述语句在kibana或者其他es客户端中执行后,再启动canal,logstash同步数据,es就会对数据进行预处理了
4、测试,可以看到数据转换成功
GET user/_search?size=100