Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(下)

简介: Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(下)

打印出的时间戳几乎是一样的,若严格按照 log 打印的时间戳顺序,应该是 Emitter 先发送的 4,Subscriber1 再才接收到的 2,但根据反复实践的结果来看,实际上是 Subscriber1 先接收缓冲区中的 2,等缓冲区有剩余空间后,Emitter 才结束挂起继续发送 4. 把上面的例子简化一下,再改改数据:

//code 7
private fun coroutineStudy() {
    val sharedFlow = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 1)
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow1 collected $it")
                delay(10000)
            }
        }
        launch {
            (1..4).forEach{
                sharedFlow.emit(it)
                println("+++emit $it")
                delay(1000)
            }
        }
    }
}

打印结果如下所示,因为把 sharedFlow delay 的时长设置为 10s,所以很明显地看到 Emitter 在发送 1、2、3 时时间间隔均是 1s,发送 4 时足足过了 8s,这段时间就是 Emitter 被挂起了,一直等到 sharedFlow1 接收到 2 之后,4 才被 Emitter 发送,而 sharedFlow1 的每次接收都是间隔 10s,所以是先接收的 2,再结束挂起发送的 4.

00:25:52.481 29483-29483/com.example.myapplication I/System.out: +++emit 1
00:25:52.482 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 1
00:25:53.483 29483-29483/com.example.myapplication I/System.out: +++emit 2
00:25:54.486 29483-29483/com.example.myapplication I/System.out: +++emit 3
00:26:02.487 29483-29483/com.example.myapplication I/System.out: +++emit 4
00:26:02.488 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 2
00:26:12.497 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 3
00:26:22.516 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 4

通过源码也可看出这个结论,从 collect 方法进入,最终可以找到实际上是调用了 SharedFlowImpl 中的 collect 方法:

//code 8
    override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        try {
            if (collector is SubscribedFlowCollector) collector.onSubscription()
            val collectorJob = currentCoroutineContext()[Job]
            while (true) {
                var newValue: Any?
                while (true) {
                    newValue = tryTakeValue(slot) //首先尝试直接获取值
                    if (newValue !== NO_VALUE) break
                    awaitValue(slot) //没获取到则只能挂起等待新值到来
                }
                collectorJob?.ensureActive()
                collector.emit(newValue as T)
            }
        } finally {
            freeSlot(slot)
        }
    }

在内层 while 循环中,首先是通过 tryTakeValue 方法直接取值,如果没取到则通过 awaitValue 方法挂起等待新值,awaitValue 是个挂起函数。取到新值之后,才会跳出内层 while 循环,并执行 collector.emit(newValue as T),而这一段代码,实际上就是调用的 code 7 中的 sharedFlow.emit(it) 代码。

此处源代码还可以看出,SharedFlow 每次在 emit 之前,确实都会查看所在协程是否还在运行;且它确实是不会停止的,哪怕没有接收到新值,也会一直处于挂起等待的状态,想要结束则得使用截断类型的操作符。


1.3 onBufferOverflow


SharedFlow 构造函数的第三个参数就是设置超过 Buffer 之后的策略,默认是将生产者挂起暂时不再发送数据,即 BufferOverflow.SUSPEND。

还有另外两个数据丢弃策略:

1)BufferOverflow.DROP_LATEST 丢弃最新数据;

image.png

Emitter 在发送 4 时,因为 Buffer 已满,所以只能按照策略将最新的数据 4 丢弃。而在发送 3 时,由于 1 已经被消费过,所以可以从 Buffer 中移除,从而腾出存储空间缓存 3。

2)BufferOverflow.DROP_OLDEST 丢弃最老数据:

image.png

这个策略就比较简单,Buffer 中只会存储最新的数据。不管较老的数据是否被消费,当 Buffer 已满而又有新的数据到达时,老数据都会从 Buffer 中移除,腾出空间让给新数据。

注意点:当 replay、extra 都为 0,即没有 Buffer 的时候,那么 onBufferOverflow 只能是 BufferOverflow.SUSPEND。丢弃策略启动的前提是 SharedFlow 至少有 Buffer 且 Buffer 已满。


1.4 emit 与 tryEmit


由前一节可知,当 SharedFlow 的 Buffer 已满且 onBufferOverflow 为 BufferOverflow.SUSPEND 的时候,emit 会被挂起(emit 是个挂起函数),但这会影响到 Emitter 的速度。如果不想在发送数据的时候被挂起,除了设置 onBufferOverflow 丢弃策略外,还可以使用 tryEmit 方法。

//code 9
    override fun tryEmit(value: T): Boolean {
        var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
        val emitted = synchronized(this) {
            if (tryEmitLocked(value)) {
                resumes = findSlotsToResumeLocked(resumes)
                true
            } else {
                false
            }
        }
        for (cont in resumes) cont?.resume(Unit)
        return emitted
    }
    @Suppress("UNCHECKED_CAST")
    private fun tryEmitLocked(value: T): Boolean {
        // Fast path without collectors -> no buffering
        // 1.没有订阅者时,直接返回 true,因为没有人接收,发了也没用,也不用缓存
        if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
        // With collectors we'll have to buffer
        // 2.有订阅者,就得考虑缓存发送的值了
        // cannot emit now if buffer is full & blocked by slow collectors
        // 3.如果缓存空间已满,且订阅者还在挂起处理上次的数据,则不能 emit
        if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
            when (onBufferOverflow) {
                BufferOverflow.SUSPEND -> return false // will suspend
                BufferOverflow.DROP_LATEST -> return true // just drop incoming
                BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
            }
        }
        // 4.代码能走到这里,说明缓存还有空间或丢弃策略为DROP_OLDEST
        enqueueLocked(value)
        bufferSize++ // value was added to buffer
        // drop oldest from the buffer if it became more than bufferCapacity
        if (bufferSize > bufferCapacity) dropOldestLocked()
        // keep replaySize not larger that needed
        if (replaySize > replay) { // increment replayIndex by one
            updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
        }
        return true
    }

由代码可见 tryEmit 不是一个挂起函数,它有返回值,如果返回 true 则说明发送数据成功了;如果返回 false,则说明这时发送数据需要被挂起等待。其中最主要的就是 tryEmitLocked 方法。

tryEmitLocked 方法主要逻辑已在注释中说明,需要额外说明的是,bufferCapacity 就是 replay + extraBufferCapacity 的大小;replayIndex 指的是最近开始订阅的订阅者在 replay cache 缓存数组中需要重播的最小 index。所以当使用默认构造的 SharedFlow 时,replayextraBufferCapacity 都为 0,如果这时再使用 tryEmit 方法进行发送,则会使得 if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) 判断为 true,默认的丢弃策略又是 BufferOverflow.SUSPEND,就会导致这里会直接返回 false,永远都不会发送出值。所以,在使用默认构造的 SharedFlow 时,不能使用 tryEmit 发送值,否则无法发送。 一般使用 emit 即可。

在 SharedFlow 具体实现中,emit 方法就是先尝试使用 tryEmit 来发送值,如果不能马上发送再使用挂起函数 emitSuspend 方法:

//code 10    class SharedFlowImpl
    override suspend fun emit(value: T) {
        if (tryEmit(value)) return // fast-path
        emitSuspend(value)
    }


2. StateFlow


看完 SharedFlow 再来看 StateFlow 的话就比较简单了。因为 StateFlow 就是 SharedFlow 的一种特殊子类,特点有三:

1)它的 replay cache 容量为 1;即可缓存最近的一次粘性事件;

2)初始化时必须给它设置一个初始值;

3)每次发送数据都会与上次缓存的数据作比较,如果不一样才会发送,自动过滤掉没有发生变化的数据。

它还可直接访问它自己的 value 参数获取当前结果值,总体来说,在使用上与 LiveData 相似,下面是它俩的异同点对比。


2.1 与 LiveData 比较的相同点


  1. 均提供了 可读可写 和 仅可读 两个版本:MutableStateFlow、StateFlow 与 MutableLiveData、LiveData;
  2. 允许被多个观察者观察,即生产者对消费者可以为一对多的关系;
  3. 都只会把最新的值给到观察者,即使没有观察者,也会更新自己的值;
  4. 都会产生粘性事件问题;
  5. 都可能产生丢失值的问题;

粘性事件问题:因为 StateFlow 初始化时必须给定初始值,且 replay 为 1,所以每个观察者进行观察时,都会收到最近一次的回播数据。如果想避免粘性事件问题,换用 SharedFlow 即可,replay 使用默认值 0 。

值丢失问题:出现在消费者处理数据比生产者生产数据慢的情况,消费者来不及处理数据,就会把之前生产者发送的旧数据丢弃掉,看个例子:

//code 11
    private fun stateFlowDemo1() {
        val stateFlow = MutableStateFlow(0)
        CoroutineScope(Dispatchers.Default).launch {
            var count = 1
            while (true) {
                val tmp = count++
                delay(1000)
                println("+++++ tmp = $tmp")
                stateFlow.value = tmp
            }
        }
        CoroutineScope(Dispatchers.Default).launch {
            stateFlow.collect{
                println("++++ count = $it")
                delay(5000)  //模拟耗时操作
            }
        }
    }

image.png

可以从打印结果看出,StateFlow 会丢弃掉生产者之前发送的值,其实 MutableStateFlow 的丢弃策略就是设置的 BufferOverflow.DROP_OLDEST。


2.2 与 LiveData 比较的不同点


  1. StateFlow 必须在构建的时候传入初始值,LiveData 不需要;
  2. StateFlow 默认是防抖的,LiveData 默认不防抖;
  3. 对于 Android 来说 StateFlow 默认没有和生命周期绑定,直接使用会有问题;

StateFlow 默认防抖:即如果发送的值与上次相同,则生产者并不会真正发送。在源码中也有说明,具体在 StateFlow.kt -> class StateFlowImpl -> private fun updateState -> if (oldState == newState) return true感兴趣的可以自行查阅,我看的版本是 1.5.0.

与 LiveData 相比,没有和 Activity 的生命周期绑定恐怕是使用 StateFlow 最不方便的地方了。当 View 进入 STOPPED 状态时,LiveData.observe() 会自动取消注册使用方,这样就不会再接收到数据了,也符合常理。因为用户此时已经离开页面,再接收数据已没有意义,如果继续处理后续逻辑可能还会出 bug。

而如果使用的是 StateFlow 或其他数据流,在 View 进入 STOPPED 状态时,收集数据的操作并不会自动停止。如需实现相同的行为,则需要从 Lifecycle.repeatOnLifecycle 块收集数据流。如下是来自官方文档的例子:

//code 12
class LatestNewsActivity : AppCompatActivity() {
    private val latestNewsViewModel = // getViewModel()
    override fun onCreate(savedInstanceState: Bundle?) {
        ...
        // Start a coroutine in the lifecycle scope
        lifecycleScope.launch {
            // repeatOnLifecycle launches the block in a new coroutine every time the
            // lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                // Trigger the flow and start listening for values.
                // Note that this happens when lifecycle is STARTED and stops
                // collecting when the lifecycle is STOPPED
                latestNewsViewModel.uiState.collect { uiState ->
                    // New value received
                    when (uiState) {
                        is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
                        is LatestNewsUiState.Error -> showError(uiState.exception)
                    }
                }
            }
        }
    }
}
//注意:repeatOnLifecycle API 仅在 androidx.lifecycle:lifecycle-runtime-ktx:2.4.0 库及更高版本中提供。

英文部分注释说的比较明确了,repeatOnLifecycle(Lifecycle.State.STARTED) 的作用就是每次进入 STARTED 可见状态时都会重新观察并收集数据;而在 STOPPED 状态时就会 cancel 掉 StateFlow 收集流所在的协程从而停止收集。


总结


最后总结一下 Flow 第二小节的内容吧:

1)热流有无消费者都可发送数据,生产者和消费者的关系可以是一对多;

2)SharedFlow 可构建热流,可设置 replay 重播数据量及 extraBufferCapacity 缓冲区大小,以及 onBufferOverflow 缓冲区满的策略;

3)emittryEmit 发送方法的异同,前者是挂起函数,注意在使用默认构造的 SharedFlow 时不要使用 tryEmit

4)StateFlow 是 SharedFlow 的一个子类,replay = 1,必须给定初始值,自带防抖;

5)使用 StateFlow 或 SharedFlow 收集值时,记得在 repeatOnLifecycle(Lifecycle.State.STARTED) 方法中,防止出现崩溃等问题。

更多内容,欢迎关注公众号:修之竹

赞人玫瑰,手留余香!欢迎点赞、转发~ 转发请注明出处~


参考文献


  1. Reactive Streams on Kotlin: SharedFlow and StateFlow; Ricardo Costeira; https://www.raywenderlich.com/22030171-reactive-streams-on-kotlin-sharedflow-and-stateflow
  2. Kotlin中 Flow、SharedFlow与StateFlow区别;五问;https://juejin.cn/post/7142038525997744141
  3. 一看就懂!图解 Kotlin SharedFlow 缓存系统;fundroid;https://juejin.cn/post/7156408785886511111
  4. Kotlin:深入理解StateFlow与SharedFlow,StateFlow和LiveData使用差异区分,SharedFlow实现源码解析;  pumpkin的玄学;  https://blog.csdn.net/weixin_44235109/article/details/121594988?spm=1001.2014.3001.5502
  5. StateFlow 和 SharedFlow 官方文档  https://developer.android.google.cn/kotlin/flow/stateflow-and-sharedflow?hl=zh-cn
目录
相关文章
|
1月前
|
缓存 数据处理 Android开发
Android经典实战之Kotlin常用的 Flow 操作符
本文介绍 Kotlin 中 `Flow` 的多种实用操作符,包括转换、过滤、聚合等,通过简洁易懂的例子展示了每个操作符的功能,如 `map`、`filter` 和 `fold` 等,帮助开发者更好地理解和运用 `Flow` 来处理异步数据流。
74 4
|
8天前
|
数据处理 开发者 Kotlin
利用Kotlin Flow简化数据流管理
随着移动端应用的复杂化,数据流管理成为一大挑战。Kotlin Flow作为一种基于协程的响应式编程框架,可简化数据流处理并支持背压机制,有效避免应用崩溃。本文通过解答四个常见问题,详细介绍Kotlin Flow的基本概念、创建方法及复杂数据流处理技巧,帮助开发者轻松上手,提升应用性能。
36 16
|
2天前
|
存储 API 数据库
Kotlin协程与Flow的魅力——打造高效数据管道的不二法门!
在现代Android开发中,Kotlin协程与Flow框架助力高效管理异步操作和数据流。协程采用轻量级线程管理,使异步代码保持同步风格,适合I/O密集型任务。Flow则用于处理数据流,支持按需生成数据和自动处理背压。结合两者,可构建复杂数据管道,简化操作流程,提高代码可读性和可维护性。本文通过示例代码详细介绍其应用方法。
11 2
|
9天前
|
数据处理 Kotlin
掌握这项Kotlin技能,让你的数据流管理不再头疼!Flow的秘密你解锁了吗?
【9月更文挑战第12天】随着移动应用发展,数据流管理日益复杂。Kotlin Flow作为一种基于协程的异步数据流处理框架应运而生,它可解耦数据的生产和消费过程,简化数据流管理,并支持背压机制以防应用崩溃。本文通过四个问题解析Kotlin Flow的基础概念、创建方式、复杂数据流处理及背压实现方法,助您轻松掌握这一高效工具,在实际开发中更从容地应对各种数据流挑战,提升应用性能。
28 8
|
10天前
|
数据处理 API 数据库
揭秘Kotlin Flow:迈向响应式编程的黄金钥匙
【9月更文挑战第11天】在现代软件开发中,异步编程与数据处理对于构建高性能应用至关重要。Kotlin Flow作为协程库的一部分,提供了简洁高效的API来处理数据流。本文将通过实例引导你从零开始学习Kotlin Flow,掌握构建响应式应用的方法。Flow是一种冷流,仅在订阅时才开始执行,支持map、filter等操作符,简化数据处理。
25 7
|
8天前
|
存储 数据处理 Kotlin
Kotlin Flow背后的神秘力量:背压、缓冲与合并策略的终极揭秘!
【9月更文挑战第13天】Kotlin Flow 是 Kotlin 协程库中处理异步数据流的强大工具,本文通过对比传统方法,深入探讨 Flow 的背压、缓冲及合并策略。背压通过 `buffer` 函数控制生产者和消费者的速率,避免过载;缓冲则允许数据暂存,使消费者按需消费;合并策略如 `merge`、`combine` 和 `zip` 则帮助处理多数据源的整合。通过这些功能,Flow 能更高效地应对复杂数据处理场景。
22 2
|
8天前
|
移动开发 定位技术 Android开发
「揭秘高效App的秘密武器」:Kotlin Flow携手ViewModel,打造极致响应式UI体验,你不可不知的技术革新!
【9月更文挑战第12天】随着移动开发领域对响应式编程的需求增加,管理应用程序状态变得至关重要。Jetpack Compose 和 Kotlin Flow 的组合提供了一种优雅的方式处理 UI 状态变化,简化了状态管理。本文探讨如何利用 Kotlin Flow 增强 ViewModel 功能,构建简洁强大的响应式 UI。
21 3
|
9天前
|
数据库 Kotlin
Kotlin中的冷流和热流以及如何让Flow停下来
本文介绍了Kotlin中`Flow`的概念及其类型,包括冷流(Cold Flow)、热流`SharedFlow`及具有最新值的`StateFlow`。文中详细描述了每种类型的特性与使用场景,并提供了停止`Flow`的方法,如取消协程、使用操作符过滤及异常处理。通过示例代码展示了如何运用这些概念。
17 2
|
2天前
|
API 数据处理 数据库
掌握 Kotlin Flow 的艺术:让无限数据流处理变得优雅且高效 —— 实战教程揭秘如何在数据洪流中保持代码的健壮与灵活
Kotlin Flow 是一个强大的协程 API,专为处理异步数据流设计。它适合处理网络请求数据、监听数据库变化等场景。本文通过示例代码展示如何使用 Kotlin Flow 管理无限流,如实时数据流。首先定义了一个生成无限整数的流 `infiniteNumbers()`,然后结合多种操作符(如 `buffer`、`onEach`、`scan`、`filter`、`takeWhile` 和 `collectLatest`),实现对无限流的优雅处理,例如计算随机数的平均值并在超过阈值时停止接收新数据。这展示了 Flow 在资源管理和逻辑清晰性方面的优势。
10 0
|
1月前
|
缓存 API Android开发
Android经典实战之Kotlin Flow中的3个数据相关的操作符:debounce、buffer和conflate
本文介绍了Kotlin中`Flow`的`debounce`、`buffer`及`conflate`三个操作符。`debounce`过滤快速连续数据,仅保留指定时间内的最后一个;`buffer`引入缓存减轻背压;`conflate`仅保留最新数据。通过示例展示了如何在搜索输入和数据流处理中应用这些操作符以提高程序效率和用户体验。
35 6