Java SPI 机制在 Flink 中的应用(源码分析)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 我们在使用 Flink SQL 的时候是否有过这样的疑问? Flink 提供了各种各样的 connector 我们只需要在 DML 里面定义即可运行,那它是怎么找到要执行的代码呢? 它是怎么知道代码对应关系的呢? 其实 Flink 是通过 Java 的 SPI(并不是Flink发明创造的) 机制来实现的,下面就来深入源码分析一下其实现过程.什么是 SPI ?

我们在使用 Flink SQL 的时候是否有过这样的疑问? Flink 提供了各种各样的 connector 我们只需要在 DML 里面定义即可运行,那它是怎么找到要执行的代码呢? 它是怎么知道代码对应关系的呢? 其实 Flink 是通过 Java 的 SPI(并不是Flink发明创造的) 机制来实现的,下面就来深入源码分析一下其实现过程.


什么是 SPI ?


SPI 全称(Service Provide Interface),在 JAVA 中是一个比较重要的概念,在框架设计中被广泛使用。在框架设计中,要遵循的原则是对扩展开放,对修改关闭,保证框架实现对于使用者来说是黑盒。因为框架不可能做好所有的事情,只能把共性的部分抽离出来进行流程化,然后留下一些扩展点让使用者去实现,这样不同的扩展就不用修改源代码或者对框架进行定制。也就是我们经常说的面向接口编程。


在 JDK6 里面引进的一个新的特性 ServiceLoader,从官方的文档来说,它主要是用来装载一系列的 service provider。而且ServiceLoader 可以通过 service provider 的配置文件来装载指定的 service provider。当服务的提供者,提供了服务接口的一种实现之后,我们只需要在 jar 包的 META-INF/services/ 目录里同时创建一个以服务接口命名的文件。该文件里就是实现该服务接口的具体实现类。而当外部程序装配这个模块的时候,就能通过该 jar 包 META-INF/services/ 里的配置文件找到具体的实现类名,并装载实例化,完成模块的注入。综上所述,SPI 机制实际上就是 "基于接口的编程+策略模式+配置文件" 组合实现的一种动态加载机制,在 JDK 中提供了工具类:java.util.ServiceLoader 来实现服务查找。


实现 SPI  机制,要遵循下面的一些规范:


服务提供者提供了接口的具体实现后,需要在资源文件夹中创建 META-INF/services 文件夹,并且新建一个以全类名为名字的文本文件,文件内容为实现类的全名(如下面图中的红框);


接口实现类必须在工程的 classpath 下,也就是 maven 中需要加入依赖或者 jar 包引用到工程里.


其实 Java 里使用 SPI 还是比较多的,比如我们常用的 JDBC 连接 Mysql 就是用的 SPI 机制来实现的连接逻辑,下面来看一个简单的 Demo.


接口服务提供


public interface Person {
    void eat();
}


实现类

public class Flink implements Person {
    @Override
    public void eat() {
        System.out.println(this.getClass().getSimpleName() + " 执行 eat 方法");
    }
}
public class JasonLee implements Person {
    @Override
    public void eat() {
        System.out.println(this.getClass().getSimpleName() + " 执行 eat 方法");
    }
}
public class Spark implements Person {
    @Override
    public void eat() {
        System.out.println(this.getClass().getSimpleName() + " 执行 eat 方法");
    }
}


META-INF/services 配置文件



image-20210506170559361

在 source 下面的  META-INF/services 文件夹下面创建 spi.Person 文件,文件的内容是上面实现类的全名(包名.类名)


测试类


public static void main(String[] args) {
    ServiceLoader<Person> load = ServiceLoader.load(Person.class);
    Iterator<Person> iterator = load.iterator();
    while (iterator.hasNext()) {
        Person next = iterator.next();
        next.eat();
    }
}


执行打印的结果是:


Flink 执行 eat 方法

JasonLee 执行 eat 方法

Spark 执行 eat 方法

可以看到 3 个实现类都被执行了.


然后来分析一下 ServiceLoader#load 方法的源码:


public static <S> ServiceLoader<S> load(Class<S> service) {
    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    return ServiceLoader.load(service, cl);
}


首先会获取当前线程的类加载器,然后调用另一个重载的 load 方法.


public static <S> ServiceLoader<S> load(Class<S> service,
                                        ClassLoader loader)
{
    return new ServiceLoader<>(service, loader);
}


这个 load 方法会调用 ServiceLoader 的构造方法进行变量初始化.


private ServiceLoader(Class<S> svc, ClassLoader cl) {
    service = Objects.requireNonNull(svc, "Service interface cannot be null");
    loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
    acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
    reload();
}


构造方法里面主要是完成了对 service,loader,acc 变量的赋值工作,然后调用 reload 方法


public void reload() {
    providers.clear();
    lookupIterator = new LazyIterator(service, loader);
}


providers 其实是一个 LinkedHashMap 用来做缓存用,存储的是用读取到的 services 文件夹下面实现类的实例,所以上来先清空缓存中的数据.然后创建了 LazyIterator 实例,核心的逻辑在 hasNextService 和 nextService 这两个方法中.


private boolean hasNextService() {
    if (nextName != null) {
        return true;
    }
    if (configs == null) {
        try {
            // 获取实现类的全名称
            String fullName = PREFIX + service.getName();
            if (loader == null)
                configs = ClassLoader.getSystemResources(fullName);
            else
                configs = loader.getResources(fullName);
        } catch (IOException x) {
            fail(service, "Error locating configuration files", x);
        }
    }
    while ((pending == null) || !pending.hasNext()) {
        if (!configs.hasMoreElements()) {
            return false;
        }
        // 解析实现类
        pending = parse(service, configs.nextElement());
    }
    nextName = pending.next();
    return true;
}
private S nextService() {
            if (!hasNextService())
                throw new NoSuchElementException();
            String cn = nextName;
            nextName = null;
            Class<?> c = null;
            try {
                // 通过反射去创建对象
                c = Class.forName(cn, false, loader);
            } catch (ClassNotFoundException x) {
                fail(service,
                     "Provider " + cn + " not found");
            }
            if (!service.isAssignableFrom(c)) {
                fail(service,
                     "Provider " + cn  + " not a subtype");
            }
            try {
                // 对象的实例化
                S p = service.cast(c.newInstance());
                providers.put(cn, p);
                return p;
            } catch (Throwable x) {
                fail(service,
                     "Provider " + cn + " could not be instantiated",
                     x);
            }
            throw new Error();          // This cannot happen
        }


我们必须要在 source 下面创建 META-INF/services 文件夹吗 不放在这个位置难道就加载不到吗? 答案是肯定的,如果不创建确实加载不到,因为源码里面的 PREFIX = "META-INF/services/" 这个变量是写死的,所以我们必须创建这个文件夹.这里会遍历文件里面所有的实现类然后通过反射机制去创建对象.


你可能还会发现一个问题 load 方法一开始就获取了 Thread.currentThread().getContextClassLoader() 上下文的类加载器,然后一直往后面传递,最后在 forName 里面用到了,那如果不把 loader 传进来行不行? 答案是确实不行,因为 ServiceLoader 是一个基础类,它是在 java.util 这个包下面的,所以它是由 BootstrapClassLoader 来加载的.而我们自定义的实现类是由 AppClassLoader 去加载的,BootstrapClassLoader 这个类加载器是加载不到我们定义的类的,所以这里 getContextClassLoader 其实是打破了双亲委派模型的.


Flink 中 SPI 实现


在 Flink 源码中大量使用了 Java 的 SPI 机制,比如在 Flink-connector ,Flink-formats ,Flink-metrics 等模块都可以看到 SPI 的身影.比如 Json Format



image-20210506182553881

那么 Flink 是如何保证正确的 TableFactory 实现类被加载的呢?直接来看 TableFactoryService#findSingleInternal 方法的源码


private static <T extends TableFactory> T findSingleInternal(
        Class<T> factoryClass,
        Map<String, String> properties,
        Optional<ClassLoader> classLoader) {
    // 加载所有的实现类
    List<TableFactory> tableFactories = discoverFactories(classLoader);
    // 过滤出满足条件的
    List<T> filtered = filter(tableFactories, factoryClass, properties);
    if (filtered.size() > 1) {
        throw new AmbiguousTableFactoryException(
                filtered, factoryClass, tableFactories, properties);
    } else {
        return filtered.get(0);
    }
}


其中 discoverFactories 方法用来发现并加载 Table 的服务提供类,filter 方法则是用来筛选出满足条件的 TableFactory 的实现类。前者最终调用了 ServiceLoader 的相关方法,如下:


private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) {
    try {
        List<TableFactory> result = new LinkedList<>();
        ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader());
        ServiceLoader.load(TableFactory.class, cl).iterator().forEachRemaining(result::add);
        return result;
    } catch (ServiceConfigurationError e) {
        LOG.error("Could not load service provider for table factories.", e);
        throw new TableException("Could not load service provider for table factories.", e);
    }
}


可以看到 Java 的 SPI 机制就是在这里用的,查找并加载 TableFactory 所有的实现类,然后保存在 List 里面.然后在过滤中满足条件的一个.


总结:


SPI 机制的优缺点都非常明显,优点是实现解耦,使得接口的定义和具体业务实现分离,易于动态扩展,帮忙我们灵活的插件化开发.


缺点也很明显,不能按需加载,虽然 ServiceLoader 做了延迟加载,但是会把接口的实现类全部加载并实例化一遍,可能会造成浪费,获取某个实现类的方式比较单一,只能通过 iterator 形式获取,不能根据参数的形式获取.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2天前
|
Java 数据库
JAVA并发编程-一文看懂全部锁机制
曾几何时,面试官问:java都有哪些锁?小白,一脸无辜:用过的有synchronized,其他不清楚。面试官:回去等通知! 今天我们庖丁解牛说说,各种锁有什么区别、什么场景可以用,通俗直白的分析,让小白再也不怕面试官八股文拷打。
|
4天前
|
Java 程序员 开发者
Java中的异常处理机制深度解析
本文旨在深入探讨Java中异常处理的核心概念与实际应用,通过剖析异常的本质、分类、捕获及处理方法,揭示其在程序设计中的关键作用。不同于常规摘要,本文将直接切入主题,以简明扼要的方式概述异常处理的重要性及其在Java编程中的应用策略,引导读者快速把握异常处理的精髓。
|
3天前
|
安全 Java 开发者
Java并发编程中的锁机制解析
本文深入探讨了Java中用于管理多线程同步的关键工具——锁机制。通过分析synchronized关键字和ReentrantLock类等核心概念,揭示了它们在构建线程安全应用中的重要性。同时,文章还讨论了锁机制的高级特性,如公平性、类锁和对象锁的区别,以及锁的优化技术如锁粗化和锁消除。此外,指出了在高并发环境下锁竞争可能导致的问题,并提出了减少锁持有时间和使用无锁编程等策略来优化性能的建议。最后,强调了理解和正确使用Java锁机制对于开发高效、可靠并发应用程序的重要性。
13 3
|
6天前
|
Java 开发者
Java中的异常处理机制
本文将深入探讨Java中异常处理的基本概念和机制,包括try-catch-finally块、throws关键字以及自定义异常类的使用方法。我们将通过实例演示如何在Java程序中有效地捕获和处理异常,确保程序的健壮性和稳定性。无论您是Java编程的初学者还是有一定经验的开发者,本文都能为您提供有价值的参考。
|
4天前
|
Java 编译器 开发者
Java中的异常处理机制:从基础到高级应用
在Java编程中,异常处理是一个至关重要的部分,它帮助开发者处理运行时错误,确保程序的鲁棒性和可靠性。本文将深入探讨Java异常处理的基础和高级应用,包括异常的分类、捕获和处理方法,以及如何使用自定义异常和异常链。通过实际案例,我们将展示如何有效利用Java的异常处理机制来提升代码质量。
|
1天前
|
监控 算法 Java
Java中的内存管理:理解Garbage Collection机制
本文将深入探讨Java编程语言中的内存管理,特别是垃圾回收(Garbage Collection, GC)机制。我们将从基础概念开始,逐步解析垃圾回收的工作原理、不同类型的垃圾回收器以及它们在实际项目中的应用。通过实际案例,读者将能更好地理解Java应用的性能调优技巧及最佳实践。
10 0
|
2天前
|
Kubernetes Cloud Native Java
探索未来编程新纪元:Quarkus带你秒建高性能Kubernetes原生Java应用,云原生时代的技术狂欢!
Quarkus 是专为 Kubernetes 设计的全栈云原生 Java 框架,凭借其轻量级、快速启动及高效执行特性,在 Java 社区脱颖而出。通过编译时优化与原生镜像支持,Quarkus 提升了应用性能,同时保持了 Java 的熟悉度与灵活性。本文将指导你从创建项目、编写 REST 控制器到构建与部署 Kubernetes 原生镜像的全过程,让你快速上手 Quarkus,体验高效开发与部署的乐趣。
9 0
|
2天前
|
Java 开发者
Java中的异常处理机制:理解与应用
在Java编程中,异常处理是确保程序稳定性和可靠性的关键。本文将深入探讨Java的异常处理机制,包括异常的分类、捕获和处理方法,以及如何有效地使用这些工具来提高代码质量。
|
2天前
|
Java 程序员 开发者
Java中的异常处理机制:从基础到高级应用
在Java编程中,异常处理是确保程序稳定性和可靠性的关键。本文将深入探讨Java异常处理的基本概念、不同类型的异常、常用的异常处理技术以及一些最佳实践。通过阅读本文,您将能够更好地理解和运用Java异常处理机制,提升您的编程技能。
8 0
|
2月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
705 7
阿里云实时计算Flink在多行业的应用和实践