[玩转ES] ES批量/全量导入数据

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: [玩转ES] ES批量/全量导入数据

方法 / 步骤

一:Logstash实现

1.1 安装插件

# 从Logstash的bin目录下安装输入输出ES和MySQL插件
./logstash-plugin install logstash-output-elasticsearch
./logstash-plugin install logstash-input-jdbc

在这里插入图片描述

将mysql-connector-java-8.0.11.jar copy到logstash/bin/mysql目录下

1.2 配置

1.2.1 pipelines配置

  • logstash主目录下\config\pipelines.yml 放开下面的注释
 - pipeline.id: test
   pipeline.workers: 1
   pipeline.batch.size: 1
   config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
# - pipeline.id: another_test
   queue.type: persisted
   path.config: "/tmp/logstash/*.config"

1.2.2 JDBC 同步配置

1.2.2.1 全量更新

文件名称: init-mysql2es.conf

input {
    stdin {}
    jdbc{
      type => "classify"
      jdbc_default_timezone => "Asia/Shanghai"
      # mysql 数据库链接,school-edu为数据库名
      jdbc_connection_string => "jdbc:mysql://192.168.11.10:3306/ldd_saas?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
      # 用户名和密码
      jdbc_user => "root"
      jdbc_password => "useradmin"
      # 驱动
      jdbc_driver_library => "D:/foundation-server/dev-pkg/elk/logstash-7.0.0/bin/mysql-connector-java-8.0.19.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      # 是否开启分页
      jdbc_paging_enabled => true
      #指定每页显示50000条
      jdbc_page_size => "50000"
      #是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
      use_column_value => false
      #执行的sql 文件路径+名称
      #statement_filepath => "D:/foundation-server/dev-pkg/elk/logstash-7.0.0/bin/mysql/jdbc.sql"
      #执行的sql语句
      statement => "SELECT `member_id` AS id, `club_id`, `name`, `sex`, `mobile`, `avatar`, `birthday`, `face_img`, `credit_type`, `credit_no`, `wechat_no`, `nick_name`, `stature`, `weight` FROM member_basics"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      #schedule => "* * * * *"
    }
}
output {
    elasticsearch  {
      # ES的IP地址及端口
        hosts => ["localhost:9200"]
      # 索引名称
        index => "member_basics"
        #user => "elastic"
        #password => "changeme"
        #document_type => "_doc"
      # 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
        document_id => "%{id}"
    }
    stdout {
     # JSON格式输出
        codec => json_lines
    }
}
1.2.2.2 增量更新

文件名称: sync-mysql2es.conf

#logstash输入配置
input {
  #jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => <my username>
    jdbc_password => <my password>
    # 是否开启分页
    jdbc_paging_enabled => true
     # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
    use_column_value => true
    # 用来控制增量更新的字段,一般是自增id或者创建、更新时间,注意这里要采用sql语句中select采用的字段别名
    tracking_column => "unix_ts_in_secs"
    # tracking_column 对应字段的类型
    tracking_column_type => "numeric"
    # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
    schedule => "*/5 * * * * *"
     # 同步数据的查询sql语句
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
#logstash输入数据的字段匹配和数据过滤
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}
#logstash输出配置
output {
  # 采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
  stdout { codec =>  "rubydebug"}

  # 指定输出到ES的具体索引
  elasticsearch {
      index => "rdbms_sync_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

1.2.3 运行脚本

# 全量构建 run-init2es.bat
logstash -f init-mysql2es.conf
#增量构建 文件名称: run-sync2es.bat
logstash -f sync-mysql2es.conf
  • 导入成功

在这里插入图片描述

  • kibana 查看已经导入成功

在这里插入图片描述

参考资料 & 致谢

[1] mysql (全量)数据导入到 elasticsearch
[2] 通过Logstash实现mysql数据定时增量同步到ES

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
6月前
|
缓存 监控
遇到Hologres慢查询列表的导出功能出现问题,无法下载查询结果的情况
【2月更文挑战第20天】遇到Hologres慢查询列表的导出功能出现问题,无法下载查询结果的情况
89 1
|
canal 消息中间件 关系型数据库
详解 canal 同步 MySQL 增量数据到 ES
canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。 这篇文章,我们手把手向同学们展示**使用 canal 将 MySQL 增量数据同步到 ES**
详解 canal 同步 MySQL 增量数据到 ES
|
5月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之数据源同步时,使用脚本模式采集mysql数据到odps中,使用querySql方式采集数据,在脚本中删除了Reader中的column,但是datax还是报错OriginalConfPretreatmentUtil - 您的配置有误。如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之离线同步任务中,把表数据同步到POLARDB,显示所有数据都是脏数据,报错信息:ERROR JobContainer - 运行scheduler 模式[local]出错.是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
自然语言处理 索引
03_ES数据查询操作
03_ES数据查询操作
86 0
|
6月前
|
canal 关系型数据库 MySQL
四种常用的 MySQL 数据同步 ES 的方法
【2月更文挑战第16天】
2949 2
四种常用的 MySQL 数据同步 ES 的方法
|
6月前
|
canal SQL 关系型数据库
MySQL数据直接实时同步到ES
MySQL数据直接实时同步到ES
135 0
|
6月前
|
存储
ES批量写入数据
ES批量写入数据
226 1
|
6月前
|
算法 Apache 数据库
Sqoop的增量数据加载策略与示例
Sqoop的增量数据加载策略与示例
|
6月前
|
JSON 数据格式
es批量插入文件中的数据
es批量插入文件中的数据