用 PolarDB - X + Flink 搭建实时数据大屏|学习笔记(二)

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: 快速学习用 PolarDB - X + Flink 搭建实时数据大屏

开发者学堂课程【用 PolarDB-X+Flink 搭建实时数据大屏用 PolarDB - X + Flink 搭建实时数据大屏】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/988/detail/14931


可以看到 mySQL 也有 accounts 这张表,来看一下里面数据的情况使用 select*from accounts order by id 命令,里面也是有100条数据,使用select sum(balance) fromaccounts命令来看一下他们拥有的总的钱数,也是10万,可能大家没有注意到这里面神奇的地方通常我们所知道的Binlog工具都不是基于show的,都是一条一条基于行景同步的,也就是说例如现在做一个转账测试,A账户减了10块,B账户加了10块,那么就会对应两个操作,一个将一条记录做减十,一个将另外一条记录做驾驶,在Binlog同步的过程当中就会将它当成两个事情可能会看到一个中间态,可能先看到减10+10还没有同步过来,如果在这个情况下做查看总体余额可能不是刚刚好是10万,在PolarDB - X里面的Binlog是可以保证事物的原子性的,mySQL在做Binlog同步时也是基于事务来同步的,这样就会把事务一个一个完整的同步过来,所以不管在同步的任何时间点去做总余额的查询,始终可以保证余额是10万的,这是Binlog的一个特点,已经做了一段时间的转账测试,下面就将转账测试点掉,点掉之后再来看PolarDB - X,首先总的余额还是10万,看一下里面数据的情况,里面数据已经不变化了。

image.png

最后几条是这样一个数据,99账号对应的余额是997,来看一下mySQL这边是怎样的情况,这边99账号对应的余额也是997,为了对比两边的数据是否完全一致,提前写好了一个命令,这个命令就是将里面所有的数据做一个md 5计算,首先比较的一点是两边数据是否完全一致,首先在PolarDB - X这边执行C口,执行C口比较简单就是将所有数据的ID账户余额做一个拼接,先排序然后在拼接再计算md 5,md5字符串然后计算出md 5的值,只有两边的数据是完全一致的理论上算出来的md 5的值应该是一致的,在PolarDB - X中算出来的md 5值是这样的。

E881开头的以7a75结尾,再来看一下mySQ里面的情况,可以看到也是E881开头的以7a75结尾,表明同步链路是正常工作的,源端这边没有变更之后稍微过一点时间目标mySQL这边就把所有的增量全部的同步了,大家来看一下

image.png

在原端还有另外一个事情,accounts这张表其实是PolarDB - X里面分布式的表,里边会有一个dbpartition by hash是一个分表,在mySQL这边就是正常的一张表

image.png

可以看到在mySQL这边也是通过准备同步,把ddl同步过来,也就意味着全局Binlog里会将分布式的一些特性给处理好,保证下游可以把它当做mySQL的语法,以上便是第一个demo,也就是用PolarDB - X为主,mySQL为备用mySQL原生的Change master指令搭建PolarDB - X到mySQL主备同步的链路

5.PolarDB - X+Flink+Vue

接下来来看第二个例子,也就是用PolarDB - X+Flink+Vue来做一个双十一的交易大屏,稍微再看一下整个数据的链路,因为涉及到主件比较多。

image.png

第一个是在业务端,会有一个模拟线上交易的一段代码来写PolarDB - X数据,PolarDB - X作为交易的主库,之后会通过Binlog增量的日志同步到flink,flink计算之后把结果取到db,db用的是PolarDB - X会把它再写到PolarDB - X里面,之后有一个专门的交易大屏,应用分前端和后端,前端是用Vue,后端使用spring boot,Spring boot中写了一个小的应用来读数据库里面知识,现在就按照数据读的顺序,将这个系统一步一步搭建起来。

首先将刚才的例子改掉,mySQL就不管了,这里会有一个transfer test库,清掉它,这样有了一个干净的环境,接下来在PolarDB - X里面创建业务的一些库表,这次模拟的是一个电商的服务器,电商系统的表结构来自于flink CDC官方的一个教程,flink CDC在getting started快速上手的教程里面,第一篇讲的就是用flink cdc搭建一个mySQL和postgres的streaming ETL,最后通过flink cdc到ECS的一个有计算的链路,此节课就借鉴了这样的数据流的过程,其中原端将mySQL+opg PolarDB - X,目标端也将它换成了PolarDB - X,中间原模原样的使用flink cdc,在办公教程里面会让我们创建若干张表,首先用户的库叫my DB,第一张是商品的表,叫product,里面会有ID,名字还有一些商品的描述,会向里面插入一些初始商品的条目,- MySOL

CREATE DATABASE mydb ;

USE mydb ;

CREATE TABLE profucts (

id INTEGER NOT NULL AUTO _ INCREMENT PRINARY KEY , name VARCHAR (255) NOT NULL ,

description VARCHAR (512) ALTER TABLE products AUTO _ INCREMENT =101;

INSERT INTO products

VALUES

( default ,“ scooter ","Snal12-wheel scooter "),

( default ," car battery ","12V car battery "),

( default ,"12-pack drill bits ","12-pack of drill bits with sizes ranging from #4E( default ," harner ","12oz carpenter ' s hamner '),

( default ," hamner ","140z carpenter ' s hamner '),

( default ," hanner ”,“16oz carpenter ' s haner "),

( default ," rocks "," box of a5sorted rocks "),

( default ," jacket ”,“ water resistent black wind breaker '),

( default ,“ spare tire ","24 inch spare tire ");

第二个是订单表orders,会有订单的ID订单时间购买者的名字价格商品的ID还有商品目前的状态订单的状态等等,

CREATE TABLE orders (

order _ id INTEGER NOT NULL AUTO _ INCRENENT PRIMARY KEY , order _ date DATETIHE NOT NULL ,

custorer _ nane VARCHAR (255) NOT NULL , price DECIMAL (10,5) NOT NULL ,

produet _ id INTEGER NOT NULL ,

order _ status B00LEAN NOT NULL 一 Whether order has been placed AUTO _ INCREMENT =10001;

INSERT INTO orders

VALUES ( default ,"2020-07-3010:08:22," Jark ",50.50,102, false ),

2020-87-3010:11:09,‘ Sally ",15.00,105, false ),

'2020-87-3012:08:30",' Edward ',25.25,186, fatse );

( default ,

( default ,

第三个表示快递的情况,主要是有三张表一张是商品,一张是订单,一张是物流的信息这三张表,今天的演示为了简便起见只关注订单这一张表

PG

CREATE TABLE shiprents (

shipment _ id stRIAL NOT NULL PRIMARY KEY , order _ id SERIAL NOT NULL ,

origin VARCHAR (255) NOT NULL ,

destination VARCHAR (255) NOT NULL , is _ arrived B00LEAN NOT NULL

ALTER SEQUENCE public . shipnents _ shipment _ id _ seq RESTART WITH 1001; ALTER TABLE public . shipments REPLICA IDENTITY FULL ;

INSERT INTO shipments

VALUES ( default ,10001,"Be11ing", Shanghai ", false ),

( default ,10002," Hangzhou ',' Shanghai ', false ),

( default ,10003," Shanghai ',' Hangzhou ', false );

接下来开始去构建这些表,首先在PolarDB - X里面创建业务的db叫做mydb,mydb创建好以后首先在PolarDB - X里面创建商品这张表,商品表创建以后,向里面插入几条初始的商品

NINSERT INTO products

VALUES

( default ," scoater "," Small 2-vheelsco0ter"),

( default ," car battery ","12V car battery "),

( default ,"12-packdril1 bits ","12-pack of drill bits with sizes ranging ( default ," hammer ","120z carpenter ' s hamer '),

( default ,“ hammer "“140z carpenter ' s hammer "'),( default ," hammer ","160z carpenter ' s hammer ''),( default ," rocks "," box of assorted rocks "),

( default ," jacket "," water resistent black w1nd breaker "),( default ," spare tire ",“24 inch spare tire ";

f rom #40 to #3"》,

看一下

image.png

这张表已经存在了,看一下这张表里面的数据

image.png

里面有九条商品的记录,第二步,创建订单表,创建以后,里面会有几条初始的数据,也需要创建一下

INSERT INTO orders

VALUES

S ( default ,‘2020-07-3010:08:22',‘ Jark ',50.50,102, false ),( default ,2020-07-3010:11:09'," Sally ',15,00,105, false ),( default ,‘2020-07-3012:00:30',‘ Edward ',25.25,106, false );

第三个是物流的表,同样需要创建。

创建好以后可以看到现在业务库里面已经有三张表,商品,订单以及物流,有了初始信息以后,,接下来开始启动flink,Flink中有几个包需要下载,一个是他本身,第二个是它里面的一些connect,连接mySQL的connect以及连接ddc的connect,这些比较大所以已经提前下好放在这里了,接下来需要启动flink,flink本身有dashboard的web界面,为了方便将dashboard端口转化到本地,稍后可以通过浏览器进行观看状态,将端口转化之后

image.png

可以看到这就是flink本身dashboard的页面,目就可以了前本身flink是没有启动的,现在将flink启动起来,启动比较简单,调用bin目录下有个start cluster

image.png

可以看到有一个任务叫stots,这个启动好以后就启动flink cdc客户端,C口client,下面进行配置,将PolarDB - X里面的三张表在flink里面分别建立一个对应关系,因为今天只关心订单这张表,所以在flink中只建一张orders,其他两张表不进行建立。

首先改一下flink参数,在flink中也创建一下订单表

image.png

大家可以看一下建表的语句,创建一张表叫做orders,里面有多少个列,分别叫什么以及列的类型,与前面的PolarDB - X是对应起来的,在这里会进行一个数据源的配置,第一个是connector,用的是mySQL cdc,这里大家要注意,这边使用的是mySQLcdc,也就是说flink cdc会直接将PolarDB - X当成mySQL来对待,第二个是一些连接的信息,例如IP是什么?用户名是什么?端口是什么?密码是什么?然后让你连接库,表,是mydb库,orders表,建好以后看一下flink是否与PolarDB - X能正常的通信,执行select*from orders语句,将PolarDB - X 里面order这张表的数据拉到flink中,看一下能不能正常的获取。

image.png

可以看到flink已经可以正常拿到PolarDB - X里面的三条orders数据,说明链路已经打通了,现在flink  cdc已经完全把PolarDB - X当成mySQL来对待,也正确的拿到了里面的数据,接下来开始处理有计算的部分,有了交易表,交易大屏里面我们最关心的有两个值,第一个值叫做当前总的交易额是多少,第二个值是当前总订单的数量有多少,可以简单想象一下这两个数据是容易从orders里面获取的,第一个是创的新的orders里面有多少记录,就是订单数量,第二个可以sum一下price,把所有订单里面的price加起来,就是总的交易额,所以这是一个非常简单的场景,在真实的阿里巴巴双十一交易大屏里面会有非常多的丰富的数据进行展示,后台获取数据的源比较多,计算也非常的复杂,这里仅仅是为了演示链路,所以把这个场景做的简单一些,仅从orders这张表去获取最初的原始的数据,并且进行简单的调用计算,一个是count*,一个是sum,最后将结果存到PolarDB - X里面,现在在回到PolarDB - X里面,将储存结果的db和表先建立,Flink计算结果的库叫做gmv,建好之后存储两个结果,一个是交易总额以及订单总数量,这张表也叫做gmv,首先在PolarDB - X里面建这张表。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍如何基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
5月前
|
Cloud Native 关系型数据库 MySQL
免费体验!高效实现自建 MySQL 数据库平滑迁移至 PolarDB-X
PolarDB-X 是阿里云推出的云原生分布式数据库,支持PB级存储扩展、高并发访问与数据强一致,助力企业实现MySQL平滑迁移。现已开放免费体验,点击即享高效、稳定的数据库升级方案。
免费体验!高效实现自建 MySQL 数据库平滑迁移至 PolarDB-X
|
4月前
|
存储 分布式计算 运维
云栖实录|驰骋在数据洪流上:Flink+Hologres驱动零跑科技实时计算的应用与实践
零跑科技基于Flink构建一体化实时计算平台,应对智能网联汽车海量数据挑战。从车机信号实时分析到故障诊断,实现分钟级向秒级跃迁,提升性能3-5倍,降低存储成本。通过Flink+Hologres+MaxCompute技术栈,打造高效、稳定、可扩展的实时数仓,支撑100万台量产车背后的数据驱动决策,并迈向流批一体与AI融合的未来架构。
366 3
云栖实录|驰骋在数据洪流上:Flink+Hologres驱动零跑科技实时计算的应用与实践
|
7月前
|
SQL DataWorks 关系型数据库
DataWorks+Hologres:打造企业级实时数仓与高效OLAP分析平台
本方案基于阿里云DataWorks与实时数仓Hologres,实现数据库RDS数据实时同步至Hologres,并通过Hologres高性能OLAP分析能力,完成一站式实时数据分析。DataWorks提供全链路数据集成与治理,Hologres支持实时写入与极速查询,二者深度融合构建离在线一体化数仓,助力企业加速数字化升级。
|
人工智能 大数据 Apache
Flink Forward Asia 2024 即将盛大开幕!
Flink Forward Asia 2024是由Apache官方授权的技术大会,聚焦流式湖仓、流批一体、AI大模型等热点方向,旨在分享Flink社区最新动态及实践经验,是Flink开发者和使用者不容错过的盛会。大会不仅探讨了Flink在实时大数据分析中的应用,还深入讨论了Data+AI领域的新成果,如基于Flink和Elasticsearch的企业级高级RAG架构设计,展示了Flink在多模态数据处理、实时数据向量化等方面的强大能力。
|
存储 关系型数据库 分布式数据库
PolarDB-X HTAP新特性 ~ 列存索引
随着数据爆炸式的增长,传统的OLTP和OLAP解决方案基于简单的读写分离或ETL模型,将在线库的数据以T+1的方式抽取到数据仓库中进行计算,这种方案存在存储成本高、实时性差、链路和维护成本高等缺陷。 为应对数据爆炸式增长的挑战,PolarDB分布式版本基于对象存储设计了一套列存索引(Clustered Columnar Index,CCI)功能,支持将行存数据实时同步到列存存储上
76457 148
|
存储 SQL 消息中间件
Hologres+Flink企业级实时数仓核心能力介绍
通过Hologres+Flink构建易用、统一的企业级实时数仓。
|
监控 Java API
死磕xxl-job(一)
死磕xxl-job(一)
|
存储 关系型数据库 MySQL
深度评测:PolarDB-X 开源分布式数据库的优势与实践
本文对阿里云开源分布式数据库 PolarDB-X 进行了详细评测。PolarDB-X 以其高性能、强可用性和出色的扩展能力在云原生数据库市场中脱颖而出。文章首先介绍了 PolarDB-X 的核心产品优势,包括金融级高可靠性、海量数据处理能力和高效的混合负载处理能力。随后,分析了其分布式架构设计,包括计算节点、存储节点、元数据服务和日志节点的功能分工。评测还涵盖了在 Windows 平台通过 WSL 环境部署 PolarDB-X 的过程,强调了环境准备和工具安装的关键步骤。使用体验方面,PolarDB-X 在处理分布式事务和实时分析时表现稳定,但在网络问题和性能瓶颈上仍需优化。最后,提出了改进建
|
存储 关系型数据库 大数据
PolarDB 大数据处理能力及其应用场景
【8月更文第27天】随着数据量的爆炸性增长,传统的数据库系统面临着存储和处理大规模数据集的挑战。阿里云的 PolarDB 是一种兼容 MySQL、PostgreSQL 和高度可扩展的关系型数据库服务,它通过其独特的架构设计,能够有效地支持海量数据的存储和查询需求。
558 0