17

掌握Kotlin Coroutine之 Channels

 3 years ago
source link: http://blog.chengyunfeng.com/?p=1091
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

掌握Kotlin Coroutine之 Channels

作者: rain 分类: 移动 发布时间: 2019-03-16 17:36 6 0条评论

前面介绍的 produce 函数创建的是一个处理一些列数据流的 Coroutine,这个数据流在 Coroutine 中就是通过 Channel 接口实现的,本节来介绍下相关的内容。

当前 Channel 还处于开发阶段,和以后正式发布版本相比,当前的 API 可能会有一些变化。

Channel 基础

Channel 和 Java 的 BlockingQueue 概念比较类似。区别在于除了阻塞操作之外,Channel 还提供了非阻塞的操作 sendreceive

为了表达通顺,下面用 通道 指代 Channel。

Channel 有两个接口,一个是 SendChannel 接口定义了往通道发送数据的接口;另外一个是 ReceiveChannel 接口定义了从通道接收数据的接口。

SendChannel

发送数据的接口SendChannel比较简单,有四个函数和三个属性:
– val isClosedForSend: Boolean: 判断通道是否已经关闭了,如果已经关闭了再发送数据会抛出异常
– isFull: Boolean:判断通道是否已经满了。
– onSend 属性用在 select 表达式的地方,以后会介绍 select 表达式。
– close() 函数用来关闭通道
– invokeOnClose() 函数,接收通道关闭的通知
– fun offer(element: E): Boolean 函数在通道中同步发送一个数据。如果通道已经满了或者关闭了无法添加成功,则返回 false 或者抛出异常。
suspend fun send(element: E) 函数是一个 suspend 函数,如果通道已经满了,则该函数会暂停执行;如果该通道已经关闭了会抛出异常。

ReceiveChannel

ReceiveChannelSendChannel 类似提供了一些接收数据的函数以及相关状态的属性:
– val isClosedForReceive: Boolean:如果通道已经被关闭了并且所有已发送的数据都接收了则返回 true。
– val isEmpty: Boolean:如果通道没有任何数据则返回 true
– onReceive、onReceiveOrNull:用于 select 表达式。
– fun cancel() 函数关闭通道并且清空里面所缓冲的数据
– operator fun iterator() 用来遍历通道里面的数据,主要用在循环表达式
– fun poll(): E?:从通道获取并删除一个数据,如果是空的则返回 null。如果出错了则抛出错误异常信息。
suspend fun receive(): E 非阻塞的方式接受数据。
suspend fun receiveOrNull(): E? 同上,只不过当通道被关闭后调用该函数不会抛出异常而是返回 null。

在接收数据端通常需要处理数据,比如过滤、转换等。所以在 ReceiveChannel 上还定义了很多扩展函数实现了各种操作,比如 distinctfilterfilterIndexedgroupBy 等, 关于这些扩展函数请查看 ReceiveChannel 的 API 文档。

Channel 接口同时实现了 SendChannel 和 ReceiveChannel:

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

所以我们可以使用 Channel 实例来发送和接收数据。创建 Channel 实例需要使用 Channel(capacity) 这个工厂方法,根据 capacity 值的不同,底层会返回不同的实现。

  • 如果 capacity0 — 会创建一个 RendezvousChannel 对象,该通道没有任何换缓冲。只有当 send 和 receive 函数匹配调用的时候才能把数据发送给对方。所以调用 send 发送数据后,如果没有人调用 receive 则继续调用 send 会被阻塞;在消费端也是一样。
  • 如果 capacityChannel.CONFLATED — 会创建一个 ConflatedChannel 对象,该通道只会缓冲一个数据,而后续所有发送到该通道的数据都被合并了,只保留最新的一个之前旧的数据都被丢弃了。所以消费者只会收到最新的一个数据。在这个通道上调用 send 从来不会被阻塞,同样 offer 函数也总是返回 true
  • 如果 capacityChannel.UNLIMITED — 会创建一个 LinkedListChannel 对象,这个通道用一个链表实现并且没有数量限制(取决于可用的系统资源)。所以在这个通道上调用 send 从来不会被阻塞,同样 offer 函数也总是返回 true
  • 如果 capacity 是大于 0 的值,但是又小于 UNLIMITED — 这样会创建一个基于数组的通道,使用 capacity 初始化数组。当通道满了继续发送数据会被阻塞,如果通道是空的获取数据则会阻塞。

Channel(capacity) 函数 capacity 参数的默认值为 0.

下面是使用 Channel 的示例:

Kotlin
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        val channel = Channel<Int>()
        launch(BG) {
            for (x in 1..2) {
                // 使用 delay 来模拟耗时操作
                delay(1000)
                Log.d(TAG, "Send index $x")
                channel.send(x * x)
                Log.d(TAG, "Send value ${x * x}")
        launch {
            repeat(2) {
                Log.d(TAG, "Receive index $it")
                val value = channel.receive()
                Log.d(TAG, "Received value $value")
                textView.text = "Received value: $value"
            Log.d(TAG, "channel received done!")
        fab.setOnClickListener { view ->
            // 点击 FAB 按钮可以取消
            channel.close()

上面示例中使用的是 Channel 函数的默认 capacity 参数,由于发送数据的一方延迟1秒才发送,而获取数据的一方立刻获取,所以获取数据的一方调用 receive() 函数的时候会停止执行知道发送数据的一方往通道发送数据后恢复。注意看执行的log中的时间:

    03-16 19:40:50.325 9234-9234 D Receive index 0
    03-16 19:40:51.245 9234-9314 D Send index 1
    03-16 19:40:51.247 9234-9314 D Send value 1
    03-16 19:40:51.248 9234-9234 D Received value 1
    03-16 19:40:51.251 9234-9234 D Receive index 1
    03-16 19:40:52.248 9234-9314 D Send index 2
    03-16 19:40:52.248 9234-9314 D Send value 4
    03-16 19:40:52.248 9234-9234 D Received value 4
    03-16 19:40:52.249 9234-9234 D channel received done!

关闭通道和遍历通道数据

当数据发送完毕后,可以调用 close() 函数来关闭通道,这样接收一方就知道合适数据接收完成。 而 ReceiveChannel 实现了 iterator 接口,所以可以在循环表达式中遍历通道,在循环执行的时候, 当 iterator 遇到了结束通道的信号就表示遍历完成:

        val channel = Channel<Int>()
        launch(BG) {
            for (x in 1..2) {
                // 使用 delay 来模拟耗时操作
                delay(1000)
                channel.send(x * x)
                Log.d(TAG, "Send value ${x * x}")
            // 这里发送完数据后,调用 close 函数
            channel.close()
        launch {
            for (v in channel) { // 在循环中遍历数据
                Log.d(TAG, "Receive value $v")
                textView.text = "Received value: $v"
            Log.d(TAG, "channel received done!")

注意上面的代码和前面一节中的区别,发送方发送完数据后调用 channel.close()。如果没有这个 close 调用,那么接收方的 for 循环是不会跳出循环的。

produce()

由于生产者-消费者模式在编码中非常常见,所以标准库中封装了一个 produce() 函数,比如下面用 produce 重写上面的示例:

val channel = produce<Int>(BG) {
    for (x in 1..2) {
        // 使用 delay 来模拟耗时操作
        delay(1000)
        send(x * x)
        Log.d(TAG, "Send value ${x * x}")
    close()
launch {
    for (v in channel) {
        Log.d(TAG, "Receive value $v")
        textView.text = "Received value: $v"
    Log.d(TAG, "channel received done!")

上面的 produce 函数的 Lambda 参数为生产数据的代码块,在 produce 函数内会使用 Channel() 工厂方法来创建一个 channel,然后在代码块里面就可以通过 send() 函数来发送数据,也可以通过 close() 函数来关闭通道。

为啥在 Lambda 代码块里面可以直接调用这两个函数呢? 看看 produce 的函数定义就知道了:

public fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>

block 参数类型为 ProducerScope<E>.() -> Unit ,所以 Lambda 代码块是一个 ProducerScope 接口的扩展函数,而 ProducerScope 继承了 SendChannel 接口,所以就可以直接使用 SendChannel 接口里面的函数了。

produce 的返回值是一个 ReceiveChannel 对象。可以看到标准库提供的 produce 函数用起来会更加方便。

Pipelines(管道)

管道是 Coroutine 生产数据流的一种模式,数据流可能是无限的:

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // 从1开始的无限制的整数流

其他的 Coroutine 可以消费这个数据流,然后处理里面的数据并生成另外一个数据流。比如下面把上面的数字转换为其平方:

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)

下面的代码把整个管道打通:

    launch {
        val numbers = produceNumbers()
        val squares = square(numbers)
        for (i in 1..5) { // 处理五个数据
            val value = squares.receive()
            textView.text = "Received value: $value"
            Log.d(TAG, "Received value: $value")
        Log.d(TAG, "Done!!")
        coroutineContext.cancelChildren()
        // 调用这个可以取消所有的子 coroutines

注意上面两个函数都定义为 CoroutineScope 的扩展函数,launch 里面的代码块都继承自同一个scope,这样可以使用前面介绍的 structured concurrency 特性来实现并行计算,而不用引入全局Scope。

Ticker channels

通过 ticker() 函数可以创建 Ticker channel,这是一个特殊的 RendezvousChannelcapacity0),可以指定一个初始的延迟时间 initialDelayMillis 和每次触发的间隔时间 delayMillis,这样在initialDelayMillis时间过后会产生一个 Unit 事件, 然后开始等待 delayMillis 时间后会继续产生 Unit 事件。其结果就类似于在间隔的时间内不停的产生 Unit 事件。

Ticker channel 主要用来创建复杂的基于时间的 pipelines 和其他基于时间窗口的操作符函数以及基于时间的事件处理机制。

简单介绍了通道的几个接口和一些关键的函数,ReceiveChannel 接口上定义的大量用来处理数据的扩展函数以及 通道的高级用法就不详细介绍了,随着大家对基础概念的理解,其他内容查看相关 API 文档应该就不难理解了。

ReceiveChannel API 文档:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
Channel API 文档:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK