方法 / 步骤
一:Logstash实现
1.1 安装插件
# 从Logstash的bin目录下安装输入输出ES和MySQL插件
./logstash-plugin install logstash-output-elasticsearch
./logstash-plugin install logstash-input-jdbc
将mysql-connector-java-8.0.11.jar copy到logstash/bin/mysql目录下
1.2 配置
1.2.1 pipelines配置
- logstash主目录下\config\pipelines.yml 放开下面的注释
- pipeline.id: test
pipeline.workers: 1
pipeline.batch.size: 1
config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
# - pipeline.id: another_test
queue.type: persisted
path.config: "/tmp/logstash/*.config"
1.2.2 JDBC 同步配置
1.2.2.1 全量更新
文件名称: init-mysql2es.conf
input {
stdin {}
jdbc{
type => "classify"
jdbc_default_timezone => "Asia/Shanghai"
# mysql 数据库链接,school-edu为数据库名
jdbc_connection_string => "jdbc:mysql://192.168.11.10:3306/ldd_saas?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "useradmin"
# 驱动
jdbc_driver_library => "D:/foundation-server/dev-pkg/elk/logstash-7.0.0/bin/mysql-connector-java-8.0.19.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 是否开启分页
jdbc_paging_enabled => true
#指定每页显示50000条
jdbc_page_size => "50000"
#是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
use_column_value => false
#执行的sql 文件路径+名称
#statement_filepath => "D:/foundation-server/dev-pkg/elk/logstash-7.0.0/bin/mysql/jdbc.sql"
#执行的sql语句
statement => "SELECT `member_id` AS id, `club_id`, `name`, `sex`, `mobile`, `avatar`, `birthday`, `face_img`, `credit_type`, `credit_no`, `wechat_no`, `nick_name`, `stature`, `weight` FROM member_basics"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
#schedule => "* * * * *"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名称
index => "member_basics"
#user => "elastic"
#password => "changeme"
#document_type => "_doc"
# 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
1.2.2.2 增量更新
文件名称: sync-mysql2es.conf
#logstash输入配置
input {
#jdbc输入配置,用来指定mysql中需要同步的数据查询SQL及同步周期
jdbc {
jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
jdbc_user => <my username>
jdbc_password => <my password>
# 是否开启分页
jdbc_paging_enabled => true
# 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
use_column_value => true
# 用来控制增量更新的字段,一般是自增id或者创建、更新时间,注意这里要采用sql语句中select采用的字段别名
tracking_column => "unix_ts_in_secs"
# tracking_column 对应字段的类型
tracking_column_type => "numeric"
# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务,这里设置为每5分钟同步一次
schedule => "*/5 * * * * *"
# 同步数据的查询sql语句
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
}
#logstash输入数据的字段匹配和数据过滤
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
#logstash输出配置
output {
# 采用stdout可以将同步数据输出到控制台,主要是调试阶段使用
stdout { codec => "rubydebug"}
# 指定输出到ES的具体索引
elasticsearch {
index => "rdbms_sync_idx"
document_id => "%{[@metadata][_id]}"
}
}
1.2.3 运行脚本
# 全量构建 run-init2es.bat
logstash -f init-mysql2es.conf
#增量构建 文件名称: run-sync2es.bat
logstash -f sync-mysql2es.conf
- 导入成功
- kibana 查看已经导入成功
参考资料 & 致谢
[1] mysql (全量)数据导入到 elasticsearch
[2] 通过Logstash实现mysql数据定时增量同步到ES