基于BottledWater-PG+nodejs实时地图应用实践

简介: 前文作者讲述了BottledWater-PG安装部署,并在pg中实现了数据改变,向kafka发送消息的案例,详细参考《BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台》。

前文作者讲述了BottledWater-PG安装部署,并在pg中实现了数据改变,向kafka发送消息的案例,详细参考《BottledWater-PG:PostgreSQL集成Kafka的实时数据交换平台》。此前作者写过一篇pg的异步消息实现的实时地图应用案例《postgres+socket.io+nodejs实时地图应用实践》,本文将改用BottledWater-PG实现一遍。

一 服务器端

var fs = require('fs');
var http = require('http');
var socket = require('socket.io');
var Kafka = require('node-rdkafka');
    
var server = http.createServer(function(req, res) {
    
    res.writeHead(200, { 'Content-type': 'text/html'});
    res.end(fs.readFileSync(__dirname + '/index.html'));
    }).listen(8081, function() {
        console.log('Listening at: http://localhost:8081');
});
//注册socket.io
var socketio=socket.listen(server);
socketio.on('connection', function (socketclient) {
    console.log('已连接socket:');
    //socketclient.broadcast.emit('GPSCoor', data.payload);//广播给别人
    //socketclient.emit('GPSCoor', data.payload);//广播给自己
    
});

var consumer = new Kafka.KafkaConsumer({
  //'debug': 'all',
  'metadata.broker.list': '192.168.43.27:9092',
  'group.id': 'node-rdkafka-consumer-flow-example',
  'enable.auto.commit': false
});

var topicName = 'gps';

//logging debug messages, if debug is enabled 
consumer.on('event.log', function(log) {
  console.log(log);
});

//打印错误
consumer.on('error', function(err) {
  console.error('Error from consumer');
  console.error(err);
});

consumer.on('ready', function(arg) {
  console.log('consumer ready.' + JSON.stringify(arg));
  consumer.subscribe([topicName]);
  //准备消费消息
  consumer.consume();
});

consumer.on('data', function(m) {
    console.log(m);
    let _data;
    if(m.value==null)//delete操作发送来的消息
    {
        _data=JSON.parse(m.key);
        _data.tg_op='delete';
    }
    else{
        _data=m.value.toString();
        _data=JSON.parse(_data);
    }   
    console.log(_data);
    socketio.emit('GPSCoor', _data);//广播给所有的客户端
});

consumer.on('disconnected', function(arg) {
  console.log('consumer disconnected. ' + JSON.stringify(arg));
});

//启动
consumer.connect();

二 客户端

<html>
<head>
    <meta charset='utf-8'>
    <title>实时地图应用</title>
    <link rel="stylesheet" href="http://openlayers.org/en/v3.18.2/css/ol.css" type="text/css">
    <script src="http://openlayers.org/en/v3.18.2/build/ol.js"></script>
    <script src="/socket.io/socket.io.js"></script>
    <script>
            var wktform=new ol.format.WKT();//wkt解析
            var gpsSource=new ol.source.Vector();
            function init(){
                var gpsLayer=new ol.layer.Vector({
                    source:gpsSource,
                    style:new ol.style.Style({
                        image: new ol.style.Icon(({
                            anchor: [0.5, 1],
                            src: 'http://openlayers.org/en/v3.18.2/examples/data/icon.png'
                        }))
                    })
                });
                var map = new ol.Map({
                    layers : [
                        new ol.layer.Tile({
                            title : '街道图',
                            visible : true,
                            source : new ol.source.XYZ({
                                url : 'http://www.google.cn/maps/vt?pb=!1m5!1m4!1i{z}!2i{x}!3i{y}!4i256!2m3!1e0!2sm!3i342009817!3m9!2szh-CN!3sCN!5e18!12m1!1e47!12m3!1e37!2m1!1ssmartmaps!4e0&token=32965'
                            })
                        }),
                        gpsLayer
                    ],
                    target : 'map',
                    controls : ol.control.defaults({
                        attributionOptions : 
                        ({
                            collapsible : false
                        })
                    }),
                    view : new ol.View({
                        center : [0, 0],
                        zoom : 2
                    })
                });
                var iosocket = io.connect();
                //接受服务端消息
                iosocket.on('GPSCoor', function(data) {
                    console.log(data);
                    var id=data.id.int;
                    var feature;
                    if(data.tg_op=='delete'){
                        feature=gpsSource.getFeatureById(id);
                        if(feature)
                            gpsSource.removeFeature(feature);//删除点
                    }
                    else{
                        var geom=data.geom.string;
                        geom=wktform.readGeometry(geom);
                        geom.transform('EPSG:4326','EPSG:3857');
                        feature=gpsSource.getFeatureById(id);
                        if(feature)
                            feature.setGeometry(geom);//修改已有点
                        else{
                            feature=new ol.Feature({
                                geometry:geom
                            });
                            feature.setId(id);
                            gpsSource.addFeature(feature);//地图新增点
                        }
                    }
                });
            }
            
           
    </script>
</head>
<body onload="init()">
    <div id="map"></div>
</body>
</html>

三 测试成果

3.1 新增

mcsas=# insert into gps(name,geom) values ('opy','Point(118 31.5)');
INSERT 0 1
mcsas=# insert into gps(name,geom) values ('ty','Point(117 30.5)');
INSERT 0 1
img_fa5e408b18ab9db86a5d879c20f7dcc9.png
新增成果.png

3.2 修改

mcsas=# update gps set geom='Point(115 40)' where name='opy';
UPDATE 1
img_ab074c2cf274e1356c1df543d1e3acd1.png
修改成果.png

3.3 删除

mcsas=# delete from gps where name='opy';
DELETE 1

img_394f23ea6c9875f096f2a9012467a6bb.png
删除结果.png

四 总结

BottledWater-PG主要作用是将pg库中的表的增删改的消息都发往了kafka,应用程序并没有直接连接数据库,而是直接去消费kafka的消息。在表发生insert,update,delete能获取消息,但是truncate table并未向kafka生成消息,不知是否是我哪里遗漏。
  作者之前曾使用pg自带的notify与listen实现异步消息发送,该方法借助了表的触发器实现。应用程序是直连数据库且数据增删改都会走触发器。
  匆忙中,作者并未对比两者之间孰优孰劣,但一个直连库,一个间接消费,在不同需求中可选择一个比较符合要求的方案而加以应用。

相关文章
|
17天前
|
JavaScript API 数据库
深入理解Node.js事件循环及其在后端开发中的应用
【9月更文挑战第3天】本文将深入浅出地介绍Node.js的事件循环机制,探讨其非阻塞I/O模型和如何在后端开发中利用这一特性来处理高并发请求。通过实际的代码示例,我们将看到如何有效地使用异步操作来优化应用性能。文章旨在为读者揭示Node.js在后端开发中的核心优势和应用场景,帮助开发者更好地理解和运用事件循环来构建高性能的后端服务。
|
21天前
|
JavaScript 前端开发 API
深入浅出:使用Node.js搭建RESTful API的实践之旅
【8月更文挑战第31天】本文将带你踏上一次Node.js的探险之旅,通过实际动手构建一个RESTful API,我们将探索Node.js的强大功能和灵活性。无论你是初学者还是有一定经验的开发者,这篇文章都将为你提供宝贵的实践经验和深刻的技术洞见。
|
26天前
|
机器学习/深度学习 人工智能 自然语言处理
深度学习中的图像识别技术深入理解Node.js事件循环及其在后端开发中的应用
【8月更文挑战第27天】本文将介绍深度学习中的图像识别技术,包括其原理、应用领域及未来发展。我们将探讨如何通过神经网络实现图像识别,并分析其在医疗、交通等领域的应用。最后,我们将展望图像识别技术的发展前景。
|
25天前
|
运维 Cloud Native JavaScript
云端新纪元:云原生技术深度解析深入理解Node.js事件循环及其在异步编程中的应用
【8月更文挑战第27天】随着云计算技术的飞速发展,云原生已成为推动现代软件开发和运维的关键力量。本文将深入探讨云原生的基本概念、核心价值及其在实际业务中的应用,帮助读者理解云原生如何重塑IT架构,提升企业的创新能力和市场竞争力。通过具体案例分析,我们将揭示云原生技术背后的哲学思想,以及它如何影响企业决策和操作模式。
|
1月前
|
存储 缓存 JavaScript
深入Node.js身份验证:策略与实践
【8月更文挑战第20天】
44 4
|
30天前
|
JavaScript 安全 前端开发
Node.js身份验证全攻略:策略与实践,打造坚不可摧的Web应用安全防线!
【8月更文挑战第22天】Node.js作为强大的服务器端JavaScript平台,对于构建高效网络应用至关重要。本文探讨其身份验证策略,涵盖从基于token至复杂的OAuth 2.0及JWT。Passport.js作为认证中间件,支持本地账号验证及第三方服务如Google、Facebook登录。同时介绍JWT轻量级验证机制,确保数据安全传输。开发者可根据应用需求选择合适方案,注重安全性以保护用户数据。
31 1
|
1月前
|
存储 缓存 JavaScript
构建高效后端服务:Node.js与Express框架的实战应用
【8月更文挑战第2天】在数字化时代的浪潮中,后端服务的构建成为了软件开发的核心。本文将深入探讨如何利用Node.js和Express框架搭建一个高效、可扩展的后端服务。我们将通过实际代码示例,展示从零开始创建一个RESTful API的全过程,包括路由设置、中间件使用以及数据库连接等关键步骤。此外,文章还将触及性能优化和安全性考量,旨在为读者提供一套完整的后端开发解决方案。让我们一同走进Node.js和Express的世界,探索它们如何助力现代Web应用的开发。
|
21天前
|
JavaScript 开发者
深入理解Node.js事件循环及其在后端开发中的应用
【8月更文挑战第31天】 本文将带你走进Node.js的事件循环机制,通过浅显易懂的语言和实例代码,揭示其背后的工作原理。我们将一起探索如何高效利用事件循环进行异步编程,提升后端应用的性能和响应速度。无论你是Node.js新手还是有一定经验的开发者,这篇文章都能给你带来新的启发和思考。
|
1月前
|
数据采集 资源调度 JavaScript
Node.js 适合做高并发、I/O密集型项目、轻量级实时应用、前端构建工具、命令行工具以及网络爬虫和数据处理等项目
【8月更文挑战第4天】Node.js 适合做高并发、I/O密集型项目、轻量级实时应用、前端构建工具、命令行工具以及网络爬虫和数据处理等项目
37 5
|
24天前
|
JavaScript 前端开发 API
深入理解Node.js事件循环及其在后端开发中的应用
【8月更文挑战第29天】本文将深入浅出地介绍Node.js事件循环机制,并结合代码示例探讨其如何影响后端开发实践。我们将从事件循环的基本概念出发,逐步解析其工作原理和性能优化策略,旨在帮助开发者更好地理解和运用Node.js进行高效的后端开发。