7

掌握Kotlin Coroutine之 选择表达式

 3 years ago
source link: http://blog.chengyunfeng.com/?p=1093
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

理解了前面所介绍的内容,在应用开发中基本就可以使用Coroutine了。从这一节开始会涉及到几个比较复杂并且不常用的概念。

首先来看看Select expression(选择表达式)。选择表达式可以同时等待多个suspending函数选择第一个执行完的结果。 通俗点说呢,就是有多个异步任务在同时执行,然后等待这些任务的执行,如果谁最先执行完并返回结果,则就使用这个最先完成的任务返回的数据,丢弃其他后执行完的数据。Coroutine 对这种场景提供了不同的支持。

注意选择表达式目前还处于实验性阶段,以后其API可能会发生变化。

从通道中选择

这种场景是有多个生成数据的通道(channel),然后选择最先收到数据的那个通道。下面是一个示例,有两个函数fizzbuzz分别返回一个每隔一段时间产生一个字符串的通道channel对象:

fun CoroutineScope.fizz() = produce<String> {
    while (true) {
        // 每隔1秒生成一个"Fizz"字符串
        delay(1000)
        send("Fizz")
fun CoroutineScope.buzz() = produce<String> {
    while (true) {
        //每隔3秒生成一个"Buzz"字符串
        delay(3000)
        send("Buzz!")

下面是使用select表达式来选择最先产生数据的那个通道产生的数据,selectFizzBuzz 函数返回select表达式返回的值:

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>): String {
    return select {
        // <String> 表明这个选择表达式返回一个字符串
        fizz.onReceive { value ->
            // 这是第一个选择语句
            return@onReceive value
        buzz.onReceive { value ->
            // 这是第二个选择语句
            return@onReceive value

select表达式还支持超时操作,如果多个通道在规定的时间内都没法产生数据,则可以通过调用onTimeout函数来处理。

下面通过示例代码看起来更加直观:

class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        launch {
            val fizz = fizz()
            val buzz = buzz()
            repeat(7) {
                val v = selectFizzBuzz(fizz, buzz)
                Log.e(TAG, "In repeat index $it -> value $v")
            coroutineContext.cancelChildren()
    fun CoroutineScope.fizz() = produce<String> {
        Log.d(TAG, "fizz started")
        while (true) {
            // 每隔1秒生成一个"Fizz"字符串
            delay(1000)
            send("Fizz")
    fun CoroutineScope.buzz() = produce<String> {
        Log.d(TAG, "buzz started")
        while (true) {
            //每隔3秒生成一个"Buzz"字符串
            delay(3000)
            send("Buzz!")
    suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>): String {
        val value = select<String> {
            // <String> 表明这个选择表达式返回一个字符串
            fizz.onReceive { value ->
                // 这是第一个选择语句
                return@onReceive value
            buzz.onReceive { value ->
                // 这是第二个选择语句
                return@onReceive value
            onTimeout(500) {
                "Default"
        // 这个 value 变量是多余的,这里为了看起来更清晰所以保留
        return value

上面调用selectFizzBuzz这个函数7次,第一次执行的时候, select表达式等待了 500毫秒还没有收到两个通道生成的数据,所以就触发了超时,所以第一个打印的结果为 In repeat index 0 -> value Default; 后面第二次执行的时候,继续等待收到了 fizz 生成的数据 Fizz;以此类推。

上面的示例代码运行结果如下:

03-16 07:57:35.990 9689-9689 D fizz started
03-16 07:57:35.991 9689-9689 D buzz started
03-16 07:57:36.491 9689-9689 E In repeat index 0 -> value Default
03-16 07:57:36.994 9689-9689 E In repeat index 1 -> value Fizz
03-16 07:57:37.496 9689-9689 E In repeat index 2 -> value Default
03-16 07:57:37.998 9689-9689 E In repeat index 3 -> value Fizz
03-16 07:57:38.500 9689-9689 E In repeat index 4 -> value Default
03-16 07:57:38.993 9689-9689 E In repeat index 5 -> value Buzz!
03-16 07:57:38.999 9689-9689 E In repeat index 6 -> value Fizz

通道关闭异常

如果通道被关闭了,则select 里面的onReceive语句会抛出如下异常:

kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

使用onReceiveOrNull函数可以在通道关闭的时候收到 null 值,而不是抛出异常。这样应用可以针对 null 值做一些特殊处理。

class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        val fizz = fizz()
        val buzz = buzz()
        launch {
            repeat(8) {
                val v = selectFizzBuzz(fizz, buzz)
                Log.e(TAG, "In repeat index $it -> value $v")
            coroutineContext.cancelChildren()
        fab.setOnClickListener {
            fizz.cancel()
    fun CoroutineScope.fizz() = produce<String> {
        Log.d(TAG, "fizz started")
        repeat(4) {
            send("Fizz $it")
    fun CoroutineScope.buzz() = produce<String> {
        Log.d(TAG, "buzz started")
        repeat(4) {
            send("Buzz $it")
    suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>): String {
        val value = select<String> {
            // <String> 表明这个选择表达式返回一个字符串
            fizz.onReceiveOrNull { value ->
                // 这是第一个选择语句
                if (value == null) {
                    return@onReceiveOrNull "fizz is null"
                } else {
                    return@onReceiveOrNull value
            buzz.onReceiveOrNull { value ->
                // 这是第二个选择语句
                if (value == null) {
                    return@onReceiveOrNull "buzz is null"
                } else {
                    return@onReceiveOrNull value
        // 这个 value 变量是多余的,这里为了看起来更清晰所以保留
        return value

上面的示例代码和前面相比做了一些改动,首先发送数据的通道不再设置延迟时间,取消了timeout语句,并且发送数据的次数做了限制,还处理了当通道关闭后返回值为null的情况。
上面代码的log如下:

03-16 08:52:12.572 16494-16494/? D fizz started
03-16 08:52:12.574 16494-16494/? D buzz started
03-16 08:52:12.576 16494-16494/? E In repeat index 0 -> value Fizz 0
03-16 08:52:12.577 16494-16494/? E In repeat index 1 -> value Buzz 0
03-16 08:52:12.578 16494-16494/? E In repeat index 2 -> value Fizz 1
03-16 08:52:12.578 16494-16494/? E In repeat index 3 -> value Fizz 2
03-16 08:52:12.578 16494-16494/? E In repeat index 4 -> value Buzz 1
03-16 08:52:12.578 16494-16494/? E In repeat index 5 -> value Fizz 3
03-16 08:52:12.579 16494-16494/? E In repeat index 6 -> value fizz is null
03-16 08:52:12.579 16494-16494/? E In repeat index 7 -> value fizz is null

上面这个示例演示了几个select的特性:
– 优先选择第一个,当几个选择语句同时返回数据的时候,select优先选第一个语句所返回的数据,在上面的示例中,两个通道不停的产生数据,而fizz出现在select语句中第一个位置,所以优先选择fizz的结果
– 由于两个通道用的都是不带缓冲的类型,所以在调用fizzsend函数的时候会被暂停执行,这样buzz也会有执行的机会。
– 另外当通道被关闭后会立刻返回null,所以最后两个是fizz为null的情况

SendChannel#onSend

SendChannel接口有个 onSend 变量定义,这个onSend可以用到选择表达式中,其作用就是当向通道发送数据的时候,就触发选择表达式选择这个onSend语句。 这个解释起来有点绕,下面通过代码来演示:

fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
    for (num in 1..6) { //生成从 1 到 6 这6个数字
        delay(100) // 延迟 100 毫秒
        select<Unit> {
            onSend(num) {} // 发送数字到主要的 SendChannel
            side.onSend(num) {} // 如果主要通道没有处理完毕,则把新的数据发送给这个次要通道
@ExperimentalCoroutinesApi
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        val side = Channel<Int>() // 创建一个次要通道
        launch {
            launch {
                // 在子Coroutine 中启动一个处理数据非常快的次要通道
                side.consumeEach { println("Side channel has $it") }
            produceNumbers(side).consumeEach {
                println("Consuming $it")
                // 这里延迟 250毫秒,模拟这个通道处理数据慢的情况
                delay(250)
            println("Done consuming")
            coroutineContext.cancelChildren()

上面的代码定义了一个produceNumbers扩展函数,该函数返回值类型为ReceiveChannel<Int>(假设这个为消费数据的主通道),其功能就是返回从1到6这六个数字。然后下面通过produceNumbers(side).consumeEach() 函数来接收这六个数字。正常情况下,我们需要在 produce()代码块里面调用SendChannelsend()函数来向这个通道发送数据。 但是上面的produceNumbers扩展函数使用选择表达式来发送数据。里面的选择表达式的意思是:分别向主通道次要通道(side)发送数据,发送的顺序是选择表达式中定义的通道顺序,如果第一个主通道上一个数据还没处理完成,则就向下一个通道(side)继续发送,当数据发送出去后,调用onSend语句,由于上面示例中,主通道每隔 250毫秒处理一个数据,所以只写的Log如下:

03-16 09:27:52.413 10570-10570 I/ Consuming 1
03-16 09:27:52.664 10570-10570 I/ Consuming 2
03-16 09:27:52.915 10570-10570 I/ Consuming 3
03-16 09:27:53.166 10570-10570 I/ Consuming 4
03-16 09:27:53.417 10570-10570 I/ Consuming 5
03-16 09:27:53.667 10570-10570 I/ Consuming 6
03-16 09:27:53.918 10570-10570 I/ Done consuming

如果上面的解释还看的不是很明白,则可以再详细看看下面改进够的代码,更容易理解:

fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
    val pro = this // 这里 this 为 Lambda 表达式所在的 ProducerScope 对象,同时继承了 SendChannel
    pro.send(100) // 先发送一个数字 100 出去
    for (num in 1..6) { //生成从 1 到 6 这6个数字
        delay(100) // 延迟 100 毫秒
        select<Unit> {
            // 发送数字到主要的 SendChannel, 这里加上 pro 便于理解
            pro.onSend(num) {
                // 当数据发送出去的时候,会触发 onSend 语句,onSend 有两个参数,第一个为所发送出去的数字,
                // 这里为 num,第二个参数为这个Lambda 代码块, 这个代码块有个参数为发送数据的那个通道,
                // 可以通过变量 it 引用,比如下面每当有数据发送出去的时候,就通过 it 来继续往通道
                // 再发送一个数字 9, 然后再打印一个log。
                it.send(9)
                println("pro channel 9 , num $num")
            // 如果主要通道没有处理完毕,则把新的数据发送给这个次要通道
            side.onSend(num) {
                // 这里的处理方式同上,当select处理新的数字的时候,如果发现 pro 通道上次的数据还没有
                // 处理完毕(也就是 pro 通道还在忙着),则把数据发送给这个 side 通道
                it.send(10)
                println("side channel 10 , num $num")

注意:上面的代码依然需要仔细的看才能理解 onSend 在选择表达式中的用法。

上面改进后的代码打印的log如下:

03-16 09:52:24.809 12618-12618 I: Consuming 100
03-16 09:52:24.856 12618-12618 I: Side channel has 1
03-16 09:52:24.856 12618-12618 I: Side channel has 10
03-16 09:52:24.856 12618-12618 I: side channel 10 , num 1
03-16 09:52:24.957 12618-12618 I: Side channel has 2
03-16 09:52:24.958 12618-12618 I: Side channel has 10
03-16 09:52:24.958 12618-12618 I: side channel 10 , num 2
03-16 09:52:25.058 12618-12618 I: Side channel has 3
03-16 09:52:25.058 12618-12618 I: Side channel has 10
03-16 09:52:25.059 12618-12618 I: side channel 10 , num 3
03-16 09:52:25.160 12618-12618 I: Consuming 4
03-16 09:52:25.411 12618-12618 I: Consuming 9
03-16 09:52:25.411 12618-12618 I: pro channel 9 , num 4
03-16 09:52:25.513 12618-12618 I: Side channel has 5
03-16 09:52:25.513 12618-12618 I: Side channel has 10
03-16 09:52:25.514 12618-12618 I: side channel 10 , num 5
03-16 09:52:25.615 12618-12618 I: Side channel has 6
03-16 09:52:25.615 12618-12618 I: Side channel has 10
03-16 09:52:25.615 12618-12618 I: side channel 10 , num 6
03-16 09:52:25.662 12618-12618 I: Done consuming

看log的时候请注意,pro 通道在处理数据的时候延迟了 250毫秒,而1到6六个数字每次延迟100毫秒。

选择异步数据(deferred value)

Deferred接口有个onAwait属性也可以用在选择表达式中,用来选择async异步操作的返回值。比如下面这个的示例:

// 一个异步操作,返回一个字符串,该函数的返回值类型为 Deferred<String>
fun CoroutineScope.asyncString(time: Int): Deferred<String> = async {
    delay(time.toLong())// 延迟一段时间,模拟异步耗时操作
    return@async "Waited for $time ms" // 异步代码返回的字符串
// 创建一个 List,里面包含了12个 asyncString 函数返回的 Deferred<String>
fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
    val random = Random(3)
    return List(12) {
        asyncString(random.nextInt(1000))
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        launch {
            //创建 Deferred<String> list 对象
            val list = asyncStringsList()
            // 在选择表达式中选择 list 中的多个 Deferred 的结果
            val result = select<String> {
                list.withIndex().forEach { (index, deferred) ->
                    deferred.onAwait { answer ->
                        "Deferred $index produced answer '$answer'"
            // list 中 12个 Deferred 中第一个结果返回后,
            // 选择表达式执行完成并返回第一个结果
            println(result)
            // list 中第一个 Deferred 返回后(说明执行完毕了),
            // 统计下剩下的处于活跃状态的数目(应该是11个)
            val countActive = list.count { it.isActive }
            println("$countActive coroutines are still active")

上面执行的log如下:

03-16 11:26:15.415  I: Deferred 6 produced answer 'Waited for 43 ms'
03-16 11:26:15.415  I: 11 coroutines are still active

注意上面的选择表达式的实现,由于选择表达式是一个 Kotlin DSL,所以可以用任意的方式来提供这个选择语句。上面使用便利的操作来提供 onAwait 语句。和下面的 for 循环类似。

// 在选择表达式中选择 list 中的多个 Deferred 的结果
val result = select<String> {
      for (deferred in list) {
     deferred.onAwait { "Deferred answer '$it'" }

处理产生deferred类型数据的通道

下面这个示例比较有意思,switchMapDeferreds函数的参数为ReceiveChannel<Deferred<String>>类型,所以 input 里面接收到的数据类型为Deferred<String>,然后当收到 input 通道发来的 deferred 数据后,开始等待该 deferred 执行完毕并同时等待 input 通道的下一个数据,如果 deferred 还没有执行完毕则 input 下一个数据(next)就已经收到了,那么就不继续等待当前这个 deferred 对象的返回值了,而是继续等待 next 上的返回值,依次循环。

fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>): ReceiveChannel<String> = produce<String> {
    var current = input.receive() // 从第一个收到的 deferred 开始
    while (isActive) { // 如果 Coroutine 处于活跃状态则一直循环读取数据
        // 这个选择表达式返回的为 Deferred<String> 对象或者 null
        val next = select<Deferred<String>?> {
            //第一个选择语句,等待input通道中的 deferred 数据,如果通道关闭了,则返回null
            input.onReceiveOrNull { update ->
                update // 返回这个值
            //下面是第二个选择语句,current 为 Deferred<String> 对象,当 onAwait 返回
            // 的时候, value 为一个string对象,然后把value发送到 produce 的通道中去
            current.onAwait { value ->
                send(value) // 把value发送到 produce 的通道中去
                // 然后把 input 通道的下一个 deferred 数据作为选择表达式的结果返回
                input.receiveOrNull()
        // 如果 next 为null,说明 input 通道关闭了
        if (next == null) {
            println("Channel was closed")
            break // 跳出while 循环
        } else {
            // 如果 next 不为null,则更新 current 为 nexut 的值
            current = next

请注意看上面的注释可以帮助理解该代码的运行行为。这里同时在选择表达式上使用了 ReceiveChannel#onReceiveOrNullDeferred#onAwait 两个不同的 select 语句。

可以使用下面的函数来生成向 input 通道内发送的 deferred 对象:

fun CoroutineScope.asyncString(str: String, time: Long) = async {
    delay(time)
    return@async str

下面是整个测试代码:

fun CoroutineScope.asyncString(str: String, time: Long) = async {
    delay(time)
    return@async str
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>): ReceiveChannel<String> = produce<String> {
    var current = input.receive() // 从第一个收到的 deferred 开始
    while (isActive) { // 如果 Coroutine 处于活跃状态则一直循环读取数据
        // 这个选择表达式返回的为 Deferred<String> 对象或者 null
        val next = select<Deferred<String>?> {
            //第一个选择语句,等待input通道中的 deferred 数据,如果通道关闭了,则返回null
            input.onReceiveOrNull { update ->
                update // 返回这个值
            //下面是第二个选择语句,current 为 Deferred<String> 对象,当 onAwait 返回
            // 的时候, value 为一个string对象,然后把value发送到 produce 的通道中去
            current.onAwait { value ->
                send(value) // 把value发送到 produce 的通道中去
                // 然后把 input 通道的下一个 deferred 数据作为选择表达式的结果返回
                input.receiveOrNull()
        // 如果 next 为null,说明 input 通道关闭了
        if (next == null) {
            println("Channel was closed")
            break // 跳出while 循环
        } else {
            // 如果 next 不为null,则更新 current 为 nexut 的值
            current = next
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        launch {
            // 用来测试的通道
            val chan = Channel<Deferred<String>>()
            launch { // 启动一个用来打印结果的子 coroutine
                // switchMapDeferreds 的返回值类型为 ReceiveChannel<String>,
                for (s in switchMapDeferreds(chan))
                    println(s) // 用for循环打印里面所收到的每个数据
            // 上面的子 coroutine 等待往 chan 里面发送数据或者 chan 被关闭了
            // 先发送第一个数据,这个 deferred 对象延迟 100毫秒返回 BEGIN 字符串
            chan.send(asyncString("BEGIN", 100))
            delay(200) // 然后等待超过 100毫秒的时间,让 "BEGIN" 字符串能够发送出来
            // 向通道中发送第二个数据,这个 deferred 对象延迟 500毫秒返回 Slow 字符串
            chan.send(asyncString("Slow", 500))
            delay(100) // 延迟 100毫秒继续发送下一个 deferred 对象
            // 当第三个 deferred 对象发送出去的时候, select 表达式中 先收到了第三个 deferred 数据,
            // 所以上面的 Slow deferred 数据就没有来得及发送出来,就被丢弃了
            chan.send(asyncString("Replace", 100))
            // 等待 500毫秒再发送下一个数据,这样 第三个 deferred 对象的 Replace 数据就可以发送出来了
            delay(500)
            // 然后发送最后一个 deferred 对象 END
            chan.send(asyncString("END", 500))
            // 等待超过 500毫秒的时间,好让 END 这个数据能够发送出来
            delay(1000)
            chan.close() // 关闭通道
            delay(500) // 等一点时间好让子Coroutine处理关闭事件

注意看上面代码中的注释,帮助理解 调用switchMapDeferreds函数的代码逻辑。特别是向 chan 通道发送数据的等待时间间隔。下面是log:

03-16 12:43:19.425 I BEGIN
03-16 12:43:19.729 I Replace
03-16 12:43:20.630 I END
03-16 12:43:21.132 I Channel was closed

可以看到选择表达式Select expression可以有不同的用法,可以在多个suspending functions上等待并返回最先执行完毕的数据。 需要注意的是,选择表达式还处于实现性阶段,以后这些API可能会有稍微的变化。请及时关注官方文档。

选择表达式API文档:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK