Flink 闭包清除源码分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 0x1 摘要本文主要讲解Flink里为什么需要做闭包清除?Flink是怎么实现闭包清除的?0x2 Flink 为什么要做闭包清除大家都知道Flink中算子都是通过序列化分发到各节点上,所以要确保算子对象是可以被序列化的,很多时候大家比较喜欢直接用匿名内部类实现算子,而匿名内部类就会带来闭包问题,当匿名内部类引用的外部对象没有实现序列化接口时,就会导致内部类无法被序列化,因此Flink框架底层必须做好清除工作。

0x1 摘要

本文主要讲解Flink里为什么需要做闭包清除?Flink是怎么实现闭包清除的?

0x2 Flink 为什么要做闭包清除

大家都知道Flink中算子都是通过序列化分发到各节点上,所以要确保算子对象是可以被序列化的,很多时候大家比较喜欢直接用匿名内部类实现算子,而匿名内部类就会带来闭包问题,当匿名内部类引用的外部对象没有实现序列化接口时,就会导致内部类无法被序列化,因此Flink框架底层必须做好清除工作。

0x3 Flink 闭包清除实现

先来看一个Map算子代码:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
    @Override
    public void run(SourceContext<String> ctx) throws Exception {

    }

    @Override
    public void cancel() {

    }
});
source.map(new MapFunction<String, String>() {
    @Override
    public String map(String value) throws Exception {
        return null;
    }
});

跟进源码查看map方法:

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
            Utils.getCallLocationName(), true);
    return transform("Map", outType, new StreamMap<>(clean(mapper)));
}

重点关注clean(mapper)代码,继续跟进源码,最终会走到StreamExecutionEnvironment类的以下方法:

@Internal
public <F> F clean(F f) {
    if (getConfig().isClosureCleanerEnabled()) {
        ClosureCleaner.clean(f, true);
    }
    ClosureCleaner.ensureSerializable(f);
    return f;
}

到这里已经可以看出来闭包清除工具类ClosureCleaner,下面我们详细剖析一下此类。
先看clean方法:

public static void clean(Object func, boolean checkSerializable) {
    if (func == null) {
        return;
    }

    final Class<?> cls = func.getClass();

    // First find the field name of the "this$0" field, this can
    // be "this$x" depending on the nesting
    boolean closureAccessed = false;

    for (Field f: cls.getDeclaredFields()) {
        if (f.getName().startsWith("this$")) {
            // found a closure referencing field - now try to clean
            closureAccessed |= cleanThis0(func, cls, f.getName());
        }
    }

    if (checkSerializable) {
        try {
            InstantiationUtil.serializeObject(func);
        }
        catch (Exception e) {
            String functionType = getSuperClassOrInterfaceName(func.getClass());

            String msg = functionType == null ?
                    (func + " is not serializable.") :
                    ("The implementation of the " + functionType + " is not serializable.");

            if (closureAccessed) {
                msg += " The implementation accesses fields of its enclosing class, which is " +
                        "a common reason for non-serializability. " +
                        "A common solution is to make the function a proper (non-inner) class, or " +
                        "a static inner class.";
            } else {
                msg += " The object probably contains or references non serializable fields.";
            }

            throw new InvalidProgramException(msg, e);
        }
    }
}

方法参数:

  • func:要清除的对应
  • checkSerializable:清除完成后是否需要调用序列方法进行验证

第一步:查找闭包引用的成员变量,通过反射判断成员变量名是否包含this$来判定,代码片断:

for (Field f: cls.getDeclaredFields()) {
    if (f.getName().startsWith("this$")) {
        // found a closure referencing field - now try to clean
        closureAccessed |= cleanThis0(func, cls, f.getName());
    }
}

找到闭包引用的成员变量后,调用内部私有方法cleanThis0方法处理,看方法源码:

private static boolean cleanThis0(Object func, Class<?> cls, String this0Name) {
    This0AccessFinder this0Finder = new This0AccessFinder(this0Name);
    getClassReader(cls).accept(this0Finder, 0);

    final boolean accessesClosure = this0Finder.isThis0Accessed();

    if (LOG.isDebugEnabled()) {
        LOG.debug(this0Name + " is accessed: " + accessesClosure);
    }

    if (!accessesClosure) {
        Field this0;
        try {
            this0 = func.getClass().getDeclaredField(this0Name);
        } catch (NoSuchFieldException e) {
            // has no this$0, just return
            throw new RuntimeException("Could not set " + this0Name + ": " + e);
        }

        try {
            this0.setAccessible(true);
            this0.set(func, null);
        }
        catch (Exception e) {
            // should not happen, since we use setAccessible
            throw new RuntimeException("Could not set " + this0Name + " to null. " + e.getMessage(), e);
        }
    }

    return accessesClosure;
}

核心代码this0.set(func, null);将闭包引用置空处理,此方法还用到了ASM包,具体逻辑没完成整明白。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
Java 流计算
Flink中异步AsyncIO的实现 (源码分析)
Flink中异步AsyncIO的实现 (源码分析)
|
SQL API 流计算
Flink SQL代码补全提示(源码分析)
Flink SQL代码补全提示(源码分析)
74 0
|
SQL 存储 缓存
Flink进行Paimon写入源码分析
本文主要解析了Flink写入Paimon的核心流程。
|
存储 消息中间件 缓存
Flink进行Hudi写入源码分析
本文主要解析了Flink将DataStream写入到Hudi表的核心流程
|
SQL API Apache
Flink SQL代码补全提示(源码分析)
使用过Navicat的童鞋都知道,当我们写SQL的时候,工具会根据我们输入的内容弹出提示,这样可以很方便我们去写SQL
447 0
Flink SQL代码补全提示(源码分析)
|
SQL Java API
Flink 1.13.0 sql-client 新特性及源码分析
在 Flink 1.13.0 版本中增加了很多新特征,具体可以参考前面一篇文章,其中很重要的一点是对 sql-client 功能做了加强,支持了初始化脚本和执行 SQL 文件,SQL 客户端是直接运行和部署 SQL 流和批处理作业的便捷方法,而无需从命令行或作为 CI 的一部分来编写任何代码,这个版本大大改进了 SQL 客户端的功能。现在,SQL 客户端和SQL 脚本都支持 Java 应用程序可用的几乎所有操作(通过编程方式从 TableEnvironment 启动查询时)。这意味着 SQL 用户在其 SQL 部署中需要粘贴的代码变的更少.由于篇幅的原因这篇文章只会介绍 SQL CLIENT
Flink 1.13.0 sql-client 新特性及源码分析
|
Apache 调度 流计算
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3
245 0
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析3
|
分布式计算 数据处理 API
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析1
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析1
251 0
【Flink】(05)Apache Flink 漫谈系列 —— SocketWindowWordCount 程序执行过程源码分析1
|
SQL 缓存 JSON
Java SPI 机制在 Flink 中的应用(源码分析)
我们在使用 Flink SQL 的时候是否有过这样的疑问? Flink 提供了各种各样的 connector 我们只需要在 DML 里面定义即可运行,那它是怎么找到要执行的代码呢? 它是怎么知道代码对应关系的呢? 其实 Flink 是通过 Java 的 SPI(并不是Flink发明创造的) 机制来实现的,下面就来深入源码分析一下其实现过程. 什么是 SPI ?
Java SPI 机制在 Flink 中的应用(源码分析)
|
存储 流计算
Flink源码分析:WindowOperator底层实现
上一篇文章介绍了 Flink窗口机制的执行流程,其实WindowOperator才是真正负责window中元素存储和计算流程的核心类。这篇文章主要就是分析一下WindowOperator的执行逻辑。 apply方法 接着上一篇从apply方法入手,先来看一下apply的代码逻辑。