PostgreSQL内核扩展之 - ElasticSearch同步插件

本文涉及的产品
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云数据库 Tair(兼容Redis),内存型 2GB
简介: 背景介绍 Elasticsearch 是开源搜索平台的新成员,实时数据分析的神器,发展迅猛,基于 Lucene、RESTful、分布式、面向云计算设计、实时搜索、全文搜索、稳定、高可靠、可扩展、安装+使用方便。 PostgreSQL 是起源自伯克利大学的开源数据库,历史悠久,内核扩展性极强,用户

背景介绍

Elasticsearch 是开源搜索平台的新成员,实时数据分析的神器,发展迅猛,基于 Lucene、RESTful、分布式、面向云计算设计、实时搜索、全文搜索、稳定、高可靠、可扩展、安装+使用方便。

PostgreSQL 是起源自伯克利大学的开源数据库,历史悠久,内核扩展性极强,用户横跨各个行业。
关于PostgreSQL的内核扩展指南请参考
https://yq.aliyun.com/articles/55981

传统数据库与搜索引擎ES如何同步

例如用户需要将数据库中某些数据同步到ES建立索引,传统的方法需要应用来负责数据的同步。
这种方法会增加一定的开发成本,时效也不是非常的实时。
2
1

PostgreSQL与ES结合有什么好处

PostgreSQL的扩展插件pg-es-fdw,使用PostgreSQL的foreign data wrap,允许直接在数据库中读写ES,方便用户实时的在ES建立索引。
这种方法不需要额外的程序支持,时效也能得到保障。
3

case

安装PostgreSQL 9.5

略,需要包含 --with-python

安装 ES on CentOS 7

# yum install -y java-1.7.0-openjdk

# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

# vi /etc/yum.repos.d/es.repo
[elasticsearch-2.x]
name=Elasticsearch repository for 2.x packages
baseurl=https://packages.elastic.co/elasticsearch/2.x/centos
gpgcheck=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

# yum install -y elasticsearch

# /bin/systemctl daemon-reload
# /bin/systemctl enable elasticsearch.service
# /bin/systemctl start elasticsearch.service

# python --version
Python 2.7.5

# curl -X GET 'http://localhost:9200'
{
  "name" : "Red Wolf",
  "cluster_name" : "elasticsearch",
  "version" : {
    "number" : "2.3.3",
    "build_hash" : "218bdf10790eef486ff2c41a3df5cfa32dadcfde",
    "build_timestamp" : "2016-05-17T15:40:04Z",
    "build_snapshot" : false,
    "lucene_version" : "5.5.0"
  },
  "tagline" : "You Know, for Search"
}

python client

# easy_install pip
# pip install elasticsearch

PostgreSQL 插件 multicorn

# wget http://api.pgxn.org/dist/multicorn/1.3.2/multicorn-1.3.2.zip
# unzip multicorn-1.3.2.zip
# cd multicorn-1.3.2
# export PATH=/home/digoal/pgsql9.5/bin:$PATH
# make && make install
# su - digoal
$ psql
postgres=# create extension multicorn ;
CREATE EXTENSION

PostgreSQL 插件 pg-es-fdw (foreign server基于multicorn)

# git clone https://github.com/Mikulas/pg-es-fdw /tmp/pg-es-fdw
# cd /tmp/pg-es-fdw
# export PATH=/home/digoal/pgsql9.5/bin:$PATH
# python setup.py install
# su - digoal
$ psql

使用例子

基于multicorn创建es foreign server

CREATE SERVER multicorn_es FOREIGN DATA WRAPPER multicorn
OPTIONS (
  wrapper 'dite.ElasticsearchFDW'
);

创建测试表

CREATE TABLE articles (
    id serial PRIMARY KEY,
    title text NOT NULL,
    content text NOT NULL,
    created_at timestamp
);

创建外部表

CREATE FOREIGN TABLE articles_es (
    id bigint,
    title text,
    content text
) SERVER multicorn_es OPTIONS (host '127.0.0.1', port '9200', node 'test', index 'articles');

创建触发器

对实体表,创建触发器函数,在用户对实体表插入,删除,更新时,通过触发器函数自动将数据同步到对应ES的外部表。
同步过程调用FDW的接口,对ES进行索引的建立,更新,删除。

CREATE OR REPLACE FUNCTION index_article() RETURNS trigger AS $def$
    BEGIN
        INSERT INTO articles_es (id, title, content) VALUES
            (NEW.id, NEW.title, NEW.content);
        RETURN NEW;
    END;
$def$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION reindex_article() RETURNS trigger AS $def$
    BEGIN
        UPDATE articles_es SET
            title = NEW.title,
            content = NEW.content
        WHERE id = NEW.id;
        RETURN NEW;
    END;
$def$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION delete_article() RETURNS trigger AS $def$
    BEGIN
        DELETE FROM articles_es a WHERE a.id = OLD.id;
        RETURN OLD;
    END;
$def$ LANGUAGE plpgsql;

CREATE TRIGGER es_insert_article
    AFTER INSERT ON articles
    FOR EACH ROW EXECUTE PROCEDURE index_article();

CREATE TRIGGER es_update_article
    AFTER UPDATE OF title, content ON articles
    FOR EACH ROW
    WHEN (OLD.* IS DISTINCT FROM NEW.*)
    EXECUTE PROCEDURE reindex_article();

CREATE TRIGGER es_delete_article
    BEFORE DELETE ON articles
    FOR EACH ROW EXECUTE PROCEDURE delete_article();

测试

curl 'localhost:9200/test/articles/_search?q=*:*&pretty'
psql -c 'SELECT * FROM articles'

写入实体表,自动同步到ES
psql -c "INSERT INTO articles (title, content, created_at) VALUES ('foo', 'spike', Now());"
psql -c 'SELECT * FROM articles'

查询ES,检查数据是否已同步
curl 'localhost:9200/test/articles/_search?q=*:*&pretty'

更新实体表,数据自动同步到ES
psql -c "UPDATE articles SET content='yeay it updates\!' WHERE title='foo'"

查询ES数据是否更新
curl 'localhost:9200/test/articles/_search?q=*:*&pretty'

参考

https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-repositories.html
http://www.vpsee.com/2014/05/install-and-play-with-elasticsearch/
https://github.com/Mikulas/pg-es-fdw
https://wiki.postgresql.org/wiki/Fdw
http://multicorn.org/
http://pgxn.org/dist/multicorn/
http://multicorn.readthedocs.io/en/latest/index.html

小结

  1. PostgreSQL提供的FDW接口,允许用户在数据库中直接操纵外部的数据源,所以支持ES只是一个例子,还可以支持更多的数据源。
    这是已经支持的,几乎涵盖了所有的数据源。

https://wiki.postgresql.org/wiki/Fdw

  1. multicorn在FDW接口的上层再抽象了一层,支持使用python写FDW接口,方便快速试错,如果对性能要求不是那么高,直接用multicore就可以了。
  2. 开发人员如何编写FDW? 可以参考一下如下:
    http://multicorn.readthedocs.io/en/latest/index.html

https://yq.aliyun.com/articles/55981
https://www.postgresql.org/docs/9.6/static/fdwhandler.html

附录

###
### Author: Mikulas Dite
### Time-stamp: <2015-06-09 21:54:14 dwa>

from multicorn import ForeignDataWrapper
from multicorn.utils import log_to_postgres as log2pg

from functools import partial

import httplib
import json
import logging

class ElasticsearchFDW (ForeignDataWrapper):

    def __init__(self, options, columns):
        super(ElasticsearchFDW, self).__init__(options, columns)

        self.host = options.get('host', 'localhost')
        self.port = int(options.get('port', '9200'))
        self.node = options.get('node', '')
        self.index = options.get('index', '')

        self.columns = columns

    def get_rel_size(self, quals, columns):
        """Helps the planner by returning costs.
        Returns a tuple of the form (nb_row, avg width)
        """

        conn = httplib.HTTPConnection(self.host, self.port)
        conn.request("GET", "/%s/%s/_count" % (self.node, self.index))
        resp = conn.getresponse()
        if not 200 == resp.status:
            return (0, 0)

        raw = resp.read()
        data = json.loads(raw)
        # log2pg('MARK RESPONSE: >>%d<<' % data['count'], logging.DEBUG)
        return (data['count'], len(columns) * 100)

    def execute(self, quals, columns):
        conn = httplib.HTTPConnection(self.host, self.port)
        conn.request("GET", "/%s/%s/_search&size=10000" % (self.node, self.index))
        resp = conn.getresponse()
        if not 200 == resp.status:
            yield {0, 0}

        raw = resp.read()
        data = json.loads(raw)
        for hit in data['hits']['hits']:
            row = {}
            for col in columns:
                if col == 'id':
                    row[col] = hit['_id']
                elif col in hit['_source']:
                    row[col] = hit['_source'][col]
            yield row

    @property
    def rowid_column(self):
        """Returns a column name which will act as a rowid column,
        for delete/update operations. This can be either an existing column
        name, or a made-up one.
        This column name should be subsequently present in every
        returned resultset.
        """
        return 'id';

    def es_index(self, id, values):
        content = json.dumps(values)

        conn = httplib.HTTPConnection(self.host, self.port)
        conn.request("PUT", "/%s/%s/%s" % (self.node, self.index, id), content)
        resp = conn.getresponse()
        if not 200 == resp.status:
            return

        raw = resp.read()
        data = json.loads(raw)

        return data

    def insert(self, new_values):
        log2pg('MARK Insert Request - new values:  %s' % new_values, logging.DEBUG)

        if not 'id' in new_values:
             log2pg('INSERT requires "id" column.  Missing in: %s' % new_values, logging.ERROR)

        id = new_values['id']
        new_values.pop('id', None)
        return self.es_index(id, new_values)

    def update(self, id, new_values):
        new_values.pop('id', None)
        return self.es_index(id, new_values)

    def delete(self, id):
        conn = httplib.HTTPConnection(self.host, self.port)
        conn.request("DELETE", "/%s/%s/%s" % (self.node, self.index, id))
        resp = conn.getresponse()
        if not 200 == resp.status:
            log2pg('Failed to delete: %s' % resp.read(), logging.ERROR)
            return

        raw = resp.read()
        return json.loads(raw)

## Local Variables: ***
## mode:python ***
## coding: utf-8 ***
## End: ***
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
24天前
|
缓存 监控 安全
Elasticsearch扩展和优化
【11月更文挑战第4天】
35 6
|
3月前
|
数据可视化 Java Windows
Elasticsearch入门-环境安装ES和Kibana以及ES-Head可视化插件和浏览器插件es-client
本文介绍了如何在Windows环境下安装Elasticsearch(ES)、Elasticsearch Head可视化插件和Kibana,以及如何配置ES的跨域问题,确保Kibana能够连接到ES集群,并提供了安装过程中可能遇到的问题及其解决方案。
Elasticsearch入门-环境安装ES和Kibana以及ES-Head可视化插件和浏览器插件es-client
|
22天前
|
存储 监控 安全
扩展 Elasticsearch
扩展 Elasticsearch
29 2
|
2月前
|
存储 JSON Java
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
这篇文章是关于Elasticsearch的学习指南,包括了解Elasticsearch、版本对应、安装运行Elasticsearch和Kibana、安装head插件和elasticsearch-ik分词器的步骤。
185 0
elasticsearch学习一:了解 ES,版本之间的对应。安装elasticsearch,kibana,head插件、elasticsearch-ik分词器。
|
3月前
|
存储 自然语言处理 关系型数据库
ElasticSearch基础3——聚合、补全、集群。黑马旅游检索高亮+自定义分词器+自动补全+前后端消息同步
聚合、补全、RabbitMQ消息同步、集群、脑裂问题、集群分布式存储、黑马旅游实现过滤和搜索补全功能
ElasticSearch基础3——聚合、补全、集群。黑马旅游检索高亮+自定义分词器+自动补全+前后端消息同步
|
4月前
|
监控 物联网 关系型数据库
使用PostgreSQL触发器解决物联网设备状态同步问题
在物联网监控系统中,确保设备状态(如在线与离线)的实时性和准确性至关重要。当设备状态因外部因素改变时,需迅速反映到系统内部。因设备状态数据分布在不同表中,直接通过应用同步可能引入复杂性和错误。采用PostgreSQL触发器自动同步状态变化是一种高效方法。首先定义触发函数,在设备状态改变时更新管理模块表;然后创建触发器,在状态字段更新后执行此函数。此外,还需进行充分测试、监控性能并实施优化,以及在触发函数中加入错误处理和日志记录功能。这种方法不仅提高自动化程度,增强数据一致性与实时性,还需注意其对性能的影响并采取优化措施。
|
6月前
|
JSON DataWorks 关系型数据库
DataWorks操作报错合集之同步Elasticsearch数据报错:Cat response did not contain a JSON Array,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
Java DataX 开发工具
datax-elasticsearch 同步踩坑记录
为了用datax同步es数据到其他地方,踩了不少坑.记录一下.
213 0
|
7月前
|
Kubernetes 关系型数据库 MySQL
实时计算 Flink版产品使用合集之在Kubernetes(k8s)中同步MySQL变更到Elasticsearch该怎么操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
SQL 监控 API
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗