怎样做多数据源的混合计算

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: 早期应用通常只会连接一个数据库,计算也都由数据库完成,基本不存在多数据源混合计算的问题。而现代应用的数据源变得很丰富,同一个应用也可能访问多种数据源,各种 SQL 和 NoSQL 数据库、文本 /XLS、WebService/Restful、Kafka、Hadoop、…。多数据源上的混合计算就是个摆在桌面需要解决的问题了。直接在应用中硬编码实现是很繁琐的,Java 这些常用的应用开发语言很不擅长做这类事,和 SQL 比,简洁性差得很远。

早期应用通常只会连接一个数据库,计算也都由数据库完成,基本不存在多数据源混合计算的问题。而现代应用的数据源变得很丰富,同一个应用也可能访问多种数据源,各种 SQL 和 NoSQL 数据库、文本 /XLS、WebService/Restful、Kafka、Hadoop、…。多数据源上的混合计算就是个摆在桌面需要解决的问题了。

直接在应用中硬编码实现是很繁琐的,Java 这些常用的应用开发语言很不擅长做这类事,和 SQL 比,简洁性差得很远。
把多源数据导入一个数据库再计算也不合适,且不说导入需要时间导致丧失数据实时性,有些数据想要无损地导入关系数据库(比如 Mongodb 支持和多层文档数据)是件非常困难且成本高昂的任务。毕竟这些五花八门的数据源之所以存在都是有其道理的,不可能轻松被关系数据库取代。否则人没必要发明 Mongodb,直接用 MySQL 算了。
逻辑数仓呢?听起来就很沉重。使用前先需要定义元数据来映射这些多样性的数据源,很繁琐。而且大部分逻辑数仓还是 SQL 型的,依然很难无损地映射这些多样性的数据。
那一堆计算框架呢?特别是流计算框架。倒是能接入不少数据源,但计算本身的功能却提供得很少。要么用 SQL,就会有逻辑数仓一样的映射困难问题;要想自由接入各种数据源,就自己用 Java 写计算代码。

面对多数据源上的混合计算问题,esProc SPL 才是个好方法。
esProc SPL 是纯 Java 开发的开源计算引擎,在这里 https://github.com/SPLWare/esProc

esProc SPL 怎么解决这个问题呢?主要是两个方面。

  1. 抽象多样性数据源的访问接口,可以将五花八门的数据映射成少数几个数据对象。
  2. 基于 1 中的数据对象,自行实现足够丰富且不依赖于数据源的计算能力。
    有了这两方面能力,碰到新的数据源只要封装一下接口,不断补充就可以了。

esProc SPL 提供了两个基本的数据对象:序表和游标,分别对应内存数据表和流式数据表。
包括关系数据库在内,几乎所有的数据源都会提供返回这两种数据对象的接口:小数据一次性读出,使用内存数据表(序表);大数据要逐步返回,使用流式数据表(游标)。有了这两种数据对象,就可以覆盖几乎所有的数据源了。
这样,不需要事先定义元数据做映射,直接使用数据源本身提供的方法来访问数据,然后封装成这两种数据对象之一即可。这样可以保留数据源的特点,充分利用其存储和计算能力。当然更不需要先把数据做“某种”入库动作,实时访问就可以。这两种数据对象就是多样性数据源访问接口共有的能力,而逻辑数仓采用的映射数据表方法并没有正确抽象出多样性数据源的公共特征,适用面要窄很多。
需要特别指出的是,SPL 的序表和游标都支持多层结构化数据以及文本数据,这就能接收和处理 json 数据(或其二进制变种)。

看一些例子:
关系数据库,A2 返回序表,A3 返回游标

A

1 =connect("MyCompany")
2 =A1.query("select from employees order by hire_date asc limit 100")
3 =A1.cursor("select
from salaries")
4 >A1.close()
本地文件,A1/A3 返回序表,A2 返回游标

A

1 =T("Orders.csv")
2 =file("sales.txt").cursor@t()
3 =file("Orders.xls").xlsimport@t()
Restful,A1 返回 json 格式的文本

A

1 =httpfile("http://127.0.0.1:6868/restful/emp_orders").read()
2 =json(A1)
Elastic Search

A

1 >apikey="Authorization:ApiKey a2x6aEF……KZ29rT2hoQQ=="
2
'{

"counter" : 1,

"tags" : ["red"]

,"beginTime":"2022-01-03"

,"endTime":"2022-02-15"

}

3 =es_rest("https://localhost:9200/index1/_doc/1", "PUT",A2;"Content-Type: application/x-ndjson",apikey)
Mongodb,A2 返回序表,A3 返回游标

A

1 =mongo_open("mongodb://127.0.0.1:27017/mymongo")
2 =mongo_shell(A1,"{'find':'orders',filter:{OrderID: {$gte: 50}},batchSize:100}")
3 =mongo_shell@dc(A1,"{'find':'orders',filter:{OrderID: { $gte: 50}},batchSize:10}")
4 =mongo_close(A1)
Kafka,A2 返回含有 json 数据的序表,A3 返回游标

A

1 =kafka_open("/kafka/my.properties", "topic1")
2 =kafka_poll(A1)
3 =kafka_poll@c(A1)
4 =kafka_close(A1)
HBase,A2/A3 返回序表,A4 返回游标

A

1 =hbase_open("hdfs://192.168.0.8", "192.168.0.8")
2 =hbase_get(A1,"Orders","row1","datas:Amount":number:amt,"datas:OrderDate": date:od)
3 =hbase_scan(A1,"Orders")
4 =hbase_scan@c(A1,"Orders")
5 =hbase_close(A1)
esProc SPL 封装过的数据源已经很多,还在不断增加中:

esProc SPL 针对序表提供了完善的计算能力,包括过滤、分组、排序、连接等,丰富程度还远远超过 SQL,大部分运算只要一句就可以完成:

Filter:T.select(Amount>1000 && Amount<=3000 && like(Client,"s"))
Sort:T.sort(Client,-Amount)
Distinct:T.id(Client)
Group:T.groups(year(OrderDate);sum(Amount))
Join:join(T1:O,SellerId; T2:E,EId)
TopN:T.top(-3;Amount)
TopN in group:T.groups(Client;top(3,Amount))
游标上也有类似的计算,语法几乎完全一样,这里就不详细举例了。感兴趣的小伙伴可以参考 esProc SPL 官网上的材料。

在这些基础上,混合计算就非常容易实现了:

两个关系数据库

A

1 =oracle.query("select EId,Name from employees")
2 =mysql.query("select SellerId, sum(Amount) subtotal from Orders group by SellerId")
3 =join(A1:O,SellerId; A2:E,EId)
4 =A3.new(O.Name,E.subtotal)
关系数据库与 json

A

1 =json(file("/data/EO.json").read())
2 =A1.conj(Orders)
3 =A2.select(Amount>1000 &&Amount<=3000 && like@c(Client,"s"))
4 =db.query@x("select ID,Name,Area from Client")
5 =join(A3:o,Client;A4:c,ID)
Mongodb 和关系数据库

A

1 =mongo_open("mongodb://127.0.0.1:27017/mongo")
2 =mongo_shell(A1,"test1.find()")
3 =A2.new(Orders.OrderID,Orders.Client,Name,Gender,Dept).fetch()
4 =mongo_close(A1)
5 =db.query@x("select ID,Name,Area from Client")
6 =join(A3:o, Orders.Client;A4:c,ID)
Restful 和本地文本文件

A

1 =httpfile("http://127.0.0.1:6868/api/getData").read()
2 =json(A1)
3 =T("/data/Client.csv")
4 =join(A2:o,Client;A3:c,ClientID)
有了混合计算能力,还可以顺便解决 T+0 计算。

单体的 TP 数据库天然就支持 T+0 计算。数据积累太多时,会影响 TP 数据库的性能。这时候通常会把一部分历史数据移到专业的 AP 数据库中,也就是冷热分离,TP 数据库只保存近期产生的热数据,AP 数据库保存历史冷数据,TP 数据库压力减轻后即可流畅运行。

但这样,想做实时全量统计时就需要跨库计算了,而这一直是个麻烦事,特别是面对异构数据库(TP 库和 AP 库常常不是同一种)就更困难。有了 esProc SPL 这种混合数据源计算能力,这个问题就可以轻松解决了。

A    B

1 =[[connect@l("oracle"),"ORACLE"],[connect@l("hive"),"HIVE"]]
2 =SQL="select month(orderdate) ordermonth,sellerid,sum(amount) samount,count(amount) camount from sales group by month(orderdate),sellerid"
3 fork A1 =SQL.sqltranslate(A3(2))
4
=A3(1).query(B3)
5 =A3.conj().groups(ordermonth,sellerid;sum(samount):totalamount,sum(camount):totalcount)
TP 数据库 Oracle 和 AP 数据库 Hive 之间的混合运算,SPL 还能把 SQL 转换成不同数据库的方言语法。

那么,esProc SPL 写出来的代码如何集成到应用程序中呢?
很简单,esProc 提供了标准的 JDBC 驱动,被 Java 程序引入后,就可以使用 SPL 语句了,和调用数据库 SQL 一样。

Class.forName("com.esproc.jdbc.InternalDriver");
Connection conn =DriverManager.getConnection("jdbc:esproc:local://");
Statement statement = conn.createStatement();
ResultSet result = statement.executeQuery("=json(file(\"Orders.csv\")).select(Amount>1000 && like(Client,\"s\")
较复杂的 SPL 脚本可以存成文件,然后就像调用存储过程一样:

Class.forName("com.esproc.jdbc.InternalDriver");
Connection conn =DriverManager.getConnection("jdbc:esproc:local://");
CallableStatement statement = conn.prepareCall("call queryOrders()");
statement.execute();
这相当于提供了一个没有存储且不用 SQL 的逻辑数据库。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
3月前
|
机器学习/深度学习 SQL 数据挖掘
ADB优化器背后的秘密:如何用成本估算和规则引擎编织高效的查询网络?
【8月更文挑战第27天】AnalyticDB (ADB) 是一款专为大规模数据集设计的高性能分析型数据库。本文深入探讨ADB的优化器如何通过成本估算、规则引擎及机器学习等策略生成高效执行计划。成本估算是选择最优路径的关键;规则引擎通过谓词下推等手段优化查询;机器学习则使优化器能基于历史数据预测执行效率。结合示例代码与执行计划分析,展现了ADB在提升查询性能方面的强大功能。未来,ADB将继续进化以满足日益增长的大数据分析需求。
37 0
|
3月前
|
存储 数据处理 Apache
超越传统数据库:揭秘Flink状态机制,让你的数据处理效率飞升!
【8月更文挑战第26天】Apache Flink 在流处理领域以其高效实时的数据处理能力脱颖而出,其核心特色之一便是状态管理机制。不同于传统数据库依靠持久化存储及 ACID 事务确保数据一致性和可靠性,Flink 利用内存中的状态管理和分布式数据流模型实现了低延迟处理。Flink 的状态分为键控状态与非键控状态,前者依据数据键值进行状态维护,适用于键值对数据处理;后者与算子实例关联,用于所有输入数据共享的状态场景。通过 checkpointing 机制,Flink 在保障状态一致性的同时,提供了更适合流处理场景的轻量级解决方案。
55 0
|
4月前
|
Java Spring
通用研发提效问题之配置的若干场景下若干方案的变化该如何解决
通用研发提效问题之配置的若干场景下若干方案的变化该如何解决
|
4月前
|
存储 JSON 数据格式
通用研发提效问题之存储模型选择存储方式,如何解决
通用研发提效问题之存储模型选择存储方式,如何解决
|
4月前
|
固态存储 测试技术 数据库
最新测试揭秘:TDengine 线性扩展能力能否满足你的性能需求?
**摘要:** TDengine 的线性扩展能力通过CPU和磁盘测试得以验证。在CPU测试中,使用TDengine V3.3.0.0和taosBenchmark,随着CPU核数从4增至12,写入性能线性提升,每个CPU增加对应约50W条/秒的提升,保持CPU在瓶颈状态。磁盘IO测试中,使用低速机械盘,增加磁盘数量导致写入性能成比例增长,充分利用新增磁盘IO。测试结果表明,无论CPU还是磁盘扩展,TDengine都能有效利用资源,展示出出色的线性扩展能力。
63 0
|
6月前
|
存储 安全 算法
十种接口安全方案!!!
日常开发中,如何保证接口数据的安全性呢?接口数据安全的保证过程,主要体现在这几个方面:一个就是数据传输过程中的安全,还有就是数据到达服务端,如何识别数据,最后一点就是数据存储的安全性。介绍下保证接口数据安全的10个方案。数据加签:用Hash算法(如MD5,或者SHA-256)把原始请求参数生成报文摘要,然后用私钥对这个摘要进行加密,就得到这个报文对应的数字签名sign(这个过程就是加签通常来说呢,请求方会把数字签名和报文原文一并发送给接收方。验签:接收方拿到原始报文和数字签名(sign)后,用。
318 1
|
存储 算法 NoSQL
想要实现在时序场景下“远超”通用数据库,需要做到哪几点?
近年来,随着物联网技术和市场的快速发展、企业业务的加速扩张,时序数据的处理难题也越来越受到行业和企业的重视,时序场景下通用型数据库步履维艰,各种时序数据库产品应运而起。但是,做一个优质的时序数据库真的很容易吗?本篇文章将从数据库开发者的角度,解剖时序场景下的数据处理需求、分析时序数据库设计思路,给到读者一些硬核技术思考。
172 0
|
存储 移动开发 缓存
离线优先:下一个渐进增强技术
原生应用速度比较快。如果你要创造下一个愤怒的小鸟,这当然很重要。然而,罕有应用需要达到游戏级别的响应度。也就是说,多一点关注度,利用HTML5技术创建一个快速反应的游戏是有可能的。但这能否在一系列的设备中良好运行就是另外一个事情了。 客户部不知道哪个更好:“这应用如此酷!我们的对头就有了——我们也需要一个”。其实这需要一点说服力就可以解决这个问题的。 手机应用能够离线运行。但是webapp也可以——就是这技术比较前沿而且我们很少用到的缘故
573 0
|
存储 数据库 OLAP
混合事务分析处理“HTAP”的技术要点分析
HTAP是近些年来比较火的一个概念,本文将聊聊HTAP的前世今生及技术特点。
【自然框架】之“元数据”的威力
定义      元数据最本质、最抽象的定义为:data about data (关于数据的数据)。它是一种广泛存在的现象,在许多领域有其具体的定义和应用。       我的理解就是对数据进行说明、描述。
851 0
下一篇
无影云桌面