7

再谈协程之第三者Flow基础档案

 2 years ago
source link: https://xuyisheng.top/flow_basic/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

该来的还是来了,LiveData提供了响应式编程的基础,搭建了一套数据观察者的使用框架,但是,它相当于RxJava这类的异步框架来说,有点略显单薄了,这也是经常被人诟病的问题,因此,Flow这个小三就顺应而生了。

Flow作为一套异步数据流框架,几乎可以约等于RxJava,但借助Kotlin语法糖和协程,以及Kotlin的DSL语法,可以让Flow的写法变得异常简洁,让你直面人性最善良的地方,一切的黑暗和丑陋,都被编译器消化了。而且,Flow作为LiveData的进化版本,可以很好的和JetPack结合起来,作为全家桶的一员,为统一架构添砖加瓦。

要理解FLow,首先需要了解Flow的各种操作符和基础功能,如果不理解这些,那么很难将Flow灵活运用,所以,本节主要来梳理Flow的基础。

Flow前言

首先,我们来看一个新的概念——冷流和热流,如果你看网上的Flow相关的文章,十有八九都会提到这个很冷门的名词。

Flow是早上冷的,到Channel才热起来。

一个异步数据流,通常包含三部分:

所谓冷流,即下游无消费行为时,上游不会产生数据,只有下游开始消费,上游才从开始产生数据。

而所谓热流,即无论下游是否有消费行为,上游都会自己产生数据。

Flow操作符

Flow和RxJava一样,用各种操作符撑起了异步数据流框架的半边天。Flow默认为冷流,即下游有消费时,才执行生产操作。

所以,操作符也被分为两类——中间操作符和末端操作符,中间操作符不会产生消费行为,返回依然为Flow,而末端操作符,会产生消费行为,即触发流的生产。

Flow的创建

仅仅创建Flow,是不会执行Flow中的任何代码的,但我们首先,还是要看下如何创建Flow。

通过flow{}构造器,可以快速创建Flow,在flow中,可以使用emit来生产数据(或者emitAll生产批量数据),示例如下。

flow {
    for (i in 0..3) {
        emit(i.toString())
    }
}
  • flowOf

与listOf类似,Flow可以通过flowOf来产生有限的已知数据。

flowOf(1, 2, 3)
  • asFlow

asFlow用于将List转换为Flow。

listOf(1,2,3).asFlow()
  • emptyFlow

如题,创建一个空流。

末端操作符

末端操作符在调用之后,创建Flow的代码才会执行,这点和Sequence非常类似。

  • collect

collect是最常用的末端操作符,示例如下。

末端操作符都是suspend函数,所以需要运行在协程作用域中。

MainScope().launch {
    val time = measureTimeMillis {
        flow {
            for (i in 0..3) {
                Log.d("xys", "emit value---$i")
                emit(i.toString())
            }
        }.collect {
            Log.d("xys", "Result---$it")
        }
    }
    Log.d("xys", "Time---$time")
}
  • collectIndexed

带下标的collect,下标是Flow中的emit顺序。

MainScope().launch {
    val time = measureTimeMillis {
        flow {
            for (i in 0..3) {
                Log.d("xys", "emit value---$i")
                emit(i.toString())
            }
        }.collectIndexed { index, value ->
            Log.d("xys", "Result in $index --- $value")
        }
    }
    Log.d("xys", "Time---$time")
}
  • collectLatest

collectLatest用于在collect中取消未来得及处理的数据,只保留当前最新的生产数据。

flowOf(1, 2, 3).collectLatest {
    delay(1)
    Log.d("xys", "Result---$it")
}
  • toCollection、toSet、toList

这些操作符用于将Flow转换为Collection、Set和List。

  • launchIn

在指定的协程作用域中直接执行Flow。

flow {
    for (i in 0..3) {
        Log.d("xys", "emit value---$i")
        emit(i.toString())
    }
}.launchIn(MainScope())
  • last、lastOrNull、first、firstOrNull

返回Flow的最后一个值(第一个值),区别是last为空的话,last会抛出异常,而lastOrNull可空。

flow {
    for (i in 0..3) {
        emit(i.toString())
    }
}.last()

状态操作符

状态操作符不做任何修改,只是在合适的节点返回状态。

  • onStart:在上游生产数据前调用
  • onCompletion:在流完成或者取消时调用
  • onEach:在上游每次emit前调用
  • onEmpty:流中未产生任何数据时调用
  • catch:对上游中的异常进行捕获
  • retry、retryWhen:在发生异常时进行重试,retryWhen中可以拿到异常和当前重试的次数
MainScope().launch {
    Log.d("xys", "Coroutine in ${Thread.currentThread().name}")
    val time = measureTimeMillis {
        flow {
            for (i in 0..3) {
                emit(i.toString())
            }
            throw Exception("Test")
        }.retryWhen { _, retryCount ->
            retryCount <= 3
        }.onStart {
            Log.d("xys", "Start Flow in ${Thread.currentThread().name}")
        }.onEach {
            Log.d("xys", "emit value---$it")
        }.onCompletion {
            Log.d("xys", "Flow Complete")
        }.catch { error ->
            Log.d("xys", "Flow Error $error")
        }.collect {
            Log.d("xys", "Result---$it")
        }
    }
    Log.d("xys", "Time---$time")
}

另外,onCompletion也可以监听异常,代码如下所示。

.onCompletion { exception ->
    Log.d("xys", "Result---$exception")
}

Transform操作符

与RxJava一样,在数据流中,我们可以利用操作符对数据进行各种变换,以满足操作流的不同需求。

  • map、mapLatest、mapNotNull

map操作符将Flow的输入通过block转换为新的输出。

flow {
    for (i in 0..3) {
        emit(i)
    }
}.map {
    it * it
}
  • transform、transformLatest

transform操作符与map操作符有点一样,但又不完全一样,map是一对一的变换,而transform则可以完全控制流的数据,进行过滤、 重组等等操作都可以。

flow {
    for (i in 0..3) {
        emit(i)
    }
}.transform { value ->
    if (value == 1) {
        emit("!!!$value!!!")
    }
}.collect {
    Log.d("xys", "Result---$it")
}
  • transformWhile

transformWhile的返回值是一个bool类型,用来控制流的截断,如果返回true,则流继续执行,如果false,则流截断。

flow {
    for (i in 0..3) {
        emit(i)
    }
}.transformWhile { value ->
    emit(value)
    value == 1
}.collect {
    Log.d("xys", "Result---$it")
}

过滤操作符

如题,过滤操作符用于过滤流中的数据。

  • filter、filterInstance、filterNot、filterNotNull

过滤操作符可以按条件、类型或者对过滤取反、取非空等条件进行操作。

flow {
    for (i in 0..3) {
        emit(i)
    }
}.filter { value ->
    value == 1
}.collect {
    Log.d("xys", "Result---$it")
}
  • drop、dropWhile、take、takeWhile

这类操作符可以丢弃前n个数据,或者是只拿前n个数据。带while后缀的,则表示按条件进行判断。

  • debounce

debounce操作符用于防抖,指定时间内的值只接收最新的一个。

  • sample

sample操作符与debounce操作符有点像,但是却限制了一个周期性时间,sample操作符获取的是一个周期内的最新的数据,可以理解为debounce操作符增加了周期的限制。

  • distinctUntilChangedBy

去重操作符,可以按照指定类型的参数进行去重。

组合操作符

组合操作符用于将多个Flow的数据进行组合。

  • combine、combineTransform

combine操作符可以连接两个不同的Flow。

val flow1 = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
flow1.combine(flow2) { i, s -> i.toString() + s }.collect {
    Log.d("xys", "Flow combine: $it")
}
D/xys: Flow combine: 1a
D/xys: Flow combine: 2a
D/xys: Flow combine: 2b
D/xys: Flow combine: 2c

可以发现,当两个Flow数量不同时,始终由Flow1开始,用其最新的元素,与Flow2的最新的元素进行组合,形成新的元素。

  • merge

merge操作符用于将多个流合并。

val flow1 = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
listOf(flow1, flow2).merge().collect {
    Log.d("xys", "Flow merge: $it")
}
D/xys: Flow merge: 1
D/xys: Flow merge: 2
D/xys: Flow merge: a
D/xys: Flow merge: b
D/xys: Flow merge: c

merge的输出结果是按照时间顺序,将多个流依次发射出来。

zip操作符会分别从两个流中取值,当一个流中的数据取完,zip过程就完成了。

val flow1 = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
flow1.zip(flow2) { i, s -> i.toString() + s }.collect {
    Log.d("xys", "Flow zip: $it")
}
D/xys: Flow zip: 1a
D/xys: Flow zip: 2b

在Flow中,可以简单的使用flowOn来指定线程的切换,flowOn会对上游,以及flowOn之前的所有操作符生效。

flow {
    for (i in 0..3) {
        Log.d("xys", "Emit Flow in ${Thread.currentThread().name}")
        emit(i)
    }
}.map {
    Log.d("xys", "Map Flow in ${Thread.currentThread().name}")
    it * it
}.flowOn(Dispatchers.IO).collect {
    Log.d("xys", "Collect Flow in ${Thread.currentThread().name}")
    Log.d("xys", "Result---$it")
}

这种情况下,flow和map的操作都将在子线程中执行。

而如果是这样:

flow {
    for (i in 0..3) {
        Log.d("xys", "Emit Flow in ${Thread.currentThread().name}")
        emit(i)
    }
}.flowOn(Dispatchers.IO).map {
    Log.d("xys", "Map Flow in ${Thread.currentThread().name}")
    it * it
}.collect {
    Log.d("xys", "Collect Flow in ${Thread.currentThread().name}")
    Log.d("xys", "Result---$it")
}

这样map就会执行在主线程了。

同时,你也可以多次调用flowOn来不断的切换线程,让前面的操作符执行在不同的线程中。

取消Flow

Flow也是可以被取消的,最常用的方式就是通过withTimeoutOrNull来取消,代码如下所示。

MainScope().launch {
    withTimeoutOrNull(2500) {
        flow {
            for (i in 1..5) {
                delay(1000)
                emit(i)
            }
        }.collect {
            Log.d("xys", "Flow: $it")
        }
    }
}

这样当输出1、2之后,Flow就被取消了。

Flow的取消,实际上就是依赖于协程的取消。

Flow的同步非阻塞模型

首先,我们要理解下,什么叫同步非阻塞,默认场景下,Flow在没有切换线程的时候,运行在协程作用域指定的线程,这就是同步,那么非阻塞又是什么呢?我们知道emit和collect都是suspend函数,所谓suspend函数,就是会挂起,将CPU资源让出去,这就是非阻塞,因为suspend了就可以让一让,让给谁呢?让给其它需要执行的函数,执行完毕后,再把资源还给我。

所以,我们来看下面这个例子。

flow {
    for (i in 0..3) {
        emit(i)
    }
}.onStart {
    Log.d("xys", "Start Flow in ${Thread.currentThread().name}")
}.onEach {
    Log.d("xys", "emit value---$it")
}.collect {
    Log.d("xys", "Result---$it")
}
D/xys: Start Flow in main
D/xys: emit value---0
D/xys: Result---0
D/xys: emit value---1
D/xys: Result---1
D/xys: emit value---2
D/xys: Result---2
D/xys: emit value---3
D/xys: Result---3

可以发现,emit一个,collect拿一个,这就是同步非阻塞,互相谦让,这样谁都可以执行,看上去flow中的代码和collect中的代码,就是同步执行的。

异步非阻塞模型

假如我们给Flow增加一个线程切换,让Flow执行在子线程,同样是上面的代码,我们再来看下执行情况。

flow {
    for (i in 0..3) {
        emit(i)
    }
}.onStart {
    Log.d("xys", "Start Flow in ${Thread.currentThread().name}")
}.onEach {
    Log.d("xys", "emit value---$it")
}.flowOn(Dispatchers.IO).collect {
    Log.d("xys", "Collect Flow in ${Thread.currentThread().name}")
    Log.d("xys", "Result---$it")
}
D/xys: Start Flow in DefaultDispatcher-worker-1
D/xys: emit value---0
D/xys: emit value---1
D/xys: emit value---2
D/xys: emit value---3
D/xys: Collect Flow in main
D/xys: Result---0
D/xys: Collect Flow in main
D/xys: Result---1
D/xys: Collect Flow in main
D/xys: Result---2
D/xys: Collect Flow in main
D/xys: Result---3

这个时候,Flow就变成了异步非阻塞模型,异步呢,就更好理解了,因为在不同线程,而此时的非阻塞,就没什么意义了,由于flow代码先执行,而这里的代码由于没有delay,所以是同步执行的,执行的同时,collect在主线程进行监听。

除了使用flowOn来切换线程,使用channelFlow也可以实现异步非阻塞模型。

向大家推荐下我的网站 https://xuyisheng.top/ 专注 Android-Kotlin-Flutter 欢迎大家访问


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK