Kettle实现ES到ES循环增量抽取

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: Kettle实现ES到ES循环增量抽取

主页:写程序的小王叔叔的博客欢迎来访👀

支持:点赞收藏关注



本博客内容,实践前,请先逐一浏览,然后再逐一学习

1、效果

image.png

2、实现

2.1 创建数据库

Kettle安装使用

2.2 创建作业

2.2.1 初始化变量:设置变量,通过变量实现作业的循环更新初始值

parent_job.setVariable("isContinue","1");parent_job.setVariable("lastUpdateTime","");true;

2.2.2 创建核心转换

【见2.3】

2.2.3 写日志记录

image.png

isContinue=${isContinue}-------------------lastUpdateTime=${lastUpdateTime}===============

2.2.4 设置循环

image.png

通过【2.2】中设置,可以将基本循环抽取动作的作业可以实现循环。

2.3 创建转换:关键处!!!

思路:

1.通过MySQL中kettle业务抽取的时间备用表,进行设置最后一次修改更新时间。

2.设置基本循环单次抽取的条数,和基本抽取的json格式

3.设置抽取的数据源

4.解析抽取后的es中内置的hits-source的相关结构

5.成功解析之后,将抽取到的数据进行入库,同时变量获取最新的更新时间保存到MySQL中,便于下次更新使用

2.3.1 选择数据源

image.png

selectround(unix_timestamp(timetable_dev.modify_time)*1000) asmodifyTime, '1'asisContinuefromes_kettle.timetable_devWHEREindex_name='sta_resource_operation'

2.3.2 更新常量

image.png

{"from":0,"size":10,"query":{"bool":{"filter":[{"bool":{"must":[{"range":{"last_update_time":{"from":startTime,"to":null,"include_lower":true,"include_upper":true,"boost":1}}}],"adjust_pure_negative":true,"boost":1}}],"adjust_pure_negative":true,"boost":1}},"sort":[{"last_update_time":{"order":"asc"}}]}

2.3.4 参数替换

image.png

2.3.5 设置数据源基本请求信息

image.png

2.3.6 配置解析hits结构

image.png

2.3.7 配置解析的结构

image.png

2.3.8 筛选结构

image.png

2.3.9 最后如ES库

image.png

2.3.10 根据时间设置循环

image.png

执行SQL脚本:

update es_kettle.timetable_devset modify_time = FROM_UNIXTIME('?','%Y-%m-%d %H:%i:%S')where index_name ='sta_resource_operation'

以上就是ES通过作业,转换进行抽取到新的ES结果

3、注意事项

3.1)设置对应字段

3.2)组件之间的关联性

4、最后完成效果

image.png

image.png

转载声明:本文为博主原创文章,未经博主允许不得转载

⚠️注意 ~

💯本期内容就结束了,如果内容有误,麻烦大家评论区指出!

如有疑问❓可以在评论区💬或私信💬,尽我最大能力🏃‍♀️帮大家解决👨‍🏫!

如果我的文章有帮助到您,欢迎点赞+关注✔️鼓励博主🏃,您的鼓励是我分享的动力🏃🏃🏃~

相关文章
|
6月前
|
缓存 索引
kibana上执行ES DSL语言查询数据并查看表结构与数据、删除索引、查看文件大小
kibana上执行ES DSL语言查询数据并查看表结构与数据、删除索引、查看文件大小
310 0
|
8月前
|
SQL Java 数据库
Sqoop【付诸实践 02】Sqoop1最新版 全库导入 + 数据过滤 + 字段类型支持 说明及举例代码(query参数及字段类型强制转换)
【2月更文挑战第10天】Sqoop【付诸实践 02】Sqoop1最新版 全库导入 + 数据过滤 + 字段类型支持 说明及举例代码(query参数及字段类型强制转换)
386 0
|
8月前
|
存储
ES批量写入数据
ES批量写入数据
298 1
|
SQL 存储 监控
通过Logstash实现mysql数据定时增量同步到ES
通过Logstash实现mysql数据定时增量同步到ES
1427 0
通过Logstash实现mysql数据定时增量同步到ES
|
8月前
|
算法 Apache 数据库
Sqoop的增量数据加载策略与示例
Sqoop的增量数据加载策略与示例
|
8月前
|
JSON 数据格式
es批量插入文件中的数据
es批量插入文件中的数据
|
JSON 移动开发 NoSQL
【ES系列九】——批量同步数据至ES
通过es官网提供的bulk方法进行实现
|
缓存 自然语言处理 数据挖掘
白话Elasticsearch50-深入聚合数据分析之基于doc values正排索引的聚合内部原理
白话Elasticsearch50-深入聚合数据分析之基于doc values正排索引的聚合内部原理
121 0
|
SQL Oracle 关系型数据库
基于变量方式实现kettle快速循环迁移表数据(八)
基于变量方式实现kettle快速循环迁移表数据(八)
394 0
基于变量方式实现kettle快速循环迁移表数据(八)
|
canal 数据采集 关系型数据库
Elastic实战:通过pipeline实现mysql同步数据到es的数据预处理
首先canal是支持自定义客户端的,需要引入如下依赖,这种方式适合数据转换规则比较复杂,具有强定制性的场景,但是考虑到我这里还要做logstash的数据同步,因此需要一个比较通用的方式来实现数据转换处理,因此我用到了es的pipeline来做预处理
279 0
Elastic实战:通过pipeline实现mysql同步数据到es的数据预处理