elasticsearch-jdbc实现MySQL同步到ElasticSearch深入详解

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 本文是elasticsearch-jdbc实现MySQL同步到ElasticSearch深入详解。

1、如何实现mysql与elasticsearch的数据同步?

逐条转换为json显然不合适,需要借助第三方工具或者自己实现。核心功能点:同步增、删、改、查同步。

2、mysql与elasticsearch同步的方法有哪些?优缺点对比?

目前该领域比较牛的插件有:

1)elasticsearch-jdbc,严格意义上它已经不是第三方插件。已经成为独立的第三方工具。https://github.com/jprante/elasticsearch-jdbc
2)elasticsearch-river-mysql插件 https://github.com/scharron/elasticsearch-river-mysql
3)go-mysql-elasticsearch(国内作者siddontang) https://github.com/siddontang/go-mysql-elasticsearch

1-3同步工具/插件对比:

go-mysql-elasticsearch仍处理开发不稳定阶段。
为什么选择elasticsearch-jdbc而不是elasticsearch-river-mysql插件的原因?
(参考:http://stackoverflow.com/questions/23658534/using-elasticsearch-river-mysql-to-stream-data-from-mysql-database-to-elasticsea

1)通用性角度:elasticsearch-jdbc更通用,
2)版本更新角度:elasticsearch-jdbc GitHub活跃度很高,最新的版本2.3.3.02016年5月28日兼容Elasticsearch2.3.3版本。
而elasticsearch-river-mysql 2012年12月13日后便不再更新。
综上,选择elasticsearch-jdbc作为mysql同步Elasticsearch的工具理所当然。

elasticsearch-jdbc的缺点与不足(他山之石):

1)go-mysql-elasticsearch作者siddontang在博客提到的:
elasticsearch-river-jdbc的功能是很强大,但并没有很好的支持增量数据更新的问题,它需要对应的表只增不减,而这个几乎在项目中是不可能办到的。
http://www.jianshu.com/p/05cff717563c

2)博主leotse90在博文中提到elasticsearch-jdbc的缺点:那就是删除操作不能同步(物理删除)!
http://leotse90.com/2015/11/11/ElasticSearch与MySQL数据同步以及修改表结构/

我截止2016年6月16日没有测试到,不妄加评论。

image.png

3、elasticsearch-jdbc如何使用?要不要安装?

3.1 和早期版本不同点

elasticsearch-jdbcV2.3.2.0版本不需要安装。以下笔者使用的elasticsearch也是2.3.2测试。
操作系统:CentOS release 6.6 (Final)
看到这里,你可能会问早期的版本有什么不同呢?很大不同。从我搜集资料来看,不同点如下:

1)早期1.x版本,作为插件,需要安装。
2)配置也会有不同。

3.2 elasticsearch-jdbc使用(同步方法一)

前提:
1)elasticsearch 2.3.2 安装成功,测试ok。
2)mysql安装成功,能实现增、删、改、查。
可供测试的数据库为test,表为cc,具体信息如下:

mysql> select * from cc;
+----+------------+
| id | name |
+----+------------+
| 1 | laoyang |
| 2 | dluzhang |
| 3 | dlulaoyang |
+----+------------+
3 rows in set (0.00 sec)

第一步:下载工具。
地址:http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.3.2.0/elasticsearch-jdbc-2.3.2.0-dist.zip

第二步:导入Centos。路径自己定,笔者放到根目录下,解压。unzip elasticsearch-jdbc-2.3.2.0-dist.zip

第三步:设置环境变量。

[root@5b9dbaaa148a /]# vi /etc/profile 
export JDBC_IMPORTER_HOME=/elasticsearch-jdbc-2.3.2.0

使环境变量生效:

[root@5b9dbaaa148a /]# source /etc/profile 

第四步:配置使用。详细参考:

https://github.com/jprante/elasticsearch-jdbc

1)根目录下新建文件夹odbc_es 如下:

[root@5b9dbaaa148a /]# ll /odbc_es/ 
drwxr-xr-x 2 root root 4096 Jun 16 03:11 logs 
-rwxrwxrwx 1 root root 542 Jun 16 04:03 mysql_import_es.sh 

2)新建脚本mysql_import_es.sh,内容如下;

[root@5b9dbaaa148a odbc_es]# cat mysql_import_es.sh
’#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib
echo '{
"type" : "jdbc",
"jdbc": {
"elasticsearch.autodiscover":true,
"elasticsearch.cluster":"my-application", #簇名,详见:/usr/local/elasticsearch/config/elasticsearch.yml
"url":"jdbc:mysql://10.8.5.101:3306/test", #mysql数据库地址
"user":"root", #mysql用户名
"password":"123456", #mysql密码
"sql":"select * from cc",
"elasticsearch" : {
  "host" : "10.8.5.101",
  "port" : 9300
},
"index" : "myindex", #新的index
"type" : "mytype" #新的type
}
}'| java \
  -cp "${lib}/*" \
  -Dlog4j.configurationFile=${bin}/log4j2.xml \
  org.xbib.tools.Runner \
  org.xbib.tools.JDBCImporter

3)为 mysql_import_es.sh 添加可执行权限。

[root@5b9dbaaa148a odbc_es]# chmod a+x mysql_import_es.sh 

4)执行脚本mysql_import_es.sh

[root@5b9dbaaa148a odbc_es]# ./mysql_import_es.sh

第五步:测试数据同步是否成功。
使用elasticsearch检索查询:

[root@5b9dbaaa148a odbc_es]# curl -XGET 'http://10.8.5.101:9200/myindex/mytype/_search?pretty'
{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
  "total" : 8,
  "successful" : 8,
  "failed" : 0
  },
  "hits" : {
  "total" : 3,
  "max_score" : 1.0,
  "hits" : [ {
  "_index" : "myindex",
  "_type" : "mytype",
  "_id" : "AVVXKgeEun6ksbtikOWH",
  "_score" : 1.0,
  "_source" : {
  "id" : 1,
  "name" : "laoyang"
  }
  }, {
  "_index" : "myindex",
  "_type" : "mytype",
  "_id" : "AVVXKgeEun6ksbtikOWI",
  "_score" : 1.0,
  "_source" : {
  "id" : 2,
  "name" : "dluzhang"
  }
  }, {
  "_index" : "myindex",
  "_type" : "mytype",
  "_id" : "AVVXKgeEun6ksbtikOWJ",
  "_score" : 1.0,
  "_source" : {
  "id" : 3,
  "name" : "dlulaoyang"
  }
  } ]
  }
}

出现以上包含mysql数据字段的信息则为同步成功。

4、 elasticsearch-jdbc 同步方法二

[root@5b9dbaaa148a odbc_es]# cat mysql_import_es_simple.sh
#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib
  java \
  -cp "${lib}/*" \
  -Dlog4j.configurationFile=${bin}/log4j2.xml \
  org.xbib.tools.Runner \
  org.xbib.tools.JDBCImporter statefile.json

[root@5b9dbaaa148a odbc_es]# cat statefile.json
{
"type" : "jdbc",
"jdbc": {
"elasticsearch.autodiscover":true,
"elasticsearch.cluster":"my-application",
"url":"jdbc:mysql://10.8.5.101:3306/test",
"user":"root",
"password":"123456",
"sql":"select * from cc",
"elasticsearch" : {
  "host" : "10.8.5.101",
  "port" : 9300
},
"index" : "myindex_2",
"type" : "mytype_2"
}
}

脚本和json文件分开,脚本执行前先加载json文件。
执行方式:直接运行脚本 ./mysql_import_es_simple.sh 即可。

5、Mysql与elasticsearch等价查询

目标:实现从表cc中查询id=3的name信息。
1)MySQL中sql语句查询:

mysql> select * from cc where id=3;
+----+------------+
| id | name |
+----+------------+
| 3 | dlulaoyang |
+----+------------+
1 row in set (0.00 sec)

2)elasticsearch检索:

[root@5b9dbaaa148a odbc_es]# curl http://10.8.5.101:9200/myindex/mytype/_search?pretty -d '
{
"filter" : { "term" : { "id" : "3" } }
}'
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
  "total" : 8,
  "successful" : 8,
  "failed" : 0
  },
  "hits" : {
  "total" : 1,
  "max_score" : 1.0,
  "hits" : [ {
  "_index" : "myindex",
  "_type" : "mytype",
  "_id" : "AVVXKgeEun6ksbtikOWJ",
  "_score" : 1.0,
  "_source" : {
  "id" : 3,
  "name" : "dlulaoyang"
  }
  } ]
  }
}

常见错误:
错误日志位置:/odbc_es/logs
日志内容:

[root@5b9dbaaa148a logs]# tail -f jdbc.log
04:03:39,570org.xbib.elasticsearch.helper.client.BaseTransportClient after auto-discovery connected to [{5b9dbaaa148a}{aksn2ErNRlWjUECnp_8JmA}{10.8.5.101}{10.8.5.101:9300}{master=true}]

Bug1、02:46:23,894importer.jdbc error while processing request: cluster state is RED and not YELLOW, from here on, everything will fail!

原因:
you created an index with replicas but you had only one node in the cluster. One way to solve this problem is by allocating them on a second node. Another way is by turning replicas off.
你创建了带副本 replicas 的索引,但是在你的簇中只有一个节点。

解决方案:
方案一:允许分配‘它们’到第二个节点。
方案二:关闭副本replicas(非常可行)。如下:

curl -XPUT 'localhost:9200/_settings' -d '
{
  "index" : {
  "number_of_replicas" : 0
  }
}

Bug2、13:00:37,137importer.jdbc error while processing request: no cluster nodes available, check settings {autodiscover=false, client.transport.ignore_cluster_name=false, client.transport.nodes_sampler_interval=5s, client.transport.ping_timeout=5s, cluster.name=elasticsearch,
org.elasticsearch.client.transport.NoNodeAvailableException: no cluster nodes available, check

解决方案:
见上脚本中新增:

“elasticsearch.cluster”:”my-application”, #簇名,和/usr/local/elasticsearch/config/elasticsearch.yml 簇名保持一致。

参考:
http://stackoverflow.com/questions/11944915/getting-an-elasticsearch-cluster-to-green-cluster-setup-on-os-x


作者:铭毅天下
转载请标明出处,原文地址:http://blog.csdn.net/laoyang360/article/details/51694519

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
17天前
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
7天前
|
存储 自然语言处理 关系型数据库
ElasticSearch基础3——聚合、补全、集群。黑马旅游检索高亮+自定义分词器+自动补全+前后端消息同步
聚合、补全、RabbitMQ消息同步、集群、脑裂问题、集群分布式存储、黑马旅游实现过滤和搜索补全功能
ElasticSearch基础3——聚合、补全、集群。黑马旅游检索高亮+自定义分词器+自动补全+前后端消息同步
|
1月前
|
SQL druid Java
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)
47 3
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)
|
1月前
|
SQL Java 关系型数据库
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(上)
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)
57 3
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(上)
|
23天前
|
关系型数据库 MySQL Linux
mysql 主从同步 实现增量备份
【8月更文挑战第28天】mysql 主从同步 实现增量备份
34 3
|
1月前
|
SQL 缓存 关系型数据库
MySQL主从同步如何操作?
随着业务增长,单台MySQL服务器难以应对高并发访问和潜在的故障风险。主从同步(Master-Slave)通过读写分离提升数据库处理能力,具备多项优势:读写分离减轻主数据库压力、支持一主多从增强扩展性与高可用性、以及数据备份确保容灾恢复。MySQL利用binlog实现主从数据同步,记录所有写操作,不包含查询。binlog有三种格式:Statement(基于SQL语句)、Row(基于行更改)、Mixed(结合前两者优点)。主从复制涉及三个关键线程:主库的binlog dump thread和从库的I/O thread与SQL thread。
MySQL主从同步如何操作?
|
1月前
|
存储 关系型数据库 MySQL
MySQL主从同步如何保证数据一致性?
MySQL主从同步如何保证数据一致性?
57 0
MySQL主从同步如何保证数据一致性?
|
1月前
|
前端开发 关系型数据库 MySQL
com.mysql.jdbc.Driver 和 com.mysql.cj.jdbc.Driver 的区别
这篇文章讨论了`com.mysql.jdbc.Driver`和`com.mysql.cj.jdbc.Driver`两个MySQL驱动类的区别,指出`com.mysql.jdbc.Driver`适用于MySQL 5的`mysql-connector-java`版本,而`com.mysql.cj.jdbc.Driver`适用于MySQL 6及以上版本的`mysql-connector-java`。文章还提到了在实际使用中如何根据MySQL版本选择合适的驱动类。
com.mysql.jdbc.Driver 和 com.mysql.cj.jdbc.Driver 的区别
|
22天前
|
SQL 存储 关系型数据库
实时计算 Flink版产品使用问题之同步MySQL多张表的过程中,内存释放依赖于什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。