解构反应式编程——Java 8, RxJava, Reactor之比较

简介: 如果你熟悉Java 8,同时又了解反应式编程(Reactive Programming)框架,例如RxJava和Reactor等,你可能会问: “如果我可以用Java 8 的Stream, CompletableFuture, 以及Optional完成同样的事情,为什么还要用RxJava .

如果你熟悉Java 8,同时又了解反应式编程(Reactive Programming)框架,例如RxJava和Reactor等,你可能会问:

“如果我可以用Java 8 的Stream, CompletableFuture, 以及Optional完成同样的事情,为什么还要用RxJava 或者 Reactor呢?”

问题在于,大多数时候你在处理的是简单的任务,这个时候你确实不需要那些反应式编程的库。但是,当系统越来越复杂,或者你处理的本身就是个复杂的任务,你恐怕就得写一些让自己头皮发麻的代码。随着时间的推移,这些代码会变得越来越复杂和难以维护。

RxJava和Reactor提供了很多非常趁手的功能,能够支持你在未来更轻松地维护你的代码,实现新需求。但是这个优势到底有多大,具体体现在哪些方面?没有标准无法比较,让我们定义8个比较的维度,来帮助我们理解Java 8的API以及反应式编程的库之间的差别。

  1. Composable(可组装)
  2. Lazy(延迟执行)
  3. Reusable(可重用)
  4. Asynchronous(异步)
  5. Cacheable(可缓存)
  6. Push or Pull(推还是拉)
  7. Backpressure(反压)
  8. Operator fusion(操作融合)

针对上面这些维度,我们比较以下的这些类:

  1. CompletableFuture
  2. Stream
  3. Optional
  4. Observable (RxJava 1)
  5. Observable (RxJava 2)
  6. Flowable (RxJava 2)
  7. Flux (Reactor Core)

准备好了吗?我们开始!

Composable(可组装)

上面所有的这7个类都是可组装的,支持函数式的编程方式,这是我们喜欢它们的原因。

  • CompletableFuture - 提供很多的.then*()方法,这些方法允许我们构建一个流水线,在不同的执行阶段之间传递一个单一的值(或者没有值),以及传递异常对象。
  • Stream - 提供很多的可以链式编程方式连接起来的操作,不同的操作阶段之间可以传递N个值。
  • Optional - 提供一些中间操作,如: .map(), .flatMap(), .filter().
  • Observable, Flowable, Flux - 跟Stream相同

Lazy(延迟执行)

  • CompletableFuture - 非延迟执行,它本质上只是一个异步结果的持有者。这些对象创建出来是为了代表对应的工作,CompletableFuture创建的时候,对应的工作已经开始执行了。它不知道任何的关于工作的具体内容,只是关心结果。所以,没有办法能走到上游去从上到下执行整个流水线。当结果被塞到CompletableFuture对象的时候,下一个阶段开始执行。
  • Stream - 所有的中间操作都是延迟执行的。所有的终端操作,会触发整个计算。
  • Optional - 非延迟执行,所有的操作会马上发生。
  • Observable, Flowable, Flux - 延迟执行,没有订阅者的话,什么都不会做,只有当有订阅者的时候才会执行。

Reusable(可重用)

  • CompletableFuture - 可以重用,它只是在一个值外面做了一层包装。但需要注意一点,这个包装是可更改的。.obtrude*()方法会更改它的内容,如果你确定没有人会调用到这类方法,那么重用它还是安全的。
  • Stream - 不能重用。Java Doc已经说了:

A stream should be operated on (invoking an intermediate or terminal stream operation) only once. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. However, since some stream operations may return their receiver rather than a new stream object, it may not be possible to detect reuse in all cases.
翻译过来就是:Stream只能被操作(调用中间操作或者终端操作)一次。如果一个stream的实现检测到流被重复使用了,它可以抛出一个IllegalStateException。但是因为某些流操作会返回他们的receiver,而不是一个新的stream对象,并不是在所有的情况下都能够检测出重用。

  • Optional - 完全可重用,因为它是不可变对象,而且所有工作都是立即执行的。
  • Observable, Flowable, Flux - 就是设计来可重用的。所有的执行会从初始点开始,走过所有阶段,前提是有订阅者。

Asynchronous(异步)

  • CompletableFuture - 嗯...这个类存在的目的就是异步的把多个操作链接起来。CompletableFuture代表一个工作,后面跟一个Executor关联起来。如果你不明确指定一个executor,那么系统会使用公共的ForkJoinPool线程池来执行。这个线程池可以用ForkJoinPool.commonPool()获取到。默认的设置下它会创建系统硬件支持的线程数一样多的线程(通常就是跟CPU的核心数,如果你的CPU支持超线程,那么可能再翻一倍)。不过你也可以设置ForkJoinPool线程池的线程数,用以下JVM option:
    -Djava.util.concurrent.ForkJoinPool.common.parallelism=?

或者每次调用的时候提供一个定制的Executor。

  • Stream - 不支持创建异步过程,但是可以支持并行的计算——通过stream.parallel()等方式创建并行流。
  • Optional - 不支持,它只是一个容器。
  • Observable, Flowable, Flux - 目标就是为了构建异步的系统,但是默认情况下还是同步的。subscribeOn和observeOn允许你来控制消息的订阅以及消息的接收(指定当你的observer的 onNext / onError / onCompleted 被调用的时候做什么事情)。

subscribeOn让你决定用哪个Scheduler来执行Observable.create。即便你自己没有调用create,系统内部也会做类似的事情。示例:

Observable
  .fromCallable(() -> {
    log.info("Reading on thread: " + currentThread().getName());
    return readFile("input.txt");
  })
  .map(text -> {
    log.info("Map on thread: " + currentThread().getName());
    return text.length();
  })
  .subscribeOn(Schedulers.io()) // <-- setting scheduler
  .subscribe(value -> {
     log.info("Result on thread: " + currentThread().getName());
  });

输出:

Reading file on thread: RxIoScheduler-2
Map on thread: RxIoScheduler-2
Result on thread: RxIoScheduler-2

相反的,observeOn()决定在observeOn()之后,用哪个Scheduler来运行下游的执行阶段。示例:

Observable
  .fromCallable(() -> {
    log.info("Reading on thread: " + currentThread().getName());
    return readFile("input.txt");
  })
  .observeOn(Schedulers.computation()) // <-- setting scheduler
  .map(text -> {
    log.info("Map on thread: " + currentThread().getName());
    return text.length();
  })
  .subscribeOn(Schedulers.io()) // <-- setting scheduler
  .subscribe(value -> {
     log.info("Result on thread: " + currentThread().getName());
  });

输出:

Reading file on thread: RxIoScheduler-2
Map on thread: RxComputationScheduler-1
Result on thread: RxComputationScheduler-1

Cacheable(可缓存)

可缓存和可重用之间的区别是什么?举个例子,我们有一个流水线A,并且使用这个流水线两次,创建两个新的流水线 B = A + 以及 C = A + 。

       - 如果B和C都能成功完成,那么这个A是可重用的。
       - 如果B和C都能成功完成,并且A的每一个阶段只被调用了一次,那么这个A是可缓存的。

可以看出,一个类如果是可缓存的,必然得是可重用的。

  • CompletableFuture - 跟可重用的答案一样。
  • Stream - 不能缓存中间操作的结果,除非调用了终端操作。
  • Optional - ‘可缓存’,实际上,所有工作立即执行,并且做完后就保存了一个不变值,自然‘可缓存’。
  • Observable, Flowable, Flux - 默认情况下是不可缓存的,但是你可以把一个这些类转变成缓存,只要调用.cache()就可以。示例:
Observable<Integer> work = Observable.fromCallable(() -> {
  System.out.println("Doing some work");
  return 10;
});
work.subscribe(System.out::println);
work.map(i -> i * 2).subscribe(System.out::println);

输出:

Doing some work
10
Doing some work
20

如果用.cache():

Observable<Integer> work = Observable.fromCallable(() -> {
  System.out.println("Doing some work");
  return 10;
}).cache(); // <- apply caching
work.subscribe(System.out::println);
work.map(i -> i * 2).subscribe(System.out::println);

输出:

Doing some work
10
20

Push or Pull(推模式还是拉模式)

  • Stream 和 Optional - 是拉模式的。你调用不同的方法(.get(), .collect() 等)从流水线拉取结果。拉模式经常与阻塞、同步是相关联的,而这也合理。你调用一个方法,然后线程等待数据。线程会阻塞直到数据到达。
  • CompletableFuture, Observable, Flowable, Flux - 是推模式的。你订阅一个流水线,然后当有东西可以处理的时候你会得到通知。推模式通常意味着非阻塞、异步。当流水线在某个线程上执行的时候,你可以做任何事情。你已经定义了一段待执行的代码,作为下一个阶段的任务,当通知到达的时候,这个代码就会被执行。

Backpressure(反压)

要做到支持反压,流水线必须是推模式的。

Backpressure(反压) 描述的是在流水线中会发生的一种场景:某些异步的阶段处理速度跟不上,需要告诉上游生产者放慢速度。直接失败是不可接受的,因为会丢失太多数据。

backpressure.jpg

  • Stream & Optional - 不支持反压,因为他们是拉模式。
  • CompletableFuture - 不需要面对这个问题,因为它只产生0个或者1个结果。
  • Observable(RxJava 1), Flowable, Flux - 提供一组方案解决这个问题。常用的策略是:

      - Buffering(缓冲) - 把所有的onNext的值保存到缓冲区,直到下游消费它们。
      - Drop Recent - 如果下游处理跟不上的话,丢弃最近的onNext值。
      - Use Latest - 如果下游处理跟不上的话,只提供最近的onNext值,之前的值会被覆盖。
      - None - onNext事件直接被触发,不带任何缓冲或丢弃处理。
      - Exception - 如果下游处理跟不上的话,触发一个异常。

  • Observable(RxJava 2) - 不解决这个问题。很多RxJava 1的使用者用Observable来处理不适用反压的事件,或者是使用Observable的时候不用任何策略处理反压,这会导致不可预知的异常。所以,RxJava 2明确地区分两种情况,提供支持反压的Flowable和不支持反压的Observable。

Operator fusion(操作融合)

操作融合背后的想法是,在生命周期的不同点上,改变执行阶段的链条,从而消除库的架构因素所造成的额外开销。所有这些优化都是在内部处理掉的,对外部用户来说是透明的。

只有RxJava 2 和 Reactor 支持这个特性,但支持的方式不同。总的来说,有两种类型的优化:

  • Macro-fusion - 用一个操作替换2个或更多的相继的操作

macro-fusion_.png

  • Micro-fusion - 在一个输出队列中结束的操作,和在一个前驱队列中开始的操作,能够共用同一个队列的实例。比如说,与其调用request(1)然后处理onOnext(),我们可以:

micro-fusion-1_1.png

订阅者可以向父observable轮询值。

micro-fusion-2.png

更多的详细信息可以参考Operator-fusion (part 1)Operator-fusion (part 2)

总结

上面的内容可以总结为一个表:

2018-04-12_20-38-07.png

总的来说,Stream, CompletableFuture, 和 Optional创建出来是为了解决特定的问题。它们解决这些问题很好用。如果它们满足你的要求,你继续用它们就好了。

但是,不同的问题有不同的复杂性。某些问题需要新的技术。RxJava 和 Reactor是一组通用的工具,帮助你用一种声明式的方式解决你面对的问题,而不是用一些并非为这种问题而提供的工具,来创建一种“hack”的解决方案。


本文主要内容翻译自:http://alexsderkach.io/comparing-java-8-rxjava-reactor/

目录
相关文章
|
1月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
16天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
20天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
54 12
|
16天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
99 2
|
2月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
2月前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
1月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
52 3
|
1月前
|
开发框架 安全 Java
Java 反射机制:动态编程的强大利器
Java反射机制允许程序在运行时检查类、接口、字段和方法的信息,并能操作对象。它提供了一种动态编程的方式,使得代码更加灵活,能够适应未知的或变化的需求,是开发框架和库的重要工具。
55 4
|
2月前
|
安全 Java 开发者
Java中的多线程编程:从基础到实践
本文深入探讨了Java多线程编程的核心概念和实践技巧,旨在帮助读者理解多线程的工作原理,掌握线程的创建、管理和同步机制。通过具体示例和最佳实践,本文展示了如何在Java应用中有效地利用多线程技术,提高程序性能和响应速度。
72 1