Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(上)

简介: Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow(上)

要说最近圈内大事件,那就非 chatGPT 莫属了!人工智能领域最新的大突破了吧?很可能引发下一场的技术革命,因为大家都懂的原因现在还不能在中国大陆使用,不过国内的度厂正在积极跟进了,预计3月份能面世,且期待一下吧~

上节主要讲述了 Flow 的组成、Flow 常用操作符以及冷流的具体使用。这节自然就要介绍热流了。先来温习下:

冷流(Cold Flow):在数据被消费者订阅后,即调用 collect 方法之后,生产者才开始执行发送数据流的代码,通常是调用 emit 方法。即不消费,不生产,多次消费才会多次生产。消费者和生产者是一对一的关系。

上次说的例子不太直观,所以这次换了个更直观的对比例子,先来看第一个:

//code 1
val coldFlow = flow {
    println("coldFlow begin emitting")
    emit(40)
    println("coldFlow 40 is emitted")
    emit(50)
    println("coldFlow 50 is emitted")
}
binding.btn2.setOnClickListener {
    lifecycleScope.launch {
        coldFlow.collect {
            println("coldFlow = $it")
        }
    }
}

只有当点击按钮时,才会如图打印出信息,即冷流只有调用了 collect 方法收集流后,emit 才会开始执行。

image.png

热流(Hot Flow)就不一样了,无论有无消费者,生产者都会生产数据。它不像冷流,Flow 必须在调用末端操作符之后才会去执行;而是可以自己控制是否发送或者生产数据流。并且热流可以有多个订阅者;而冷流只有一个。再来看看热流的例子:

//code 2
val hotFlow = MutableStateFlow(0)
lifecycleScope.launch {
    println("hotFlow begin emitting")
    hotFlow.emit(40)
    println("hotFlow 40 is emitted")
    hotFlow.emit(50)
    println("hotFlow 50 is emitted")
}
binding.btn2.setOnClickListener {
    lifecycleScope.launch {
        hotFlow.collect {
            println("hotFlow collects $it")
        }
    }
}

MutableStateFlow 就是热流中的一种,当没有点击按钮时,便会输出下图中的前三行信息。

image.png

当点击两下按钮后,就会依次输出如图第 4,5 行的信息,至于为什么只会接收到 50,这跟 MutableStateFlow 的特性有关,后面再说。

通过这两个例子就可清楚地知道冷热流之间的区别。热流有两种对象,分别是 StateFlow 和 SharedFlow。


1. SharedFlow


先来看看 SharedFlow,它是一个 subscriber 订阅者的角色,当一个 SharedFlow 调用了 collect 方法后,它就不会正常地结束完成;但可以 cancel 掉 collect 所在的协程,这样就可以取消掉订阅了。SharedFlow 在每次 emit 时都会去 check 一下所在协程是否已经取消。绝大多数的终端操作符,例如 Flow.toList() 都不会使得 SharedFlow 结束完成,但 Flow.take() 之类的截断操作符是例外,它们是可以强制完成一个 SharedFlow 的。

SharedFlow 的简单使用样例:

//code 3
class EventBus {
    private val _events = MutableSharedFlow<Event>() // private mutable shared flow
    val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
    suspend fun produceEvent(event: Event) {
        _events.emit(event) // suspends until all subscribers receive it
    }
}

与 LiveData 相似的使用方式。但 SharedFlow 的功能更为强大,它有 replay cache 和 buffer 机制。


1.1 Replay cache


可以理解为是一个粘性事件的缓存。每个新的订阅者会首先收到 replay cache 中之前发出并接收到的事件,再才会收到新的发射出的值。可以在 MutableSharedFlow 的构造函数中设置 cache 的大小,不能为负数,默认为 0.

//code 4
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)

replay 重播之前最新的 n 个事件,见字知义。下面是例子:

//code 5
private fun testSharedFlow() {
    val sharedFlow = MutableSharedFlow<Int>(replay = 2)
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow1 collected $it")
            }
        }
        launch {
            (1..3).forEach{
                sharedFlow.emit(it)
            }
        }
        delay(200)
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow2 collected $it")
            }
        }
    }
}

结果为:

com.example.myapplication I/System.out: ++++ sharedFlow1 collected 1
com.example.myapplication I/System.out: ++++ sharedFlow1 collected 2
com.example.myapplication I/System.out: ++++ sharedFlow1 collected 3
com.example.myapplication I/System.out: ++++ sharedFlow2 collected 2
com.example.myapplication I/System.out: ++++ sharedFlow2 collected 3

emit 发射数据前后分别设置了一个订阅者,后面还延时了 200ms 才进行订阅。第一个订阅者 1、2、3都收到了;而第二个订阅者却只收到了 2 和 3. 这是因为在第二个订阅者开始订阅时,数据已经都发射完了,而 SharedFlow 的重播 replay 为 2,就可将最近发射的两个数据再依次发送一遍,这就可以收到 2 和 3 了。


1.2 extraBufferCapacity


SharedFlow 构造函数的第二个参数 extraBufferCapacity 的作用是,在 replay cache 之外还能额外设置的缓存。常用于当生产者生产数据的速度 > 消费者消费数据的速度时的情况,可以有效提升吞吐量。

所以,若 replay = m,extraBufferCapacity = n,那么这个 SharedFlow 总共的 BufferSize = m + n.  replay 会存储最近发射的数据,如果满了就会往 extraBuffer 中存。接下来看一个例子:

//code 6
private fun coroutineStudy() {
    val sharedFlow = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 1)
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow1 collected $it")
                delay(6000)
            }
        }
        launch {
            (1..4).forEach{
                sharedFlow.emit(it)
                println("+++emit $it")
                delay(1000)
            }
        }
        delay(4000)
        launch {
            sharedFlow.collect {
                println("++++ sharedFlow2 collected $it")
                delay(20000)
            }
        }
    }
}

运行结果为:

17:32:09.283 28184-28184 System.out com.wen.testdemo I  +++emit 1
17:32:09.284 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 1
17:32:10.285 28184-28184 System.out com.wen.testdemo I  +++emit 2
17:32:11.289 28184-28184 System.out com.wen.testdemo I  +++emit 3
17:32:13.286 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow2 collected 3
17:32:15.292 28184-28184 System.out com.wen.testdemo I  +++emit 4
17:32:15.293 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 2
17:32:21.301 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 3
17:32:27.311 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 4
17:32:33.292 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow2 collected 4

打印结果可能会有点懵,对照着时序图更容易理解(此图来自于参考文献3,感谢 fundroid 大佬的输出~):

image.png

1)Emitter 发送 1,因为 Subscriber1 在 Emitter 发送数据前就已开始订阅,所以 Subscriber1 可马上接收;此时 replay 存储 1;

2)Emitter 发送 2,Subscriber1 还在处理中处于挂起态,此时 replay 存储 2;

3)Emitter 发送 3,此时还没有任何消费者能消费,则 replay 存储 3,将 2 放入 extra 中;

4)Emitter 想要发送 4,但发现 SharedFlow 的 Buffer 已满,则按照默认的策略进行挂起等待(默认策略就是 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND);

5)Subscriber2 开始订阅,接收到 replay 中的 3,此时 Subscriber1 还是挂起态,Buffer 中数据没变化,即 replay 存储 3,extra 存储 2;

6)Subscriber1 处理完 1 后,依次处理 Buffer 中 的下一个数据,即消费 extra 中的 2,这时 Buffer 终于有空间了,Emitter 结束挂起,发送 4,replay 存储 4,将 3 放入 extra 中;

7)Subscriber1 消费完 2 后接着再消费 extra 中的 3,此时 Buffer 中就只有 4 了。后面的就不用多说了

比较绕,需要多看几次思考一下。需要注意的是,代码运行结果中下面两行输出到底谁先谁后的问题:

17:32:15.292 28184-28184 System.out com.wen.testdemo I  +++emit 4
17:32:15.293 28184-28184 System.out com.wen.testdemo I  ++++ sharedFlow1 collected 2


目录
相关文章
|
2月前
|
Java Kotlin
Kotlin学习教程(七)
《Kotlin学习教程(七)》主要介绍了Lambda表达式,这是一种匿名函数,广泛用于简化代码。文章通过与Java 8 Lambda表达式的对比,展示了Kotlin中Lambda的基本语法、参数声明、函数体定义及如何作为参数传递。示例包括按钮事件处理和字符串比较,突出了Lambda表达式的简洁性和实用性。
45 4
|
3月前
|
Java Kotlin 索引
Kotlin学习教程(三)
Kotlin学习教程(三)
20 4
|
3月前
|
Java Kotlin
Kotlin学习教程(二)
Kotlin学习教程(二)
44 4
|
3月前
|
安全 Java 编译器
Kotlin学习教程(一)
Kotlin学习教程(一)
48 4
|
3月前
|
存储 Java API
Kotlin学习教程(六)
《Kotlin学习教程(六)》介绍了Kotlin中的注解、反射、扩展函数及属性等内容。注解用于添加元数据,反射支持运行时自省,扩展则允许为现有类添加新功能,无需修改原类。本文还详细解释了静态扩展的使用方法,展示了如何通过companion object定义静态部分,并对其进行扩展。
25 2
|
3月前
|
存储 设计模式 JSON
Kotlin学习教程(五)
《Kotlin学习教程(五)》介绍了Kotlin中的泛型、嵌套类、内部类、匿名内部类、枚举、密封类、异常处理、对象、单例、对象表达式、伴生对象、委托等高级特性。具体内容包括泛型的定义和类型擦除、嵌套类和内部类的区别、匿名内部类的创建、枚举类的使用、密封类的声明和用途、异常处理机制、对象和单例的实现、对象表达式的应用、伴生对象的作用以及类委托和属性委托的使用方法。通过这些内容,读者可以深入理解Kotlin的高级特性和设计模式。
26 1
|
3月前
|
Java 开发者 Kotlin
Kotlin学习笔记- 类与构造器
本篇笔记详细介绍了Kotlin中的类与构造器,包括类的基本概念、主构造器与次构造器的区别、构造器中参数的使用规则、类的继承以及构造器在继承中的应用等。通过具体示例,解释了如何在类中定义属性、实现构造逻辑,并探讨了Kotlin类的继承机制和Any类的作用。此外,还简要介绍了包的概念及其在组织代码中的作用。适合初学者深入理解Kotlin面向对象编程的核心概念。
40 3
|
3月前
|
Java 编译器 Kotlin
Kotlin学习笔记 - 数据类型
《Kotlin学习笔记 - 数据类型》是Kotlin编程语言学习系列的一部分,专注于Kotlin中的数据类型,包括布尔型、数字型(整型和浮点型)、字符型及字符串型,详述了各类型的定义、使用方法及相互间的转换规则。适合初学者快速掌握Kotlin基础语法。
33 3
|
3月前
|
安全 IDE Java
Kotlin 学习笔记- 空类型和智能类型转换
Kotlin 学习笔记聚焦于空类型和智能类型转换,深入解析非空与可空类型、安全调用操作符、Elvis 运算符、非空断言运算符及智能类型转换等内容,助你高效掌握 Kotlin 语言特性,避免 NullPointException 异常,提升代码质量。
33 2
|
3月前
|
Java 开发者 Kotlin
Kotlin学习笔记- 类与构造器
Kotlin学习笔记- 类与构造器
35 3