并行流ParallelStream中隐藏的陷阱

简介: 这篇文章介绍一下日常开发中并行流ParallelStream中隐藏的陷阱,这个问题其实离我们很近,特别是喜欢使用JDK1.8+的流式编程的伙伴,应该会深有感触。标题中所谓的"陷阱",其实并不是ParallelStream自身的陷阱,而一般是开发者错误使用ParallelStream给自己埋下的陷阱。

微信截图_20220513121508.png


前提



这篇文章介绍一下日常开发中并行流ParallelStream中隐藏的陷阱,这个问题其实离我们很近,特别是喜欢使用JDK1.8+的流式编程的伙伴,应该会深有感触。标题中所谓的"陷阱",其实并不是ParallelStream自身的陷阱,而一般是开发者错误使用ParallelStream给自己埋下的陷阱。


一个故意而为的例子



下面举一个故意而为的例子,实际上应该不会有类似的业务代码:


public class ParallelStreamMain {
    public static void main(String[] args) throws Exception {
        List<List<Integer>> array = new ArrayList<>();
        List<Integer> item1 = new ArrayList<>();
        List<Integer> item2 = new ArrayList<>();
        List<Integer> target = new ArrayList<>(100);
        array.add(item1);
        array.add(item2);
        array.parallelStream().forEach(x -> {
            for (int i = 0; i < 100000; i++) {
                target.add(i);
            }
        });
        System.out.println(target.size());
    }
}
复制代码


某一次执行结果为:163913。如果不停地执行这个main方法,最终都会得到一个非200000的结果,这里的问题就在于使用了并行流parallelStream()方法。ParallelStream底层使用了Fork/Join框架实现,也就是应用了线程池ForkJoinPool把并行流中的节点抽象为ForkJoinTask进行计算,背后用到的"任务窃取"等原理这里就不进行展开,只需要明确:


  • ForkJoinPool一般使用Runtime.getRuntime().availableProcessors()(此值一般认为是物理机器的逻辑核心数量)作为并行度(parallelism),简单认为是可并发执行的任务数,并不是工作线程数。
  • 多核机器中,使用ParallelStream在流的节点中的所有操作都相当于在一个多线程环境中进行操作,里面的所有操作都会产生不可预期的结果,例如可能会数组越界、添加元素丢失、部分下标index的引用为NULL等等。


一个仿真例子



写这篇文章不是有意为之,其实很早之前笔者曾经遇到一个比较隐蔽的生产故障,其中有一段访问量比较低的代码大致如下:


@Data
private static class OrderDTO {
    private String orderId;
    private OrderStatus orderStatus;
    private BigDecimal amount;
    private Long customerId;
}
@Data
private static class Order {
    private Long id;
    private String orderId;
    private Integer orderStatus;
    private BigDecimal amount;
    private Long customerId;
    private OffsetDateTime createTime;
    private OffsetDateTime editTime;
}
public void groupByOrderStatus(Long customerId) {
    List<Order> orders = orderDao.selectByCustomerId(customerId);
    List<OrderDTO> orderDTOList = new ArrayList<>();
    orders.parallelStream().forEach(order -> {
        OrderDTO dto = new OrderDTO();
        ......
        orderDTOList.add(dto);
    });
    Map<String, List<OrderDTO>> collect 
            = orderDTOList.stream().collect(Collectors.groupingBy(item -> item.getOrderStatus().getCode()));
    ......
}
复制代码


该方法的功能是通过客户ID查询订单列表,然后把订单列表转化为OrderDTO列表,然后再按照订单状态字段进行分组。通过生产日志和测试回归发现,上面的代码段中groupByOrderStatus()方法会偶发空指针异常。


初次出现问题的时候,由于开发者通过Lambda表达式把多处代码压缩为1行,所以从异常栈比较难排查具体发生问题的代码,后面把Lambda表达式以句点起点拆分为多行上线后观察一段时间,最终定位到发生空指针异常的代码段为Collectors.groupingBy(item -> item.getOrderStatus().getCode()),也就是OrderDTO实例中的orderStatus为空对象。这里显然,groupByOrderStatus()方法其实是被封闭在线程栈中调用,本不应该有多个线程去并发修改其中的内容,这里只剩下一个疑点:使用了parallelStream()。后来直接把parallelStream()修改为stream()重新上线,该空指针问题不再复现。


Lambda/Stream其实并不是天然线程安全的,线程安全的前提是它们本身被线程封闭调用,并且不引入多线程环境,像使用了并行流,本质就是引入了多线程环境。所以,在开发功能的时候,需要仔细思考一下:


  1. 是否真的有必要使用Lambda和流式编程?
  2. 是否真的有必要用到并行流?如果使用了并行流,是否需要考虑引入额外的同步机制,例如锁?
  3. 如果引入了额外的同步机制,是否考虑是强行使用并行流,违反了并行流设计的初衷?
  4. 其实并发并不能提高性能,只能提高吞吐量,应该着重去发现和优化性能瓶颈,而不是拼命地把上游改造成并发调用。


笔者有代码洁癖,当时还发现了上面的代码存在映射操作,正确来说应该使用map()函数,而不是forEach()去遍历元素重新装进去另一个列表,方法中的逻辑体现了原开发者其实对Lambda一知半解。


小结



回到最初那个问题,其实使用并行流也可以保证执行结果和预期一致,不过一定需要引入额外的同步机制,例如这里使用监视器进行同步:


public class ParallelStreamMain {
    public static void main(String[] args) throws Exception {
        List<List<Integer>> array = new ArrayList<>();
        List<Integer> item1 = new ArrayList<>();
        List<Integer> item2 = new ArrayList<>();
        List<Integer> target = new ArrayList<>(100);
        array.add(item1);
        array.add(item2);
        final Object monitor = new Object();
        array.parallelStream().forEach(x -> {
            synchronized (monitor) {
                for (int i = 0; i < 100000; i++) {
                    target.add(i);
                }
            }
        });
        System.out.println(target.size());
    }
}
复制代码


上面的方法无论执行多少次,最终都只会输出:200000。这里在并行流中添加同步代码块的逻辑看起来确实比较滑稽,仅仅是为了说明如果在多线程环境下,对一个容器进行元素增加或者修改,只有添加额外的同步机制,才能保证最终的结果是符合预期的。ParallelStream是一个十分优秀的设计,但是需要考量其适用的场景,避免踏进自己为自己埋下的并发陷阱。

相关文章
|
7月前
|
存储 前端开发 Java
【C++ 多线程 】C++并发编程:精细控制数据打印顺序的策略
【C++ 多线程 】C++并发编程:精细控制数据打印顺序的策略
208 1
|
29天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
数据采集 Java
自定义 ForkJoinPool 提升并行流 ParallelStream 执行速度
简介 在 java8 中 添加了流Stream,可以让你以一种声明的方式处理数据。使用起来非常简单优雅。ParallelStream 则是一个并行执行的流,采用 ForkJoinPool 并行执行任务,提高执行速度。
7535 1
|
2月前
|
负载均衡 安全 Java
并行流的人生
【10月更文挑战第8天】
23 1
|
6月前
Stream优化(使用并行流和数据使用基本类型)
Stream优化(使用并行流和数据使用基本类型)
|
5月前
|
SQL 安全
线程操纵术并行策略问题之调整并行流的并行度问题如何解决
线程操纵术并行策略问题之调整并行流的并行度问题如何解决
|
6月前
|
Java
Java并行流问题之parallelStream的使用方式
Java并行流问题之parallelStream的使用方式
115 1
|
7月前
|
JavaScript 前端开发 Java
流的概念,怎么处理
流的概念,怎么处理
|
Linux
44 # 流的原理
44 # 流的原理
64 0
|
存储
Stream流中各阶段方法说明及组合示例
Stream流中各阶段方法说明及组合示例
121 1