使用流进行函数式数据处理(一)

简介: 使用流进行函数式数据处理(一)

使用流进行函数式数据处理(一)

1. 流

流解决了两个问题:1.集合操作不够优雅,能不能像SQL一样简介高效。2.多线程操作有较高的难度。

1.1 什么是流

流是javaAPI新成员,它允许你以声明式的方式处理集合数据(通过查询语句来表达,而不是临时编写一个实现),另外流可以简单透明的进行并行处理。

实例(考虑一下用遍历实现对比一下)(下面的api讲解在下一节流的使用,这里只简要了解)

List<String> lowCaloricDishesName =menu.stream()
  .filter(d -> d.getCalories() < 400)      // 选出卡路里小于400的
  .sorted(comparing(Dish::getCalories))    // 按照卡路里排序
  .map(Dish::getName)                      // 取出菜名组成新的流
  .collect(toList());                      // 触发流的执行,转换为一个List放回

为了利用多核架构并行执行这段代码,你只需要把 stream()换成 parallelStream()。

代码是声明式的(说明要干啥就行,而不是想着怎么处理你的想法)、可以把几个基础操作链接起来。

声明式、可复合、可并行。

1.2 流简介

java集合支持一个新的stream方法,它返回一个流。

简洁定义:从支持数据处理操作的源生成的元素序列。

几个关键词:

  1. 元素序列:集合,流都是元素序列,可以访问特定元素类型的一组有序值。集合的主要目的是以特定的事件/空间复杂度存储和访问元素。流的目的在于表达计算。集合将的是数据,流讲的是计算。
  2. 源:流会使用一个提供数据的源,比如集合、数组或I/O资源。
  3. 数据处理操作:流的数据处理功能支持类似于数据库的操作。
  4. 流水线:很多流操作本身会返回一个流,多个操作可以连接起来。
  5. 内部迭代:集合使用迭代器是外部的显示迭代,流操作是在后台执行的。
1.3 流与集合

集合是一个内存中的数据结构,它包含数据结构中目前所有的值

流则是在概念上固定的数据结构(你不能添加或删除元素),其元素是按需计算的。

理解:以视频数据为例,光盘这种结构是把所有的数据都存储在光盘中的,而目前的流媒体视频播放器则只是缓存马上要播放的几帧数据按需计算获取。

1.3.1 流只能遍历一次
Stream<String> s = title.stream();
s.forEach(System.out::println);
s.forEach(System.out::println);

上面代码第三行的循环会报java.lang.IllegalStateException:流已被操作或关闭

和迭代器类似,流只能遍历一次。遍历完之后,我们就说这个流已经被消费掉了。

1.3.2 外部迭代和内部迭代

使用Collection接口需要用户去做迭代(比如for-each),这称为外部迭代,相反Stream库使用内部迭代。

Streams库的内部迭代可以自动选择一种适合你硬件的数据表示和并行实现。

1.4 流操作

流的操作可以分为两类:

  • filter、map和limit操作数据返回流,可以形成一条流水线,称为中间操作。
  • collection触发流水线并关闭它,称为终端操作。

除非流水线上触发一个终端操作,否则中间操作不会执行任何处理。

终端操作会从流的流水线生成结果,其结果是任何不是流的值。

流的操作一共分为三件事

  • 一个数据源。生成流。
  • 一个中间操作链。处理流。
  • 一个终端操作。执行流返回结果。
1.5 小结
  • 流是“从支持数据处理操作的源生成的一系列元素”。
  • 流利用内部迭代:迭代通过filter、map、sorted等操作被抽象掉了。
  • 流操作有两类:中间操作和终端操作。
  • filter和map等中间操作会返回一个流,并可以链接在一起。可以用他们来设置一条流水线,但并不会生成任何结果。
  • forEach和count等终端操作会返回一个非流的值,并处理流水线以返回结果。
  • 流中的元素是按需计算的。

2. 流的使用

测试实例的基本数据

@Data
@Builder
public class Apple {
    private String name;
    private Integer weight;
    private String color;
}
public class Test {
    public static void main(String[] args) {
        List<Apple> appleList = new ArrayList<>();
        appleList.add(Apple.builder().name("苹果1").color("RED").weight(100).build());
        appleList.add(Apple.builder().name("苹果2").color("RED").weight(150).build());
        appleList.add(Apple.builder().name("苹果2").color("RED").weight(150).build());
        appleList.add(Apple.builder().name("苹果3").color("RED").weight(200).build());
        appleList.add(Apple.builder().name("苹果4").color("RED").weight(250).build());
    }
}
2.1 筛选

用谓词(一个返回boolean的函数)筛选,筛选出各不相同的元素。

2.1.1 用谓词做筛选(filter)

Stream接口支持filter方法。该操作会接受一个谓词(一个返回Boolean的函数)作为参数,并返回一个包含所有符合谓词元素的流。

List<Apple> appleList1 = appleList.stream().filter(s -> s.getWeight() > 150).collect(Collectors.toList());
// [Apple(name=苹果3, weight=200, color=RED), Apple(name=苹果4, weight=250, color=RED)]
2.1.2 筛选各异的元素(distinct)去重

流还支持一个叫做distinct的方法,它返回一个元素各异(根据流所生成元素的hashCode和equals方法实现)的流。

List<Apple> appleList2 = appleList.stream().distinct().collect(Collectors.toList());
// [Apple(name=苹果1, weight=100, color=RED), Apple(name=苹果2, weight=150, color=RED), Apple(name=苹果3, weight=200, color=RED), Apple(name=苹果4, weight=250, color=RED)]
2.2 切片

使用流Stream的一些操作,你可以高效地选择或者丢弃流中的元素。

2.2.1 使用谓词对流进行切片(takeWhile、dropWhile)
  • takeWhile:从头开始选择符合条件的,碰到第一个不符合条件的中断,返回前面的流。
List<Apple> appleList3 = appleList.stream().takeWhile(s -> s.getWeight() < 150).collect(Collectors.toList());
// [Apple(name=苹果1, weight=100, color=RED)]
  • dropWhile:从头开始删除符合条件的,碰到第一个不符合条件的中断,返回后面的流。
List<Apple> appleList4 = appleList.stream().dropWhile(s -> s.getWeight() < 150).collect(Collectors.toList());
// [Apple(name=苹果2, weight=150, color=RED), Apple(name=苹果2, weight=150, color=RED), Apple(name=苹果3, weight=200, color=RED), Apple(name=苹果4, weight=250, color=RED)]
2.2.2 截断流(limit)

流支持limit(n)方法,该方法会返回另一个不超过给定长度的流。

List<Apple> appleList5 = appleList.stream().limit(2).collect(Collectors.toList());
// [Apple(name=苹果1, weight=100, color=RED), Apple(name=苹果2, weight=150, color=RED)]
2.2.3 跳过元素(skip)

流还支持skip(n)方法,返回一个人掉了前几个元素的流,如果流中元素不足n个,返回一个空流。

limit(n)和skip(n)是互补的。

List<Apple> appleList6 = appleList.stream().skip(4).collect(Collectors.toList());
// [Apple(name=苹果4, weight=250, color=RED)]
2.3 映射
2.3.1 对流中的每一个元素应用函数(map)

流支持map方法,它会接受一个函数作为参数,这个函数会被应用到每个元素上,并将其映射成一个新的元素。

使用映射一次,而不是使用转换是因为,它是”创建一个新版本“而不是去”修改“。

List<String> stringList = appleList.stream().map(Apple::getName).collect(Collectors.toList());
// [苹果1, 苹果2, 苹果2, 苹果3, 苹果4]
2.3.2 流的扁平化(flatMap)

现在思考一个问题,如何将上面的列表转换为各不相同的字符列表呢?如['苹','果','1','2']

List<String> stringList2 = appleList.stream()
  .map(s->s.getName().split("")) //返回一个字符数组列表
  .flatMap(Arrays::stream) // Arrays::stream将每个字符数组转换为流,flatMap合并流为一个
  .distinct() // 去重
  .collect(Collectors.toList());
// [苹, 果, 1, 2, 3, 4]

flatMap将各个生成流扁平化为单个流,各个数组并不是分别映射成一个流,而是映射成流的内容。

2.4 查找与匹配

一种常见的数据处理套路是看看数据集中的某些元素是否匹配一个给定的属性。

Stream API通过allMatch、anyMatch、noneMatch、findFirst和findAny方法提供了这样的工具。

2.4.1 检查谓词是否至少匹配一个元素(anyMatch)

流中是否有一个元素能够匹配给定的谓词。anyMatch方法返回一个boolean,因此是一个终端操作。

boolean anyMatch = appleList.stream().anyMatch(s -> s.getWeight() > 500);
// false
2.4.2 检查谓词是否匹配所有元素(allMatch)

流中的元素是否都能匹配给定的谓词。

boolean allMatch = appleList.stream().allMatch(s -> s.getWeight() > 0);
// true
2.4.3 没有任何元素与给定的谓词匹配(noneMatch)

anyMatch、allMatch、noneMatch都用到了所谓的断路,即得到结果即终端操作。

boolean noneMatch = appleList.stream().noneMatch(s -> s.getWeight() > 500);
// true
2.4.4 查找元素(findAny)

findAny方法返回当前流中的任意一个元素

Optional<Apple> apple = appleList.stream().filter(s->s.getWeight()>150).findAny();
// Optional[Apple(name=苹果3, weight=200, color=RED)]

Optional简介

Optional类是一个容器类,代表一个值存在或不存在。java8设计引入了Optional,这样就不用返回null了。Optional的常用方法。

  • isPresent():将在Optional包含值得时候返回true,否则返回false。
  • ifPresent(Consumer block):会在值存在的时候执行给定的代码块。
  • T get():会在值存在的时候返回值,否则抛出一个NoSuchElement异常。
  • T orElse(T other):会在值存在的时候返回值,否则返回一个默认值。
2.4.5 查找第一个元素(findFirst)

按照流的顺序返回第一个符合条件的元素。

Optional<Apple> first = appleList.stream().filter(s -> s.getWeight() > 100).findFirst();
// Optional[Apple(name=苹果2, weight=150, color=RED)]
为什么会同时有findFirst和findAny呢?

答案是:并行

找到第一个元素在并行上限制更多。如果你不关心返回哪个元素,请使用findAny。

2.5 归约

上面查询都是返回一个值,下面是如何把一个流中的元素组合起来,使用reduce操作来表示更复杂的查询。

2.5.1 元素求和
Integer reduce = appleList.stream().map(Apple::getWeight).reduce(0, (a, b) -> a + b);
// 850,下面为最简写法
Integer reduce = appleList.stream().map(Apple::getWeight).reduce(0, Integer::sum);

reduce数字求和的原理:首先第一个参数0作为初始值与流中的第一个元素求和,得到的结果与下一个元素求和,以此类推。下面例子理解一下初始值。

Integer reduce1 = appleList.stream().map(Apple::getWeight).filter(s->s<50).reduce(1, (a, b) -> a * b);
// 1

reduce还有一个重载的变体,它不接受初始值,但是会返回一个Optional对象

2.5.2 最大值和最小值
Optional<Integer> reduce2 = appleList.stream().map(Apple::getWeight).reduce(Integer::max);
// 250
Optional<Integer> reduce3 = appleList.stream().map(Apple::getWeight).reduce(Integer::min);
// 100

相对于逐行代码求和,使用reduce的好处在于,这里的迭代过程被抽象掉了,让内部实现得以选择并行执行reduce操作。(更新共享变量求和显然不合适,可以将输入分块,分块求和,最后合并。分支合并框架)

2.6 无状态和有状态

诸如map、filter等操作会从输入流中获取每一个元素,并输出一个元素,这种操作是一般都是无状态的(不需要内部状态来积累结果)。无状态有界

诸如reduce、sum、max等操作都需要内部状态来积累结果,但是其状态都是一个int或者一个double,不管流中有多少元素,内部状态都是有界的。有状态有界

相反、诸如sort、distinct等从流中排序和删除重复项都需要知道先前的历史,例如排序要求所有的元素都放到缓冲区后才能给输出流加入一个项目,这一操作要求的存储是无限的,如果是无限流就会出问题,我们把这些操作叫做有状态操作。有状态无界

2.7 Stream流相关操作汇总
操作 类型 返回类型 使用的类型/函数式接口 函数描述符
filter 中间 Stream Predicate T->boolean
distinct 中间(有状态-无界) Stream
takeWhile 中间 Stream Predicate T->boolean
dropWhile 中间 Stream Predicate T->boolean
skip 中间(有状态-无界) Stream long
limit 中间(有状态-无界) Stream long
map 中间 Stream Function<T,R> T->R
flatMap 中间 Stream Function<T,Stream> T->Stream
sorted 中间(有状态-无界) Stream Comparator (T,T)->int
anyMatch 终端 boolean Predicate T->boolean
noneMatch 终端 boolean Predicate T->boolean
allMatch 终端 boolean Predicate T->boolean
findAny 终端 Optional
findFirst 终端 Optional
forEach 终端 void Consumer T->void
collect 终端 R Collector<T,A,R>
reduce 终端(有状态-有界) Optional BinaryOperator (T,T)->T
count 终端 long
2.8 数值流

如果我们使用

Integer reduce = appleList.stream().map(Apple::getWeight).reduce(0, Integer::sum);

会存在一个暗含的拆箱成本,每个Integer都必须拆箱成一个基本类型再进行求和。为此StreamAPI还提供了基本类型流特化,且提供了专门处理数据的方法

2.8.1 基本类型流特化

Java8引入了三个基本类型数据特化流:IntStream、DoubleStream和LongStream,将流中的元素特化为int,double和long,从而避免拆箱开销,并且提供了sum、max等数值规约方法。

1. 映射到数值流:

将流转化为特化版本的常用方法mapToInt、mapToDouble和mapToLong。

int sum = appleList.stream().mapToInt(Apple::getWeight).sum();
// 850

2. 转换回对象流:

特化流执行如map等操作时接受的Lambda必须是接受一个int并返回int,在一些场景可能需要转化为非特化流。

数值流转化为非特化流调用boxed方法。

appleList.stream().mapToInt(Apple::getWeight).boxed().map(s->s+"aaa").forEach(System.out::print);
// 100aaa150aaa150aaa200aaa250aaa

3. 默认值OptionalInt:

求和有一个默认值是0这是正常的,但是求最大值时默认值为0就有问题了,为了区别是真的最大值是0还是容器为空,引入了OptionalInt、OptionalDouble、OptionalLong,同Optional在不存在最大值时可以自定义一个默认值,例如:

OptionalInt max = appleList.stream().mapToInt(Apple::getWeight).max();
int maxInt = max.orElse(0);
// 250
2.9 构建流
2.9.1 由值构建流

使用静态方法Stream.of,显示创建一个流。

Stream.of("A ","B ","C ","D ").map(s->s.toLowerCase(Locale.ROOT)).forEach(System.out::print);
// a b c d 
Stream.empty();
// 返回一个空流
2.9.2 由可空对象构建流

如果你创建流的元素可能为空,就需要很多判断以返回一个空流,java9提供了一个新方法可以由一个可空对象创建流。Stream.ofNullable,例如

Stream<Apple> appleStream = Stream.ofNullable(null);
// 不会报错,而会返回一个空流

搭配flatMap处理更为方便

Stream<String> values = Stream.of("config", "home", "user").flatMap(key -> Stream.ofNullable(System.getProperty(key)));
2.9.3 由数组创建流

可以由静态方法Arrays.stream从数组创建一个流

int[] numbers = {2, 3, 5, 7, 11, 13}; 
int sum = Arrays.stream(numbers).sum();
2.9.4 由文件生成流

Java中用于处理文件等I/O操作的NIO API已更新,以便利用StreamAPI。

java.nio.file.Files中很多静态方法都会返回一个流。

例如,一个很有用的方法Files.lines,它会返回一个由指定文件中的各行构成的字符串流。

try(Stream<String> stringStream = Files.lines(Paths.get("src/main/resources/hello.txt"),Charset.defaultCharset())) {
  stringStream.forEach(System.out::println);
} catch (IOException e) {
  e.printStackTrace();
}
2.9.5 由函数生成流:创建无限流

StreamAPI提供了两个静态方法来从函数生成流:Stream.iterate和Stream.generate。这两个操作可以创建所谓的无限流,因此应该使用limit(n)对流加以限制。

  1. 迭代
Stream.iterate(0,n->n+2)
  .map(s->s+" ")
  .limit(10)
  .forEach(System.out::print);
// 0 2 4 6 8 10 12 14 16 18 

iterate方法接受一个初始值,还有一个依次应用在每个产生的新值上的Lambda。

如果生成一个流符合某个条件终止,不可以用filter,filter是对每一个元素进行判断,不符合也会继续判断不会中断流,可以使用takeWhile中断流

不过,java9对lambda进行了加强,它可以支持谓词操作了

Stream.iterate(0, n -> n < 18, n -> n + 2)
  .map(s -> s + " ")
  .limit(10)
  .forEach(System.out::print);

iterate第二个参数增加了一个谓词,可以进行条件判断

  1. 生成

与iterate类似,generate方法也可以按需生成一个无限流。它接受一个Supplier类型的Lambda提供新的值。

例如

Stream.generate(Math::random)
  .limit(5)
  .forEach(System.out::println);
// 0.3264606676208366
// 0.45144188087176196
// 0.5374271831360782
// 0.5986257087017012
// 0.39600100086294787

自定义实现

Stream.generate(new Supplier<Integer>() {
  int a = 10;
  @Override
  public Integer get() {
    return a++;
  }
}).limit(5).map(s->s+" ").forEach(System.out::print);
// 10 11 12 13 14
Stream.generate(()->10).limit(5).map(s->s+" ").forEach(System.out::print);
// 10 10 10 10 10 
2.10 小结
  • 你可以使用 filter、distinct、takeWhile (Java 9)、dropWhile (Java 9)、skip 和limit 对流做筛选和切片。
  • 如果你明确地知道数据源是排序的,那么用 takeWhile 和 dropWhile 方法通常比filter 高效得多。
  • 你可以使用 map 和 flatMap 提取或转换流中的元素。
  • 你可以使用 findFirst 和 findAny 方法查找流中的元素。你可以用 allMatch、noneMatch 和 anyMatch 方法让流匹配给定的谓词。
  • 这些方法都利用了短路:找到结果就立即停止计算;没有必要处理整个流。
  • 你可以利用 reduce 方法将流中所有的元素迭代合并成一个结果,例如求和或查找最大元素。
  • filter 和 map 等操作是无状态的,它们并不存储任何状态。reduce 等操作要存储状态才能计算出一个值。sorted 和 distinct 等操作也要存储状态,因为它们需要把流中的所有元素缓存起来才能返回一个新的流。这种操作称为有状态操作。
  • 流有三种基本的原始类型特化:IntStream、DoubleStream 和 LongStream。它们的操作也有相应的特化。
  • 流不仅可以从集合创建,也可从值、数组、文件以及 iterate 与 generate 等特定方法创建。
  • 无限流所包含的元素数量是无限的(想象一下所有可能的字符串构成的流)。这种情况是有可能的,因为流中的元素大多数都是即时产生的。使用 limit 方法,你可以由一个无限流创建一个有限流。
目录
相关文章
|
消息中间件 JSON Java
高性能异步编程与实时流那千丝万缕的联系
在海量并发的场景下,使用异步+NIO的编程方式,最后就 会演变成一个实时流系统,看完你也会觉得很惊讶,万物都是想通的
|
3月前
|
安全 Java API
Stream流式编程,让代码变优雅
Stream流式编程,让代码变优雅
|
4月前
|
传感器 PyTorch 数据处理
流式数据处理:DataLoader 在实时数据流中的作用
【8月更文第29天】在许多现代应用中,数据不再是以静态文件的形式存在,而是以持续生成的流形式出现。例如,传感器数据、网络日志、社交媒体更新等都是典型的实时数据流。对于这些动态变化的数据,传统的批处理方式可能无法满足低延迟和高吞吐量的要求。因此,开发能够处理实时数据流的系统变得尤为重要。
144 1
|
4月前
|
存储 数据处理 API
数据处理
【8月更文挑战第21天】
40 1
|
7月前
|
存储 Kubernetes 负载均衡
k8s 数据流向 与 核心概念详细介绍
k8s 数据流向 与 核心概念详细介绍
|
7月前
|
SQL 数据处理
数据处理语言思考
数据处理语言思考
37 1
|
存储 Java
Stream流式编程
Stream流式编程
139 0
|
JSON 分布式计算 监控
Spark结构化流应用编程模式
Spark结构化流应用编程模式
|
NoSQL Shell Linux
如何使用 Flupy 构建数据处理管道
如何使用 Flupy 构建数据处理管道
164 0
|
数据可视化 Java 程序员
类型流(TypeFlow)——世俗化的函数式编程和改进的过程式设计
类型流(TypeFlow)——世俗化的函数式编程和改进的过程式设计
类型流(TypeFlow)——世俗化的函数式编程和改进的过程式设计