Kotlin 学习笔记(七)—— Flow 数据流学习实践指北(三)冷流转热流以及代码实例(上)

简介: Kotlin 学习笔记(七)—— Flow 数据流学习实践指北(三)冷流转热流以及代码实例(上)

前一节(Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow)介绍完了两种热流的构造方法以及它们的特点,那有没有方法可以将冷流转化为热流呢?当然是有的。那为什么需要将冷流转化为热流呢?

假如有这么一个场景:一开始有一个冷流 coldFlow 和它对应的消费者,后来下游又有几个新来的消费者需要使用这个 coldFlow,并且还需要之前已发送过的数据。而冷流的生产者与消费者是一对一的关系,且没有 replay 缓存机制,为新的消费者再创建一个冷流开销较大,这种情况下将冷流转为热流就显得事半功倍了。


1. shareIn 操作符


Flow 中的 shareIn 操作符就可以将冷流转为热流,它的方法声明是:

// code 1
public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

首先看返回值,最终确实会转化为一个热流 SharedFlow 实例。方法参数先来看最简单的 replay 参数,就是设置回播到每个新增消费者的数据个数,默认为 0。所以默认情况下,新增的消费者只能收到从它开始收集的时间点之后,生产者发送的数据。

再来看第一个 scope 参数,用于设置一个 CoroutineScope 作用域,注意其生命周期的长度需要比任何消费者都要长,保证被转化成的热流能在所有消费者收集数据进行消费时,都能处于活跃状态。新被转化的热流其实就是一个共享数据流,可以被所有的消费者共享使用。

第二个参数 started 复杂一些,它是用于设置被转化为共享数据流的启动方式,官方提供有 3 种方式,下面一个个说:

SharingStarted.Eagerly

勤快式启动方式。不等第一个消费者出现就会立即启动,需要注意的是,这种方式只会保留启动时数据流发送的前 replay 个数据,再之前的数据会立即丢弃。即不对数据流缓存区以外的数据负责,所以 replay 缓存区大小设置很重要。

SharingStarted.Lazily

懒汉式启动方式。需要等第一个消费者出现才会启动,第一个消费者可以接收到数据流所有发送的数据;但其他后面的消费者只能接收到最近的 replay 个数据。这种方式启动的数据流会一直保持活跃状态,甚至所有的的消费者都退出观察不再接收了,数据流仍然会缓存最近的 replay 个数据。

SharingStarted.WhileSubscribed()

灵活式启动方式。默认情况下就是有消费者来它就立即启动,没消费者接收了它就立即停止。所以在第一个消费者出现数据流就启动,当最后一个消费者退出它就立即停止,但它仍会永久缓存最近的 replay 个数据。此外,这种启动方式还可以根据需求自定义设置参数:

// code 2
public fun WhileSubscribed(
    stopTimeoutMillis: Long = 0,
    replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted =
    StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)

stopTimeoutMillis:设置最后一个消费者退出后,多长时间后再关闭数据流。默认是 0,即立即关闭。replayExpirationMillis:设置关闭流之后等待多长时间后,再重置清空缓存区 replay cache 的数据。默认是 Long.MAX_VALUE,即永远保存。

自定义 SharingStarted

其实还可以自定义启动方式,自己实现 SharingStarted 接口即可。如果看了前三种启动方式的源码,不难会发现,其实启动方式都是使用固定的几个 SharingCommand 实现的。SharingCommand 有三种:

// code 3
public enum class SharingCommand {
    /**
     * 开始启动,并开始收集上游数据流.
     * 多次发送这个命令并没有什么用(支持防抖),如果先发送 STOP 再发送 START 则是重启一个上游数据流。
     */
    START,
    /**
     * 停止数据流, 取消上游数据流的收集所在协程。
     */
    STOP,
    /**
     * 停止数据流, 取消上游数据流的收集所在协程。并且将 replayCache 缓冲区的值重置为初始状态。
     * 如果是 shareIn 操作符,则会调用 [MutableSharedFlow.resetReplayCache] 方法;
     * 如果是 stateIn 操作符,则会将缓冲数据重置为最初设置的初始值.
     */
    STOP_AND_RESET_REPLAY_CACHE
}

感兴趣的同学可以看看 SharingStarted.WhileSubscribed() 的具体实现类 StartedWhileSubscribed 里面的源码。如果需要自定义启动方式,照着葫芦画瓢即可。

既然有 shareIn,那自然就少不了 stateIn 了。


2. stateIn 操作符


方法声明:

// code 4
public fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

首先可以看出返回值是一个热流 StateFlow 实例,那么自然而然就需要一个参数给它设置一个初始值,即第三个参数 initialValue。 前两个参数与 shareIn 一样,这里就不再赘述。


3. shareIn 与 stateIn 使用指北


3.1 SharingStarted.WhileSubscribed() 实际使用


从上面的介绍可知,这种启动方式可以在没有消费者时自动取消上游数据流,从而避免资源的浪费。但在实际使用中,建议使用 SharingStarted.WhileSubscribed(5000),即在最后一个消费者停止后再保持数据流 5 秒钟的活跃状态。避免在某些特定情况下(如配置改变——最常见就是横竖屏切换、暗夜模式切换)重启上游的数据流。


3.2 shareIn、stateIn 适用于属性声明而非方法返回值


shareInstateIn 都会创建一个新的数据流,具体说就是 shareIn 会构建一个 ReadonlySharedFlow 实例;stateIn 则会构建一个 ReadonlyStateFlow 实例。而新创建的数据流会一直保存在内存中,直到传入数据流的作用域被取消或者没有任何引用时才会被 GC 回收。

所以下面代码中,前一部分代码是禁止使用的,正确的使用应该是如后一部分的代码,即在属性中使用。

// code 5
//错误示例:每次调用方法都会构建新的数据流
fun getUser(): Flow<User> =
    userLocalDataSource.getUser()
            .shareIn(externalScope, WhileSubscribed())    
//正确示例:在属性中使用 shareIn 或 stateIn 
 val user: Flow<User> = 
     userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())


3.3 MutableSharedFlow 的 subscriptionCount 参数


这个参数表示的是 MutableSharedFlow 中活跃的消费者数目,即订阅者的个数。可用于监听消费者的数目变更,下面就是一个例子:

// code 6
sharedFlow.subscriptionCount
    .map { count -> count > 0 } // count > 0 说明有消费者,返回 true;= 0 说明没有消费者了,返回 false
    .distinctUntilChanged() // only react to true<->false changes
    .onEach { isActive -> // configure an action
        if (isActive) { // do something } else { // do something }
    }
    .launchIn(scope) // launch it

这个例子可以在有消费者收集数据流时,做一些自己的操作;当所有消费者都停止收集时,再处理另外的一些操作,比如资源回收等。

distinctUntilChanged 操作符比较面生,它就是过滤掉前面接收到的重复值,从而使得后面只会接收到发生了变化的新值,和 StateFlow 特性一样。

onEach 操作符也比较常见,可以在流上新增一些处理操作,再发给下游。


3.4 与操作符的搭配使用


如果在实际使用中,需要得知上游数据流的一些状态,比如开始、完成等,则需要在上游数据流转为热流之前添加一些操作符起到监听的作用。

onStart 操作符监听启动,onCompletion 操作符监听完成

// code 7
private fun shareInOnStartDemo() {
    val testFlow = flow {
        println("++++emit before")
        emit(4)
        delay(1000)
        emit(5)
        delay(1000)
        emit(6)
    }.onStart {
        emit(-1)
        println("++++ onStart")
    }.onCompletion {
        emit(-100)
        println("++++ onCompletion")
    }.shareIn(
        lifecycleScope,
        SharingStarted.WhileSubscribed(),
        8
    )
    lifecycleScope.launch {
        testFlow.collect {
            println("++++ collector receive $it")
        }
    }
}

image.png

从打印的 log 可以看到,确实可以监听状态。当然也可以在相同的位置添加 catch 操作符用于监听异常的发生,感兴趣的同学可以试试看。

目录
相关文章
|
8天前
|
数据处理 开发者 Kotlin
利用Kotlin Flow简化数据流管理
随着移动端应用的复杂化,数据流管理成为一大挑战。Kotlin Flow作为一种基于协程的响应式编程框架,可简化数据流处理并支持背压机制,有效避免应用崩溃。本文通过解答四个常见问题,详细介绍Kotlin Flow的基本概念、创建方法及复杂数据流处理技巧,帮助开发者轻松上手,提升应用性能。
36 16
|
2天前
|
存储 API 数据库
Kotlin协程与Flow的魅力——打造高效数据管道的不二法门!
在现代Android开发中,Kotlin协程与Flow框架助力高效管理异步操作和数据流。协程采用轻量级线程管理,使异步代码保持同步风格,适合I/O密集型任务。Flow则用于处理数据流,支持按需生成数据和自动处理背压。结合两者,可构建复杂数据管道,简化操作流程,提高代码可读性和可维护性。本文通过示例代码详细介绍其应用方法。
11 2
|
9天前
|
数据处理 Kotlin
掌握这项Kotlin技能,让你的数据流管理不再头疼!Flow的秘密你解锁了吗?
【9月更文挑战第12天】随着移动应用发展,数据流管理日益复杂。Kotlin Flow作为一种基于协程的异步数据流处理框架应运而生,它可解耦数据的生产和消费过程,简化数据流管理,并支持背压机制以防应用崩溃。本文通过四个问题解析Kotlin Flow的基础概念、创建方式、复杂数据流处理及背压实现方法,助您轻松掌握这一高效工具,在实际开发中更从容地应对各种数据流挑战,提升应用性能。
28 8
|
10天前
|
数据处理 API 数据库
揭秘Kotlin Flow:迈向响应式编程的黄金钥匙
【9月更文挑战第11天】在现代软件开发中,异步编程与数据处理对于构建高性能应用至关重要。Kotlin Flow作为协程库的一部分,提供了简洁高效的API来处理数据流。本文将通过实例引导你从零开始学习Kotlin Flow,掌握构建响应式应用的方法。Flow是一种冷流,仅在订阅时才开始执行,支持map、filter等操作符,简化数据处理。
25 7
|
8天前
|
存储 数据处理 Kotlin
Kotlin Flow背后的神秘力量:背压、缓冲与合并策略的终极揭秘!
【9月更文挑战第13天】Kotlin Flow 是 Kotlin 协程库中处理异步数据流的强大工具,本文通过对比传统方法,深入探讨 Flow 的背压、缓冲及合并策略。背压通过 `buffer` 函数控制生产者和消费者的速率,避免过载;缓冲则允许数据暂存,使消费者按需消费;合并策略如 `merge`、`combine` 和 `zip` 则帮助处理多数据源的整合。通过这些功能,Flow 能更高效地应对复杂数据处理场景。
22 2
|
8天前
|
移动开发 定位技术 Android开发
「揭秘高效App的秘密武器」:Kotlin Flow携手ViewModel,打造极致响应式UI体验,你不可不知的技术革新!
【9月更文挑战第12天】随着移动开发领域对响应式编程的需求增加,管理应用程序状态变得至关重要。Jetpack Compose 和 Kotlin Flow 的组合提供了一种优雅的方式处理 UI 状态变化,简化了状态管理。本文探讨如何利用 Kotlin Flow 增强 ViewModel 功能,构建简洁强大的响应式 UI。
21 3
|
9天前
|
数据库 Kotlin
Kotlin中的冷流和热流以及如何让Flow停下来
本文介绍了Kotlin中`Flow`的概念及其类型,包括冷流(Cold Flow)、热流`SharedFlow`及具有最新值的`StateFlow`。文中详细描述了每种类型的特性与使用场景,并提供了停止`Flow`的方法,如取消协程、使用操作符过滤及异常处理。通过示例代码展示了如何运用这些概念。
17 2
|
2天前
|
API 数据处理 数据库
掌握 Kotlin Flow 的艺术:让无限数据流处理变得优雅且高效 —— 实战教程揭秘如何在数据洪流中保持代码的健壮与灵活
Kotlin Flow 是一个强大的协程 API,专为处理异步数据流设计。它适合处理网络请求数据、监听数据库变化等场景。本文通过示例代码展示如何使用 Kotlin Flow 管理无限流,如实时数据流。首先定义了一个生成无限整数的流 `infiniteNumbers()`,然后结合多种操作符(如 `buffer`、`onEach`、`scan`、`filter`、`takeWhile` 和 `collectLatest`),实现对无限流的优雅处理,例如计算随机数的平均值并在超过阈值时停止接收新数据。这展示了 Flow 在资源管理和逻辑清晰性方面的优势。
10 0
|
8天前
|
Android开发 开发者 Kotlin
告别AsyncTask:一招教你用Kotlin协程重构Android应用,流畅度飙升的秘密武器
【9月更文挑战第13天】随着Android应用复杂度的增加,有效管理异步任务成为关键。Kotlin协程提供了一种优雅的并发操作处理方式,使异步编程更简单直观。本文通过具体示例介绍如何使用Kotlin协程优化Android应用性能,包括网络数据加载和UI更新。首先需在`build.gradle`中添加coroutines依赖。接着,通过定义挂起函数执行网络请求,并在`ViewModel`中使用`viewModelScope`启动协程,结合`Dispatchers.Main`更新UI,避免内存泄漏。使用协程不仅简化代码,还提升了程序健壮性。
21 1
|
1月前
|
调度 Android开发 开发者
【颠覆传统!】Kotlin协程魔法:解锁Android应用极速体验,带你领略多线程优化的无限魅力!
【8月更文挑战第12天】多线程对现代Android应用至关重要,能显著提升性能与体验。本文探讨Kotlin中的高效多线程实践。首先,理解主线程(UI线程)的角色,避免阻塞它。Kotlin协程作为轻量级线程,简化异步编程。示例展示了如何使用`kotlinx.coroutines`库创建协程,执行后台任务而不影响UI。此外,通过协程与Retrofit结合,实现了网络数据的异步加载,并安全地更新UI。协程不仅提高代码可读性,还能确保程序高效运行,不阻塞主线程,是构建高性能Android应用的关键。
37 4