Java并行流陷阱:为什么指定线程池可能是个坏主意

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。

Java并行流陷阱:为什么指定线程池可能是个坏主意

Java Stream 并不支持指定线程池,实际编码中,有些开发者可能会使用一些“技巧”来指定线程池。实际上,所谓的技巧不仅降低了可读性,而且很容易出现bug。本文将分析并行流式编程的设计思想、”技巧“会带来的问题,并提出相关的解决方案。

1. Stream 为什么不支持指定线程池

简单总结就是官方考虑过相关方案,认为没必要,parallel 提供了简单直接的使用方式,官方的初衷就是流式编程需要轻松实现并行支持,而指定线程池会使API更复杂化,不便使用,还会有线程安全性问题。需要指出的是,官方考虑过相关方案。

并行流默认使用公共线程池,基本思想为分治。公共池类型为 ForkJoinPool, 公共线程池并发度为CUP核数 - 1,适用于处理CPU密集型任务。任务为递归型任务,任务可以划分为子任务,空闲线程可以”窃取“待执行任务,充分利用线程池。一般情况下,使用公用池时,任务队列中会存在比较多的小任务。使用公用池的好处是可以避免创建过多无用的线程,特别是对于CPU密集型任务,新增线程反而会增加上下文开销。

流式编程可能是函数式编程最被大众接受的一种编程方式。理论上,流的处理过程中,所有的方法都应该是纯函数,遵循引用透明原则,内部可以对具体执行流程进行优化,其不为 IO 密集型任务是理所应当的。

不妨看看官方对于 ForkJoinTask 的描述:

A ForkJoinTask is a lightweight form of Future. The efficiency of ForkJoinTasks stems from a set of restrictions (that are only partially statically enforceable) reflecting their main use as computational tasks calculating pure functions or operating on purely isolated objects. The primary coordination mechanisms are fork, that arranges asynchronous execution, and join, that doesn't proceed until the task's result has been computed. Computations should ideally avoid synchronized methods or blocks, and should minimize other blocking synchronization apart from joining other tasks or using synchronizers such as Phasers that are advertised to cooperate with fork/ join scheduling. Subdividable tasks should also not perform blocking I/ O, and should ideally access variables that are completely independent of those accessed by other running tasks. These guidelines are loosely enforced by not permitting checked exceptions such as IOExceptions to be thrown.

简单总结就是 Future类型,通过fork、join 编排异步任务执行,应该避免阻塞方法。

若想指定线程池或者实现细粒度的代码执行控制,官方推荐使用 CompletableFuture,JUC 等相关类库。

2. 使用”技巧“执行线程池

以下stackoverflow上有关于指定线程池的解答

java

代码解读

复制代码

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

通过这个”技巧“可以指定 ForkJoinPool。其底层逻辑可以总结为:当 ForkJoin 任务执行时,其可以获得线程池上下文,任务(子任务)会在线程池中执行。

但是,这个 trick 是不可靠的。你需要注意 JDK 的版本,openjdk8u222 之前的版本实现有bug,使用的依然是公共池,官方bug修改相关信息可以看这里Parallel stream execution within a custom ForkJoinPool should obey the parallelism

以下是使用CompletableFuture 的 trick 实现,基本思路是一样的:

java

代码解读

复制代码

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

需要注意的是:由于forkJoinTask的存在,虽然看似提交了一条任务,实际上提交了很多条。

3. 慎用并行流

首先,无论如何不要在并行流里执行阻塞任务,除非你对于其内部实现非常了解,否则,你会遇到各种各样所谓的坑。

当然,官方也没有完全禁止,其表述如下:

It is possible to define and use ForkJoinTasks that may block, but doing so requires three further considerations: (1) Completion of few if any other tasks should be dependent on a task that blocks on external synchronization or I/ O. Event-style async tasks that are never joined (for example, those subclassing CountedCompleter) often fall into this category. (2) To minimize resource impact, tasks should be small; ideally performing only the (possibly) blocking action. (3) Unless the ForkJoinPool. ManagedBlocker API is used, or the number of possibly blocked tasks is known to be less than the pool's ForkJoinPool. getParallelism level, the pool cannot guarantee that enough threads will be available to ensure progress or good performance.

如果你能够对以上描述有深入理解,比如 ManagedBlocker 的工作原理,那么可能没人可以拦住你使用阻塞任务。但是,由于公共池是公用的,每一次任务的成功执行不一定能保证整体执行多条任务时能够执行成功(这也是推荐使用自定义线程池的原因之一)。

其次,应该理解并行流的基本执行流程。不是所有stream 都可以 parallel 的。笔者遇到的生产代码没有遇到非要使用parallel 才能解决问题的,而且99%遇到的parallel 使用都是有问题。

Effective Java 第48条(谨慎使用并行流)指出了一些规则:

  1. stream.iterate 、数据源为 迭代器、中间操作使用了limit,使用并行流不能提高性能。
  2. 使用方便切分的数据结构,如ArrayList、HashMap、数组、range 等。你需要理解 Spliterator 的底层实现。
  3. 使用规约 reduce 方法的效率最高,短路操作(如 anyMatch、allMatch)次之,collect 操作最低。

再次,你需要能写出无需状态、无副作用的纯函数。

最后,需要进行性能测试。实际上,使用并行流不一定提高性能。我们可以在最初阶段估算并行度,比如并行排序,一方面只有可以并行的运算才可以提高性能;另一方面,任务划分可能会划分过多的子任务,结果收集难以并行运算,还有线程上下文切换、数据同步等开销。性能测试不是一次性的,应该尽可能模拟实际生产场景。

总之,轻松使用并行流可能应了这句话:“理想很丰满,现实很骨感”。我们已经有了容易理解的API(CompletableFuture、线程池、JUC等工具类),没必要舍近求远使用复杂、易错、性能不一定好的实现。

4. Parallel collector

Stream类中可拓展性最好的方法是 collect, 你可以传入不同的Collector 实现,比如 使用 toConcurrentMap 返回并发支持Map、Guava 中使用 toImmutableList, toImmutableSet 返回不可变集合、使用 Comparators. greatest(k, comparator) 高效计算 topK问题等等。

对于阻塞任务,开源类库 Parallel Collector 提供了收集阻塞任务的能力,示例代码如下:

java

代码解读

复制代码

list.stream()
  .collect(parallel(i -> blockingOp(i), toList()))
  	// 加入超时机制,提高系统韧性
    .orTimeout(1000, MILLISECONDS)
    .thenAcceptAsync(System.out::println, executor)
    .thenRun(() -> System.out.println("Finished!"));

总结其特点如下:

  1. 其返回类型为 CompletableFuture,无需等待即可返回。实现了 CompletableFuture 和 Stream 流的转换。
  2. 其可以指定执行器和并发度,方便并发控制。
  3. 和标准库无缝衔接,没有所谓的 trick,做法仅仅是实现 Collector 接口,没有对Stream底层实现有依赖。
  4. 成熟,目前支持 JDK21+(虚拟线程) 和 JDK8+(线程池)。截至本文发表前 JDK8+支持版本为
    com.pivovarit:parallel-collectors:2.6.1


转载来源:https://juejin.cn/post/7419869992617050152

相关文章
|
4天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
4月前
|
监控 Java 调度
Java面试题:描述Java线程池的概念、用途及常见的线程池类型。介绍一下Java中的线程池有哪些优缺点
Java面试题:描述Java线程池的概念、用途及常见的线程池类型。介绍一下Java中的线程池有哪些优缺点
70 1
|
4月前
|
设计模式 安全 Java
Java面试题:请解释Java中的线程池以及为什么要使用线程池?请解释Java中的内存模型以及如何避免内存泄漏?请解释Java中的并发工具包以及如何实现一个简单的线程安全队列?
Java面试题:请解释Java中的线程池以及为什么要使用线程池?请解释Java中的内存模型以及如何避免内存泄漏?请解释Java中的并发工具包以及如何实现一个简单的线程安全队列?
43 1
|
4月前
|
存储 安全 Java
Java面试题:如何在Java应用中实现有效的内存优化?在多线程环境下,如何确保数据的线程安全?如何设计并实现一个基于ExecutorService的任务处理流程?
Java面试题:如何在Java应用中实现有效的内存优化?在多线程环境下,如何确保数据的线程安全?如何设计并实现一个基于ExecutorService的任务处理流程?
45 0
|
4月前
|
安全 Java 测试技术
Java中的并行流详解
Java中的并行流详解
|
消息中间件 Java Kafka
Java并行流指北
Java并行流,方便了 并发操作,但是不注意可能会导致问题。如 最大线程数,怎么控制并发数,类加载器,线程上下文变化,ForkJoinPool 的 execute、submit、invoke 方法的区别 等。
136 1
|
6月前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
61 0
|
6月前
|
Java API 索引
Java 编程问题:十、并发-线程池、可调用对象和同步器4
Java 编程问题:十、并发-线程池、可调用对象和同步器
36 0
java8中的并行流,封装ForkJoin
并行流就是执行任务的时候分配给多个线程队列执行
|
并行计算 算法 Java
Java 8 - 正确高效的使用并行流
Java 8 - 正确高效的使用并行流
135 0