CompletableFuture异步编排,你还不会?

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 Tair(兼容Redis),内存型 2GB
简介: 本文介绍了同步与异步编程的概念,探讨了在复杂业务场景中使用异步编排的重要性。通过对比 `Future` 与 `CompletableFuture`,详细讲解了 `CompletableFuture` 的多种方法,如 `runAsync`、`supplyAsync`、`whenComplete`、`exceptionally` 等,并展示了如何通过 `CompletableFuture` 实现异步任务的组合与异常处理。最后,通过实战案例演示了如何利用线程池与 `CompletableFuture` 优化商品详情页的查询效率,显著减少响应时间。

一、写在开头

我们先给出同步和异步的概念

  • 同步:同步编程指的是程序在执行某个任务时,会阻塞当前线程,直到任务完成。在这种模式下,后续的代码会等待任务完成之后再继续执行。
  • 异步:异步编程指的是程序在执行某个任务时,不会阻塞当前线程,而是将任务交给其他线程去执行,当前线程可以继续执行其他操作。任务完成后,会通过回调、通知等方式处理结果。

二、使用异步编排的原因

问题举例

查询商品详情页的逻辑非常复杂,此时数据的获取涉及到多个RPC远程调用,那么必然需要花费更多的时间。假如商品详情页的查询。

  • 由图,用户需要3.5s后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这4步操作,也许只需要1.5s即可完成响应
  • 在这里需要注意的是,在将这四个任务异步化之后,需要注意的是这四个任务之间的联系,比如获取用户的基本信息这个任务,我们需要获取商品的分类信息执行完返回的结果传入才能执行,这时候,我们就必须保证这两个任务之间的排序问题,怎么解决呢?请看下面。

Future

Java在JDK1.5之后引入了JUC包,里面包含了一个接口:Future,这算是Java中实现异步编程的开山鼻祖, 然而,Future 的局限性在于它的功能相对简单,无法很好地处理复杂的异步任务链。Future 的局限性在这里给出,有兴趣的朋友可以具体去了解,这里不重点说Future。

  • 阻塞获取结果:future.get() 方法是阻塞的,无法在不阻塞的情况下处理结果。
  • 缺乏组合功能:无法轻松地组合多个异步任务,例如任务链、并行任务等。
  • 异常处理复杂:异常处理不够简洁,需要手动捕获并处理异常。

CompletableFuture

随着 Java 版本的演进,**CompletableFuture**** **在 JDK 8 中引入,提供了更强大和灵活的异步编程支持。它不仅可以用来表示异步计算的结果,还提供了许多方便的方法来处理异步任务的执行和结果处理。以下是 CompletableFuture 的一些主要用途:

  • 异步计算CompletableFuture 可以表示一个异步计算的结果,允许在计算完成后继续执行其他操作。例如,CompletableFuture.supplyAsync 可以异步执行一个任务并返回结果。
  • 非阻塞操作:与传统的 Future 需要使用 get 方法阻塞等待结果不同,CompletableFuture 提供了非阻塞的方法,例如 thenApplythenAcceptthenRun,允许在计算完成后执行回调函数。
  • 组合多个异步任务CompletableFuture 提供了方法来组合多个异步任务,例如 thenCombinethenComposeallOf 等,允许对多个异步任务进行组合操作。
  • 处理异常CompletableFuture 提供了方法来处理异步计算中的异常,例如 exceptionallyhandle
  • 构建复杂的异步流:通过链式调用,CompletableFuture 允许构建复杂的异步流,简化异步编程模型,提高代码的可读性和可维护性。

三、CompletableFuture学习

下面我们在具体学习一下CompletableFuture的使用

1. runAsync  与  supplyAsync

  • runAsync:runAsync 方法用于启动一个没有返回值的异步任务。该方法通常用于那些不需要返回结果的任务,例如记录日志、发送通知等。
  • supplyAsync  :supplyAsync 方法用于启动一个有返回值的异步任务。该方法通常用于需要返回结果的任务,例如计算结果、获取数据等。

java

代码解读

复制代码

public class Demo01 {
    public static void main(String[] args) throws Exception {
        supplyAsync();
        System.out.println("main来了");
        SleepUtils.sleep(3);
    }
    //发起一个异步请求
    public static void runAsync() {
        CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"你好runAsync");
                SleepUtils.sleep(10);
            }
        });
    }
    //发起一个异步请求
    public static void supplyAsync() throws Exception {
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                System.out.println(Thread.currentThread().getName() + "你好supplyAsync");
                SleepUtils.sleep(2);
                return "java0518";
            }
        });
        //这是一个阻塞方法
        System.out.println(Thread.currentThread().getName()+"--->:"+supplyFuture.get());
    }

}

CompletableFutureget() 方法是一个阻塞方法,因为它会等待异步任务的完成并返回结果。如果异步任务尚未完成,调用 get() 的线程会被阻塞,直到任务完成或抛出异常。

2.whenComplete 与 exceptionally

whenComplete

  • 类似于 Vue 中发起异步请求之后的 then 方法。
  • 无论任务是否有异常都会执行。
  • 回调方法接收两个参数:任务的结果(如果任务没有返回值则为 Void)和异常(如果没有异常则为 null)。

exceptionally

  • 类似于 Vue 中发起异步请求之后的 catch 方法。
  • 只有当任务发生异常时才会执行。
  • 可以用于处理异步任务中的异常,并返回一个替代结果。

java

代码解读

复制代码

public class Demo02 {
    public static void main(String[] args) throws Exception {
        runAsync();
        System.out.println("main来了");
        SleepUtils.sleep(3);
    }
    //发起一个异步请求
    public static void runAsync() {
        CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"你好runAsync");
                //int a =10/0;
            }
        }).whenComplete(new BiConsumer<Void, Throwable>() {
            @Override
            public void accept(Void acceptVal, Throwable throwable) {
                System.out.println("执行异步之后whenComplete接受值"+acceptVal);
                System.out.println("执行异步之后whenComplete异常值"+throwable);
            }
        }).exceptionally(new Function<Throwable, Void>() {
            @Override
            public Void apply(Throwable throwable) {
                System.out.println("执行异步之后exceptionally异常值"+throwable);
                return null;
            }
        });
    }

}

3.whenComplete 和 whenCompleteAsync

whenComplete(同步)

  • 用途:用于处理任务完成后的回调,无论任务是否成功或失败。
  • 执行线程:在发起异步任务的线程中执行。

whenCompleteAsync(异步)

  • 用途:用于处理任务完成后的回调,无论任务是否成功或失败。
  • 执行线程:在异步线程池中执行,通常不会阻塞发起异步任务的线程。

java

代码解读

复制代码

public class Demo03 {
    public static void main(String[] args) throws Exception {
        //runAsync1();
        runAsync2();
        System.out.println("main来了");
        SleepUtils.sleep(3);
    }
    //发起一个异步请求
    public static void runAsync1() {
        CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"你好runAsync");
            }
        }).whenComplete(new BiConsumer<Void, Throwable>() {
            @Override
            public void accept(Void acceptVal, Throwable throwable) {
                System.out.println(Thread.currentThread().getName()+"异步之后接受值"+acceptVal);
            }
        });
    }

    //发起一个异步请求
    public static void runAsync2() {
        CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"你好runAsync");
            }
        }).whenCompleteAsync(new BiConsumer<Void, Throwable>() {
            @Override
            public void accept(Void acceptVal, Throwable throwable) {
                System.out.println(Thread.currentThread().getName()+"执行异步之后async接受值"+acceptVal);
            }
        });
    }
}

4.thenAccept 和 thenApply

  • thenAccept:用于处理异步任务的结果,但不返回新的值。适合用于打印日志、执行操作等场景。
  • thenApply:用于处理异步任务的结果,并返回新的值。适合用于转换数据、链式处理等场景。

java

代码解读

复制代码

public class Demo04 {
    public static void main(String[] args) throws Exception {
        supplyAsync();
        System.out.println("main来了");
        SleepUtils.sleep(8);
    }
    //发起一个异步请求
    public static void supplyAsync() throws Exception {
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                System.out.println(Thread.currentThread().getName() + "你好supplyAsync");
                SleepUtils.sleep(2);
                return "java0518";
            }
        });

        supplyFuture.thenAccept(new Consumer<String>() {
            @Override
            public void accept(String acceptVal) {
                SleepUtils.sleep(2);
                System.out.println(Thread.currentThread().getName()+"第一个accept接受到的值"+acceptVal);
            }
        });

        supplyFuture.thenAccept(new Consumer<String>() {
            @Override
            public void accept(String acceptVal) {
                SleepUtils.sleep(2);
                System.out.println(Thread.currentThread().getName()+"第二个accept接受到的值"+acceptVal);
            }
        });
    }
}

java

代码解读

复制代码

public class Demo05 {
    public static void main(String[] args) throws Exception {
        supplyAsync();
        System.out.println("main来了");
        SleepUtils.sleep(8);
    }
    //发起一个异步请求
    public static void supplyAsync() throws Exception {
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                System.out.println(Thread.currentThread().getName() + "你好supplyAsync");
                SleepUtils.sleep(2);
                return "java0518";
            }
        });

        CompletableFuture<String> thenApply1 = supplyFuture.thenApply(new Function<String, String>() {
            @Override
            public String apply(String acceptVal) {
                SleepUtils.sleep(2);
                System.out.println(Thread.currentThread().getName() + "第一个thenApply接受到的值" + acceptVal);
                return "apply1" + acceptVal;
            }
        });

        CompletableFuture<String> thenApply2 = supplyFuture.thenApply(new Function<String, String>() {
            @Override
            public String apply(String acceptVal) {
                SleepUtils.sleep(2);
                System.out.println(Thread.currentThread().getName() + "第二个thenApply接受到的值" + acceptVal);
                return "apply2" + acceptVal;
            }
        });

        System.out.println(thenApply1.get());
        System.out.println(thenApply2.get());
    }
}

5.thenApply 和  thenApplyAsync

thenApply

  • 使用调用它的 CompletableFuture 的同一个线程或完成该 CompletableFuture 的线程来执行处理函数。
  • 如果前面的阶段已经完成,处理函数可能会在调用线程中被同步执行。
  • 适用于对执行时间要求不严格且处理时间较短的任务。

thenApplyAsync

  • 使用 ForkJoinPool.commonPool() 或者自定义的线程池来异步执行处理函数。
  • 总是异步地执行处理函数,不管前面的阶段是否已经完成。
  • 适用于需要异步执行、处理时间较长或需要非阻塞执行的任务。
  • 执行上下文thenApply 在同一个线程或调用线程中同步执行,而 thenApplyAsync 总是异步地执行,使用公共线程池或自定义的线程池。
  • 适用场景thenApply 适用于对执行时间要求不严格的短任务,thenApplyAsync 适用于需要非阻塞异步执行的长任务或需要使用特定线程池的任务。

java

代码解读

复制代码

public class Demo06 {
    public static void main(String[] args) throws Exception {
        supplyAsync();
        System.out.println("main来了");
        SleepUtils.sleep(8);
    }

    //发起一个异步请求
    public static void supplyAsync() throws Exception {
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                System.out.println(Thread.currentThread().getName() + "你好supplyAsync");
                SleepUtils.sleep(2);
                return "java0518";
            }
        });

        CompletableFuture<String> thenApply1 = supplyFuture.thenApplyAsync(new Function<String, String>() {
            @Override
            public String apply(String acceptVal) {
                SleepUtils.sleep(2);
                System.out.println(Thread.currentThread().getName() + "第一个thenApply接受到的值" + acceptVal);
                return "apply1" + acceptVal;
            }
        });

        CompletableFuture<String> thenApply2 = supplyFuture.thenApplyAsync(new Function<String, String>() {
            @Override
            public String apply(String acceptVal) {
                SleepUtils.sleep(2);
                System.out.println(Thread.currentThread().getName() + "第二个thenApply接受到的值" + acceptVal);
                return "apply2" + acceptVal;
            }
        });

        System.out.println(thenApply1.get());
        System.out.println(thenApply2.get());
    }
}

6.总结运用

  • 对于不需要返回结果的任务,使用 runAsync
  • 对于需要返回结果的任务,使用 supplyAsync
  • 在任务完成后,无论是否成功,都要执行某些操作时,使用 whenCompletewhenCompleteAsync
  • 仅在任务发生异常时处理异常并提供替代结果,使用 exceptionally
  • 处理任务结果但不返回新值时,使用 thenAccept
  • 处理任务结果并返回新值时,使用 thenApplythenApplyAsync。根据是否需要异步执行选择同步或异步版本。

带Async代表异步,异步就是启动另外线程去执行。有xxpply的就代表有返回值

java

代码解读

复制代码

public class Demo07 {
    public static void main(String[] args) throws Exception {
        supplyAsync();
        System.out.println("main来了");
        SleepUtils.sleep(8);
    }

    //发起一个异步请求
    public static void supplyAsync() throws Exception {
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName() + "你好supplyAsync");
            SleepUtils.sleep(2);
            return "java0518";
        });

        CompletableFuture<String> thenApply1 = supplyFuture.thenApplyAsync(acceptVal->{
            SleepUtils.sleep(2);
            System.out.println(Thread.currentThread().getName() + "第一个thenApply接受到的值" + acceptVal);
            return "apply1" + acceptVal;
        });

        CompletableFuture<String> thenApply2 = supplyFuture.thenApplyAsync(acceptVal -> {
            SleepUtils.sleep(2);
            System.out.println(Thread.currentThread().getName() + "第二个thenApply接受到的值" + acceptVal);
            return "apply2" + acceptVal;
        });

        System.out.println(thenApply1.get());
        System.out.println(thenApply2.get());
    }
}

四、异步编排实战案例

我们对开头的例子运用线程池+CompletableFuture进行模拟编程 在这里假设

  • 获取商品分类信息依赖于获取商品统计信息的查询结果
  • 获取商品基本信息也依赖于获取商品统计信息的查询结果

java

代码解读

复制代码

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class AlbumDetailsDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建一个线程池
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);

        // 开始时间
        long startTime = System.currentTimeMillis();

        // 1. 获取商品的基本信息
        CompletableFuture<Void> basicInfoFuture = CompletableFuture.runAsync(() -> {
            sleep(1.5);//模拟远程调用feign
        }, executor);

        // 2. 获取商品统计信息
        CompletableFuture<String> statsInfoFuture = CompletableFuture.supplyAsync(() -> {
            sleep(0.5);//模拟远程调用feign
            return "商品统计信息";
        }, executor);

        // 3. 获取商品分类信息  依赖于2的查询结果
        CompletableFuture<Void> categoryInfoFuture = statsInfoFuture.thenAcceptAsync(statsInfoFutureInfo -> {
            System.out.println("拿到商品统计信息"+statsInfoFutureInfo);
            sleep(1.0);//模拟远程调用feign
        }, executor);

        // 4. 获取商品基本信息   依赖于2的查询结果
        CompletableFuture<Void> userInfoFuture = statsInfoFuture.thenAcceptAsync(statsInfoFutureInfo -> {
            System.out.println("拿到商品统计信息"+statsInfoFutureInfo);
            sleep(0.5);//模拟远程调用feign
        }, executor);

        // 等待所有任务完成
       CompletableFuture.allOf(basicInfoFuture, statsInfoFuture, categoryInfoFuture, userInfoFuture).join();

        // 结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("总耗时: " + (endTime - startTime) / 1000.0 + "秒");

        // 关闭线程池
        executor.shutdown();
    }

    // 模拟任务执行时间
    private static void sleep(double seconds) {
        try {
            TimeUnit.MILLISECONDS.sleep((long) (seconds * 1000));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

我们可以看到只需1.578秒就可以完成业务,比3.5秒的同步运行进行了大大优化。

五、异步编排使用场景

场景分析:

  • 如果我们使用默认方式采用的是串行,整体访问需要更多的时间
  • 如果我们单纯的使用创建多个线程的方式,无法保证线程执行顺序
  • 假如请求线程太多,会占用大量的内存,利用线程复用,可以节约空间/性能开销

实际使用场景

1. 并行调用多个微服务

在微服务架构中,一个请求可能需要调用多个微服务来获取数据并进行处理。例如,在电商平台中,获取商品详情页信息时需要调用库存服务、价格服务、评论服务等。

2. 异步批量处理任务

在后台管理系统中,可能需要批量处理大量数据,例如批量导入用户数据,进行数据清洗和校验,并将结果写入数据库。

3.异步处理用户请求并返回聚合结果

在一个内容聚合平台中,需要从多个数据源获取数据并进行合并返回给用户。例如获取新闻、社交媒体帖子和博客文章。

4. 异步执行复杂的工作流

在一些复杂的业务流程中,需要依次执行多个步骤,每个步骤可能是异步的。例如在订单处理流程中,需要检查库存、扣减库存、创建订单、处理支付等。


转载来源:https://juejin.cn/post/7394992701803462696

相关文章
|
8月前
|
Java
异步技巧之CompletableFuture
异步技巧之CompletableFuture
78 2
|
3月前
|
Java API
异步任务编排神器CompletableFuture
【10月更文挑战第10天】CompletableFuture是JDK8并发包中引入的强大工具,用于处理复杂的异步任务编排。它提供了丰富的API,支持任务的串行、并行、组合及异常处理,适用于需要高效管理和协调多个异步操作的场景。例如,网页加载时需从多个服务异步获取数据,CompletableFuture可以有效提升性能和响应速度。使用时应注意异常处理和合理选择线程池,以确保程序稳定性和效率。
异步任务编排神器CompletableFuture
|
5月前
|
Java 数据库
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
这篇文章通过一个电商商品详情页的实战案例,展示了如何使用`CompletableFuture`进行异步编排,以解决在不同数据库表中查询商品信息的问题,并提供了详细的代码实现和遇到问题(如图片未显示)的解决方案。
异步&线程池 CompletableFuture 异步编排 实战应用 【终结篇】
|
5月前
|
Java
异步&线程池 CompletableFuture 异步编排 【下篇】
这篇文章深入探讨了Java中的`CompletableFuture`类,解释了如何创建异步操作、使用计算完成时的回调方法、异常处理、串行化方法、任务组合以及多任务组合的使用方式,并通过代码示例展示了各种场景下的应用。
异步&线程池 CompletableFuture 异步编排 【下篇】
|
8月前
|
Java
JAVA线程&线程池&异步编排
JAVA线程&线程池&异步编排
63 0
|
8月前
|
Java
CompletableFuture 异步编排、案例及应用小案例2
CompletableFuture 异步编排、案例及应用小案例
89 0
|
8月前
|
Java
CompletableFuture 异步编排、案例及应用小案例1
CompletableFuture 异步编排、案例及应用小案例
181 0
|
存储 SpringCloudAlibaba Java
Java新特性:异步编排CompletableFuture
CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,支持通过函数式编程的方式对各类操作进行组合编排。 CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步[回调](https://so.csdn.net/so/search?q=回调&spm=1001.2101.3001.7020)、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
402 1
Java新特性:异步编排CompletableFuture
|
Java 数据库 数据安全/隐私保护
【CompletableFuture事件驱动异步回调】
【CompletableFuture事件驱动异步回调】
|
设计模式 JavaScript 前端开发
CompletableFuture 异步编排
CompletableFuture 异步编排