BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: 一 安装BottledWater-PG的安装前文已经表述,本文不赘述直接进入集成应用阶段。二 启动KafKa#启动zookeeper[root@bogon kafka_2.

一 安装

BottledWater-PG的安装前文已经表述,本文不赘述直接进入集成应用阶段。

二 启动KafKa

#启动zookeeper
[root@bogon kafka_2.11-0.10.2.0]# bin/zookeeper-server-start.sh config/zookeeper.properties
#启动kafka服务端
[root@bogon kafka_2.11-0.10.2.0]# bin/kafka-server-start.sh config/server.properties

三 配置PostgreSQL

3.1 配置读取权限

Bottled Water会连接到postgresql获取相关数据,连接的账户需要有replication权限,pg中数据库的变化存储在WAL中,至少需要replication权限才能读取WAL。
编辑$PGDATA目录中postgresql.conf和pg_hba.conf文件。

vi $PGDATA/postgresql.conf
#编辑内容如下:
listen_addresses = '*'
port = 5432 
wal_level = logical         
max_wal_senders = 8
wal_keep_segments = 4
max_replication_slots = 4
vi $PGDATA/pg_hba.conf
#编辑内容如下:

# IPv4 local connections:
host    all             all             0.0.0.0/0               md5
# replication privilege.
local   replication     freerep                                trust
host    replication     freerep        127.0.0.1/32            trust
host    replication     freerep        ::1/128                 trust

编辑完保存,重启数据库服务:

pg_ctl restart
psql
postgres=# CREATE ROLE freerep  WITH REPLICATION PASSWORD 'password' LOGIN;
CREATE ROLE

配置完毕!

3.2 Bottled Water使用演示

3.2.1 创建测试库表

创建一个测试数据库,建立测试表

postgres=# create database mcsas;
postgres=# \c mcsas;
mcsas=# create extension bottledwater;
mcsas=# create extension postgis;
#赋予public下的表给freerep角色,要创建如下语句,否则建立的表freerep没有读取权限
mcsas=# alter default privileges in schema public grant all on tables to freerep;
mcsas=# create table gps(gid serial primary key,name text,geom text);
mcsas=# create index gps_geom_idx on gps using gist(ST_GeomFromText(geom,4326));

在另一个终端启动bottledwater可执行程序:

source /home/postgres/.bashrc
cd /opt/bottledwater-pg-master/kafka
[root@localhost kafka]# ./bottledwater -d postgres://freerep:password@127.0.0.1/mcsas -b 192.168.43.27:9092 -f json

启动结果如下:

[root@bogon kafka]# ./bottledwater -d postgres://freerep:password@127.0.0.1/mcsas -b 192.168.43.27:9092 -f json
[INFO] Writing messages to Kafka in JSON format
[INFO] Created replication slot "bottledwater", capturing consistent snapshot "0000DA72-1".
INFO:  bottledwater_export: Table public.spatial_ref_sys is keyed by index spatial_ref_sys_pkey
INFO:  bottledwater_export: Table public.mark is keyed by index mark_pkey
[INFO] Registering metadata for table spatial_ref_sys (relid 24263)
[INFO] Opening Kafka topic "spatial_ref_sys" for table "spatial_ref_sys"
[INFO] Storing key schema for table 24263
[INFO] Storing row schema for table 24263
[INFO] Snapshot complete, streaming changes from 0/AB016F30.

代表启动成功了。

3.2.2 监听数据改变消息

插入数据

mcsas=# insert into gps(name,geom) values ('china','Point(118 32)');
INSERT 0 1
mcsas=# insert into gps(name,geom) values ('england','Point(118 12)');
INSERT 0 1

启动监听topic

bin/kafka-console-consumer.sh --bootstrap-server 192.168.43.27:9092 --topic gps --from-beginning
{"gid": {"int": 1}, "name": {"string": "china"}, "geom": {"string": "Point(118 32)"}}
{"gid": {"int": 2}, "name": {"string": "england"}, "geom": {"string": "Point(118 12)"}}

每当插入或者更新,收听的消息会源源不断的输出出来,这样,pg与kafka集成就完毕了。

相关文章
|
27天前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
1月前
|
Java API 数据中心
百炼平台Java 集成API上传文档到数据中心并添加索引
本文主要演示阿里云百炼产品,如何通过API实现数据中心文档的上传和索引的添加。
|
1月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
55 8
|
2月前
|
前端开发 JavaScript API
【Django+Vue3 线上教育平台项目实战】构建课程详情页与集成视频播放功能
随着数字化教育的兴起,构建一个高效、用户友好的线上教育平台至关重要。本文将探讨如何使用Django与Vue.js 3结合,实现一个包含课程列表和课程详情页(含视频播放功能)的线上教育平台部分。本文主要介绍了如何设计数据库模型、处理数据查询、构建动态前端界面,并集成视频播放功能,为用户带来流畅的学习体验。
【Django+Vue3 线上教育平台项目实战】构建课程详情页与集成视频播放功能
|
21天前
|
图形学 iOS开发 Android开发
从Unity开发到移动平台制胜攻略:全面解析iOS与Android应用发布流程,助你轻松掌握跨平台发布技巧,打造爆款手游不是梦——性能优化、广告集成与内购设置全包含
【8月更文挑战第31天】本书详细介绍了如何在Unity中设置项目以适应移动设备,涵盖性能优化、集成广告及内购功能等关键步骤。通过具体示例和代码片段,指导读者完成iOS和Android应用的打包与发布,确保应用顺利上线并获得成功。无论是性能调整还是平台特定的操作,本书均提供了全面的解决方案。
84 0
|
21天前
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
92 0
|
1月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
|
1月前
|
人工智能 Java API
JeecgBoot 低代码平台快速集成 Spring AI
Spring 通过 Spring AI 项目正式启用了 AI(人工智能)生成提示功能。本文将带你了解如何在 Jeecg Boot 应用中集成生成式 AI,以及 Spring AI 如何与模型互动,包含 RAG 功能。
93 3
|
23天前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。