实战分析Java的异步编程,并通过CompletableFuture进行高效调优

简介: 【6月更文挑战第7天】实战分析Java的异步编程,并通过CompletableFuture进行高效调优

一、写在开头

在我们一开始讲多线程的时候,提到过异步同步的概念,这里面我们再回顾一下:

  • 同步:调用方在调用某个方法后,等待被调用方返回结果;调用方在取得被调用方的返回值后,再继续运行。调用方顺序执行,同步等待被调用方的返回值,这就是阻塞式调用;
  • 异步:调用方在调用某个方法后,直接返回,不需要等待被调用方返回结果;被调用方开启一个线程处理任务,调用方可以同时去处理其他工作。调用方和被调用方是异步的,这就是非阻塞式调用。

适应场景
同步:如果数据存在线程间的共享,或竞态条件,需要同步。如多个线程同时对同一个变量进行读和写的操作,必须等前一个请求完成,后一个请求去调用前一个请求的结果,这时候就只能采用同步方式。
异步:当应用程序在对象上调用了一个需要花费很长时间来执行的方法,并且不希望让程序等待方法的返回时,就可以使用异步,提高效率、加快程序的响应。

而我们今天探讨的话题就是Java中的异步编程。

二、Future

为了提升Java程序的响应速度,在JDK1.5时引入了JUC包,里面包含了一个接口文件:Future,这是Java中实现异步编程的开端,我们可以将Future理解为一种异步思想或者一种设计模式;当我们执行某一耗时的任务时,可以将这个耗时任务交给一个子线程去异步执行,同时我们可以干点其他事情,不用傻傻等待耗时任务执行完成。等我们的事情干完后,我们再通过 Future 类获取到耗时任务的执行结果。

它的底层也是几个很容易理解的接口方法:

// V 代表了Future执行的任务返回值的类型
public interface Future<V> {
   
   
    // 取消任务执行
    // 成功取消返回 true,否则返回 false
    boolean cancel(boolean mayInterruptIfRunning);
    // 判断任务是否被取消
    boolean isCancelled();
    // 判断任务是否已经执行完成
    boolean isDone();
    // 获取任务执行结果
    V get() throws InterruptedException, ExecutionException;
    // 指定时间内没有返回计算结果就抛出 TimeOutException 异常
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutExceptio
    }

这些接口大致提供的服务是:我有一个任务分配给了Future,然后我可以继续去干其他的事情,然后我可以在这个过程中去看任务是否完成,也可以取消任务,一段时间后我也可以去获取到任务执行后的结果,也可以设置任务多久执行完,没执行完抛异常等。

对于Future的使用,我想大家应该并不陌生的,我们在学习线程池的时候就有涉及,看下面这个测试案例:

//这里使用Executors只是方便测试,正常使用时推荐使用ThreadPoolExecutor!
ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<String> submit = executorService.submit(() -> {
   
   
    try {
   
   
        Thread.sleep(5000L);
    } catch (InterruptedException e) {
   
   
        e.printStackTrace();
    }
    return "javabuild";
});
String s = submit.get();
System.out.println(s);
executorService.shutdown();

这里我们通过executorService.submit()方法去提交一个任务,线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值。

三、Future实战

经过了上面的学习了解,我们来根据案例场景进行实战使用Future,毕竟现在很多大厂除了问面试八股文之外,更多的会涉及到场景题!

场景模拟

假如你是一个12306的开发人员,为了在节假日满足大量用户的出行需要,请高效的完成:用户搜索一个目的地,推荐出所有的交通方案+酒店+耗时,并根据价格从低到高排序

拿到这种场景题的时候,我们往往需要分步处理:

  1. 根据目的地,搜索出所有的飞机、火车、客车路线,每个路线间隔30分钟;
  2. 计算出每种路线的耗时;
  3. 根据交通方案中最后一个到站点进行可用酒店匹配;
  4. 根据不同交通方案+对应的酒店价格进行最终出行总价格计算;
  5. 将所有组合的出行方案反馈给用户。

好了,分析完我们大概需要做的步骤,我们就来通过代码实现一下吧

第一步: 我们先来创建一个固定10个线程的线程池,用来处理以上每一步的任务。

//这里使用Executors只是演示,正常使用时推荐使用ThreadPoolExecutor!
ExecutorService executor = Executors.newFixedThreadPool(10);

第二步: 部分代码实例,方法就不贴了,太多太长了,大家需要对Future的用法理解即可

// 1. 根据传入的目的地查询所有出行方案,包括交通组合,价格,到站地点,出发时间,到站时间等
        Future<List<TripMethods>> tripMethods = executor.submit(() -> searchMethods(searchCondition));

        List<TripMethods> methods;
        try {
   
   
            methods = tripMethods.get();
        } catch (InterruptedException | ExecutionException e) {
   
   
            // 处理异常
        }

        // 2. 对每个出行方案的最终到站点查询酒店
        List<Future<List<Hotel>>> futureHotelsList = new ArrayList<>();
        for (TripMethods method : methods) {
   
   
            Future<List<Hotel>> futureHotels = executor.submit(() -> searchHotels(method));
            futureHotelsList.add(futureHotels);
        }
        // 出行方案=交通方案+酒店+耗时+价格
        List<Future<List<TravelPackage>>> futureTravelPackagesList = new ArrayList<>();
        for (Future<List<Hotel>> futureHotels : futureHotelsList) {
   
   
            List<Hotel> hotels;
            try {
   
   
                hotels = futureHotels.get();
            } catch (InterruptedException | ExecutionException e) {
   
   
                // 处理异常
            }

            // 3. 对每个交通方案的价格和其对应的酒店价格进行求和
            for (Hotel hotel : hotels) {
   
   
                Future<List<TravelPackage>> futureTravelPackages = executor.submit(() -> calculatePrices(hotel));
                futureTravelPackagesList.add(futureTravelPackages);
            }
        }

        List<TravelPackage> travelPackages = new ArrayList<>();
        for (Future<List<TravelPackage>> futureTravelPackages : futureTravelPackagesList) {
   
   
            try {
   
   
                travelPackages.addAll(futureTravelPackages.get());
            } catch (InterruptedException | ExecutionException e) {
   
   
                // 处理异常
            }
        }

        // 4. 将所有出行方案按照价格排序
        travelPackages.sort(Comparator.comparing(TravelPackage::getPrice));

        // 5. 返回结果
        return travelPackages;

我们在这里将每一步分任务,都作为一个future对象,处理完返回。但是这样会带来诸多问题,比如:我们调用future的get方法是阻塞操作,大大影响效率,并且在复杂的链路关系中,这种拆分式的写法,很难理清楚关联关系,先后关系等;

四、CompletableFuture 调优

在这种背景下,Java 8 时引入CompletableFuture 类,它的诞生是为了解决Future 的这些缺陷。CompletableFuture 除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。

//CompletableFuture实现了Future的接口方法,CompletionStage 接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
   
   
}

在CompletableFuture类中通过CompletionStage提供了大量的接口方法,他们让CompletableFuture拥有了出色的函数式编程能力,方法太多,我们无法一一讲解,只能通过对上面测试源码进行调优时,去使用,使用到的解释一下哈。
image.png

【CompletableFuture优化代码】

CompletableFuture.supplyAsync(() -> searchMethods())  // 1. 根据传入的目的地查询所有出行方案,包括交通组合,价格,到站地点,出发时间,到站时间等
                .thenCompose(methods -> {
   
     // 2. 对每个出行方案的最终到站点查询酒店
                    List<CompletableFuture<List<TravelPackage>>> travelPackageFutures = methods.stream()
                            .map(method -> CompletableFuture.supplyAsync(() -> searchHotels(method))  // 查询酒店
                                    .thenCompose(hotels -> {
   
     // 3. 对每个交通方案的价格和其对应的酒店价格进行求和
                                        List<CompletableFuture<TravelPackage>> packageFutures = hotels.stream()
                                                .map(hotel -> CompletableFuture.supplyAsync(() -> new TravelPackage(method, hotel)))
                                                .collect(Collectors.toList());

                                        return CompletableFuture.allOf(packageFutures.toArray(new CompletableFuture[0]))
                                                .thenApply(v -> packageFutures.stream()
                                                        .map(CompletableFuture::join)
                                                        .collect(Collectors.toList()));
                                    }))
                            .collect(Collectors.toList());

                    return CompletableFuture.allOf(travelPackageFutures.toArray(new CompletableFuture[0]))
                            .thenApply(v -> travelPackageFutures.stream()
                                    .flatMap(future -> future.join().stream())
                                    .collect(Collectors.toList()));
                })
                .thenApply(travelPackages -> {
   
     // 4. 将所有出行方案按照价格排序
                    return travelPackages.stream()
                            .sorted(Comparator.comparing(TravelPackage::getPrice))
                            .collect(Collectors.toList());
                })
                .exceptionally(e -> {
   
     // 处理所有的异常
                    // 处理异常
                    return null;
                });

在这里我们将整个实现都以一种函数链式调用的方式完成了,看似冗长,实则各个关系的先后非常明确,对于复杂的业务逻辑实现更加容易进行问题的排查与理解。

【解析】

1)在这段代码的开头,我们通过CompletableFuture 自带的静态工厂方法supplyAsync() 进行对象的创建,平时还可以用以new关键字或者runAsync()方法创建实例;

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 使用自定义线程池(推荐)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
// 使用自定义线程池(推荐)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

2)thenCompose():用 thenCompose() 按顺序链接两个 CompletableFuture 对象,实现异步的任务链。它的作用是将前一个任务的返回结果作为下一个任务的输入参数,从而形成一个依赖关系。注意:这个方法是非阻塞的,即查询酒店的操作会立即开始,而不需要等待查询交通方案的操作完成。

3)thenApply():thenApply() 方法接受一个 Function 实例,用它来处理结果;

4)allOf() :方法会等到所有的 CompletableFuture 都运行完成之后再返回;

5) 调用 join() 可以让程序等待都运行完了之后再继续执行。

6)exceptionally():这个方法用于处理CompletableFuture的异常情况,如果CompletableFuture的计算过程中抛出异常,那么这个方法会被调用。

五、总结

好了,今天就讲这么多,其实在Java中通过条用CompletableFuture实现异步编排的工作还是稍微有点难度的,大量的API支持,需要我们在一次次的实战中去熟悉,并灵活使用。推荐大家去看看京东的asyncTool这个框架,里面就大量使用了CompletableFuture。

目录
相关文章
|
2月前
|
缓存 算法 Java
Java 实现的局域网管控软件的性能调优
局域网管控软件在企业网络管理中至关重要,但随着网络规模扩大和功能需求增加,其性能可能受影响。文章分析了数据处理效率低下、网络通信延迟和资源占用过高等性能瓶颈,并提出了使用缓存、优化算法、NIO库及合理管理线程池等调优措施,最终通过性能测试验证了优化效果,显著提升了软件性能。
38 1
|
2月前
|
存储 Java 开发者
Java Map实战:用HashMap和TreeMap轻松解决复杂数据结构问题!
【10月更文挑战第17天】本文深入探讨了Java中HashMap和TreeMap两种Map类型的特性和应用场景。HashMap基于哈希表实现,支持高效的数据操作且允许键值为null;TreeMap基于红黑树实现,支持自然排序或自定义排序,确保元素有序。文章通过具体示例展示了两者的实战应用,帮助开发者根据实际需求选择合适的数据结构,提高开发效率。
64 2
|
13天前
|
监控 Java 编译器
Java虚拟机调优指南####
本文深入探讨了Java虚拟机(JVM)调优的精髓,从内存管理、垃圾回收到性能监控等多个维度出发,为开发者提供了一系列实用的调优策略。通过优化配置与参数调整,旨在帮助读者提升Java应用的运行效率和稳定性,确保其在高并发、大数据量场景下依然能够保持高效运作。 ####
20 1
|
19天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
40 6
|
28天前
|
缓存 算法 Java
本文聚焦于Java内存管理与调优,介绍Java内存模型、内存泄漏检测与预防、高效字符串拼接、数据结构优化及垃圾回收机制
在现代软件开发中,性能优化至关重要。本文聚焦于Java内存管理与调优,介绍Java内存模型、内存泄漏检测与预防、高效字符串拼接、数据结构优化及垃圾回收机制。通过调整垃圾回收器参数、优化堆大小与布局、使用对象池和缓存技术,开发者可显著提升应用性能和稳定性。
45 6
|
24天前
|
监控 Java 编译器
Java虚拟机调优实战指南####
本文深入探讨了Java虚拟机(JVM)的调优策略,旨在帮助开发者和系统管理员通过具体、实用的技巧提升Java应用的性能与稳定性。不同于传统摘要的概括性描述,本文摘要将直接列出五大核心调优要点,为读者提供快速预览: 1. **初始堆内存设置**:合理配置-Xms和-Xmx参数,避免频繁的内存分配与回收。 2. **垃圾收集器选择**:根据应用特性选择合适的GC策略,如G1 GC、ZGC等。 3. **线程优化**:调整线程栈大小及并发线程数,平衡资源利用率与响应速度。 4. **JIT编译器优化**:利用-XX:CompileThreshold等参数优化即时编译性能。 5. **监控与诊断工
|
1月前
|
监控 前端开发 Java
Java SpringBoot –性能分析与调优
Java SpringBoot –性能分析与调优
|
1月前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
58 2
|
1月前
|
Java 关系型数据库 数据库
面向对象设计原则在Java中的实现与案例分析
【10月更文挑战第25天】本文通过Java语言的具体实现和案例分析,详细介绍了面向对象设计的五大核心原则:单一职责原则、开闭原则、里氏替换原则、接口隔离原则和依赖倒置原则。这些原则帮助开发者构建更加灵活、可维护和可扩展的系统,不仅适用于Java,也适用于其他面向对象编程语言。
26 2
|
2月前
|
开发框架 Java 程序员
揭开Java反射的神秘面纱:从原理到实战应用!
本文介绍了Java反射的基本概念、原理及应用场景。反射允许程序在运行时动态获取类的信息并操作其属性和方法,广泛应用于开发框架、动态代理和自定义注解等领域。通过反射,可以实现更灵活的代码设计,但也需注意其性能开销。
52 1