曹操出行基于Hologres+Flink的实时数仓建设

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
简介: 曹操出行实时计算负责人-林震对于曹操出行基于Hologres+Flink的实时数仓建设进行演讲

作者:林震|曹操出行实时计算负责人

曹操出行业务背景介绍

曹操出行创立于2015年5月21日,是吉利控股集团布局“新能源汽车共享生态”的战略性投资业务,以“科技重塑绿色共享出行”为使命,将全球领先的互联网、车联网、自动驾驶技术以及新能源科技,创新应用于共享出行领域,以“用心服务国民出行”为品牌主张,致力于打造服务口碑最好的出行品牌。

作为一家互联网出行平台,曹操主要提供了网约车、顺风车和专车等多种出行服务。其中,打车是我们的核心业务之一。整体业务过程大致如下: 首先,用户在我们的平台上下单,然后曹操平台会给司机进行订单的派发,司机接到订单后,会进行履约服务。结束一次订单服务后,乘客会在平台上进行支付。

image.png

在整个流程中,涉及到的数据将会在我们的业务系统中流转,主要包括有营销、订单、派单、风控、支付、履约这些系统。这些系统产生的数据将存储在RDS中,并进一步流入实时数仓中以进行分析和处理。最终数据会进入到不同的使用场景中,比如实时的标签,实时大屏、多维BI分析,还有实时业务监控以及实时算法决策。

曹操出行业务痛点分析

image.png

上图是一个传统lambda架构,在这个架构中主要会分做实时数据流和离线数据流。在实时链路中,业务数据是存放在RDS中,并通过Binlog以Canal同步的方式进入Kafka,同时应用的日志数据也会通过实时采集的方式进入到Kafka。数据准备工作完成后,在Kafka中构建实时数据仓库。整个实时数仓基于数仓分层理念进行构建的,主要包括原始数据层(ODS)、数据明细层(DWD)、数据汇总层(DWS)和应用数据层(ADS)。这些层次通过Flink Streaming SQL进行串联,实现数据的流转和处理。

在离线链路中,数据主要是通过DataX定时同步任务将RDS中的数据同步到HDFS。同时应用的日志会通过定时任务同步到HDFS,整个离线数仓以Spark Sql定时调度任务去逐层执行。数据在离线数仓中会以不同的数据域去组织满足不同粒度的数据计算,最终数据会通过Flink Sink以及离线同步工具写到不同的数据应用组件中。同时,为了保证某些应用场景中数据的一致性,可能需要对离线和实时两条链路的数据进行合并处理和加工。

image.png

基于曹操出行数据生产成本和研发诉求,针对传统lambda架构中可以看到一些问题:

  • 为了适配不同应用场景,我们在架构中使用了非常丰富的数据组件。
  • 研发成本非常高,不仅在实时链路中进行研发和处理,而且还需额外研发一套离线的数据链路。
  • 运维效率较低,由于整个实时数仓是构建在Kafka上的,因此在数据探查以及进行数据订正就会变得非常困难。
  • 资源成本较大,主要体现在几方面:组件多,需要专门安排人员进行运维和管理;一些需要精准一致性的场景需求,需在两个数据链路中做数据的同步和合并计算;在某些计算场景中,需要Flink维护大状态进行处理,也造成额外性能问题和资源的浪费。

另外从公司开发者使用的角度,我们对实时数仓提出了以下几点诉求:

  • 统一的组件来满足不同数据应用场景。
  • 复杂的实时数据链路中保障高效的数据订正。
  • 能克服在Flink中一些大状态下的技术难点。

Hologres+Flink构建企业级实时数仓

Hologres能力分析

曹操出行作为Hologres的深度用户,在前期调研与测试阶段,我们对对Hologres的相关能力做了比较详细的分析,主要有以下优势:

1、业务场景能力丰富:

  • 具备OLAP分析能力
  • 具备高并发点查能力
  • 具备半结构化日志分析能力
  • 具备基于PostGIS的扩展能力,支持空间地理信息信息数据的分析与使用,对于曹操出行的业务属性来说非常重要。

2、一站式实时开发能力

  • 契合数仓分层结构理念(可以像离线数仓一样去构建分层体系,数据实时流动、实时存储)
  • Flink Streaming生态高度融合(Flink CDC组件集成,Flink Catalog集成)
  • 统一的Ad-hoc能力,能以外表加载离线数仓中数据进行湖仓加速和联邦分析

3、解决的痛点问题

  • 全链路低时延
  • 多流join场景很好的提供数据打宽的能力,支持主键模型和行级,局部字段更新的能力
  • Count distinct大状态精确去重场景的支持

Hologres支持高并发更新

image.png

Hologres的存储架构基于分布式存储系统,并在其上构建了存储引擎。在底层,Hologres使用了分布式存储系统来管理数据的存储和分布。在此之上,存储引擎包括一些关键组件,如Block Cache、Shard ,每个Shard中包含了多个Tablet和Write-Ahead Log(WAL)。

市面上主流的数据湖产品通常采用LSMLog-Structured Merge)架构。

主流数据主键模型更新模式有Copy On Write 和Merge On Read。这两种场景都有各自的问题

  • Copy On Write具有写放大的问题,数据的延迟会比较高。
  • Merge On Read(读时合并)模式在读取数据时需要进行大量的数据合并操作,因此读取性能可能较差。

在Hologres中,行存使用Merge On Read方式,列存主要基于Merge On Write。

下面主要介绍下在基于MergeOnWrite这种模式时,一条数据在进入Hologres中,它首先到达WAL Manager(Write-Ahead Log 管理器),同时也会进入到Memtable(内存表)。在Memtable中,主要存储三类数据:数据文件、删除标志文件(例如基于RoaringBitmap的文件)和索引文件。当Memtable数据积累到一定阶段后,会生成不可变的Memtable,并通过异步线程定期将其刷新(flush)到Data File(数据文件)中。通过这种架构,Hologres能够兼顾行存和列存的优势,并通过适当的数据合并策略来提高性能和存储效率。

Hologres Binlog支持

image.png

在Hologres中 Binlog也是一种物理表,其跟原表的主要区别是内置的几种自身结构,包含自身递增序列,数据修改类型以及数据修改时间,Binlog本质上也是分shard进行存储,所以也为一种分布式表,并且在WAL之前生成,因此在数据上可以与原表保证强一致性。

其次Hologres Binlog修改类型也还原了Flink中四种RowKind类型。在数据更新过程中会产生两条更新记录(update_before,update_after),并且保证了更新记录是一个连续的存储。右边展示中,写入一个数据一个pk1,然后再写入一个pk2数据,pk2的数据再做一次更新,那么在Binlog中它会产生4条数据结果。

Hologres数据模型介绍

image.png

Hologres主要分做行存引擎以及列存引擎,同时也支持行列共存场景。

  • 聚合场景中主要是用到列存的引擎,适合OLAP场景,复杂查询,统计以及关联等场景。同时也提供了非常丰富的索引,包括有:聚簇索引,位图索引,字典,以及基于时间序列的范围索引。
  • KV场景中主要是用到行存的引擎,主要支持高并发组件查询。包括在Flink中做维表反查也是非常适合。
  • 订阅场景中主要是用到行存的引擎,主要在表属性中进行声明,比如说Binlog是否开启,Binlog的TTL。在订阅方的话,Hologres支持CDC以及非CDC的模式。
  • 日志场景中针对聚合场景,主要是支持JsonB数据类型。JsonB数据的写入过程中,hologres能够将其自动地平铺成列式的存储结构,同时它可以自动对数据内容做解析,对数据类型做泛化处理,数据格式的对齐,非常适合这种非稀疏场景,因此给聚合场景提供了分析的灵活性。

曹操出行实时数仓实践

实时数仓架构设计

image.png

基于前面Hologres的能力介绍,接下来是对于曹操内部实时数仓的架构设计,最左边是RDS数据库,最右边是应用系统,最下边为元数据管理,中间是实时数仓的部分。数据通过Binlog进入到Kafka的ODS层之后,首先通过Flink写入到Hologres里的DIM层,然后再通过Flink做ODS的多流汇聚,写入到Hologres的DWD层。在DWD中可以做宽表打宽的实现。再下一层,通过Hologres Binlog的订阅的方式,进入到Flink进行处理加工再写入到Hologres的DWS层。完成实时数仓分层建设后,再统一通过OneService的统一查询服务对外提供服务。

dwd宽表构建实践

image.png

接下来介绍一下Hologres DWD宽表层的一个构建实践。基于之前提到的Hologres列更新能力,能够很好实现宽表Join能力。在整个生产过程中,还需重点关注维表的应用场景,其应用场景包含几种情况:一种是维表是不变的,或者缓慢的变化,另一种是维表频繁变化的。为了保障数据最终的一致,通常的设计是像离线的方式去构建一个维表拉链的数据,通过用过Start Time和End Time的方式去存储维度状态有效的一个周期。

其次需要关注维表延迟问题。在实际生产过程中,维表链路与主表的链路通常是异步的,可能会出现维表延迟导致主表关联数据为空或关联到过时的维度状态。为处理这种情况,需要在Hologres中实施维度缺失记录的过滤,并采取补偿机制进行维度补偿处理。同时,还需要定时调度进行维度字段和维表对比检查,以增量方式修正不一致的维度状态。

聚合计算场景优化

image.png

接下来是我们对聚合场景的优化,针对许多预聚合计算场景,我们将其统一收敛到Rollup计算模型中,主要解决以下问题:

  • 在Flink聚合场景中经常会出现状态兼容性的问题
  • 整个数据的复用性非常差,研发人员收到新的需求,例如新的指标或者新增维度粒度时,为了不影响生产数据的稳定性,新增需求需要构建新任务,导致任务管理混乱。

因此曹操出行主要进行了两点优化:

  • 构建MapSumAgg算子,MapSum主要通过对SumAgg算子做了重新设计,使之能够支持Map内部结构的求和逻辑
  • 对Grouping Sets进行动态配置化,这样Grouping Sets动态增加维度粒度,使整个任务在不重启的情况下也能自动去做自适应

结合这两点,把已有的指标放入map结构中进行封装,这样在不改变原有的算子状态,也可以得到很好的处理。在下游中可以针对不同维度组合,指标集合做好选择,然后由同步工具做实时的数据路由,为下游提供服务。

image.png

对于第二个聚合场景的优化,是对精确去重场景的拆分。在前面例子中,我们把Count Distinct的精确去重做了剥离,主要解决两个问题:

  • 维度爆炸的问题。在Flink的回撤机制下使用精确去重时,需要存储全量状态。然而,在Cube场景中,这种状态的爆炸式增长对于Flink的可扩展性是一个挑战。
  • 查询灵活性的问题

解决思路是通过Hologres去构建细粒度的RoaringBitmap存储方案。

具体流程如下:首先,在Hologres中构建自身序列的UID维表,然后在主表中通过反差逻辑将UID的自身序列反查出来。接下来,在Flink中进行Group by操作,并进行聚合计算,得出RoaringBitmap的结果。最后,将结果写入Hologres的DWS层,形成轻度汇总表用于UV计算。这种方案既能解决应用端在灵活维度查询时的高效性需求,又能解决Flink中维度爆炸的问题。

链路中吞吐能力调优

image.png

整个流链路中吞吐能力的调优主要涉及两个部分:

  • 数据写入侧:在将数据写入Hologres之前,针对字段状态频繁变更的场景进行了优化。引入了一个Union层,在Union层和ODS层中,数据根据主键进行分区。在Union层中,通过一个小窗口进行预聚合计算,以减少对Hologres的写入压力,从而提高整体数据吞吐量。然而,这种方式的缺点是无法捕获中间状态的数据。
  • 数据读取侧。在使用Binlog更新数据时,会产生连续的变更前后数据。在这种场景下,可以采用lag开窗的方式来获取一次变更中连续的上下游数据。通过比较这两个数据之间的差异,可以过滤掉冗余的变更数据,从而减轻整个处理下游数据的压力。这种方式可以提高读取数据的效率和吞吐量,减少不必要的数据处理。

元数据血缘的改造

image.png

元数据血缘的改造主要解决了以下问题:

  • Schema的演进提供了一个更便利的管控
  • 解决实时链路发布流程中的依赖链问题。
  • 对任务元数据信息进行有效的管理

曹操出行主要进行以下措施来实现上述目标:

  • Flink Catalog集成,在元数据中去整合Hologres的Catalog,也支持Kafka Topic表中自定义Catalog,支持多版本schema和任务数据的多版本,提供更灵活的数据处理能力。
  • Kafka Source和Kafka Sink的改造。结合整合整个上线发布的流程,对于数据的版本信息,是通过Kafka Sink对Header进行记录,Kafka Source对header的版本信息进行过滤,从而把数据版本引入到整个上下游的链路,提供上下游数据灵活的迭代。这种做法的好处是,在整个链路中可以感知到下游数据的使用情况,帮助用户快速定位是否还有任务依赖于某个版本的数据,右边的图片主要是展示一个开发流程中元数据的集成。

链路保障体系

image.png

在日常开发过程中,对于任务健康以及任务出现异常后的判断和检测,都是通过异常检测诊断工具去做支持。主要体现四个方面:

  • 对于基础信息采集,通过采集工具,把Flink内置Metric、Yarn的Metric以及Kafka信息进行采集,提供基础数据,包括作业信息,Kafka一些Topic信息,作业最新指标情况。
  • 对于异常的判断,通过内存以及Topic增长情况,包括CPU使用情况,以及任务有无出现反压,任务有无倾斜做出异常的判断。
  • 对于异常原因的诊断-内部原因,内部原因主要会看CheckPoint的失败情况,Kafka LAG具体是什么算子造成的反压,Restart的次数,attempt的次数。
  • 对于异常原因的诊断-外部原因,外部原因主要是看Job Manager以及Task Manager所在节点自身的情况,包括CPU使用率、IO利用率、内存情况等,然后做出综合判断,帮助用户去快速定位具体问题的原因。

image.png

在链路保障体系中,全链路的感知能力是非常重要的。曹操出行主要通过流量监控和延迟监控来实现全链路的感知能力:

  • 流量监控层面:通过Kafka Cueernt Offset以及Hologres内置的Offset信息做定时的采集,从而推算出Kafka以及Hologres表的生产速率。
  • Latency监控层面:主要采集Kafka Offset以及Flink Source的Offset情况,结合Kafka Massage Timestamp去推算出每个任务自身延迟情况,再结合整个数据血缘进行一个串联,可以得出端到任务自身整体的延迟时间。

通过任务上下游生产速率比,以及任务自身延迟情况,在整个生产链路中可以快速定位出具体异常和问题发生的节点,以便及时处理和优化,提高系统的性能和稳定性。

数据订正能力建设

image.png

在传统的Streaming链路中,数据订正方案一直是个复杂工程,主要涉及以下两个方面的挑战:

  • 如何知晓订正的数据为正确数据?验证其具有一定困难。
  • 在整个验证过程中,如何保证对下游的透明?如果丢状态去做重启的订正,肯定会对下游造成很大的影响。

因此我们主要思路是基于Hologres去做实现。首先对于原始任务进行代码修正后,并维持原有状态去做重启。第二步将对Hologres表做Schema的拷贝,然后新建一个订正的临时表。第三步会将任务进行拷贝,并将Sink调整到订正临时表,去做无状态从头消费的重启。这样可以把订正的结果数据订正进Hologres订正表中。等待消费结束后停止订正任务,然后通过修正脚本去对比原表以及订正表中关键信息,去做数据的订正。由于数据的订正,它处于数据终态,对于下游来说,不会造成大起大落。并且在整个链路中,因为正确数据可以通过整个数据链路做回撤的传导,因此整个下游就可以自动完成数据的订正。

曹操出行业务成果分析

image.png

架构清晰简单:

  • 对比原有Lamada架构,Hologres+Flink整体架构更加清晰,使用数据组件大大减少
  • 整体技术复杂难度降低,原先为了解决数据一致性问题,数据需要在不同的异构存储和异构链路中来回传输和计算,整个技术复杂度较高

开发效率提高:

  • 整个开发模式变得简单易用,大大缩短人力周期
  • 数据实时模型分层非常清晰,整体下游复用性以及使用门槛大幅度降低

运维体验提升:

  • 由于数据存储在Hologres之上,因此数据探查更加便捷,数据订正难易程度大幅度减少。

成本减少:

  • 组件维护成本减少。
  • 数据的离线存储和实时存储,从双份存储降低到一份存储,以及降低了数据在异构存储之间的同步与计算成本
  • 解决在Flink中各类计算场景中大状态的资源成本,减少了计算开销并提升了处理性能。

未来展望

image.png

未来展望主要分为以下几个层面:

  • 当前Flink集群还是一个自建的集群,对于这些集群我们业务最关心的是使用过程中,其业务的稳定性和可靠性。特别是在高峰场景,资源不足时,怎么去做快速的缩扩容。在高峰期过去后怎么去做到无缝缩容,降低业务风险,包括减少业务的数据中断时间。
  • 在任务级别的动态感知和智能调控上。很多时候研发根据自己的经验去设置Flink的资源参数,往往有很多资源其实是多设或者是额外设置的。通过动态感知能力的引入,能够有效提升整体的资源使用情况,包括未来也可能会引入智能算法,包括自适应的机制去达到节约成本的目的。
  • Flink CDC来统一ODS入仓的方案。我们在离线使用DataX的入仓方案,后来实时使用了Flink CDC的入仓方案,其实本质上数据可以提供一个统一的解决思路,来解决数据的一致性和灵活性的诉求。包括在CDC方案中,也会有一些定制上的需求。比如说在CDC过程中数据加解密以及RDS数据归档一系列诉求。使用Flink CDC的过程中也会分阶段的做一些调整,包括一些高频迭代的诉求也会在后续的规划中更优先的解决。
  • 关于曹操出行的数据服务规划。目前有很多数据服务场景,包括了在线应用场景,以及分析型的服务场景,业务会比较关注数据服务的高可用以及服务的可扩展性,那怎么样通过同一份数据来做到不同服务的扩展。这部分我们后续会考虑基于Hologres主从隔离的能力和结合数据存储计算隔离的一些特点优势,构建一主多从的架构,来支持和满足这些数据应用服务。
相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
21天前
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
4月前
|
SQL NoSQL 关系型数据库
实时数仓Hologres发展问题之实时数仓的类数据库化与HTAP数据库的差异如何解决
实时数仓Hologres发展问题之实时数仓的类数据库化与HTAP数据库的差异如何解决
56 2
|
23天前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
298 4
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
20天前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
2月前
|
存储 数据采集 大数据
Flink实时湖仓,为汽车行业数字化加速!
本文由阿里云计算平台产品专家李鲁兵(云觉)分享,聚焦汽车行业大数据应用。内容涵盖市场趋势、典型大数据架构、产品市场地位及能力解读,以及典型客户案例。文章详细介绍了新能源汽车市场的快速增长、大数据架构分析、实时湖仓方案的优势,以及Flink和Paimon在车联网中的应用案例。
189 8
Flink实时湖仓,为汽车行业数字化加速!
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
62 1
|
3月前
|
存储 数据采集 OLAP
饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
饿了么的实时数仓经历了多个阶段的演进。初期通过实时ETL、报表应用、联动及监控构建基础架构,随后形成了涵盖数据采集、加工和服务的整体数据架构。1.0版本通过日志和Binlog采集数据,但在研发效率和数据一致性方面存在问题。2.0版本通过Dataphin构建流批一体化系统,提升了数据一致性和研发效率,但仍面临新业务适应性等问题。最终,饿了么选择Paimon和StarRocks作为实时湖仓方案,显著降低了存储成本并提高了系统稳定性。未来,将进一步优化带宽瓶颈、小文件问题及权限控制,实现更多场景的应用。
391 7
饿了么基于Flink+Paimon+StarRocks的实时湖仓探索
|
2月前
|
SQL 分布式计算 数据挖掘
加速数据分析:阿里云Hologres在实时数仓中的应用实践
【10月更文挑战第9天】随着大数据技术的发展,企业对于数据处理和分析的需求日益增长。特别是在面对海量数据时,如何快速、准确地进行数据查询和分析成为了关键问题。阿里云Hologres作为一个高性能的实时交互式分析服务,为解决这些问题提供了强大的支持。本文将深入探讨Hologres的特点及其在实时数仓中的应用,并通过具体的代码示例来展示其实际应用。
221 0
|
4月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
存储 分布式计算 数据挖掘
实时数仓 Hologres 问题之适用于业务场景的实时数仓如何搭建
实时数仓 Hologres 问题之适用于业务场景的实时数仓如何搭建

热门文章

最新文章

相关产品

  • 实时数仓 Hologres