3.3 zip 中间操作符
zip
顾名思义,就是可以将两个 Flow 汇合成一个 Flow,举个栗子就知道了:
//code 11 lateinit var testFlow1: Flow<String> lateinit var testFlow2: Flow<String> private fun setupTwoFlow() { testFlow1 = flowOf("Red", "Blue", "Green") testFlow2 = flowOf("fish", "sky", "tree", "ball") CoroutineScope(Dispatchers.IO).launch { testFlow1.zip(testFlow2) { firstWord, secondWord -> "$firstWord $secondWord" }.collect { println("+++ $it +++") } } } // 输出结果: //com.example.myapplication I/System.out: +++ Red fish +++ //com.example.myapplication I/System.out: +++ Blue sky +++ //com.example.myapplication I/System.out: +++ Green tree +++ //zip 方法声明: public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
从 zip
方法的声明中可知,zip
方法的第二个参数就是针对两个 Flow 进行各种处理的挂起函数,也可如例子中写成尾调函数的样子,返回值是处理之后的 Flow。而且当两个 Flow 长度不一样时,最后的结果会默认剔除掉先前较长的 Flow 中的元素。所以 testFlow2
中的 “ball” 就被自动剔除掉了。
4. Flow 异常处理
正如 RxJava 框架中的 subscribe
方法可以通过传入 Observer
对象在其 onNext
、onComplete
、onError
返回之前处理的结果,Flow 也有诸如 catch
、onCompletion
等操作符去处理执行的结果。例如下面的代码:
//code 12 private fun handleExceptionDemo() { val testFlow = (1..5).asFlow() CoroutineScope(Dispatchers.Default).launch { testFlow.map { check(it != 3) { //it == 3 时,会走到这里 println("+++ catch value = $it") } println("+++ not catch value = $it") it * it }.onCompletion { println("+++ onCompletion value = $it") }.catch { exception -> println("+++ catch exception = $exception") }.collect{ println("+++ collect value = $it") } } } //输出结果: //com.example.myapplication I/System.out: +++ not catch value = 1 //com.example.myapplication I/System.out: +++ collect value = 1 //com.example.myapplication I/System.out: +++ not catch value = 2 //com.example.myapplication I/System.out: +++ collect value = 4 //com.example.myapplication I/System.out: +++ catch value = 3 //com.example.myapplication I/System.out: +++ onCompletion value = java.lang.IllegalStateException: kotlin.Unit //com.example.myapplication I/System.out: +++ catch exception = java.lang.IllegalStateException: kotlin.Unit
顺着代码咱先来看看一些常用的 Flow 中间操作符。
1)map
:用来将 Flow 中的数据一个个拿出来做各自的处理,然后交给下一个操作符;本例中就是将 Flow 中的数据进行平方处理;
2)check()
:类似于一个检查站,满足括号内条件的数据可以通过,不满足则交给它的尾调函数处理,并且抛出异常;
3)onCompletion
:Flow 最后的兜底器。无论 Flow 最后是执行完成、被取消、抛出异常,都会走到 onCompletion
操作符中,类似于在 Flow 的 collect
函数外加了个 try
,finally
。官方给了个小栗子,还是很清楚的:
//code 13 try { myFlow.collect { value -> println(value) } } finally { println("Done") } //上述代码可以替换为下面的代码: myFlow .onEach { println(it) } .onCompletion { println("Done") } .collect()
所以,在 code 12 中的 onCompletion
操作符可以接住从 check
那儿抛出的异常;
4)catch
:不用多说,专门用于捕捉异常的,避免程序崩溃。这里如果把 catch
去掉,程序就会崩溃。如果把 catch
和 onCompletion
操作符位置调换,则 onCompletion
里面就接收不到异常信息了,如图所示。
5. Flow 数据请求实例
说了这么多,举个在实际中经常用到的数据请求的例子吧。先来看一个最简单的例子:
5.1 单接口请求
现在一般都是在 ViewModel 里持有 LiveData 数据,并且进行数据的请求,所以先来看下 ViewModel 中的代码实现:
//code 14 class SingleNetworkCallViewModel: ViewModel() { private val users = MutableLiveData<Resource<List<ApiUser>>>() private val apiHelperImpl = ApiHelperImpl(RetrofitBuilder.apiService) fun fetchUsers() { viewModelScope.launch { users.postValue(Resource.loading(null)) apiHelperImpl.getUsers() .catch { e -> users.postValue(Resource.error(e.toString(), null)) } .collect { users.postValue(Resource.success(it)) } } } fun getUsersData(): LiveData<Resource<List<ApiUser>>> { return users } }
从代码可看出,fetchUsers
方法就是数据请求方法,里面的核心方法是 ApiHelperImpl
类对象的 getUsers
方法,在之前初始化 apiHelperImpl
对象时传入了一个 RetrofitBuilder.apiService
值,所以底层还是用到了 Retrofit 框架进行的网络请求。Retrofit 相关的代码如下:
//code 15 object RetrofitBuilder { private const val BASE_URL = "https://xxxxxxx/" private fun getRetrofit(): Retrofit { return Retrofit.Builder() .baseUrl(BASE_URL) .addConverterFactory(GsonConverterFactory.create()) .build() } val apiService: ApiService = getRetrofit().create(ApiService::class.java) } //ApiService 中的代码也是一般常见的代码: interface ApiService { @GET("users") suspend fun getUsers(): List<ApiUser> }
再回过来看看 ViewModel 的代码,从 apiHelperImpl.getUsers
方法后面的 catch
和 collect
操作符也可看出,getUsers
方法返回的就是一个 Flow 对象,其使用的构造方法就是前文中说到的 flow{}
方法:
//code 16 class ApiHelperImpl(private val apiService: ApiService) : ApiHelper { override fun getUsers(): Flow<List<ApiUser>> { return flow { emit(apiService.getUsers()) } } }
ApiHelper
其实就是一个接口,规定了 ApiHelperImpl
中数据请求的方法名及返回值,返回值是一个 Flow,里面是我们最终需要的数据列表:
//code 17 interface ApiHelper { fun getUsers(): Flow<List<ApiUser>> }
Flow 调用 emit
发出去的就是 Retrofit 进行数据请求后返回的 List<ApiUser>
数据。
如何在 Activity 中使用就是之前使用 LiveData 的常规操作了:
//code 18 private fun setupObserver() { viewModel.getUsersData().observe(this, Observer { when (it.status) { Status.SUCCESS -> { progressBar.visibility = View.GONE it.data?.let { users -> renderList(users) } recyclerView.visibility = View.VISIBLE } Status.LOADING -> { progressBar.visibility = View.VISIBLE recyclerView.visibility = View.GONE } Status.ERROR -> { //Handle Error progressBar.visibility = View.GONE Toast.makeText(this, it.message, Toast.LENGTH_SHORT).show() } } }) }
5.2 双接口并行请求
上述例子是最简单的单个数据接口请求的场景,如果是两个或是多个数据接口需要并行请求,该如何处理呢?这就需要用到之前说的 Flow 中的 zip
操作符了。接着上面的例子,再添加一个数据请求方法 getMoreUsers
,那么两个接口并行的例子为:
//code 18 fun fetchUsers() { viewModelScope.launch { users.postValue(Resource.loading(null)) apiHelper.getUsers() .zip(apiHelper.getMoreUsers()) { usersFromApi, moreUsersFromApi -> val allUsersFromApi = mutableListOf<ApiUser>() allUsersFromApi.addAll(usersFromApi) allUsersFromApi.addAll(moreUsersFromApi) return@zip allUsersFromApi } .flowOn(Dispatchers.Default) .catch { e -> users.postValue(Resource.error(e.toString(), null)) } .collect { users.postValue(Resource.success(it)) } } }
两个数据接口请求的快慢肯定不一样,但不用担心,zip
操作符会等待两个接口的数据都返回之后才进行拼接并交给后面的操作符处理,所以这里还需要调用 flowOn
操作符将线程切换到后台线程中去挂起等待。但后面的 collect
操作符执行的代码是在主线程中,感兴趣的同学可以打印线程信息看看,这就需要了解一下 flowOn
操作符的用法了。
flowOn
方法可以切换 Flow 处理数据的所在线程,类似于 RxJava 中的 subscribeOn
方法。例如 flowOn(Dispatchers.Default)
就是将 Flow 的操作都放到后台线程中执行。
当 flowOn
操作符之前没有设置任何的协程上下文,那么 flowOn
操作符可以为它之前的操作符设置执行所在的线程,并不会影响它之后下游的执行所在线程。下面是一个简单例子:
//code 19 private fun flowOnDemo() { val testFlow = (1..2).asFlow() MainScope().launch { testFlow .filter { println("1+++ $it ${Thread.currentThread().name}") it != 3 }.flowOn(Dispatchers.IO) .map { println("2+++ $it ${Thread.currentThread().name}") it*it }.flowOn(Dispatchers.Main) .filter { println("3+++ $it ${Thread.currentThread().name}") it!=25 }.flowOn(Dispatchers.IO) .collect{ println("4+++ $it ${Thread.currentThread().name}") } } } //输出结果: //com.example.myapplication I/System.out: 1+++ 1 DefaultDispatcher-worker-1 //com.example.myapplication I/System.out: 1+++ 2 DefaultDispatcher-worker-1 //com.example.myapplication I/System.out: 2+++ 1 main //com.example.myapplication I/System.out: 2+++ 2 main //com.example.myapplication I/System.out: 3+++ 1 DefaultDispatcher-worker-1 //com.example.myapplication I/System.out: 3+++ 4 DefaultDispatcher-worker-1 //com.example.myapplication I/System.out: 4+++ 1 main //com.example.myapplication I/System.out: 4+++ 4 main
发现了么?flowOn
操作符只对最近的上游操作符线程负责,它下游的线程会自动切换到之前所在的线程。如果连续有两个或多个 flowOn
操作符切换线程,则会切换到首个 flowOn
操作符切换的线程中去:
//code 20 testFlow .filter { println("1+++ $it ${Thread.currentThread().name}") it != 3 //最终会在 Main 主线程中执行 }.flowOn(Dispatchers.Main).flowOn(Dispatchers.IO).flowOn(Dispatchers.Default) .collect{ println("4+++ $it ${Thread.currentThread().name}") }
在 filter
后面连续有两个 flowOn
操作符,但最终会在 Main 线程中执行 filter
操作符中的逻辑。
整体上看,Flow 在数据请求时所扮演的角色是数据接收与处理后发送给 UI 层的作用,这跟 RxJava 的职责是相同的,而且两者都有丰富的操作符来处理各种不同的情况。不同的是 Flow 是将接收到的数据放到 Flow 载体中,而 RxJava 一般将数据放到 Observable
对象中;Flow 处理数据更加方便和自然,去除了 RxJava 中繁多且功能臃肿的操作符。
总结
最后总结一下 Flow 第一小节的内容吧:
1)Flow 数据流可异步按顺序返回多个数据;
2)Flow 整体是由 构建器、中间操作符、末端操作符 组成;
3)冷流只有在调用末端操作符时,流的构造器和中间操作符才会开始执行;冷流的使用方和提供方是一对一的;
4)简单介绍了 collect
、reduce
末端操作符以及 zip
、map
等中间操作符的使用;
5)Flow 异常处理所用到的 catch
、check
、onCompletion
等操作符的用法;
6)Flow 在数据请求上的实例
所用实例来源:github.com/MindorksOpe…
更多内容,欢迎关注公众号:修之竹
赞人玫瑰,手留余香!欢迎点赞、转发~ 转发请注明出处~
参考文献
- Android 上的 Kotlin 数据流;官方文档 https://developer.android.com/kotlin/flow
- Flow Kotlin 官方文档; https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/
- 【Kotlin Flow】 一眼看全——Flow操作符大全; 搬砖小子出现了 https://juejin.cn/post/6989536876096913439
- What is Flow in Kotlin and how to use it in Android Project?; Himanshu Singh; https://blog.mindorks.com/what-is-flow-in-kotlin-and-how-to-use-it-in-android-project
- Understanding Terminal Operators in Kotlin Flow; Amit Shekhar; https://blog.mindorks.com/terminal-operators-in-kotlin-flow
- Creating Flow Using Flow Builder in Kotlin; Amit Shekhar; https://blog.mindorks.com/creating-flow-using-flow-builder-in-kotlin
- Exception Handling in Kotlin Flow; Amit Shekhar; https://blog.mindorks.com/exception-handling-in-kotlin-flow