PostgreSQL 物联网黑科技 - 阅后即焚

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
简介: 在物联网应用场景中,有大量的传感器,会产生非常大量的消息以极高的并发进入数据库。这些数据如果直接进入面向OLAP场景设计的数据仓库,数据实时入库会成为瓶颈,并且OLAP系统很难接受非常高并发的请求。面对这样的应用场景,这些既要又要还要怎么满足呢?.1. 既要实时入库,.2. 又要实时分析,.3. .

在物联网应用场景中,有大量的传感器,会产生非常大量的消息以极高的并发进入数据库。
这些数据如果直接进入面向OLAP场景设计的数据仓库,数据实时入库会成为瓶颈,并且OLAP系统很难接受非常高并发的请求。
面对这样的应用场景,这些既要又要还要怎么满足呢?
.1. 既要实时入库,
.2. 又要实时分析,
.3. 还要历史留档,应对随时变化的分析需求。

实时入库比较容易满足,我前些天写过一篇 "PostgreSQL 如何潇洒的处理每天上百TB的数据增量"
https://yq.aliyun.com/articles/8528

实时分析也比较好满足,我前些天写过一篇 "PostgreSQL "物联网"应用 - 1 实时流式数据处理案例(万亿每天)"
https://yq.aliyun.com/articles/166

历史留档,应对随时变化的分析需求。这一点的需求其实也非常简单,其实就是在满足了前面两点后,把数据LOAD到OLAP系统。
但是不要小看这个非常简单的操作,做到实时性,一致性是非常关键的。
一般的做法存在的gap问题(一致性问题)
GAP问题可解,例如通过快照或者单线程来解,太low了。
以前写过关于解GAP问题的一系列文章:
.1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/
.2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/
.3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/
.4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/
.5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/
.6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/
.7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/
.8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/
GAP问题出现的原因,用一张图来表示:
_
简单来说,就是读取数据的事务快照把一些未提交,但是序列或时间靠前的记录屏蔽了。下次再读取时就会产生GAP,实时性越高,产生GAP的概率越高。有GAP,OLTP和OLAP系统的数据就会不一致。
传统的解决这个问题的办法:
.1. 延迟同步,例如同步一个小时前的数据,来减少GAP。
.2. 串行插入,数据串行插入,不存在GAP。
.3. 在记录中添加一个XID字段,记录数据插入的事务号;读取数据时通过事务快照,记录未提交的事务XID;下次再次读取数据时,根据快照中表示未结束事务的XID,以及行上的XID找到这些GAP记录。
不用多说,前面几种方法,都有一定的弊端。
要解决实时性问题,又要高逼格。
PostgreSQL的阅后即焚完美的解决了以上问题,可以完美的实现并发性,一致性,实时性。
_
并发指并发的插入和并发的读取;
一致性指数据进去N条,出去一定是N条;
实时性,指数据可以实时|流式的取走,不需要设间隔;

阅后即焚的语法很简单,例子:

postgres=# create table tbl(id serial, crt_time timestamp, info jsonb default '
{
  "k1": "v1", 
  "k2": "v2", 
  "k3": "v3", 
  "k4": {
         "subk1": "subv1", 
         "subk2": "subv2", 
         "subk3": {
                   "ssubk1": "ssubv1"
                }
      }
}
');
postgres=# insert into tbl (crt_time) select clock_timestamp() from generate_series(1,1000);
INSERT 0 1000
postgres=# select * from tbl limit 1;
 id |          crt_time          |                                                      info                                                       
----+----------------------------+-----------------------------------------------------------------------------------------------------------------
  1 | 2016-04-13 15:02:06.603235 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
(1 row)

postgres=# select * from tbl limit 5;
 id |          crt_time          |                                                      info                                                       
----+----------------------------+-----------------------------------------------------------------------------------------------------------------
  1 | 2016-04-13 15:02:06.603235 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
  2 | 2016-04-13 15:02:06.60337  | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
  3 | 2016-04-13 15:02:06.603375 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
  4 | 2016-04-13 15:02:06.603378 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
  5 | 2016-04-13 15:02:06.603379 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
(5 rows)

阅后即焚:

postgres=# delete from tbl where id<=5 returning *;
 id |          crt_time          |                                                      info                                                       
----+----------------------------+-----------------------------------------------------------------------------------------------------------------
  1 | 2016-04-13 15:02:06.603235 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
  2 | 2016-04-13 15:02:06.60337  | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
  3 | 2016-04-13 15:02:06.603375 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
  4 | 2016-04-13 15:02:06.603378 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
  5 | 2016-04-13 15:02:06.603379 | {"k1": "v1", "k2": "v2", "k3": "v3", "k4": {"subk1": "subv1", "subk2": "subv2", "subk3": {"ssubk1": "ssubv1"}}}
(5 rows)
DELETE 5

postgres=# select count(*) from tbl where id<=5;
 count 
-------
     0
(1 row)

下面进行并发测试,
验证一致性,实时性,并发性:

postgres=# create table tbl(id serial, crt_time timestamp, info jsonb default '
{
  "k1": "v1", 
  "k2": "v2", 
  "k3": "v3", 
  "k4": {
         "subk1": "subv1", 
         "subk2": "subv2", 
         "subk3": {
                   "ssubk1": "ssubv1"
                }
      }
}
');

create index idx_tbl_1 on tbl(crt_time);

create table tbl1(like tbl including all);

create or replace function r_d(lmt int) returns setof tbl as 
$$

declare
  curs1 cursor for select * from tbl order by crt_time limit lmt for update SKIP LOCKED;
begin
  for res in curs1 loop
      delete from tbl where current of curs1;
      return next res;
  end loop;
  return;
end;

$$
 language plpgsql;

并发插入2小时:

vi ins.sql
insert into tbl (crt_time) select clock_timestamp() from generate_series(1,5000);

pgbench -M prepared -n -r -P 5 -f ./ins.sql -c 64 -j 64 -T 7200 &

并发阅后即焚2小时:

vi r_d.sql
insert into tbl1 select * from r_d(100000);

pgbench -M prepared -n -r -P 5 -f ./r_d.sql -c 64 -j 64 -T 7200 &

验证插入和阅后即焚的记录数一致。

性能指标(64张表并发测试写入和阅后即焚的性能指标):
插入: 230万行/s
阅后即焚: 384万行/s

这种技术在其他应用场景的使用:
.1. 延迟确认,在短信确认的应用中非常常见,如订阅一个运营商的业务,一般会收到二次确认的短信。
服务端会向数据库插入一条记录,然后等待用户反馈,反馈后更新之前插入的那条记录的状态。

insert into tbl values () returning id;
commit;

     then wait user's response

update tbl set ... where id=xxx;
commimt;

.2. 相关的用法(oracle也支持这种用法) :

insert into tbl values () returning *;
delete from tbl where ... returning *;
update tbl set xxx=xxx where xxx returning *;

skip locked;  -- oracle 11G以后也支持这种用法

扩展阅读:
"PostgreSQL 如何潇洒的处理每天上百TB的数据增量"
https://yq.aliyun.com/articles/8528
"PostgreSQL "物联网"应用 - 1 实时流式数据处理案例(万亿每天)"
https://yq.aliyun.com/articles/166

PostgreSQL的其他特性也非常的适合物联网:
JSON支持, GIS支持, 窗口查询, 树形查询, 轻数据分析, 范围类型, 范围索引 等等。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
相关文章
|
关系型数据库 物联网 PostgreSQL
沉浸式学习PostgreSQL|PolarDB 11: 物联网(IoT)、监控系统、应用日志、用户行为记录等场景 - 时序数据高吞吐存取分析
物联网场景, 通常有大量的传感器(例如水质监控、气象监测、新能源汽车上的大量传感器)不断探测最新数据并上报到数据库. 监控系统, 通常也会有采集程序不断的读取被监控指标(例如CPU、网络数据包转发、磁盘的IOPS和BW占用情况、内存的使用率等等), 同时将监控数据上报到数据库. 应用日志、用户行为日志, 也就有同样的特征, 不断产生并上报到数据库. 以上数据具有时序特征, 对数据库的关键能力要求如下: 数据高速写入 高速按时间区间读取和分析, 目的是发现异常, 分析规律. 尽量节省存储空间
802 1
|
关系型数据库 数据库 索引
AnalyticDB for PostgreSQL 黑科技解析 - 列存储 Meta Scan 性能加速
本文介绍阿里云 AnalyticDB for PostgreSQL(原HybridDB for PostgreSQL) 产品,即 MPP 数据仓库服务,其列存储 meta scan机制,及其对 分析场景的性能提升。
2649 0
|
5月前
|
监控 物联网 关系型数据库
使用PostgreSQL触发器解决物联网设备状态同步问题
在物联网监控系统中,确保设备状态(如在线与离线)的实时性和准确性至关重要。当设备状态因外部因素改变时,需迅速反映到系统内部。因设备状态数据分布在不同表中,直接通过应用同步可能引入复杂性和错误。采用PostgreSQL触发器自动同步状态变化是一种高效方法。首先定义触发函数,在设备状态改变时更新管理模块表;然后创建触发器,在状态字段更新后执行此函数。此外,还需进行充分测试、监控性能并实施优化,以及在触发函数中加入错误处理和日志记录功能。这种方法不仅提高自动化程度,增强数据一致性与实时性,还需注意其对性能的影响并采取优化措施。
|
8月前
|
人工智能 监控 物联网
基于物联网、大数据、云计算、人工智能等技术的智慧工地源码(Java+Spring Cloud +UniApp +MySql)
基于物联网、大数据、云计算、人工智能等技术的智慧工地源码(Java+Spring Cloud +UniApp +MySql)
820 1
|
关系型数据库 PostgreSQL 索引
AnalyticDB for PostgreSQL 黑科技解析 - 列存储 Meta Scan 性能加速
原文作者:张晓博,2011年毕业于武汉理工大学,曾任职于人大金仓、北大方正信产研究院、百分点等公司,从事数据库内核开发工作,现在任职于阿里云数据库事业部,云化数据仓库服务 AnalyticDB for PostgreSQL 数据库内核开发技术专家。
4256 0
|
关系型数据库 物联网 数据库
德哥PG系列课程直播(第12讲):PostgreSQL 物联网最佳实践
知识点 知识点:时序数据特性,SCHEMAless设计思路,递归调用,规则,流式计算,滑窗聚合 学习资料 1、时序数据合并场景加速分析和实现 - 复合索引,窗口分组查询加速,变态递归加速背景: 在很多场景中,都会有数据合并的需求。
9306 0
|
大数据 物联网
黑科技揭秘:网红“天空物联网飞艇”服务范围为何能突破30公里
今年2018杭州云栖大会的网红飞艇,搭载LoRaWAN网关,与地面网关一起组成上天入地的天空物联网服务,最主要的核心是Link WAN物联网络管理平台,提供了网关管理与设备快数接入云端,使得飞艇挂载网关十分容易。
2489 0
|
Oracle 关系型数据库 物联网
《物联网框架ServerSuperIO教程》-19.设备驱动和OPC Client支持mysql、oracle、sqlite、sqlserver的持久化。v3.6.4版本发布
19.设备驱动和OPC Client支持mysql、oracle、sqlite、sqlserver的持久化 19.1     概述      ServerSuperIO支持设备驱动和OPC Client采集的数据信息按标签集合写入mysql、oracle、sqlserver和sqlite数据库。
1134 0

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版