12

掌握Kotlin Coroutine之 数据共享

 3 years ago
source link: http://blog.chengyunfeng.com/?p=1094
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

掌握Kotlin Coroutine之 数据共享

作者: rain 分类: 移动 发布时间: 2019-03-16 17:46 6 0条评论

Coroutine 既然是异步操作,并且可以通过多线程的dispatcher来并发执行。所以同样会遇到多线程并发异步操作的各种问题。其中最重要的一个问题就是如何控制对shared mutable state(可变的共享数据)的访问。多线程中的一些解决方法在Coroutine 中也能使用,但是 Coroutine 也有自己特有的解决方式。

下面先来看一些在其他多线程中所采用的解决数据共享问题的方法,这些方法同样也可以在 Coroutine 中使用。

我们使用下面的示例当做需要解决的问题,通过这个示例来演示各种解决方式。

suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
    val n = 100  // 子 coroutines 的数目
    val k = 1000 // 每个子 coroutine 重复执行 action 函数的次数
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch {
                repeat(k) { action() }
        jobs.forEach { it.join() }
    // measureTimeMillis 函数可以计算里面代码块所执行的时间,
    // 然后打印计算的时间
    println("Completed ${n * k} actions in $time ms")

下面使用一个非常简单的任务 —- 递增一个共享可变的变量。注意下面的代码使用的是GlobalScope来执行的,使用的是默认的Dispatchers.Default来运行:

GlobalScope.massiveRun {
    counter++
println("Counter = $counter")

下面是在Pixel手机上多次执行的结果:

03-16 13:40:46.101 I Completed 100000 actions in 417 ms
03-16 13:40:46.101 I Counter = 97752
03-16 13:47:44.759 I Completed 100000 actions in 330 ms
03-16 13:47:44.759 I Counter = 95835
03-16 13:47:50.163 I Completed 100000 actions in 75 ms
03-16 13:47:50.163 I Counter = 90270
03-16 13:47:55.725 I Completed 100000 actions in 236 ms
03-16 13:47:55.725 I Counter = 94529

请注意:如果你的测试手机或者电脑CPU只有2核或者1核的话,打印的结果可能是”Counter = 100000″,这样因为Coroutine 都在同一个线程内运行导致的。

volatile 变量无法解决上面的问题

通常使用volatile修饰符来解决一个变量多线程访问的问题。但是对于上面的示例是无效的,比如:

class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    @Volatile // 在 Kotlin 中`volatile` 是通过注解实现的
    var counter = 0
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        launch {
            GlobalScope.massiveRun {
                counter++
            println("Counter = $counter")

上面的代码运行的稍微慢一点,但是打印的结果依然不是Counter = 100000。原因在于 volatile 变量只能保证原子操作,而上面的 counter++ 是递增操作,是先递增再赋值 不是原子操作。

使用线程安全的数据类型

线程安全(Thread-safe)的数据类型是另外一种常见的解决方式,比如 AtomicInteger 类,里面的一些函数都是线程安全的,对里面的状态做了保护,下面是使用这种方式的示例:

class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    var counter = AtomicInteger()
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        launch {
            GlobalScope.massiveRun {
                counter.incrementAndGet()
            println("Counter = ${counter.get()}")

运行结果如下:

03-16 16:58:49.346 I Completed 100000 actions in 421 ms
03-16 16:58:49.346 I Counter = 100000

上面这种使用线程安全的数据结构的方式,可以解决大部分的简单计算问题以及集合操作问题。但是对于复杂的状态管理和复杂的运算方法如果找不到对应的封装好的线程安全的数据结构可以使用的话,则是需要另寻他法的。

Thread confinement fine-grained(细粒度线程限制)

线程限制(Thread confinement)是一种把访问可变共享数据限制到同一个线程去访问的方法。通常在 UI 框架中都是这样的,比如 安卓系统只有一个 UI线程,所有针对 UI 的操作都需要在该线程完成。 使用一个单线程执行环境就可以实现了:

class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    val counterContext = newSingleThreadContext("CounterContext")
    var counter = 0
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        launch {
            GlobalScope.massiveRun {
                withContext(counterContext) { //在单线程中执行对共享变量的访问
                    counter++
            println("Counter = ${counter}")

下面是运行结果:

03-16 16:05:28.945 I Completed 100000 actions in 3386 ms
03-16 16:05:28.945 I Counter = 100000
03-16 16:06:04.649 I Completed 100000 actions in 3526 ms
03-16 16:06:04.649 I Counter = 100000

可以看到上面运行结果是正确的,但是运行的非常慢。之前执行完毕只需要 400毫秒,上面的单线程执行结果需要 3秒多。每次在计算 counter 值的时候都需要从另外一个线程切换到计算线程,所以比较耗时。

Thread confinement coarse-grained(粗粒度线程限制)

上面细粒度的线程限制导致每次执行到递增操作的时候,都需要切换一下线程,导致执行比较慢。而通过粗粒度的线程控制,可以把更大范围的计算逻辑放到同一个线程中去执行,避免线程的频繁切换。如下示例中把massiveRun函数在同一个线程中去执行:

CoroutineScope(counterContext).massiveRun {
    counter++
println("Counter = $counter")

这样的话,运行的就快多了,并且结果依然是正确的:

03-16 16:09:28.145 I Completed 100000 actions in 281 ms
03-16 16:09:28.145 I Counter = 100000

Mutual exclusion(互斥锁)

互斥锁(Mutual exclusion)的解决方法是通过一个锁来保护关键的代码不会同时被多线程执行。在 Java 中通常使用 synchronized 关键字或者ReentrantLock对象来实现。在 Coroutine 中使用 Mutex,使用lockunlock函数来保护关键的代码。Mutex.lock() 是一个suspending function 所以不会阻塞线程的执行。

由于使用mutex代码通常会先获取锁,执行完代码后再释放锁,所以代码通常是这么写的:mutex.lock(); try { ... } finally { mutex.unlock() },而为了方便使用锁,Kotlin 提供了一个 withLock 扩展函数,这样使用锁就很简单了:

class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    val mutex = Mutex()
    var counter = 0
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        launch {
            GlobalScope.massiveRun {
                mutex.withLock { // 使用扩展函数来简化 mutex 的使用
                    counter++
            println("Counter = ${counter}")

上面示例中也是在细粒度上使用锁。所以同样会涉及到频繁获取释放锁的操作导致比较耗时。 当你必须要周期性的修改共享数据并且找不到其他更好方式的时候,可以采用上面这种解决方案。

Actors

Actor 模型是 Carl Hewitt 于1973年发表的《A Universal Modular Actor Formalism for Artificial Intelligence 》论文中提出的一种用于并发计算的模型。虽然该模型出现很早,但是由于早期硬件发展达不到该模型的执行条件,所以 Actor 在最近10来年才有所发展。特别是在2000年以后,CPU速度达到了顶点,CPU速度无法成指数级增长了。随着业务的复杂,人们对计算能力的要求越来越高,当CPU时钟速度无法快速增长后,人们提出了多核CPU的概念,多个CPU来分工协作,从而提高一个任务的执行效率。这样多线程并发的计算场景就出现了。

而 Actor 模型就是解决多线程并发计算的。早期在Smalltalk语言中实现,后来在 Java 中也通过akka框架实现。

Actor 模型和面向对象编程(OOP)概念类似,在 OOP 中everything is an object;而在 Actor 模型中everything is an actor

一个 Actor 是一个独立的计算单元,根据其受到的消息来做出不同的响应,当一个 Actor 接收到一个消息的时候可以做出如下不同的响应:
– 创建更多的 Actor
– 把消息发送给其他 Actor
– 指定如何处理下一条消息

上面的操作并没有先后顺序,并且可以并发的执行。所以 Actor 模型有两个重要的概念: Actor 和消息队列(mailbox)。

每个 Actor 都是独立的计算单元,和其他的 Actor 没有任何依赖关系。每个 Actor 也有一个标识符,这样其他 Actor 可以向它发送消息。

Actor 有个消息队列(在 Actor 模型中有个专业的叫法 — mailbox),发送给一个 Actor 的消息都处于 mailbox 中,Actor 每次从 mailbox 中拿出一个消息来处理。

注意:一个 Actor 处理消息是按顺序一个一个处理的,如果想要并发效果,就需要把多个消息分别发送给不同的 Actor。

Actor 有自己的状态,相互不影响,Actor 收到消息后可以更新自己的状态,然后继续处理下一条消息。由于 Actor 是独立的,相互不影响的,所以具有很好的容错性,如果一个 Actor 出错了,不会影响其他的 Actor, 这个出错的 Actor 可以有父 Actor来处理它,可以让他恢复初始状态继续工作,也可以把他隔离开来,不再给他发送消息。

可以举个产品开发测试的例子来说明 Actor 模型。在产品开发中会涉及到下面一些人员:产品经理PM、开发人员RD、测试人员Tester,每个人员都是一个 Actor,而产品每个需求、每个bug都是一个要处理的消息。每个消息可以发送给不同的 Actor 来处理,比如有个需求不明确的问题被开发人员发现了,开发人员把这个问题发送给了测试人员,测试人员收到后发现这个消息他处理不了,所以找到产品经理把这个消息又转发过去了(转发消息给 Actor),然后产品经理处理这个消息。当产品经理收到这个消息后,发现这个问题比较复杂,自己处理不过来,则可以指定更多的产品经理(创建更多的 Actor)来处理这个消息。

下面通过示例看看如何使用 Coroutine 提供的 actor 函数来使用 Actor 模型。

首先需要定义一些不同的消息类型。Actor 针对不同的消息由不同的处理方式。在 Kotlin 中消息类型可用 sealed classes 来定义。下面定义了一个 sealed class CounterMsg,里面有两个不同的类 IncCounterGetCounterIncCounter 代表递增的消息, GetCounter 代表获取其值的消息。在 GetCounter 中使用 CompletableDeferred 来获取结果。

// counterActor 的消息类型
sealed class CounterMsg
// 实现递增操作的消息类型
object IncCounter : CounterMsg()
// 请求当前值的消息类型
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

然后使用actor函数来创建一个counterActor函数来处理不同的消息:

// 该函数创建一个 counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor 的状态
    for (msg in channel) { // 遍历 actor mailbox 中的消息
        when (msg) {
            is IncCounter -> counter++ // 如果是 IncConter 消息,则执行递增操作
            is GetCounter -> msg.response.complete(counter) // 如果是取值的消息,则返回当前的值

下面是具体调用的代码:

class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        launch {
            val counter = counterActor() // 创建一个 actor
            GlobalScope.massiveRun {
                counter.send(IncCounter) // 给 Actor 发送消息
            // 发送获取最终结果的消息
            val response = CompletableDeferred<Int>()
            counter.send(GetCounter(response))
            println("Counter = ${response.await()}")
            counter.close() // 关闭 actor

上面这个示例可以解决我们最前面提出的这个问题,但是其实并没有完全应用 Actor 模型的能力,上面的示例中只有一个 Actor 来处理不同的消息。只不过,不同的消息在不同的线程中被处理。在 Kotlin 中一个 Actor 就是一个按顺序执行所收到消息的 coroutine ,所以把共享的数据放到一个 Actor 上作为一种并发共享数据的解决方案。

在 Kotlin 中使用 Actor 比使用锁更高效,actor 独立封装了其状态可以在不同的线程中执行,所以 actor 不需要来回切换执行的线程。

上面只是简单介绍了 Actor 模型的概念,以及在 Coroutine 中 actor 的基础用法,请搜索 Actor 模型 来了解更详细的介绍。

本文介绍了在 Coroutine 中共享数据的一些方法。这是本系列的最后一篇文章,到此关于 Kotlin 语言 Coroutine 的介绍就结束了,为了进一步熟练掌握这些概念,后面呢计划改进安卓的 Todo 示例项目,用 Coroutine 来实现。具体啥时候能改进完成目前还不清楚,到时候有空弄完了再告诉大家。

Mutex API 文档:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html

withLock 扩展函数文档:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html

Actor API 文档:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html

CompletableDeferred API 文档:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-completable-deferred/index.html

Carl Hewitt 论文地址:http://worrydream.com/refs/Hewitt-ActorModel.pdf

Akka 官网:https://akka.io/


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK