zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Kotlin 朱涛-19 协程 Channel 管道 热数据流

Kotlin 19 管道 协程 channel 数据流
2023-09-14 09:00:05 时间

本文地址


目录

19 | Channel:为什么说Channel是热的

前面我们学习的挂起函数、async,它们一次都只能返回一个结果。但在某些业务场景下,我们往往需要协程返回多个结果,比如微信等软件的 IM 通道接收的消息,或者是手机 GPS 定位返回的经纬度坐标需要实时更新。那么,在这些场景下,我们之前学习的协程知识就无法直接解决了。

Channel 就是管道

Channel 就是一个管道。管道的一端是发送方,另一端是接收方。而管道本身,则可以用来传输数据。

Channel 是一种协程资源,如果在用完后不主动关闭,会造成资源浪费。

顶层函数 Channel()

当我们调用 Channel() 的时候,感觉像是在调用一个构造函数,但实际上它却只是一个普通的顶层函数

public fun <E> Channel(
    capacity: Int = RENDEZVOUS, // 管道的容量,默认是 0
    onBufferOverflow = BufferOverflow.SUSPEND, // 容量满时的应对策略
    onUndeliveredElement: ((E) -> Unit)? = null // 异常处理回调
): Channel<E> {}

第一个参数,capacity,代表了管道的容量

  • RENDEZVOUS: Int = 0 // 会合(交替),默认值
  • CONFLATED: Int = -1 // 合并(替换),新的数据会替代旧的数据
  • UNLIMITED: Int = Int.MAX_VALUE // 无限容量
  • BUFFERED: Int = 64 // 缓存容量,默认为 64,由 VM 参数 kotlinx.coroutines.channels.defaultBuffer 决定

第二个参数,onBufferOverflow,代表了当管道容量满时,如果发送方还要继续发送,Channel 的应对策略

  • SUSPEND:默认值。挂起 send() 方法,即以非阻塞的方式,将发送方的执行流程挂起,等管道中有了空闲位置以后再恢复
  • DROP_OLDEST:丢弃最旧的那条数据,然后发送新的数据
  • DROP_LATEST:丢弃最新的那条数据,注意,是丢弃当前正准备发送的那条数据,而管道中的内容将维持不变

第三个参数,onUndeliveredElement,它其实相当于一个异常处理回调。当管道中的某些数据没有被成功接收的时候,这个回调就会被调用。

Channel 接口设计

Channel 本身只是一个接口,而且,Channel 本身并没有什么方法和属性,Channel 的所有能力,都是来自于 SendChannelReceiveChannel 这两个接口。

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {}

public interface SendChannel<in E> {
    public val isClosedForSend: Boolean                  // 判断 Channel 是否已关闭
    public suspend fun send(element: E)                  // 挂起函数的发送
    public fun trySend(element: E): ChannelResult<Unit>  // 非挂起函数的发送
    public val onSend: SelectClause2<E, SendChannel<E>>  // select 相关,后面讲
    public fun close(cause: Throwable? = null): Boolean  // 关闭
    public fun invokeOnClose(h: (t: Throwable?) -> Unit)
}

public interface ReceiveChannel<out E> {
    public val isClosedForReceive: Boolean       // 判断 Channel 是否已关闭
    public val isEmpty: Boolean
    public fun cancel(cause: CancellationException? = null) // 取消
    public suspend fun receive(): E                         // 应杜绝使用
    public suspend fun receiveCatching(): ChannelResult<E>  // 防止异常发生
    public fun tryReceive(): ChannelResult<E>               // 非挂起函数的接收
    public val onReceive: SelectClause1<E>                        // select 相关,后面讲
    public val onReceiveCatching: SelectClause1<ChannelResult<E>> // select 相关,后面讲
    public operator fun iterator(): ChannelIterator<E>
}

借助它的这个特点,我们可以实现 对读取开放,对写入封闭 的设计。

测试案例

fun main() = runBlocking {
    val channel = Channel<Int>() // 创建一个管道,泛型代表管道传递的数据类型
    launch {
        (1..3).forEach {
            channel.send(it) // 在一个协程中,发送数据到管道里
            log("Send: $it")
        }
        channel.close() // 管道需要主动关闭,否则主线程不会结束
    }
    launch {
        for (i in channel) { // 在另一个协程中,取出管道中的数据
            log("Receive: $i")
        }
    }
    log("end")
}

fun log(text: Any) = println(Thread.currentThread().name + " " + text)
main @coroutine#1 end
main @coroutine#3 Receive: 1
main @coroutine#2 Send: 1
main @coroutine#2 Send: 2
main @coroutine#3 Receive: 2
main @coroutine#3 Receive: 3
main @coroutine#2 Send: 3
  • 在上面的案例中,如果没有调用 channel.close(),程序将永远不会停下来。
  • 如果上面不是调用 close(),而是调用 cancel(),则管道中的消息会被清空,同时管道也会标记为 cancel,此时等价于 close(),所以程序也会停下来。

管道的容量

交替:RENDEZVOUS

在创建 Channel 的时候,不设置 capacity,或设置 capacity = RENDEZVOUS

val channel = Channel<Int>(capacity = Channel.Factory.RENDEZVOUS)

main @coroutine#1 end
main @coroutine#3 Receive: 1
main @coroutine#2 Send: 1
main @coroutine#2 Send: 2
main @coroutine#3 Receive: 2
main @coroutine#3 Receive: 3
main @coroutine#2 Send: 3

发送和接收的两个协程,会轮流执行

注意,上述代码打印的结果并不是严格交替的,因为,send、received 会被轮流挂起。

替换:CONFLATED

在创建 Channel 的时候,设置 capacity = CONFLATED

val channel = Channel<Int>(capacity = Channel.Factory.CONFLATED)

main @coroutine#1 end
main @coroutine#2 Send: 1
main @coroutine#2 Send: 2
main @coroutine#2 Send: 3
main @coroutine#3 Receive: 3

发送方也会一直发送数据,但是,对于接收方来说,它永远只能接收到最后一条数据

UNLIMITED 或 BUFFERED

在创建 Channel 的时候,设置 capacity = UNLIMITED

val channel = Channel<Int>(capacity = Channel.Factory.UNLIMITED)

main @coroutine#1 end
main @coroutine#2 Send: 1
main @coroutine#2 Send: 2
main @coroutine#2 Send: 3
main @coroutine#3 Receive: 1
main @coroutine#3 Receive: 2
main @coroutine#3 Receive: 3

由于 Channel 的容量是无限大的,所以发送方可以一直往管道当中塞入数据,等数据都塞完以后,接收方才开始接收。

设置 capacity = BUFFERED 时,在容量没达到阈值时,和 UNLIMITED 的效果类似

溢出处理策略

默认值是 SUSPEND,挂起 send() 方法,即以非阻塞的方式,将发送方的执行流程挂起,等管道中有了空闲位置以后再恢复

丢弃最旧:DROP_OLDEST

同时使用 DROP_OLDESTcapacity = 1,可以实现 CONFLATED 的效果。

val channel = Channel<Int>(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)

main @coroutine#1 end
main @coroutine#2 Send: 1
main @coroutine#2 Send: 2
main @coroutine#2 Send: 3
main @coroutine#3 Receive: 3

丢弃最新:DROP_LATEST

val channel = Channel<Int>(capacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)

main @coroutine#1 end
main @coroutine#2 Send: 1
main @coroutine#2 Send: 2
main @coroutine#2 Send: 3
main @coroutine#3 Receive: 1

当 Channel 容量满了以后,之后再继续发送的内容,就会直接被丢弃

异常处理回调

当发送出去的 Channel 数据无法被接收方处理的时候,就可以通过 onUndeliveredElement 这个回调,来进行监听。

fun main() = runBlocking {
    val channel = Channel<Int>(Channel.UNLIMITED) { log("onUndeliveredElement = $it") }
    launch {
        (1..3).forEach { channel.send(it) } // 放入三个数据
    }
    val i = channel.receive() // 取出一个
    log("receive = $i")
    channel.cancel() // 取消(注意不是关闭 close)
    log("end")
}
main @coroutine#1 receive = 1
main @coroutine#1 onUndeliveredElement = 2
main @coroutine#1 onUndeliveredElement = 3
main @coroutine#1 end

Channel 使用时遇到的问题

produce 会自动关闭

使用高阶函数 produce{} 创建 Channel 时,produce{} 会自动帮我们调用 close() 方法。

fun main() = runBlocking {
    val channel: ReceiveChannel<Int> = produce(capacity = Channel.Factory.UNLIMITED) {
        (1..2).forEach {
            send(it)
            log("Send: $it")
        }
    }
    (1..2).forEach { _ ->  // 注意 ①
        val i = channel.receive()
        log("receive = $i")
    }
    log("end")
}
main @coroutine#2 Send: 1
main @coroutine#2 Send: 2
main @coroutine#1 receive = 1
main @coroutine#1 receive = 2
main @coroutine#1 end

Channel 关闭引发的异常

如果将上面注释的地方改为 (1..3),则最后一次调用 receive() 的时候,由于此时 Channel 已经被关闭了,所以代码会抛出异常:

main @coroutine#2 Send: 1
main @coroutine#2 Send: 2
main @coroutine#1 receive = 1
main @coroutine#1 receive = 2
Exception in thread "main" ClosedReceiveChannelException: Channel was closed

如果上面的 Channel 没有被关闭,则最后一次调用 receive() 会被挂起,而不会抛出异常。

杜绝使用 receive

最好不要用 receive(),即使配合 isClosedForReceive,直接调用 receive() 也是一件非常危险的事情!

fun main() = runBlocking {
    val channel: ReceiveChannel<Int> = produce {
        (1..2).forEach { // 注意 ①
            send(it)
            log("Send: $it")
        }
    }
    while (!channel.isClosedForReceive) {
        val i = channel.receive()
        log("Receive: $i")
    }
    log("end")
}

上述代码注释的地方,发送【奇数】条数据时是正常的,发送【偶数】条数据时就会提示管道关闭了。

main @coroutine#2 Send: 1
main @coroutine#1 Receive: 1
main @coroutine#1 Receive: 2
main @coroutine#2 Send: 2
Exception in thread "main" ClosedReceiveChannelException: Channel was closed

可以使用 receiveCatching

在某些特殊场景下,如果我们必须要自己来调用 receive(),那么可以使用 receiveCatching(),它可以防止异常发生。

while (!channel.isClosedForReceive) {
    val result: ChannelResult<Int> = channel.receiveCatching()
    log("Receive: ${result.getOrNull()}")
}
main @coroutine#2 Send: 1
main @coroutine#1 Receive: 1
main @coroutine#1 Receive: 2
main @coroutine#2 Send: 2
main @coroutine#1 Receive: null
main @coroutine#1 end

可以使用 consumeEach

可以使用高阶函数 consumeEach {} 读取 Channel 当中的数据。

fun main() = runBlocking {
    val channel: ReceiveChannel<Int> = produce {
        (1..2).forEach {
            send(it)
            log("Send: $it")
        }
    }
    channel.consumeEach { log("Receive $it") }
    log("end")
}
main @coroutine#2 Send: 1
main @coroutine#1 Receive 1
main @coroutine#1 Receive 2
main @coroutine#2 Send: 2
main @coroutine#1 end

为什么说 Channel 是【热】的

Channel 是用来传递 数据流 的,这里的数据流,指的是多个数据组合形成的流

前面挂起函数、async 返回的数据,就像是水滴一样,而 Channel 则像是自来水管当中的水流一样。

在业界一直有一种说法:Channel 是 的。因此,在 Kotlin 中,我们也经常把 Channel 称为 热数据流

没有接收方,发送方也会工作

fun main() = runBlocking {
    produce(capacity = 3) {  // 只发送不接受
        (1..3).forEach {
            send(it)
            log("Send $it")
        }
    }
    log("end")
}
main @coroutine#1 end
main @coroutine#2 Send 1
main @coroutine#2 Send 2
main @coroutine#2 Send 3

上面的代码中,并没有消费 Channel 中的数据,但是依然发送了 2 个数据,这种 不管有没有接收方,发送方都会工作 的模式,就是我们将其认定为 的原因。

这就有点像是一个热心的饭店服务员,不管你有没有提要求,服务员都会给你端茶送水,把茶水摆在你的饭桌上。当你想要喝水的时候,就可以直接从饭桌上拿了(当你想要数据的时候,就可以直接从管道里取出来了)。

容量不足时,发送方会被挂起

将上面的 capacity 由 3 改为 2/1/0

produce(capacity = 2) // 注意:以下结论的前提是,使用的溢出处理策略是 BufferOverflow.SUSPEND

main @coroutine#1 end
main @coroutine#2 Send 1
main @coroutine#2 Send 2
// 注意,上面代码 main 线程不会结束

把 capacity 改成 2/1/0 以后,可以看到 Channel 的发送方仍然是会工作的,只是说,在它调用 send() 方法的时候,可能由于管道容量已经满了,所以它会被挂起(因为默认使用的溢出处理策略是是 BufferOverflow.SUSPEND)。

可以想象成:虽然你的饭桌已经没有空间了,但服务员还是端来了茶水站在了你旁边,只是没有把茶水放在你桌上,等饭桌有了空间,或者你想喝水了,你就能马上喝到。

坏处

  • 可能会导致数据的丢失
  • 不管有没有接收者都一直工作,浪费资源
  • 如果未及时 close 的话,可能会导致内存泄露
  • 接收数据时【可能】会接收到之前的旧数据

小结

  • Channel 是一个管道,当我们想要用协程传递多个数据组成的流的话,就没办法通过挂起函数、async 来实现了。这时候,Channel 是一个不错的选择。
  • 我们可以通过 Channel() 这个顶层函数来创建 Channel 管道。在创建 Channel 的时候,有三个参数
    • capacity 代表了容量
    • onBufferOverflow 代表容量满了以后的应对策略
    • onUndeliveredElement 则是一个异常回调
  • Channel 的 send() 方法用于发送管道数据,receive() 方法用于接收管道数据。直接使用 receive() 会导致各种问题,建议使用 for 循环、consumeEach {}
  • Channel 是 的。这是因为不管有没有接收方,发送方都会工作

2017-11-08