zl程序教程

您现在的位置是:首页 >  后端

当前栏目

【Kotlin 协程】Flow 异步流 ⑦ ( 调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测 | 启用检测 Flow 流的取消cancellable函数 )

Kotlin自动执行异步 函数 调用 检测 元素
2023-06-13 09:18:06 时间

文章目录

一、调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测


在 Flow 流构建器 中 , 每次 调用 FlowCollector#emit 发射元素时 ,

都会执行一个 ensureActive 检测 , 检测当前的流是否取消 ,

因此 , 在 flow 流构建器 中 , 循环执行的 FlowCollector#emit 发射操作 , 是可以取消的 ;

在 Flow#collect 代码块中 , 执行 Job#cancel 函数 , 即可 取消该流收集操作所在的协程 , 进而取消了流 ;

/**
 * 用一个可选的cancel [cause]取消这个作用域,包括它的作业和它的所有子任务。
 * 原因可用于指定错误消息或提供有关的其他详细信息
 * 用于调试目的的取消原因。
 * 如果作用域中没有作业,则抛出[IllegalStateException]。
 */
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

代码示例 : 在收集元素时 , 收集几个元素后 , 执行 Flow#cancel 函数 , 取消流收集所在协程 ;

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 携程中调用挂起函数返回一个 Flow 异步流
        runBlocking {
            flowEvent().collect{
                println("收集元素 $it")

                if (it == 2) {
                    // 收集到元素 2 时, 取消流
                    // 在流中 emit 发射 3 时, 就会自动爆出异常, 停止后续操作
                    cancel()
                }
            }
        }
    }

    /**
     * 使用 flow 构建器 Flow 异步流
     */
    suspend fun flowEvent() = flow<Int> {

        for(i in 0..5) {
            delay(1000)
            emit(i)
            println("发射元素 $i")
        }
    }
}

执行结果 :

2022-12-23 18:16:41.610 29409-29409/kim.hsl.coroutine I/System.out: 收集元素 0
2022-12-23 18:16:41.611 29409-29409/kim.hsl.coroutine I/System.out: 发射元素 0
2022-12-23 18:16:42.614 29409-29409/kim.hsl.coroutine I/System.out: 收集元素 1
2022-12-23 18:16:42.614 29409-29409/kim.hsl.coroutine I/System.out: 发射元素 1
2022-12-23 18:16:43.655 29409-29409/kim.hsl.coroutine I/System.out: 收集元素 2
2022-12-23 18:16:43.658 29409-29409/kim.hsl.coroutine I/System.out: 发射元素 2
2022-12-23 18:16:43.661 29409-29409/kim.hsl.coroutine D/AndroidRuntime: Shutting down VM
    
    
    --------- beginning of crash
2022-12-23 18:16:43.665 29409-29409/kim.hsl.coroutine E/AndroidRuntime: FATAL EXCEPTION: main
    Process: kim.hsl.coroutine, PID: 29409
    java.lang.RuntimeException: Unable to start activity ComponentInfo{kim.hsl.coroutine/kim.hsl.coroutine.MainActivity}: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@daf39f2
        at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2951)
        at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3086)
        at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
        at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
        at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
        at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1816)
        at android.os.Handler.dispatchMessage(Handler.java:106)
        at android.os.Looper.loop(Looper.java:193)
        at android.app.ActivityThread.main(ActivityThread.java:6718)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)
     Caused by: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@daf39f2
2022-12-23 18:16:43.695 29409-29409/kim.hsl.coroutine I/Process: Sending signal. PID: 29409 SIG: 9

二、调用 Flow#cancellable() 函数启用检测 Flow 流的取消


在 Flow 流中 , 除 FlowCollector#emit 发射元素 之外 ,

还有很多其它的 流操作 , 这些操作不会 自动执行 ensureActive 检测 ,

因此这里需要我们 手动 进行 流取消检测 ;

调用 Flow#cancellable() 函数 , 可以手动设置流取消检测 ;

1、流取消失败代码示例

代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 携程中调用挂起函数返回一个 Flow 异步流
        runBlocking {

            (0..5).asFlow().collect {
                println("收集到元素 $it")

                // 收集到元素 2 时, 协程退出
                if (it == 2) {
                    cancel()
                }
            }

        }
    }
}

执行结果 :

2022-12-23 18:24:01.821 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 0
2022-12-23 18:24:01.822 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 1
2022-12-23 18:24:01.822 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 2
2022-12-23 18:24:01.827 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 3
2022-12-23 18:24:01.827 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 4
2022-12-23 18:24:01.827 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 5
2022-12-23 18:24:01.829 30105-30105/kim.hsl.coroutine D/AndroidRuntime: Shutting down VM
    
    
    --------- beginning of crash
2022-12-23 18:24:01.832 30105-30105/kim.hsl.coroutine E/AndroidRuntime: FATAL EXCEPTION: main
    Process: kim.hsl.coroutine, PID: 30105
    java.lang.RuntimeException: Unable to start activity ComponentInfo{kim.hsl.coroutine/kim.hsl.coroutine.MainActivity}: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@daf39f2
        at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2951)
        at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3086)
        at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
        at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
        at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
        at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1816)
        at android.os.Handler.dispatchMessage(Handler.java:106)
        at android.os.Looper.loop(Looper.java:193)
        at android.app.ActivityThread.main(ActivityThread.java:6718)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)
     Caused by: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@daf39f2
2022-12-23 18:24:01.856 30105-30105/? I/Process: Sending signal. PID: 30105 SIG: 9

2、启用检测 Flow 流的取消代码示例

代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 携程中调用挂起函数返回一个 Flow 异步流
        runBlocking {

            // 执行 Flow#cancellable 启用手动执行流取消检测
            (0..5).asFlow().cancellable().collect {
                println("收集到元素 $it")

                // 收集到元素 2 时, 协程退出
                if (it == 2) {
                    cancel()
                }
            }
        }
    }
}

执行结果 :

2022-12-23 18:28:40.139 31502-31502/kim.hsl.coroutine I/System.out: 收集到元素 0
2022-12-23 18:28:40.140 31502-31502/kim.hsl.coroutine I/System.out: 收集到元素 1
2022-12-23 18:28:40.140 31502-31502/kim.hsl.coroutine I/System.out: 收集到元素 2
2022-12-23 18:28:40.145 31502-31502/kim.hsl.coroutine D/AndroidRuntime: Shutting down VM
2022-12-23 18:28:40.149 31502-31502/kim.hsl.coroutine E/AndroidRuntime: FATAL EXCEPTION: main
    Process: kim.hsl.coroutine, PID: 31502
    java.lang.RuntimeException: Unable to start activity ComponentInfo{kim.hsl.coroutine/kim.hsl.coroutine.MainActivity}: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@daf39f2
        at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2951)
        at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3086)
        at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
        at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
        at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
        at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1816)
        at android.os.Handler.dispatchMessage(Handler.java:106)
        at android.os.Looper.loop(Looper.java:193)
        at android.app.ActivityThread.main(ActivityThread.java:6718)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)
     Caused by: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@daf39f2
2022-12-23 18:28:40.169 31502-31502/? I/Process: Sending signal. PID: 31502 SIG: 9