【精通函数式编程】(十一) CompletableFuture、反应式编程源码解析与实战

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

前言📫 作者简介:小明java问道之路,专注于研究计算机底层,就职于金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的设计和架构📫

🏆 Java领域优质创作者、阿里云专家博主、华为云专家🏆

🔥 如果此文还不错的话,还请👍关注、点赞、收藏三连支持👍一下博主哦

本文导读

Java代码为了更好的发展和性能,开发了 异步编程的模式,Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

一、同步与异步

1、为什么要有异步

在Java发展的这20年,他只做了一件事不被淘汰,为了不被淘汰不断的更新jdk的版本,以便使用计算机硬件、操作系统以及新的编程概念。

Java一开始提供了 synchronized 锁、Runable,后面java5有引入了 java.util.concurrent 包,java7中的 forkjoin 框架 java.util.concurrent.RecursiveTask,到java8中Stream流、lambda表达式的支持,这一切都是为了支持高并发。

即便如此,多线程虽然极大的提升了性能,如果合理的使用线程池的话,好处,第一可以降低资源消耗,重复利用已创建的线程;第二:提高响应速度,任务可以不需要等到线程创建就能立即执行;第三:提高线程的可管理性。统一分配、调优和监控。但是线程池也不是没有缺点,使用k个线程的线程池就只能并发的执行k个任务,其他任务还是回休眠或者阻塞

这时候如果有线程不和其他任务相关联,又可以不用阻塞,就好了。Java8考虑到了,充分发挥了计算机硬件的处理能力,异步API 应运而生。

2、什么是同步?什么是异步?

同步就是 a 程序强依赖 b 程序,我必须等到你的回复或者执行完毕,才能做出下一步响应,类似于编程中程序被解释器(JVM)顺序执行一样(加载 > 验证 > 准备 > 解析 > 初始化);

异步则相反,a 程序不强依赖 b 程序,响应的时间也无所谓,无论你返回还是不返回,a 程序都能继续运行,也就是说我不存在等待对方的概念,a 程序就是 异步非阻塞的。

下面举一个例子就说明什么是同步、什么是异步

网络异常,图片无法展示
|

异步编程涉及两种风格,Future风格API 和反应式风格API ,Future<Integer> fun(int a){},fun( a , x-> {}),这两个模式的实战会在后面小结讲解。

二、Future异步编程

1、Future 接口

Java5中就引入了Future 接口,他的涉及初衷就是异步计算,例如我们结算一个商户下的所有订单,这个时候并不需要for循环去累加,Future 接口使用的时候只需要封装 Callable中,再提交给ExecutorService。

2、Future 接口的使用

Future 接口的使用看下Java8之前是如何使用异步的

public static void main(String[] args) throws Exception {
    List<OrderInfo> orderInfos = Arrays.asList(new OrderInfo("123", BigDecimal.ONE),
            new OrderInfo("456", BigDecimal.TEN), new OrderInfo("789", BigDecimal.TEN));
    // 创建 ExecutorService 通过它可以向线程池提交任务
    ExecutorService executorService = Executors.newCachedThreadPool();
    // 异步操作的同时,可以进行其他操作
    Future<BigDecimal> decimalFuture = executorService.submit(new Callable<BigDecimal>() {
        @Override
        public BigDecimal call() throws Exception {
            return reduceAmt(orderInfos);
        }
    });
    // Java8 写法
    Future<BigDecimal> decimalFuture = executorService.submit(() -> reduceAmt(orderInfos));
    System.out.println(decimalFuture.get());
}
private static BigDecimal reduceAmt(List<OrderInfo> orderInfos) {
    return orderInfos.stream()
            .map(OrderInfo::getOrderAmt)
            .reduce(BigDecimal.ZERO, BigDecimal::add);
}

异步编程可以在 ExecutorService 中,以并发的方式调用另一个线程执行操作,后续调用get() 方法获取操作结果,如果操作完成会立刻返回,如果操作没有完成则回阻塞线程,直到操作完成返回。

网络异常,图片无法展示
|

3、Future 接口的缺陷(局限性)

Future接口 还提供了方法来检测异步计算是否已经结束(isDone() 方法),等待异步操作结束。

但是当长时间计算任务完成时,该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并。

就会发生很多性能问题:1、将两个异步计算合并为一个(这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。)2、此时就要等待 Future 集合中的所有任务都完成。3、当Future的完成事件发生时会收到通知,使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果,每一步都需要等待。

三、CompletableFuture 接口详解

1、CompletableFuture 的创建

CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值,runAsync适合创建不需要返回值的计算任务

通过该supplyAsync 函数创建的CompletableFuture实例会异步执行当前传入的计算任务,在调用端,则可以通过get或join获取最终计算结果。

同事直接new(构造函数创建)也是可以的,下面通过一个实战小例子看下,高并发高性能的添加购物车结构如何设计

import java.util.concurrent.CompletableFuture;
import org.springframework.core.task.AsyncTaskExecutor;
public class CompletableFutureImpl {
    @Autowired
    @Qualifier("asyncExecutor")
    private AsyncTaskExecutor asyncTaskExecutor;
    public DefaultResponseVO addGoodsCart(HttpServletRequest request, @Valid AddCartReqVO reqVo) {
        // 添加购物车
        GetCartItemEntity respVO = mallCartProcess.addGoodsCart(buildAddGoodsCartReqVO(reqBo));
        /**
         * 异步刷新到购物车
         */
        CompletableFuture voidCompletableFuture = CompletableFuture.runAsync(() -> syncAddCacheCheck(respVO), asyncTaskExecutor);
        // 通过supplyAsync 函数创建的
        CompletableFuture<Object> uCompletableFuture = CompletableFuture.supplyAsync(() -> syncAddCacheCheck(respVO));
        // 构造函数创建 
        CompletableFuture completableFuture = new CompletableFuture().runAsync(() -> syncAddCacheCheck(respVO));
        return new DefaultResponseVO(code, msg, respVO);
    }
    /**
     * 添加购物车缓存(异步刷新到购物车)
     */
    public void syncAddCacheCheck(GetCartItemEntity cartItemEntity) {
        try {
            // 添加购物车缓存(异步刷新到购物车)
            mallCartProcess.getCartInfo(cartItemEntity);
        } catch (Exception e) {
            logger.error("syncAddCache error", e);
        }
    }
}

2、CompletableFuture.supplyAsync 源码分析

本小节讲解CompletableFuture的底层实现

网络异常,图片无法展示
|

上面为java 8中 supplyAsync 函数的实现源码。可以看到,当 supplyAsync 入参只有 supplier 时,会默认使用asyncPool作为线程池(一般情况下为ForkJoinPool的commonPool),并调用内部方法asyncSupplyStage执行具体的逻辑。在 asyncSupplyStage 方法中,程序会创建一个空的CompletableFuture 返回给调用方。同时该 CompletableFuture 和传入的 supplier 会被包装在一个AsyncSupply 实例对象中,然后一起提交到线程池中进行处理。

值得注意的是,当supplyAsync返回时,调用方只会拿到一个空的CompletableFuture实例。看到这里,我们可以猜测,当计算最终完成时,计算结果会被set到对应的CompletableFuture的result字段中。调用方通过join或者get就能取到该CompletableFuture的result字段的值。所以,虽然实际创建CompletableFuture的线程和进行任务计算的线程不同,但是最终会通过result来进行结果的传递。这种方式与传统的Future中结果传递方式类似(计算线程set值,使用线程get值)。

网络异常,图片无法展示
|

上面为java 8中 AsyncSupply 的实现源码,AsyncSupply的源码很简单。首先,它实现了Runnable接口,所以被提交到线程池中后,工作线程会执行其run()方法。通过对AsyncSupply中run方法的分析,也基本证实我们之前的猜测。即计算任务由工作线程调用run方法执行,并设置到CompletableFuture的结果中。其他线程中的使用方,则可以调用该CompletableFuture的join或者get方法获取其结果。

因此,我们只需要搞清楚其run()中的实现即可。在 run() 中,程序首先检查了传入的CompletableFuture 和 Supplier 是否为空,如果均不为空,再检查 CompletableFuture 的 d.result是否为空,如果不为空,则说明 CompletableFuture 的值已经被其他线程主动设置过了(这也是CompletableFuture与Future最大的不同之处),因此这里就不会再被重新设置一次。如果 d.result 为空,则调用Supplier(源码中的 f 变量)的get()方法,执行具体的计算,然后通过 completeValue 方法将结果设置到CompletableFuture中。最后,调用CompletableFuture的postComplete()方法,执行连接到当前CompletableFuture上的后置任务。

3、CompletableFuture.runAsync 源码分析

网络异常,图片无法展示
|

通过上面的源码可以看出,runAsync也会生成一个空的CompletableFuture,并包装在AsyncRun中提交到线程池中执行。这与supplyAsync是完全一致的。由于Runnable没有返回值,这里返回的CompletableFuture的结果值是Void类型的。

网络异常,图片无法展示
|

AsyncRun的run中,计算的执行是通过调用传入的Runnable(源码中的 f 变量)的run方法进行的。由于没有返回值,所以这里在设置CompletableFuture的值时,使用其completeNull()方法,设置一个特殊的空值标记。 设计方面和supplyAsync一致

4、CompletableFuture API 实战

thenApply、thenAccept、thenRun

thenApply 提交的任务类型需遵从Function签名,也就是有入参和返回值,其中入参为前置任务的结果。thenAccept 提交的任务类型需遵从Consumer签名,也就是有入参但是没有返回值,其中入参为前置任务的结果。thenRun 提交的任务类型需遵从Runnable签名,即没有入参也没有返回值。

thenCombine、thenCompose

thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture。嵌套获取层级也越来越深。因此,需要一种方式,能将这种嵌套模式展开,使其没有那么多层级。thenCompose的主要目的就是解决这个问题(这里也可以将thenCompose的作用类比于stream接口中的flatMap,因为他们都可以将类型嵌套展开)。

whenComplete、handle

whenComplete主要用于注入任务完成时的回调通知逻辑(获得的结果是前置任务的结果,whenComplete中的逻辑不会影响计算结果)。handle与handle接收的处理函数有返回值,而且返回值会影响最终获取的计算结果(产生了新的结果)

四、反应式编程

1、什么是反应式(resctive)编程

反应式编程是最近几年才提出的概念,主要有四个特征:响应式,反应式编程的响应速度应该很快,而且是稳定可预测的。韧性,系统出现失败时,任然可以继续响应服务,任何一个组件都能以异步的方式想其他组件分发任务。弹性,影响代码响应的因素的代码(系统)的负载能力,反应式编程可以增加分配的资源,受流量影响后有自动适配的能力,服务更大的负载。消息驱动,各个组件之间松耦合,组件隔离,跨组件通信使用异步消息传递。

反应式(resctive)编程在应用层的主要特征是任务以异步的方式执行,非阻塞的处理事件流,充分利用多核CPU的特点,大多反应式框架(RxJava等)都可以独立开辟线程池,用于执行阻塞式操作,主线程池中运行都是无阻塞的。

2、反应式流(Flow API)

2.1、发布订阅模式

Future CompletableFuture 的思维模式是计算的执行是独立且并发的。使用 get()方法可以在执行结束后获取 Future 对象的执行结果。因此,Future 是一个一次性对象,它只能从头到尾执行代码一次。

与此相反,反应式编程的思维模式是类 Future 的对象随着时间的推移可以产生很多的结果。举个例子是 Web 服务器的监听组件对象。该组件监听来自网络的 HTTP请求,并根据请求的内容返回相应的数据。

网络异常,图片无法展示
|

2.2、Flow API 源码解析

Java9 使用 java.util.concurrent.Flow 提供的接口对反应式编程进行建模,实现了名为“发布-订阅”的模型(也叫协议,简写为 pub-sub )

反应式编程有三个主要的概念,分别是:订阅者可以订阅的发布者;名为订阅的连接;消息(也叫事件),它们通过连接传输。

反应式流(Flow API)规范可以总结为4个接口:Publisher(发布者)、Subscriber(订阅者)、Subscription(订阅)和Processor(处理者)

网络异常,图片无法展示
|

Publisher负责生成数据,并将数据发送给 Subscription(每个Subscriber对应一个Subscription)。Publisher接口声明了一个方法 subscribe(),Subscriber可以通过该方法向 Publisher发起订阅。

网络异常,图片无法展示
|

一旦Subscriber订阅成功,就可以接收来自Publisher的事件。这些事件是通过Subscriber接口上的方法发送的

网络异常,图片无法展示
|

Subscriber的第一个事件是通过对 onSubscribe()方法的调用接收的。Publisher调用 onSubscribe() 方法时,会将Subscription对象传递给 Subscriber。通过Subscription,Subscriber可以管理其订阅情况

网络异常,图片无法展示
|

Subscriber 可以通过调用 request() 方法来请求 Publisher 发送数据,或者通过调用 cancel()方法表明它不再对数据感兴趣并且取消订阅。当调用 request() 时,Subscriber 可以传入一个long类型的数值以表明它愿意接受多少数据。这也是回压能够发挥作用的地方,以避免Publisher 发送多于 Subscriber能够处理的数据量。在 Publisher 发送完所请求数量的数据项之后,Subscriber 可以再次调用 request()方法来请求更多的数据。

Subscriber 请求数据之后,数据就会开始流经反应式流。Publisher 发布的每个数据项都会通过调用 Subscriber 的 onNext()方法递交给 Subscriber。如果有任何错误,就会调用 onError()方法。如果 Publisher 没有更多的数据,也不会继续产生更多的数据,那么将会调用 Subscriber 的onComplete() 方法来告知 Subscriber 它已经结束

反应式流规范的接口本身并不支持以函数式的方式组成这样的流。Reactor 项目是反应式流规范的一个实现,提供了一组用于组装反应式流的函数式API。有我们自己实现。

2.3、Flow API 实战

FlowImpl :创建Publisher并向其订阅TempSubscriber

public class FlowImpl {
    public static void main(String[] args) {
        // 创建 Publisher 并向其订阅 Subscriber
        getOrderAmt("20220727123").subscribe(new SubscriberImpl());
    }
    // Publisher是个函数式接口
    private static Flow.Publisher<OrderInfo> getOrderAmt(String orderId) {
        return subscriber -> subscriber.onSubscribe(new SubscriptionImpl(subscriber, orderId));
    }
}

Subscription接口:实现向 Subscriber 发送 OrderInfo Steam

public class SubscriptionImpl implements Flow.Subscription {
    private final Flow.Subscriber<? super OrderInfo> subscriber;
    private final String orderId;
    public SubscriptionImpl(Flow.Subscriber<? super OrderInfo> subscriber, String orderId) {
        this.subscriber = subscriber;
        this.orderId = orderId;
    }
    @Override
    public void request(long n) {
        // 另起一个线程向 subscriber 发送下一个元素
        Executors.newSingleThreadExecutor().submit(() -> {
            for (long i = 0L; i < n; i++) // subscriber 每处理一个请求执行一次循环
                try {
                    // 将当前 订单号 发送给 Subscriber
                    subscriber.onNext(OrderInfo.reduceAmt(orderId));
                } catch (Exception e) {
                    // 查询报错将这个报错信息传给 Subscriber
                    subscriber.onError(e);
                    e.printStackTrace();
                    break;
                }
        });
    }
    @Override
    public void cancel() {
        // 如果 Subscription 被取消了,向 subscriber 发送一个完成信号
        subscriber.onComplete();
    }
}

Subscriber接口:实现打印输出收到的 订单 数据

public class SubscriberImpl implements Flow.Subscriber<OrderInfo> {
    private Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
    @Override
    public void onNext(OrderInfo orderInfo) {
        System.out.println(orderInfo);
        subscription.request(1);
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println(throwable.getMessage());
    }
    @Override
    public void onComplete() {
        System.out.println("Done!");
    }
}

总结

Java代码为了更好的发展和性能,开发了 异步编程的模式,Future异步编程和CompletableFuture 接口都可以实现异步编程,我们通过源码深入理解其原理和设计的思想,Java9中提供了反应式编程(Flow API)我们分析其源码并提供一个响应式查询实战。

相关文章
|
23天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
64 2
|
1月前
|
自然语言处理 编译器 Linux
|
24天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
1月前
|
安全 程序员 API
|
1月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
1月前
|
设计模式 安全 Java
Java编程中的单例模式深入解析
【10月更文挑战第31天】在编程世界中,设计模式就像是建筑中的蓝图,它们定义了解决常见问题的最佳实践。本文将通过浅显易懂的语言带你深入了解Java中广泛应用的单例模式,并展示如何实现它。
|
1月前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
47 3
|
1月前
|
JavaScript API 开发工具
<大厂实战场景> ~ Flutter&鸿蒙next 解析后端返回的 HTML 数据详解
本文介绍了如何在 Flutter 中解析后端返回的 HTML 数据。首先解释了 HTML 解析的概念,然后详细介绍了使用 `http` 和 `html` 库的步骤,包括添加依赖、获取 HTML 数据、解析 HTML 内容和在 Flutter UI 中显示解析结果。通过具体的代码示例,展示了如何从 URL 获取 HTML 并提取特定信息,如链接列表。希望本文能帮助你在 Flutter 应用中更好地处理 HTML 数据。
110 1
|
1月前
|
前端开发 中间件 PHP
PHP框架深度解析:Laravel的魔力与实战应用####
【10月更文挑战第31天】 本文作为一篇技术深度好文,旨在揭开PHP领域璀璨明星——Laravel框架的神秘面纱。不同于常规摘要的概括性介绍,本文将直接以一段引人入胜的技术剖析开场,随后通过具体代码示例和实战案例,逐步引导读者领略Laravel在简化开发流程、提升代码质量及促进团队协作方面的卓越能力。无论你是PHP初学者渴望深入了解现代开发范式,还是经验丰富的开发者寻求优化项目架构的灵感,本文都将为你提供宝贵的见解与实践指导。 ####
|
1月前
|
前端开发 JavaScript
JavaScript新纪元:ES6+特性深度解析与实战应用
【10月更文挑战第29天】本文深入解析ES6+的核心特性,包括箭头函数、模板字符串、解构赋值、Promise、模块化和类等,结合实战应用,展示如何利用这些新特性编写更加高效和优雅的代码。
43 0

推荐镜像

更多