Java Stream 流式编程怎么写?

简介: Stream它并不是一个容器,它只是对容器的功能进行了增强,添加了很多便利的操作,例如查找、过滤、分组、排序等一系列的操作。并且有串行、并行两种执行模式,并行模式充分的利用了多核处理器的优势,使用fork/join框架进行了任务拆分,同时提高了执行速度。简而言之,Stream就是提供了一种高效且易于使用的处理数据的方式。

前言

「Java8系列」神秘的Lambda

「Java8系列」神奇的函数式接口

继上两篇之后,本文已经java8系列的第三篇了。本篇文章比较长,但我希望大家都能认真读完。读不完可以先收藏,在找时间读。没看过前两篇的可以点上边的链接看看,前两篇文章算是对是用Stream铺垫的一点基础吧,不过不看也可以学会使用Stream,但看了会有助于更好的理解和使用。在没有深入了解之前,我以为Stream也是数据的载体,但后来发现并不是。那么它到底是什么?听我慢慢道来。

什么是Stream?

Stream它并不是一个容器,它只是对容器的功能进行了增强,添加了很多便利的操作,例如查找、过滤、分组、排序等一系列的操作。并且有串行、并行两种执行模式,并行模式充分的利用了多核处理器的优势,使用fork/join框架进行了任务拆分,同时提高了执行速度。简而言之,Stream就是提供了一种高效且易于使用的处理数据的方式。

  • 特点:
  1. Stream自己不会存储元素。
  2. Stream的操作不会改变源对象。相反,他们会返回一个持有结果的新Stream。
  3. Stream 操作是延迟执行的。它会等到需要结果的时候才执行。也就是执行终端操作的时候。
  • 图解:

在这里插入图片描述
一个Stream的操作就如上图,在一个管道内,分为三个步骤,第一步是创建Stream,从集合、数组中获取一个流,第二步是中间操作链,对数据进行处理。第三步是终端操作,用来执行中间操作链,返回结果。

怎么创建Stream?

  • 由集合创建:

Java8 中的 Collection 接口被扩展,提供了两个获取流的方法,这两个方法是default方法,也就是说所有实现Collection接口的接口都不需要实现就可以直接使用:

  1. default Stream stream() : 返回一个顺序流。
  2. default Stream parallelStream() : 返回一个并行流。

    例如:
        List<Integer> integerList = new ArrayList<>();
        integerList.add(1);
        integerList.add(2);
        Stream<Integer> stream = integerList.stream();
        Stream<Integer> stream1 = integerList.parallelStream();
  • 由数组创建:

Java8 中的 Arrays 的静态方法 stream() 可以获取数组流:

  1. static Stream stream(T[] array): 返回一个流
  2. 重载形式,能够处理对应基本类型的数组:

public static IntStream stream(int[] array)

public static LongStream stream(long[] array)

public static DoubleStream stream(double[] array)

例如:
    int[] intArray = {1,2,3};
    IntStream stream = Arrays.stream(intArray);
  • 由值创建:

可以使用静态方法 Stream.of(), 通过显示值 创建一个流。它可以接收任意数量的参数。

  1. public static Stream of(T... values) : 返回一个流。

    例如:
    Stream<Integer> integerStream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8);
  • 由函数创建:创建无限流

可以使用静态方法 Stream.iterate() 和 Stream.generate()创建无限流。

  1. 迭代
    public static Stream iterate(final T seed, final UnaryOperator f)
  2. 生成
    public static Stream generate(Supplier s)

     例如:
     Stream.generate(Math::random).limit(5).forEach(System.out::print); 
     List<Integer> collect = Stream.iterate(0,i -> i + 1).limit(5).collect(Collectors.toList());

    注意:使用无限流一定要配合limit截断,不然会无限制创建下去。

Stream的中间操作

如果Stream只有中间操作是不会执行的,当执行终端操作的时候才会执行中间操作,这种方式称为延迟加载或惰性求值。多个中间操作组成一个中间操作链,只有当执行终端操作的时候才会执行一遍中间操作链,具体是因为什么我们在后面再说明。下面看下Stream有哪些中间操作。

  • Stream<T> distinct():

去重,通过流所生成元素的 hashCode() 和 equals() 去除重复元素。

  • Stream<T> filter(Predicate<? super T> predicate):

Predicate函数在上一篇当中我们已经讲过,它是断言型接口,所以filter方法中是接收一个和Predicate函数对应Lambda表达式,返回一个布尔值,从流中过滤某些元素。

  • Stream<T> sorted(Comparator<? super T> comparator):

指定比较规则进行排序。

  • Stream<T> limit(long maxSize):

截断流,使其元素不超过给定数量。如果元素的个数小于maxSize,那就获取所有元素。

  • Stream<T> skip(long n):

跳过元素,返回一个扔掉了前 n 个元素的流。若流中元素不足 n 个,则返回一个空流。与 limit(n) 互补。

  • Stream<R> map(Function<? super T, ? extends R> mapper):

接收一个Function函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素。也就是转换操作,map还有三个应用于具体类型方法,分别是:mapToInt,mapToLong和mapToDouble。这三个方法也比较好理解,比如mapToInt就是把原始Stream转换成一个新的Stream,这个新生成的Stream中的元素都是int类型。这三个方法可以免除自动装箱/拆箱的额外消耗。

  • Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper):

接收一个Function函数作为参数,将流中的每个值都转换成另一个流,然后把所有流连接成一个流。flatMap也有三个应用于具体类型的方法,分别是:flatMapToInt、flatMapToLong、flatMapToDouble,其作用于map的三个衍生方法相同。

Stream的终端操作

终端操作执行中间操作链,并返回结果。终端操作我们就不一一介绍了,只介绍一下常用的操作。详细可看java.util.stream.Stream接口中的方法。

  • void forEach(Consumer<? super T> action):

内部迭代(需要用户去做迭代,称为外部迭代。相反,Stream API使用内部迭代帮你把迭代做了)

 users.stream().forEach(user -> System.out.println(user.getName()));
  • <R, A> R collect(Collector<? super T, A, R> collector):

收集、将流转换为其他形式,比如转换成List、Set、Map。collect方法是用Collector作为参数,Collector接口中方法的实现决定了如何对流执行收集操作(如收集到 List、Set、Map)。但是 Collectors 实用类提供了很多静态方法,可以方便地创建常见收集器实例。例举一些常用的:

 List<User> users = Lists.newArrayList();
 users.add(new User(15, "A", ImmutableList.of("1元", "5元")));
 users.add(new User(25, "B", ImmutableList.of("10元", "50元")));
 users.add(new User(21, "C", ImmutableList.of("100元")));
 //收集名称到List
 List<String> nameList = users.stream().map(User::getName).collect(Collectors.toList());
 //收集名称到List
 Set<String> nameSet = users.stream().map(User::getName).collect(Collectors.toSet());
 //收集到map,名字作为key,user对象作为value
 Map<String, User> userMap = users.stream()
                .collect(Collectors.toMap(User::getName, Function.identity(), (k1, k2) -> k2));
  • 其他终端操作:

    1. boolean allMatch(Predicate<? super T> predicate); 检查是否匹配所有元素。
    2. boolean anyMatch(Predicate<? super T> predicate); 检查是否至少匹配一个元素。
    3. boolean noneMatch(Predicate<? super T> predicate); 检查是否没有匹配所有元素。
    4. Optional<T> findFirst(); 返回当前流中的第一个元素。
    5. Optional<T> findAny(); 返回当前流中的任意元素。
    6. long count(); 返回流中元素总数。
    7. Optional<T> max(Comparator<? super T> comparator); 返回流中最大值。
    8. Optional<T> min(Comparator<? super T> comparator); 返回流中最小值。
    9. T reduce(T identity, BinaryOperator<T> accumulator); 可以将流中元素反复结合起来,得到一个值。 返回 T。这是一个归约操作。

Fork/Join框架

上面我们提到过,说Stream的并行模式使用了Fork/Join框架,这里简单说下Fork/Join框架是什么?Fork/Join框架是java7中加入的一个并行任务框架,可以将任务拆分为多个小任务,每个小任务执行完的结果在合并成为一个结果。在任务的执行过程中使用工作窃取(work-stealing)算法,减少线程之间的竞争。

  • Fork/Join图解
  • 工作窃取图解

Stream是怎么实现的

先看下整体类图:蓝色箭头代表继承,绿色箭头代表实现,红色箭头代表内部类。

实际上Stream只有两种操作,中间操作、终端操作,中间操作只是一种标记,只有终端操作才会实际触发执行。所以Stream流水线式的操作大致应该是用某种方式记录中间操作,只有调用终端操作才会将所有的中间操作叠加在一起在一次迭代中全部执行。这里只做简单的介绍,想详细了解的可以参考下面的参考资料中的链接。

  • 操作怎么记录?

Stream的操作记录是通过ReferencePipeline记录的,ReferencePipeline有三个内部类Head、StatelessOp、StatefulOp,Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的ReferencePipeline来代表Stage,Head用于表示第一个Stage,即调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操作,StatelessOp和StatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作。

  • 操作怎么叠加?

操作是记录完了,但是前面的Stage并不知道后面Stage到底执行了哪种操作,以及回调函数是哪种形式。这就需要有某种协议来协调相邻Stage之间的调用关系。
这种协议由Sink接口完成,Sink接口包含的方法如下表所示:

  1. void begin(long size),开始遍历元素之前调用该方法,通知Sink做好准备。
  2. void end(),所有元素遍历完成之后调用,通知Sink没有更多的元素了。
  3. boolean cancellationRequested(),是否可以结束操作,可以让短路操作尽早结束。
  4. void accept(T t),遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。

每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的accept()方法即可,并不需要知道其内部是如何处理的。有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。

  • 操作怎么执行?

    Sink完美封装了Stream每一步操作,并给出了[处理->转发]的模式来叠加操作。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。是什么启动这一连串的操作呢?也许你已经想到了启动的原始动力就是结束操作(Terminal Operation),一旦调用某个结束操作,就会触发整个流水线的执行。

参考资料

https://ifeve.com/stream

https://www.ibm.com/developerworks/cn/java/j-lo-java8streamapi/

https://segmentfault.com/a/1190000016781127

https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/6-Stream%20Pipelines.md

大家看后辛苦点个赞点个关注哦!后续还会后更多的博客。如有错误,烦请指正。
目录
相关文章
|
2天前
|
存储 Java
JAVA并发编程AQS原理剖析
很多小朋友面试时候,面试官考察并发编程部分,都会被问:说一下AQS原理。面对并发编程基础和面试经验,专栏采用通俗简洁无废话无八股文方式,已陆续梳理分享了《一文看懂全部锁机制》、《JUC包之CAS原理》、《volatile核心原理》、《synchronized全能王的原理》,希望可以帮到大家巩固相关核心技术原理。今天我们聊聊AQS....
|
2天前
|
Java 程序员 数据库连接
Java编程中的异常处理:从基础到进阶
【9月更文挑战第18天】在Java的世界里,异常处理是每个程序员必须面对的挑战。本文将带你从异常的基本概念出发,通过实际的代码示例,深入探讨如何有效地管理和处理异常。我们将一起学习如何使用try-catch块来捕捉异常,理解finally块的重要性,以及如何自定义异常类来满足特定需求。无论你是初学者还是有经验的开发者,这篇文章都将为你提供新的见解和技巧,让你的Java代码更加健壮和可靠。
|
2天前
|
Java 数据库连接 UED
掌握Java编程中的异常处理
【9月更文挑战第18天】在Java的世界中,异常是那些不请自来的客人,它们可能在任何时候突然造访。本文将带你走进Java的异常处理机制,学习如何优雅地应对这些突如其来的“访客”。从基本的try-catch语句到更复杂的自定义异常,我们将一步步深入,确保你能够在面对异常时,不仅能够从容应对,还能从中学到宝贵的经验。让我们一起探索如何在Java代码中实现健壮的异常处理策略,保证程序的稳定运行。
|
2天前
|
Java 数据库
JAVA并发编程-一文看懂全部锁机制
曾几何时,面试官问:java都有哪些锁?小白,一脸无辜:用过的有synchronized,其他不清楚。面试官:回去等通知! 今天我们庖丁解牛说说,各种锁有什么区别、什么场景可以用,通俗直白的分析,让小白再也不怕面试官八股文拷打。
|
2天前
|
安全 Java 调度
Java 并发编程中的线程安全和性能优化
本文将深入探讨Java并发编程中的关键概念,包括线程安全、同步机制以及性能优化。我们将从基础入手,逐步解析高级技术,并通过实例展示如何在实际开发中应用这些知识。阅读完本文后,读者将对如何在多线程环境中编写高效且安全的Java代码有一个全面的了解。
|
1天前
|
Java
JAVA并发编程ReentrantLock核心原理剖析
本文介绍了Java并发编程中ReentrantLock的重要性和优势,详细解析了其原理及源码实现。ReentrantLock作为一种可重入锁,弥补了synchronized的不足,如支持公平锁与非公平锁、响应中断等。文章通过源码分析,展示了ReentrantLock如何基于AQS实现公平锁和非公平锁,并解释了两者的具体实现过程。
|
2天前
|
Kubernetes Cloud Native Java
探索未来编程新纪元:Quarkus带你秒建高性能Kubernetes原生Java应用,云原生时代的技术狂欢!
Quarkus 是专为 Kubernetes 设计的全栈云原生 Java 框架,凭借其轻量级、快速启动及高效执行特性,在 Java 社区脱颖而出。通过编译时优化与原生镜像支持,Quarkus 提升了应用性能,同时保持了 Java 的熟悉度与灵活性。本文将指导你从创建项目、编写 REST 控制器到构建与部署 Kubernetes 原生镜像的全过程,让你快速上手 Quarkus,体验高效开发与部署的乐趣。
9 0
|
6天前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
17天前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
79 6
【Java学习】多线程&JUC万字超详解
|
3天前
|
Java
深入理解Java中的多线程编程
本文将探讨Java多线程编程的核心概念和技术,包括线程的创建与管理、同步机制以及并发工具类的应用。我们将通过实例分析,帮助读者更好地理解和应用Java多线程编程,提高程序的性能和响应能力。
15 4