Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(上)

简介: Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(上)

要说最近圈内大事件,那就非 chatGPT 莫属了!人工智能领域最新的大突破了吧?很可能引发下一场的技术革命,因为大家都懂的原因现在还不能在中国大陆使用,不过国内的度厂正在积极跟进了,预计3月份能面世,且期待一下吧~

上节主要讲述了 Flow 的组成、Flow 常用操作符以及冷流的具体使用。这节自然就要介绍热流了。先来温习下:

冷流(Cold Flow):在数据被消费者订阅后,即调用 collect 方法之后,生产者才开始执行发送数据流的代码,通常是调用 emit 方法。即不消费,不生产,多次消费才会多次生产。消费者和生产者是一对一的关系。

上次说的例子不太直观,所以这次换了个更直观的对比例子,先来看第一个:

//code 1
val coldFlow = flow {
    println("coldFlow begin emitting")
    emit(40)
    println("coldFlow 40 is emitted")
    emit(50)
    println("coldFlow 50 is emitted")
}
binding.btn2.setOnClickListener {
    lifecycleScope.launch {
        coldFlow.collect {
            println("coldFlow = $it")
        }
    }
}

只有当点击按钮时,才会如图打印出信息,即冷流只有调用了 collect 方法收集流后,emit 才会开始执行。

image.png

热流(Hot Flow)就不一样了,无论有无消费者,生产者都会生产数据。它不像冷流,Flow 必须在调用末端操作符之后才会去执行;而是可以自己控制是否发送或者生产数据流。并且热流可以有多个订阅者;而冷流只有一个。再来看看热流的例子:

//code 2
val hotFlow = MutableStateFlow(0)
lifecycleScope.launch {
    println("hotFlow begin emitting")
    hotFlow.emit(40)
    println("hotFlow 40 is emitted")
    hotFlow.emit(50)
    println("hotFlow 50 is emitted")
}
binding.btn2.setOnClickListener {
    lifecycleScope.launch {
        hotFlow.collect {
            println("hotFlow collects $it")
        }
    }
}

MutableStateFlow 就是热流中的一种,当没有点击按钮时,便会输出下图中的前三行信息。

image.png

当点击两下按钮后,就会依次输出如图第 4,5 行的信息,至于为什么只会接收到 50,这跟 MutableStateFlow 的特性有关,后面再说。

通过这两个例子就可清楚地知道冷热流之间的区别。热流有两种对象,分别是 StateFlow 和 SharedFlow。


1. SharedFlow


先来看看 SharedFlow,它是一个 subscriber 订阅者的角色,当一个 SharedFlow 调用了 collect 方法后,它就不会正常地结束完成;但可以 cancel 掉 collect 所在的协程,这样就可以取消掉订阅了。SharedFlow 在每次 emit 时都会去 check 一下所在协程是否已经取消。绝大多数的终端操作符,例如 Flow.toList() 都不会使得 SharedFlow 结束完成,但 Flow.take() 之类的截断操作符是例外,它们是可以强制完成一个 SharedFlow 的。

SharedFlow 的简单使用样例:

//code 3
class EventBus {
    private val _events = MutableSharedFlow<Event>() // private mutable shared flow
    val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
    suspend fun produceEvent(event: Event) {
        _events.emit(event) // suspends until all subscribers receive it
    }
}

与 LiveData 相似的使用方式。但 SharedFlow 的功能更为强大,它有 replay cache 和 buffer 机制。


1.1 Replay cache


可以理解为是一个粘性事件的缓存。每个新的订阅者会首先收到 replay cache 中之前发出并接收到的事件,再才会收到新的发射出的值。可以在 MutableSharedFlow 的构造函数中设置 cache 的大小,不能为负数,默认为 0.

//code 4
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)

replay 重播之前最新的 n 个事件,见字知义。下面是例子:

//code 5
private fun testSharedFlow() {
    val sharedFlow = MutableSharedFlow<Int>(replay = 2)
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow1 collected $it")
            }
        }
        launch {
            (1..3).forEach{
                sharedFlow.emit(it)
            }
        }
        delay(200)
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow2 collected $it")
            }
        }
    }
}

结果为:

com.example.myapplication I/System.out: ++++ sharedFlow1 collected 1
com.example.myapplication I/System.out: ++++ sharedFlow1 collected 2
com.example.myapplication I/System.out: ++++ sharedFlow1 collected 3
com.example.myapplication I/System.out: ++++ sharedFlow2 collected 2
com.example.myapplication I/System.out: ++++ sharedFlow2 collected 3

emit 发射数据前后分别设置了一个订阅者,后面还延时了 200ms 才进行订阅。第一个订阅者 1、2、3都收到了;而第二个订阅者却只收到了 2 和 3. 这是因为在第二个订阅者开始订阅时,数据已经都发射完了,而 SharedFlow 的重播 replay 为 2,就可将最近发射的两个数据再依次发送一遍,这就可以收到 2 和 3 了。


1.2 extraBufferCapacity


SharedFlow 构造函数的第二个参数 extraBufferCapacity 的作用是,在 replay cache 之外还能额外设置的缓存。常用于当生产者生产数据的速度 > 消费者消费数据的速度时的情况,可以有效提升吞吐量。

所以,若 replay = m,extraBufferCapacity = n,那么这个 SharedFlow 总共的 BufferSize = m + n.  replay 会存储最近发射的数据,如果满了就会往 extraBuffer 中存。接下来看一个例子:

//code 6
private fun coroutineStudy() {
    val sharedFlow = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 1)
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow1 collected $it")
                delay(6000)
            }
        }
        launch {
            (1..4).forEach{
                sharedFlow.emit(it)
                println("+++emit $it")
                delay(1000)
            }
        }
        delay(4000)
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow2 collected $it")
                delay(20000)
            }
        }
    }
}

运行结果为:

17:32:09.283 28184-28184 System.out com.wen.testdemo I  +++emit 1
17:32:09.284 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 1
17:32:10.285 28184-28184 System.out com.wen.testdemo I  +++emit 2
17:32:11.289 28184-28184 System.out com.wen.testdemo I  +++emit 3
17:32:13.286 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow2 collected 3
17:32:15.292 28184-28184 System.out com.wen.testdemo I  +++emit 4
17:32:15.293 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 2
17:32:21.301 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 3
17:32:27.311 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 4
17:32:33.292 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow2 collected 4

打印结果可能会有点懵,对照着时序图更容易理解(此图来自于参考文献3,感谢 fundroid 大佬的输出~):

image.png

1)Emitter 发送 1,因为 Subscriber1 在 Emitter 发送数据前就已开始订阅,所以 Subscriber1 可马上接收;此时 replay 存储 1;

2)Emitter 发送 2,Subscriber1 还在处理中处于挂起态,此时 replay 存储 2;

3)Emitter 发送 3,此时还没有任何消费者能消费,则 replay 存储 3,将 2 放入 extra 中;

4)Emitter 想要发送 4,但发现 SharedFlow 的 Buffer 已满,则按照默认的策略进行挂起等待(默认策略就是 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND);

5)Subscriber2 开始订阅,接收到 replay 中的 3,此时 Subscriber1 还是挂起态,Buffer 中数据没变化,即 replay 存储 3,extra 存储 2;

6)Subscriber1 处理完 1 后,依次处理 Buffer 中 的下一个数据,即消费 extra 中的 2,这时 Buffer 终于有空间了,Emitter 结束挂起,发送 4,replay 存储 4,将 3 放入 extra 中;

7)Subscriber1 消费完 2 后接着再消费 extra 中的 3,此时 Buffer 中就只有 4 了。后面的就不用多说了

比较绕,需要多看几次思考一下。需要注意的是,代码运行结果中下面两行输出到底谁先谁后的问题:

17:32:15.292 28184-28184 System.out com.wen.testdemo I  +++emit 4
17:32:15.293 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 2


目录
相关文章
|
8天前
|
数据处理 开发者 Kotlin
利用Kotlin Flow简化数据流管理
随着移动端应用的复杂化,数据流管理成为一大挑战。Kotlin Flow作为一种基于协程的响应式编程框架,可简化数据流处理并支持背压机制,有效避免应用崩溃。本文通过解答四个常见问题,详细介绍Kotlin Flow的基本概念、创建方法及复杂数据流处理技巧,帮助开发者轻松上手,提升应用性能。
36 16
|
2天前
|
存储 API 数据库
Kotlin协程与Flow的魅力——打造高效数据管道的不二法门!
在现代Android开发中,Kotlin协程与Flow框架助力高效管理异步操作和数据流。协程采用轻量级线程管理,使异步代码保持同步风格,适合I/O密集型任务。Flow则用于处理数据流,支持按需生成数据和自动处理背压。结合两者,可构建复杂数据管道,简化操作流程,提高代码可读性和可维护性。本文通过示例代码详细介绍其应用方法。
11 2
|
9天前
|
数据处理 Kotlin
掌握这项Kotlin技能,让你的数据流管理不再头疼!Flow的秘密你解锁了吗?
【9月更文挑战第12天】随着移动应用发展,数据流管理日益复杂。Kotlin Flow作为一种基于协程的异步数据流处理框架应运而生,它可解耦数据的生产和消费过程,简化数据流管理,并支持背压机制以防应用崩溃。本文通过四个问题解析Kotlin Flow的基础概念、创建方式、复杂数据流处理及背压实现方法,助您轻松掌握这一高效工具,在实际开发中更从容地应对各种数据流挑战,提升应用性能。
28 8
|
10天前
|
数据处理 API 数据库
揭秘Kotlin Flow:迈向响应式编程的黄金钥匙
【9月更文挑战第11天】在现代软件开发中,异步编程与数据处理对于构建高性能应用至关重要。Kotlin Flow作为协程库的一部分,提供了简洁高效的API来处理数据流。本文将通过实例引导你从零开始学习Kotlin Flow,掌握构建响应式应用的方法。Flow是一种冷流,仅在订阅时才开始执行,支持map、filter等操作符,简化数据处理。
25 7
|
8天前
|
存储 数据处理 Kotlin
Kotlin Flow背后的神秘力量:背压、缓冲与合并策略的终极揭秘!
【9月更文挑战第13天】Kotlin Flow 是 Kotlin 协程库中处理异步数据流的强大工具,本文通过对比传统方法,深入探讨 Flow 的背压、缓冲及合并策略。背压通过 `buffer` 函数控制生产者和消费者的速率,避免过载;缓冲则允许数据暂存,使消费者按需消费;合并策略如 `merge`、`combine` 和 `zip` 则帮助处理多数据源的整合。通过这些功能,Flow 能更高效地应对复杂数据处理场景。
22 2
|
8天前
|
移动开发 定位技术 Android开发
「揭秘高效App的秘密武器」:Kotlin Flow携手ViewModel,打造极致响应式UI体验,你不可不知的技术革新!
【9月更文挑战第12天】随着移动开发领域对响应式编程的需求增加,管理应用程序状态变得至关重要。Jetpack Compose 和 Kotlin Flow 的组合提供了一种优雅的方式处理 UI 状态变化,简化了状态管理。本文探讨如何利用 Kotlin Flow 增强 ViewModel 功能,构建简洁强大的响应式 UI。
21 3
|
9天前
|
数据库 Kotlin
Kotlin中的冷流和热流以及如何让Flow停下来
本文介绍了Kotlin中`Flow`的概念及其类型,包括冷流(Cold Flow)、热流`SharedFlow`及具有最新值的`StateFlow`。文中详细描述了每种类型的特性与使用场景,并提供了停止`Flow`的方法,如取消协程、使用操作符过滤及异常处理。通过示例代码展示了如何运用这些概念。
17 2
|
2天前
|
API 数据处理 数据库
掌握 Kotlin Flow 的艺术:让无限数据流处理变得优雅且高效 —— 实战教程揭秘如何在数据洪流中保持代码的健壮与灵活
Kotlin Flow 是一个强大的协程 API,专为处理异步数据流设计。它适合处理网络请求数据、监听数据库变化等场景。本文通过示例代码展示如何使用 Kotlin Flow 管理无限流,如实时数据流。首先定义了一个生成无限整数的流 `infiniteNumbers()`,然后结合多种操作符(如 `buffer`、`onEach`、`scan`、`filter`、`takeWhile` 和 `collectLatest`),实现对无限流的优雅处理,例如计算随机数的平均值并在超过阈值时停止接收新数据。这展示了 Flow 在资源管理和逻辑清晰性方面的优势。
10 0
|
1月前
|
前端开发 编译器 测试技术
Kotlin Multiplatform 跨平台开发的优化策略与实践
本文深入讲解Kotlin Multiplatform(KMP)的优化策略与实践。KMP是由JetBrains推出的开源技术,允许跨平台共享代码同时保持原生优势。文章覆盖KMP核心概念、性能优化技巧(如代码结构优化、利用`expect`/`actual`关键字、Kotlin/Native性能特性等),以及在移动、桌面和Web应用的实际案例分析。此外,还介绍了如何利用KMP生态系统工具进行快速开发,并展望了KMP的未来发展。
42 0
|
8天前
|
Android开发 开发者 Kotlin
告别AsyncTask:一招教你用Kotlin协程重构Android应用,流畅度飙升的秘密武器
【9月更文挑战第13天】随着Android应用复杂度的增加,有效管理异步任务成为关键。Kotlin协程提供了一种优雅的并发操作处理方式,使异步编程更简单直观。本文通过具体示例介绍如何使用Kotlin协程优化Android应用性能,包括网络数据加载和UI更新。首先需在`build.gradle`中添加coroutines依赖。接着,通过定义挂起函数执行网络请求,并在`ViewModel`中使用`viewModelScope`启动协程,结合`Dispatchers.Main`更新UI,避免内存泄漏。使用协程不仅简化代码,还提升了程序健壮性。
21 1