12:flink数据倾斜怎么定位?怎么处理?
flink数据倾斜定位?
步骤1:定位反压
定位反压有2种方式:Flink Web UI 自带的反压监控(直接方式)、Flink Task Metrics(间接方式)。通过监控反压的信息,可以获取到数据处理瓶颈的 Subtask。
步骤2:确定数据倾斜
Flink Web UI 自带Subtask 接收和发送的数据量。当 Subtasks 之间处理的数据量有较大的差距,则该 Subtask 出现数据倾斜。如下图所示,红框内的 Subtask 出现数据热点。
flink数据倾斜方案解决汇总?
keyby之前发生发生的数据倾斜
keyBy前存在数据倾斜,上游算子的某些实例可能处理的数据比较多,某些实例可能处理的数据较少,产生情况可能时因为数据源的数据不均匀
解决方案:
把数据进行打散,重新均匀分配。(需要让 Flink 任务强制进行 shuffle。使用 shuffle、rebalance 或 rescale算子即可将数据均匀分配,从而解决数据倾斜的问题。)
通过调整并发度,解决数据源消费不均匀或者数据源反压的情况
keyBy后聚合操作存在数据倾斜(通过Flink LocalKeyBy思想来解决)
在 keyBy 上游算子数据发送之前,首先在上游算子的本地对数据进行聚合后再发送到下游,使下游接收到的数据量大大减少,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈。类似 MapReduce 中 Combiner 的思想,但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。从 Flink LocalKeyBy 实现原理来讲,必然会存在一个积攒批次的过程,在上游算子中必须攒够一定的数据量,对这些数据聚合后再发送到下游。
注意:Flink 是实时流处理,如果 keyby 之后的聚合操作存在数据倾斜,且没有开窗口
的情况下,简单的认为使用两阶段聚合,是不能解决问题的。因为这个时候 Flink 是来一条
处理一条,且向下游发送一条结果,对于原来 keyby 的维度(第二阶段聚合)来讲,数据
量并没有减少,且结果重复计算(非 FlinkSQL,未使用回撤流)
keyBy后窗口聚合操作存在数据倾斜(两阶段聚合)
因为使用了窗口,变成了有界数据的处理,窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:
实现思路:
第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起)
第二阶段聚合:去掉随机数前缀或后缀,按照原来的 key 及 windowEnd 作 keyby、聚合
sql方式解决数据倾斜
开启LocalGloBal。
LocaGloBal是把数据攒在一起进行聚合,然后使用Accumulator进行累加后
合并(merge)开启LocalGlobal需要实现UDAF函数,进行merge累加。
这种方式与Aggregate方法非常相似。都是来筛选数据倾斜,减少下游数据。
13:flink去重方案?
1:mapState/ValueState+状态后端
使用RocksDBStateBackend,因为数据是存储在磁盘上,元数据保存在内存中。适合非常大的状态。在算子中,使用MapState数据结构,对key进行保存。
数据来了查看MapState是否存在,存在 + 1,不存在设置为1。
缺点:如果使用机械硬盘的话,flink数据量过大,磁盘会成为性能瓶颈。随之导致整个IO急剧下降。可能会出现背压情况!
优点:精确去重
2:基于HyperLogLog
HyperLogLog是去重计数的利器,能够以很小的精确度误差作为trade-off大幅减少内存空间占用,在不要求100%准确的计数场景极为常用
优点:高效,占用空间少
缺点:近似去重
3:布隆过滤器+状态后端/布隆过滤器+redis
类似Set集合,用于判断当前元素是否存在当前集合中。
布隆过滤器,当前的key是否存在容器中,不存在直接返回
缺点:不能百分之百的保证精确。
优点:插入和查询效率是非常的高
4: RoaringBitmap去重(推荐)
BitMap - 优点:精确去重,占用空间小(在数据相对均匀的情况下)。缺点:只能用于数字类型(int或者long)。
RoaringBitmap:BitMap固然好用,但是对去重的字段只能用int或者long类型;但是如果去重字段不是int或者long怎么办呢?那我们就构建一个字段与BitIndex的映射关系表,通过MapFunction拿到字段对应的BitIndex之后,就可以直接进行去重逻辑了。
5:hashset+hbase rowkey(不可行)
优点:能够对大的数据量高效去重
缺点:hbase不支持数据无法保证exactly-once。
6:flink+starrocks/hudi(推荐)
通过starrocks和hudi的主键直接去重
优点:高效快速去重
缺点:超大规模数据性能待验证
14:flink如何回溯历史数据?
1:时态表或回撤流
2:hudi、iceberg、delta lake
3: cdc到kafk或pulsar
15:flink ttl的几种策略?
根据程序的运行时间,我们的状态是不断的积累,占用的空间越来越多,当达到内存瓶颈时,容易出现OOM。
因此引入了TTL特性,对作业的状态(state)进行清理。
自flink1.8后,一共有三种ttl清理策略。
1、全量快照清理策略(cleanupFullSnapshot):
是针对checkpoint/savepoint全局快照的。
当快照过期,并不会删除。等待重启checkpoint/savepoint时,才会删除过期的
全局快照状态。过期时间是在代码中设置。
2、清理增量策略(cleanupIncrementally)
是针对状态后端的。
存储后端会为状态条目维护一个惰性全局迭代器。每次触发时,就会向前迭代删
除已遍历的数据。过期的数据是根据代码来设置。
.cleanupIncrementally(5, false) 第一个参数条目数量,第二个参数是是否删除
3、RocksDB过滤器清理策略(cleanupInRocksdbCompactFilter):
Flink会异步对RocksDB的状态进行压缩合并更新,减少存储空间。
对Flink条目进行清理达到1000条,会检查当前的条目是否处于属于过期状态。
如果是过期状态会进行删除。
.cleanupInRocksdbCompactFilter(1000)
16:flink如何保证端到端的exactly-once ?
不能百分之百保证exactly-once,只能尽可能的保证。需从每个阶段保证。
source端保证:使用可以记录数据位置并重设读取位置的组件(如kafka,文件)
flink内部保证:使用checkpint+state 将状态值保存在状态后端里,并且checkpoint需要设置为精确一次性语义
sink端保证:从故障恢复时,数据不会重复写入外部系统(幂等写入、事务写入)
幂等写入:幂等操作是指,同一个操作,可以执行很多次,但是不会对结果造成影响,与执行一次的结果保持一致
事务写入:在CheckPoint开始构建一个事务,当CheckPoint彻底完成时,提交事务。
事务写入又可以分为两种---WAL预写日志和2pc两阶段提交。DataStream API 提供了GenericWriteAheadSink模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。
flink两阶段提交流程不在此处讲解
17:任务链(Operator Chains)和 SlotSharing(子任务共享)有什么区别?
SlotSharing(子任务共享)是让同一个Job中不同Task的SubTask运行在同一个Slot中,它的目的是为了更好的均衡资源,避免不同的Slot出现“一半火山一半冰山”的情况。如果没有重分区的算子(即只有one-to-one的数据传递模式),它是不会有不同slot或不同taskmanager数据交互的,并且同一个线程中的SubTask进行数据传递,不需要经过IO,不需要经过序列化,直接发送数据对象到下一个SubTask,性能得到提升。但是,如果有重分区的算子(即有redistributing的数据传递模式),它还是会出现不同slot或不同taskmanager数据交互的,这样数据会经过IO和序列化。
而任务链(Operator Chains)是将并行度相同且关系为one-to-one的前后两个subtask,融合形成一个task,是更细粒度的“融合”,它一方面可以减少task的数量,提高taskManager的资源利用率,另一方面,由于是one-to-one的数据传递模式,并且task只能存在于一个slot中,数据是不会有IO和序列化的。
18: flink两阶段提交?
flink两阶段提交流程?
1. jobMaster 会周期性的发送执行checkpoint命令(start checkpoint);
2.当source端收到执行指令后会产生一条barrier消息插入到input消息队列中,当处理到barrier时会执行本地checkpoint, 并且会将barrier发送到下一个节点,当checkpoint完成之后会发送一条ack信息给jobMaster ;
3. 当DAG图中所有节点都完成checkpoint之后,jobMaster会收到来自所有节点的ack信息,那么就表示一次完整的checkpoint的完成;
4. JobMaster会给所有节点发送一条callback信息,表示通知checkpoint完成消息,这个过程是异步的,并非必须的,方便做一些其他的事情,例如kafka offset提交到kafka。
对比Flink整个checkpoint机制调用流程可以发现与2PC非常相似,JobMaster相当于协调者,所有的处理节点相当于执行者,start-checkpoint消息相当于pre-commit消息,每个处理节点的checkpoint相当于pre-commit过程,checkpoint ack消息相当于执行者反馈信息,最后callback消息相当于commit消息,完成具体的提交动作。那么我们应该怎么去使用这种机制来实现2PC呢?
Flink提供了CheckpointedFunction与CheckpointListener这样两个接口,CheckpointedFunction中有snapshotState方法,每次checkpoint触发执行方法,通常会将缓存数据放入状态中,可以理解为是一个hook,这个方法里面可以实现预提交,CheckpointListener中有notifyCheckpointComplete方法,checkpoint完成之后的通知方法,这里可以做一些额外的操作,例如FlinkKafakConsumerBase 使用这个来完成kafka offset的提交,在这个方法里面可以实现提交操作。
在2PC中提到如果对应流程2预提交失败,那么本次checkpoint就被取消不会执行,不会影响数据一致性,那么如果流程4提交失败了,在flink中可以怎么处理的呢?我们可以在预提交阶段(snapshotState)将事务的信息保存在state状态中,如果流程4失败,那么就可以从状态中恢复事务信息,并且在CheckpointedFunction的initializeState方法中完成事务的提交,该方法是初始化方法只会执行一次,从而保证数据一致性。
flink自定义两阶段提交?
Flink将两阶段提交协议中的通用逻辑抽象为了一个类—TwoPhaseCommitSinkFunction。
我们在实现端到端exactly-once的应用程序时,只需实现这个类的4个方法即可:
beginTransaction:开始事务时,会在目标文件系统上的临时目录中创建一个临时文件,之后将处理数据写入该文件。
preCommit:在预提交时,我们会刷新文件,关闭它并不再写入数据。我们还将为下一个Checkpoint的写操作启动一个新事务。
commit:在提交事务时,我们自动将预提交的文件移动到实际的目标目录。
abort:中止时,将临时文件删除。
如果出现任何故障,Flink将应用程序的状态恢复到最近一次成功的Checkpoint。如果故障发生在预提交成功之后,但还没来得及通知JobManager之前,在这种情况下,Flink会将operator恢复到已经预提交但尚未提交的状态。