【阿里云MVP月度分享】如何基于MYSQL做实时计算?

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
简介: 有时候我们会有这样的场景,在某个接口中,数据已经很规范地存入到一张的MYSQL表中,现在想对这样的数据做一些实时或准实时处理,比如数据多模式存储、异步准实时业务流程、业务实时监控等。

有时候我们会有这样的场景,在某个接口中,数据已经很规范地存入到一张的MYSQL表中,现在想对这样的数据做一些实时或准实时处理,比如数据多模式存储、异步准实时业务流程、业务实时监控等。接口中处理流程如下:
111

最原始的方法,是改动业务代码,将这些额外的处理流程作为同步流程,在更新MYSQL数据之后同步执行。如下图:
222

但是这样的处理流程可能会越来越多,如果一直作为同步流程,整个接口会变得越来越庞大、并且耗时越来越长、出问题的风险越来越高。

所以我会考虑异步处理流程。如果可以改动一下代码,将数据额外写一份儿到队列里,再用flink、storm之类的去消费不就好了么。如下图:
333

但实际上,或许由于架构设计的不规范、或许由于业务场景的繁多,导致在代码中加一遍数据埋点,就如同重构一般的工作量。所以我们需要另一种方式,能实时感知到MYSQL中数据的变化。

MYSQL的binlog可以帮我们记录数据的变化,我们还需要一个工具来收集binlog,并转为我们能读懂的数据。阿里有一款叫canal的开源软件正是做这个用的,可以通过修改源码,增加监控、告警、投递队列功能来实现。但现在,阿里云的日志服务为我们集成了这一功能,我们可以用更短的时间、更少的资源来获得更稳定、更放心的服务。如下图:
4444

日志服务收集binlog的功能还在内测中,不久之后将与大家见面。


比如有这样一个场景,我的MYSQL里有一张订单推送记录表,现在有一个需求,需要将这个表中的数据,按照一定格式再写入一份儿到表格存储TableStore中。
传统的实现方式,是在程序有写入到MYSQL的地方,再加一段代码,写入MYSQL成功后再写入到表格存储中。而现在,为完成这个需求,我选用的技术方案是:

日志服务SLS+流计算StreamCompute+表格存储TableStore


首先使用日志服务,配置对mysql中订单推送记录表所在实例的binlog的收集。

日志服务收集binlog的原理,与canal一致。具体配置这里暂不作叙述。

收集到的日志中,包含:数据库名、表名、事件(row_insert、row_update、row_delete)、全局事务ID、各个字段修改前的值、各个字段修改后的值。

根据场景,我们需要捕获到每个row_insert和row_update操作中,各个字段修改后的值,然后写入到表格存储中。所以我们在流计算中,先配置好日志服务的源表、和表格存储的目标表,中间的逻辑这样写:

INSERT INTO ots_result_order_push (pk, id, order_id, master_id, pull_status
    , offer_time, hire_time, push_time, limit_offer)
SELECT concat(REVERSE(order_id),'|',master_id) as pk,id, order_id, master_id, pull_status, offer_time
    , hire_time, push_time, limit_offer
FROM sls_stream_distribute_canal
WHERE _db_ = 'distribute'
    AND (_event_ = 'row_insert'
        OR _event_ = 'row_update')
    AND _table_ = 'order_push';

即可完成此需求。


再比如有这样一个场景,我的MYSQL里有一张用户信息表,现在想要实时统计每日注册用户数,并通过大屏展示出来。
为完成这个需求,我选用的技术方案是:

日志服务SLS+流计算StreamCompute+表格存储TableStore+数据可视化DataV

首先使用日志服务,配置对mysql中user_info表所在实例的binlog的收集。
根据场景,我们需要捕获到每个row_insert操作的时间,并将时间截取到日期。统计每天有多少条往用户信息表中插入的操作记录。所以我们在流计算中,先配置好日志服务的源表、和表格存储的目标表,中间逻辑这样写:

insert into ots_result_user(pk,val)
select concat('RegUser4General|t|',t.dt) as pk,t.val
from (select from_unixtime(cast(create_time as bigint),'yyyyMMdd') as dt,count(_table_) as val 
from sls_stream_user_service_canal
where _event_ = 'row_insert'
and _table_ = 'user_info'
group by from_unixtime(cast(create_time as bigint),'yyyyMMdd')) t
;

即可完成需求。


异步获取MYSQL数据变化,触发异步流程,避免了多个同步流程可能造成的执行时间过长、或者由于网络原因卡住等等导致的风险。同时,异步流程也可以并行,总体上加快了业务流程的速度,使“一份儿数据、多种处理”变得更加方便快捷。

当然,对于上边作为例子的两个场景来说,文中给出的方案并不是唯一的解决办法,还可以使用函数计算代替流计算实现同样的效果

整套流程全部采用阿里云的服务化产品进行,使得本来全部独立开发需要几天的工作量,可以在几分钟之内搞定,方便快捷,且整套流程都有完善的监控、告警机制,安全放心。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
4月前
|
存储 关系型数据库 MySQL
MySQL——数据库备份上传到阿里云OSS存储
MySQL——数据库备份上传到阿里云OSS存储
186 0
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
174 0
|
4月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何创建mysql临时表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
4月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
4月前
|
关系型数据库 MySQL 网络安全
阿里云安装Mysql
阿里云安装Mysql
279 1
|
4月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
401 1
|
4月前
|
存储 运维 Cloud Native
"Flink+Paimon:阿里云大数据云原生运维数仓的创新实践,引领实时数据处理新纪元"
【8月更文挑战第2天】Flink+Paimon在阿里云大数据云原生运维数仓的实践
284 3
|
4月前
|
SQL 存储 关系型数据库
实时计算 Flink版产品使用问题之同步MySQL多张表的过程中,内存释放依赖于什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。