【Kotlin 协程】Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )
2023-09-14 09:07:26 时间
一、调用 Flow#launchIn 函数指定流收集协程
1、指定流收集协程
响应式编程 , 是 基于事件驱动 的 , 在 Flow 流中会产生源源不断的事件 , 就是 发射元素操作 ;
拿到 Flow 流后 , 开始 收集元素 , 按照顺序逐个处理产生的事件 ( 元素 ) ;
调用 Flow#launchIn 函数 , 传入 协程作用域 作为参数 , 可以 指定 收集 Flow 流元素 的 协程 ;
在上一篇博客 【Kotlin 协程】Flow 异步流 ⑤ 中 , 调用 Flow#flowOn 函数 , 可以 指定 Flow 流发射元素 的 协程 ;
Flow#launchIn 函数返回值是 Job 对象 , 是 协程任务对象 , 可调用 Job#cancel 函数取消该协程任务 ;
2、Flow#launchIn 函数原型
Flow#launchIn 函数原型 :
/**
* 终端流操作符,在[作用域]中[启动][启动]给定流的[收集][收集]。
* 它是“范围”(scope)的简称。启动{flow.collect()} '。
*
* 此操作符通常与[onEach], [onCompletion]和[catch]操作符一起使用,以处理所有发出的值
* 处理上游流或处理过程中可能发生的异常,例如:
*
* ```
* flow
* .onEach { value -> updateUi(value) }
* .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
* .catch { cause -> LOG.error("Exception: $cause") }
* .launchIn(uiScope)
* ```
*
* 注意,[launchIn]的结果值没有被使用,提供的作用域负责取消。
*/
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
3、代码示例
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 携程中调用挂起函数返回一个 Flow 异步流
runBlocking {
println("流收集时的协程上下文 : ${Thread.currentThread().name}")
flowEvent()
.onEach {
// 逐个处理产生的事件
println("接收到事件 : $it, 当前线程 : ${Thread.currentThread().name}")
}
.launchIn(CoroutineScope(Dispatchers.IO)) // 在指定的协程作用域中处理收集元素操作,
// 该 launchIn 函数返回一个 Job 对象
.join() // 该协程不是 runBlocking 主协程 的子协程, 需要调用 join 等待协程执行完毕
}
}
/**
* 使用 flow 构建器 Flow 异步流
* 产生事件的 事件源
*/
suspend fun flowEvent() = (0..3)
.asFlow() // 将区间转为 Flow 流
.onEach {
delay(500)
println("发射事件 : $it, 当前线程 : ${Thread.currentThread().name}")
} // 发射元素 ( 产生事件 ) 时挂起 500ms
.flowOn(Dispatchers.Default) // 设置发射元素的协程
}
执行结果 :
2022-12-23 16:06:58.720 2950-2950/kim.hsl.coroutine I/System.out: 流收集时的协程上下文 : main
2022-12-23 16:06:59.345 2950-3080/kim.hsl.coroutine I/System.out: 发射事件 : 0, 当前线程 : DefaultDispatcher-worker-3
2022-12-23 16:06:59.347 2950-3078/kim.hsl.coroutine I/System.out: 接收到事件 : 0, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:06:59.885 2950-3078/kim.hsl.coroutine I/System.out: 发射事件 : 1, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:06:59.887 2950-3079/kim.hsl.coroutine I/System.out: 接收到事件 : 1, 当前线程 : DefaultDispatcher-worker-2
2022-12-23 16:07:00.394 2950-3080/kim.hsl.coroutine I/System.out: 发射事件 : 2, 当前线程 : DefaultDispatcher-worker-3
2022-12-23 16:07:00.396 2950-3080/kim.hsl.coroutine I/System.out: 接收到事件 : 2, 当前线程 : DefaultDispatcher-worker-3
2022-12-23 16:07:00.938 2950-3078/kim.hsl.coroutine I/System.out: 发射事件 : 3, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:07:00.940 2950-3079/kim.hsl.coroutine I/System.out: 接收到事件 : 3, 当前线程 : DefaultDispatcher-worker-2
二、通过取消流收集所在的协程取消流
Flow 流的 收集元素 操作 , 是在协程中执行 , 将 协程 取消 , 即可将 Flow 流收集操作 取消 , 也就是 将 Flow 流取消 ;
代码示例 : 使用 withTimeoutOrNull(2000)
创建一个协程 , 该协程在 2000ms 后自动超时取消 , 同时在其中进行 流收集 的操作也一并取消 ;
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 携程中调用挂起函数返回一个 Flow 异步流
runBlocking {
// 该协程作用域 2 秒后超时取消
withTimeoutOrNull(2000){
flowEvent().collect {
println("收集到元素 : $it")
}
}
println("协程作用域取消")
}
}
/**
* 使用 flow 构建器 Flow 异步流
* 产生事件的 事件源
*/
suspend fun flowEvent() = (0..10)
.asFlow() // 将区间转为 Flow 流
.onEach {
delay(500)
println("发射事件 : $it, 当前线程 : ${Thread.currentThread().name}")
} // 发射元素 ( 产生事件 ) 时挂起 500ms
.flowOn(Dispatchers.Default) // 设置发射元素的协程
}
执行结果 :
2022-12-23 16:37:02.915 9585-9647/kim.hsl.coroutine I/System.out: 发射事件 : 0, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:37:02.917 9585-9585/kim.hsl.coroutine I/System.out: 收集到元素 : 0
2022-12-23 16:37:03.429 9585-9647/kim.hsl.coroutine I/System.out: 发射事件 : 1, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:37:03.431 9585-9585/kim.hsl.coroutine I/System.out: 收集到元素 : 1
2022-12-23 16:37:03.932 9585-9647/kim.hsl.coroutine I/System.out: 发射事件 : 2, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:37:03.933 9585-9585/kim.hsl.coroutine I/System.out: 收集到元素 : 2
2022-12-23 16:37:04.327 9585-9585/kim.hsl.coroutine I/System.out: 协程作用域取消
相关文章
- kotlin 使用viewStub
- JVM 上数据处理语言的竞争:Kotlin, Scala 和 SPL
- 那些年,Kotlin 都截胡了哪些 Java 新特性
- Kotlin 学习笔记(五)—— Flow 数据流学习实践指北(一)
- 开心档-软件开发入门之Kotlin 基本数据类型
- 【Kotlin】Kotlin Sealed 密封类 ( 密封类声明 | 密封类子类定义 | 密封类特点 | 代码示例 )
- 【错误记录】Kotlin 1.5.0 编译报错 ( 1.5.0 中 Float 不能直接转 Byte 类型 )
- 【Kotlin 协程】协程简介 ( 协程概念 | 协程作用 | 创建 Android 工程并进行协程相关配置开发 | 异步任务与协程对比 )
- 【Kotlin 协程】协程取消 ① ( 协程作用域取消 | 协程作用域子协程取消 | 通过抛出异常取消协程 | Job#cancel 函数 | 自定义异常取消协程 )
- 【Kotlin 协程】Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )
- 【Kotlin】Kotlin 函数总结 ( 具名函数 | 匿名函数 | Lambda 表达式 | 闭包 | 内联函数 | 函数引用 )
- 【Kotlin】集合操作总结 ( List 集合 | MutableList 集合 | List 集合遍历 | Set 集合 | MutableSet 集合 | Map 集合 | 可变 Map集合 )
- 【Kotlin】DSL 领域特定语言 ( apply 标准库函数分析 | 普通匿名函数 | 扩展匿名函数 | 泛型扩展匿名函数 )
- 【Kotlin】:: 双冒号操作符详解 ( 获取类的引用 | 获取对象类型的引用 | 获取函数的引用 | 获取属性的引用 | Java 中的 Class 与 Kotlin 中的 KClass )
- 【错误记录】Android Studio 编译报错 ( To use data binding annotations in Kotlin, apply the ‘kotlin-kapt‘ plu )
- #新闻拍一拍# Oracle 调研如何避免让 Java 开发者投奔 Rust 和 Kotlin