3

Kotlin coroutine 原理

 3 years ago
source link: https://segmentfault.com/a/1190000040640509
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Coroutine

lifecycleScope.launch {
            Log.d("testCoroutineScope","testCoroutineScope start $this")
            delay(2000)
            Log.d("testCoroutineScope","testCoroutineScope middle1")
            delay(2000)
            Log.d("testCoroutineScope","testCoroutineScope middle2")
            delay(2000)
            Log.d("testCoroutineScope","testCoroutineScope end")
        }

上边的代码展示了启动协程的方法,通常在协程体中会调用到suspend函数。我们都了解kotlin中协程的支持除了应用到kotlin的一些语法特性,同时针对协程还进行了编译器的修改,使得我们在使用协程时更加直观方便。但是这也带来了另一个问题,我们更难理解协程的具体工作细节。下面我们从最让人费解的协程体开始入手。

一、探索协程体到底是什么?

这里认为通过launch、async启动的block块就是协程体。

协程体在经过编译器编译后会生成一个新的对象,具体对象的实现是什么样的呢?看看下面反编译后的代码与原代码的比较:

//原来的kotlin代码
private fun testCoroutineScope() {
        val block: suspend CoroutineScope.() -> Unit = {
            Log.d("testCoroutineScope", "testCoroutineScope start")
            delay(2000)
            Log.d("testCoroutineScope", "testCoroutineScope end")
        }

        lifecycleScope.launch(block = block)
    }

为了方便观察,我把协程体单独定义一个block变量。kotlin的协程体只是简单的调用delay挂起方法并在方法调用前后添加了log。

//反编译后得到的代码
private final void testCoroutineScope() {
      Function2 block = (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               Log.d("testCoroutineScope", "testCoroutineScope start");
               this.label = 1;
               if (DelayKt.delay(2000L, this) == var2) {
                  return var2;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            Log.d("testCoroutineScope", "testCoroutineScope end");
            return Unit.INSTANCE;
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      });
      BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, block, 3, (Object)null);
   }

经过反编译后的协程体是一个实现Function2接口类型的对象,Function2支持两个形参的invoke方法。除了invoke方法还有create和invokeSuspend两个方法,所以协程体更准确的描述应该是实现了Function2接口的对象。反编译的代码可以帮我们了解大体的思路,但是一些细节还是有问题的。我们在网上搜索关于协程体对象的介绍,介绍中说协程体是SuspendLambda类的子类。那么它真的是SuspendLambda的子类吗?我们可以修改代码简单验证下:

private fun testCoroutineScope() {
        val block: suspend CoroutineScope.() -> Unit = {
            Log.d("testCoroutineScope", "testCoroutineScope start")
            delay(2000)
            Log.d("testCoroutineScope", "testCoroutineScope end")
        }
        //查看block对象的方法
        block.javaClass.let { clazz ->
            clazz.methods.forEach { method ->
                Log.d("testCoroutineScope", "method is ${method}")
            }
        }

        lifecycleScope.launch(block = block)
    }

我们通过打印block对象的方法基本可以验证问题,下面部分是log输出的内容:

D: method is public kotlin.coroutines.Continuation kotlin.coroutines.jvm.internal.BaseContinuationImpl.create(kotlin.coroutines.Continuation)
D: method is public final kotlin.coroutines.Continuation com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.create(java.lang.Object,kotlin.coroutines.Continuation)
D: method is public boolean java.lang.Object.equals(java.lang.Object)
D: method is public int kotlin.coroutines.jvm.internal.SuspendLambda.getArity()
D: method is public kotlin.coroutines.jvm.internal.CoroutineStackFrame kotlin.coroutines.jvm.internal.BaseContinuationImpl.getCallerFrame()
D: method is public final java.lang.Class java.lang.Object.getClass()
D: method is public final kotlin.coroutines.Continuation kotlin.coroutines.jvm.internal.BaseContinuationImpl.getCompletion()
D: method is public kotlin.coroutines.CoroutineContext kotlin.coroutines.jvm.internal.ContinuationImpl.getContext()
D: method is public java.lang.StackTraceElement kotlin.coroutines.jvm.internal.BaseContinuationImpl.getStackTraceElement()
D: method is public int java.lang.Object.hashCode()
D: method is public final kotlin.coroutines.Continuation kotlin.coroutines.jvm.internal.ContinuationImpl.intercepted()
D: method is public java.lang.Object com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.invoke(java.lang.Object,java.lang.Object)
D: method is public final java.lang.Object com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.invoke(kotlinx.coroutines.CoroutineScope,kotlin.coroutines.Continuation)
D: method is public final java.lang.Object com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.invokeSuspend(java.lang.Object)
D: method is public final native void java.lang.Object.notify()
D: method is public final native void java.lang.Object.notifyAll()
D: method is public final void kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(java.lang.Object)
D: method is public java.lang.String kotlin.coroutines.jvm.internal.SuspendLambda.toString()
D: method is public final void java.lang.Object.wait() throws java.lang.InterruptedException
D: method is public final void java.lang.Object.wait(long) throws java.lang.InterruptedException
D: method is public final native void java.lang.Object.wait(long,int) throws java.lang.InterruptedException

我们从log中可以找到SuspendLambda 的getArity()方法,所以block对象肯定是继承了SuspendLambda。在log中我们也可以找到反编译代码中发现的create方法、invoke方法和invokeSuspend方法。

二、协程体是如何启动的

这里以launch方法的启动过程为准来跟踪启动体的启动过程。先看下启动方法的实现:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

我们这里只关心协程体block的启动,所以关于协程的某些细节暂时这里不深究。从代码中我们看到协程启动的方法调用coroutine.start(),我们按照执行标准的协程StandaloneCoroutine来跟踪查看它的代码。

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)
    }

这个start方法中调用了一个start方法,看着有奇怪。其实方法内部调用的是CoroutineStart类型的invoke方法

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }

我们按照默认的启动类型走应该调用DEFAULT -> block.startCoroutineCancellable(receiver, completion)。block是协程体对象,这个方法就是启动协程体的。completion就是前面创建的StandaloneCoroutine对象,receiver也是StandaloneCoroutine。startCoroutineCancellable方法做了什么呢?

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

从代码的语意上看就是创建协程体并使用拦截器进行启动。但是协程体对象不是在调用launch方法的时候传进来了吗?为什么这里还要创建呢?其实协程体是按照状态机方式来工作的,协程体中每个可挂起函数的调用都对应状态机的一个状态,这个状态被记录在协程体对象中。为了避免一个协程体被多次启动后相互干扰状态,所以这里会每次都创建一个新的协程体对象再启动。

createCoroutineUnintercepted扩展方法的具体实现如下:

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

协程体继承关系:协程体-》SuspendLambda-》ContinuationImpl-》BaseContinuationImpl-》Continuation

因为协程体继承了BaseContinuationImpl,所以这里调用create(receiver, probeCompletion)方法创建协程。我们在前面反编译协程体的代码中也看到了create方法的实现:

         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

当然反编译的代码不太准确,实际就是创建协程体对象并返回。

createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)

创建完新的协程体后调用协程体的intercepted()方法,这里涉及到协程中线程切换的内容,这里先不深究它,简单可以认为他把协程体包装成在某个线程池中运行的协程。resumeCancellableWith()方法便是真正的启动了协程体。

三、协程体如何挂起,如何恢复

前面已经介绍过协程体就是SuspendLambda对象,它的内部按照状态机的方式运行。协程体的每次唤醒都只能运行当前状态的代码并更改到下个状态。

private final void testCoroutineScope() {
      Function2 block = (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               Log.d("testCoroutineScope", "testCoroutineScope start");
               this.label = 1;
               if (DelayKt.delay(2000L, this) == var2) {
                  return var2;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            Log.d("testCoroutineScope", "testCoroutineScope end");
            return Unit.INSTANCE;
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      });
      BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, block, 3, (Object)null);
   }

反编译的代码中可以看到label变量,这个变量在维护着状态机的状态。invokeSuspend方法调用一次,label的状态变被推进一步。说到这里大家就能够理解了挂起和唤醒的实现原理了,首先invokeSuspend被调用一次只执行了一个状态的代码,后面状态的代码都没有执行,这时进入挂起状态。当异步耗时操作执行结束后再次调用invokeSuspend方法,既是协程体被唤醒。

支持唤醒挂起的对象都继承了BaseContinuationImpl,在BaseContinuationImpl中我们可以找到调用invokeSuspend方法的地方。

// This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

从代码我们了解到invokeSuspend的返回值为COROUTINE_SUSPENDED时,resumeWith才算真正结束并通过调用completion.resumeWith(outcome)来唤醒父协程的执行。invokeSuspend返回值不是COROUTINE_SUSPENDED时,当前的协程体没有执行完并等待下次通过本协程体的resumeWith唤醒再次执行。

四、suspend 函数是什么

launch、async方法启动的协程体就是一个suspend函数,他的定义如下:

val block: suspend CoroutineScope.() -> Unit = {
            Log.d("testCoroutineScope", "testCoroutineScope start")
            delay(2000)
            Log.d("testCoroutineScope", "testCoroutineScope end")
        }

它经过编译后就是SuspendLambda对象。

那么普通的suspend是怎么样的呢?

首先我们先看下这段测试代码:

private fun testCoroutineScope2() {
        val block2: suspend CoroutineScope.() -> Unit = {
            testSuspendMethod()
        }
        lifecycleScope.launch(block = block2)
    }

    private suspend fun testSuspendMethod() {
        Log.d("testSuspendMethod", "testSuspendMethod start")
        delay(3000)
        Log.d("testSuspendMethod", "testSuspendMethod end")
    }

我们在协程中调用了挂起函数,并在挂起函数中调用了delay方法。这段代码反编译后的结果入下:

private final void testCoroutineScope2() {
      Function2 block2 = (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               CoroutineActivity var10000 = CoroutineActivity.this;
               this.label = 1;
               if (var10000.testSuspendMethod(this) == var2) {
                  return var2;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            return Unit.INSTANCE;
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      });
      BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, block2, 3, (Object)null);
   }

协程体的内容我们之前已经了解过了,但是调用挂起函数的地方有些不同。在调用挂起函数testSuspendMethod的时候多了一个this参数,实际声明testSuspendMethod的时候没有这个参数。我们再看下testSuspendMethod的反编译结果:

private final Object testSuspendMethod(Continuation var1) {
      Object $continuation;
      label20: {
         if (var1 instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)var1;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }

         $continuation = new ContinuationImpl(var1) {
            // $FF: synthetic field
            Object result;
            int label;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return CoroutineActivity.this.testSuspendMethod(this);
            }
         };
      }

      Object $result = ((<undefinedtype>)$continuation).result;
      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(((<undefinedtype>)$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         Log.d("testSuspendMethod", "testSuspendMethod start");
         ((<undefinedtype>)$continuation).label = 1;
         if (DelayKt.delay(3000L, (Continuation)$continuation) == var4) {
            return var4;
         }
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      Log.d("testSuspendMethod", "testSuspendMethod end");
      return Unit.INSTANCE;
   }

testSuspendMethod多出的参数是continuation类型的,在调用的地方会把SuspendLambda对象传递过来。testSuspendMethod也通过状态机的方式来管理挂起和唤醒,但是与协程体有所不同。协程体是SuspendLambda对象,协程体的状态由SuspendLambda对象保存。testSuspendMethod方法的状态通过ContinuationImpl对象来保存。

五、suspendCoroutineCancelable到底是什么

我们对下面这段代码进行反编译

//原始代码
private fun testCoroutineScope3() {
        val block: suspend CoroutineScope.() -> Unit = {
            Log.d("testCoroutineScope3", "testCoroutineScope3 start")

            val value = suspendCancellableCoroutine<Int> {
                Log.d("testCoroutineScope3", "suspendCancellableCoroutine start")
                Handler(Looper.getMainLooper()).postDelayed({ it.resume(202) }, 2000)
                Log.d("testCoroutineScope3", "suspendCancellableCoroutine end")
            }

            Log.d("testCoroutineScope3", "testCoroutineScope3 end value:$value")
        }
        lifecycleScope.launch(block = block)
    }
//编译后的代码,这里只把suspendCancellableCoroutine调用的地方截取出来
switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               Log.d("testCoroutineScope3", "testCoroutineScope3 start");
               $i$f$suspendCancellableCoroutine = false;
               this.L$0 = this;
               this.label = 1;
               int var6 = false;
               CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted(this), 1);
               cancellable$iv.initCancellability();
               CancellableContinuation it = (CancellableContinuation)cancellable$iv;
               int var9 = false;
               Log.d("testCoroutineScope3", "suspendCancellableCoroutine start");
               (new Handler(Looper.getMainLooper())).postDelayed((Runnable)(new CoroutineActivity$testCoroutineScope3$block$1$value$1$1(it)), 2000L);
               Log.d("testCoroutineScope3", "suspendCancellableCoroutine end");
               var10000 = cancellable$iv.getResult();
               if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
                  DebugProbesKt.probeCoroutineSuspended(this);
               }

               if (var10000 == var10) {
                  return var10;
               }
               break;

从代码可以看出suspendCancellableCoroutine的本质就是创建了一个CancellableContinuationImpl对象,这个对象维护的状态机结束后会唤醒当前的状态机。CancellableContinuationImpl的运行没有结束时,getResult返回COROUTINE_SUSPENDED并使当前状态机挂起。

简单理解suspendCancellableCoroutine就是为用户提供了自定义的协程挂起方式,用户通过他可以方便的把线程的异步调用转换成协程的挂起调用。有类似功能的还有suspendCoroutine方法。

  • 协程实现的核心机制就是状态机,协程中所有的挂起点都被分解成一个单个状态,状态机的每次执行都只能执行当前状态对应的代码并更改状态机到下一个状态,然后状态机等待再次启动,也就是所谓的协程挂起。当异步任务执行完成后会再次启动状态机,这时状态机便执行下一个状态的代码,按照这样的规律以此类推。
  • 协程可以嵌套调用,那么就会生成状态机嵌套的结构。状态 机创建时可以指定状态机执行完成后要执行的父状态机,这样保证了树状结构组织的状态机能够有序执行。
  • 协程中定义的状态机对象都实现了Continuation接口,在协程的创建时,父协程也是通过Continuation接口把自己传递给子协程的。在调用Continuation接口的resumeWith方法便启动协程执行,也就是启动了状态机。BaseContinuationImpl就是状态机的模板类,控制着状态机的执行流程,在状态机结束时还会调用到父协程的状态机对象。

我的公众号已经开通,公众号会同步发布。

请关注我的公众号


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK