【Kotlin】协程的挂起恢复源码解析

【Kotlin】协程的挂起恢复源码解析

本文介绍了CPS转换,后从反编译的源码层面介绍Kotlin协程挂起和恢复的原理

首先要了解的就是CPS转换。

CPS转换

在Kotlin协程中,挂起函数的执行是通过 Continuation Passing Style (CPS)转换 来实现的。CPS转换是一种将函数式编程中的函数调用转换为可传递的 Continuation 对象的过程。这里的转换是Kotlin编译器实现的,在跨平台属性上,也保证了流程的一致性。

CPS转换调用的过程如下:

  1. 当一个函数被调用时,它的参数和返回值会被封装在一个Continuation对象中。
  2. 函数的执行过程中,遇到挂起操作时,会将当前的Continuation对象传递给挂起函数。
  3. 挂起函数执行完毕后,会将结果封装在一个新的Continuation对象中,并将其传递给原始的Continuation对象。
  4. 原始的Continuation对象会继续执行,直到所有的挂起操作都完成。

假设我们有一个简单的suspend函数,它模拟了一个异步操作:

suspend fun fetchData(): String {
    delay(1000) // 模拟耗时操作
    return "Data fetched"
}

在CPS转换后,这个函数可能会被转换为类似以下的形式:

fun fetchData(continuation: Continuation<String>) {
    delay(1000, object : Continuation<Unit> {
        override val context: CoroutineContext = continuation.context

        override fun resumeWith(result: Result<Unit>) {
            continuation.resume("Data fetched")
        }
    })
}

在这个转换后的函数中,fetchData不再直接返回结果,而是通过continuation.resume方法将结果传递给调用者。简单来说,CPS其实就是函数通过回调传递结果的一种方式。

Kotlin协程通过将异步流程拆解为一系列 挂起点 ,对含有 suspend 关键字的函数进行了 CPS转换 ,即Continuation Passing Style转换,使其能够 接收Continuation对象 作为参数,并在异步操作完成后通过调用 Continuation 的恢复方法来继续执行协程。

在编译后的字节码中,协程的状态会被转换为状态机的形式,每个挂起点对应状态机的一个状态。当协程挂起时,它的执行状态会被保存在Continuation对象中,包括局部变量上下文和执行位置。

Continuation

Continuation (续体)是一个保存协程状态的对象,它记录了协程挂起的位置以及局部变量上下文,使得协程可以在任何时候从上次挂起的地方继续执行。Continuation是一个接口,它定义了 resumeWith 方法,用于恢复协程的执行。

interface Continuation<in T> {
   val context: CoroutineContext
   fun resumeWith(result: Result<T>)//result 为返回的结果
}
  1. 续体是一个较为抽象的概念,简单来说它包装了协程在挂起之后应该继续执行的代码;在编译的过程中,一个完整的协程被分割切块成一个又一个续体。
  2. 在suspend函数或者 await 函数的挂起结束以后,它会调用 continuation 参数的 resumeWith 函数,来恢复执行suspend函数或者await 函数后面的代码。

CPS转换 使得协程能够在不阻塞线程的情况下执行异步操作。当协程挂起时,线程可以被释放去执行其他任务,从而提高了系统的并发性能。此外,CPS转换使得协程的挂起和恢复操作对开发者来说是透明的,开发者可以像编写同步代码一样编写异步代码。

发生 CPS 变换的函数,返回值类型变成了 Any?,这是因为这个函数在发生变换后,除了要返回它本身的返回值,还要返回一个标记CoroutineSingletons.COROUTINE_SUSPENDED,为了适配各种可能性,CPS 转换后的函数返回值类型就只能是 Any?了。

协程的启动

下面跟随启动,挂起,恢复的流程,从源码层面看看协程的核心原理。

测试代码入口:

object CoroutineExample {
    private val TAG: String = "CoroutineExample"

    fun main(){
        // 启动协程,分析入口
        GlobalScope.launch(Dispatchers.Main) {
            request()
        }
    }
    private suspend fun request(): String {
        delay(2000)
        Log.e(TAG, "request complete")
        return "result from request"
    }
}

从 CoroutineScope.launch 开始:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy){
        LazyStandaloneCoroutine(newContext, block) 
    }else{
        StandaloneCoroutine(newContext, active = true)
    }
    coroutine.start(start, coroutine, block)
    return coroutine
}
  • 参数一context:协程上下文,并不是我们平时理解的Android中的上下文,它是一种key-value数据结构。可以传入Main用于主线程调度。
  • 参数二start:启动模式,此处我们没有传值则为默认值(DEFAULT),共有三种启动模式。
    • DEFAULT:默认模式,创建即启动协程,可随时取消;
    • ATOMIC:自动模式,创建即启动协程,启动前不可取消;
    • LAZY:延迟启动模式,只有当调用start方法时才能启动。
  • 参数三block:协程真正执行的代码块,即上面例子中launch{}闭包内的代码块。

SuspendLambda

CoroutineScope.launch中第三个参数类型为suspend CoroutineScope.() -> Unit函数,这是怎么来的呢?我们编写代码的时候并没有这个东西,其实它由编译器生成的,我们的 block代码块 经过编译器编译后会生成一个 继承Continuation类SuspendLambda 。一起看下反编译的java代码,为了关注主要逻辑方便理解,去掉了一些无关代码大概代码如下:

public final void main() {
      BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getMain(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch (this.label) {
               case 0:
                  ResultKt.throwOnFailure($result);
                  CoroutineExample var10000 = CoroutineExample.this;
                  this.label = 1;
                  if (var10000.request(this) == var2) {
                     return var2;
                  }
                  break;
               case 1:
                  ResultKt.throwOnFailure($result);
                  break;
               default:
                  throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            return Unit.INSTANCE;
         }
        
        ···
 }

从上面反编译的java代码中好像并不能很好的看出来协程中的block代码块具体编译长什么样子,但可以确定他是编译成了 Continuation类 ,因为我们可以看到实现的 invokeSuspend 方法实际是来自BaseContinuationImpl,而BaseContinuationImpl的父类就是Continuation。这个继承关系我们后面再说。既然从反编译的java代码中看的不明显,我们直接看上面例子的字节码文件,其中可以很明显的看到这样一段代码:

final class com/imile/pda/CoroutineExample$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2

这下恍然大悟,launch函数的第三个参数,即协程中的 block代码块 是一个编译后 继承了SuspendLambda并且实现了Function2的实例

SuspendLambda 本质上是一个 Continuation ,前面我们已经说过 Continuation 是一个有着恢复操作的接口,其 resume 方法可以恢复协程的执行。

SuspendLambda继承机构如下:

- Continuation: 续体,恢复协程的执行
    - BaseContinuationImpl: 实现 resumeWith(Result) 方法,控制状态机的执行,定义了 invokeSuspend 抽象方法
        - ContinuationImpl: 增加 intercepted 拦截器,实现线程调度等
            - SuspendLambda: 封装协程体代码块
                - 协程体代码块生成的子类: 实现 invokeSuspend 方法,其内实现状态机流转逻辑

每一层封装都对应添加了不同的功能,我们先忽略掉这些功能细节,着眼于我们的主线,继续跟进 launch 函数执行过程,由于第二个参数是默认值(DEFAULT),所以创建的是 StandaloneCoroutine , 最后启动协程:

 // 启动协程
coroutine.start(start, coroutine, block)

// 启动协程
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    start(block, receiver, this)
}

上面 coroutine.start 的调用涉及到运算符重载,实际上会调到 CoroutineStart.invoke() 方法:

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        ATOMIC -> block.startCoroutine(receiver, completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        LAZY -> Unit // will start lazily
    }

这里启动方式为默认的 DEFAULT ,所以接着往下看:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) = runSafely(completion) {
    createCoroutineUnintercepted(receiver, completion)
        .intercepted()
        .resumeCancellableWith(Result.success(Unit), onCancellation)
}

整理下调用链如下:

coroutine.start(start, coroutine, block)
-> CoroutineStart.start(block, receiver, this)
-> CoroutineStart.invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>)
->  block.startCoroutineCancellable(receiver, completion)
-> 
createCoroutineUnintercepted(receiver,completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)

最后走到 createCoroutineUnintercepted(receiver,completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation) ,这里创建了一个协程,并链式调用 intercepted、resumeCancellable 方法,利用协程上下文中的 ContinuationInterceptor 对协程的执行进行拦截,intercepted 实际上调用的是 ContinuationImpl 的 intercepted 方法:

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
  ...
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?:(context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
  ...
}

context[ContinuationInterceptor]?.interceptContinuation调用的是 CoroutineDispatcher 的 interceptContinuation 方法:

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    DispatchedContinuation(this, continuation)

内部创建了一个 DispatchedContinuation 可分发的协程实例,我们继续进到看resumeCancellableWith 方法:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
  ...
  
  public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
  // 判断是否是DispatchedContinuation 根据我们前面的代码追踪 这里是DispatchedContinuation
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
    // 判断是否需要线程调度 
   // 由于我们之前使用的是 `GlobalScope.launch(Main)` Android主线程调度器所以这里为true     
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
}
  
  ...
}

最终走到 dispatcher.dispatch(context, this) 而这里的 dispatcher 就是通过工厂方法创建的 HandlerDispatcher ,dispatch() 函数第二个参数this是一个runnable这里为 DispatchedTask

HandlerDispatcher

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
  ...
  
   //  最终执行这里的 dispatch方法 而handler则是android中的 MainHandler
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        if (!handler.post(block)) {
            cancelOnRejection(context, block)
        }
    }
  
  ...
}

这里借用 Android 的主线程消息队列来在主线程中执行 block Runnable而这个 Runnable 即为 DispatchedTask:

internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
  ...
 public final override fun run() {
            ...
            withContinuationContext(continuation, delegate.countOrElement) {
                 ...
                if (job != null && !job.isActive) {
                    val cause = job.getCancellationException()
                    cancelCompletedResult(state, cause)
                    // 异常情况下
                    continuation.resumeWithStackTrace(cause)
                } else {
                    if (exception != null) {
                       // 异常情况下
                       continuation.resumeWithException(exception)
                    } else {
                      // step1:正常情况下走到这一步
                       continuation.resume(getSuccessfulResult(state))
                    }
                }
            }
           ...
   }
}

//step2:这是Continuation的扩展函数,内部调用了resumeWith()
@InlineOnly public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))


//step3:最终会调用到BaseContinuationImpl的resumeWith()方法中
internal abstract class BaseContinuationImpl(...) {
    // 实现 Continuation 的 resumeWith,并且是 final 的,不可被重写
    public final override fun resumeWith(result: Result<Any?>) {
        ...
        val outcome = invokeSuspend(param)
        ...
    }
    // 由编译生成的协程相关类来实现,例如 CoroutineExample$main$1
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}

最终调用到 continuation.resumeWith() 而 resumeWith() 中会调用 invokeSuspend,即之前编译器生成的 SuspendLambda 中的 invokeSuspend 方法:

 @Nullable
     public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch (this.label) {
               case 0:
                  ResultKt.throwOnFailure($result);
                  CoroutineExample var10000 = CoroutineExample.this;
                  this.label = 1;
                  if (var10000.request(this) == var2) {
                     return var2;
                  }
                  break;
               case 1:
                  ResultKt.throwOnFailure($result);
                  break;
      }
 }

这段代码是一个状态机机制,每一个挂起点都是一种状态,协程恢复只是跳转到下一个状态,挂起点将执行过程分割成多个片段,利用状态机的机制保证各个片段按顺序执行。

可以看到 协程非阻塞的异步底层实现其实就是一种Callback回调 (这一点我们在介绍Continuation时有提到过),只不过有多个挂起点时就会有多个Callback回调,这里协程把多个Callback回调封装成了一个状态机。

以上就是协程的启动过程,下面我们再来看下协程中的重点挂起和恢复。

协程的挂起与恢复

协程的挂起和恢复有两个关键方法 : invokeSuspend() 和 resumeWith(Result)。我们以上一节中的例子,反编译后逆向剖析协程的挂起和恢复,先整体看下是怎样的一个过程。

suspend fun reqeust(): String {
    delay(2000)
    return "result from request"
}

反编译后的代码如下(为了方便理解,代码有删减和修改):

//1.函数返回值由String变成Object,入参也增加了Continuation参数
public final Object reqeust(@NotNull Continuation completion) {
   //2.通过completion创建一个ContinuationImpl,并且复写了invokeSuspend()
   Object continuation;
   if (completion instanceof <undefinedtype>){
     continuation =  <undefinedtype>completion
   }else{
     continuation = new ContinuationImpl(completion) {
       Object result;
       int label; //初始值为0
        
       @Nullable
       public final Object invokeSuspend(@NotNull Object $result) {
          this.result = $result;
          this.label |= Integer.MIN_VALUE;
          return request(this);//又调用了request()方法
       }
    };
  }

   Object $result = (continuation).result;
   Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
   //状态机  
   //3.方法被恢复的时候又会走到这里,第一次进入case 0分支,label的值从0变为1,第二次进入就会走case 1分支
   switch(continuation.label) {
       case 0:
          ResultKt.throwOnFailure($result);
          continuation.label = 1;
          //4.delay()方法被suspend修饰,传入一个continuation回调,返回一个object结果。这个结果要么是`COROUTINE_SUSPENDED`,否则就是真实结果。
          Object delay = DelayKt.delay(2000L, continuation)
          if (delay == var4) {
            //如果是 COROUTINE_SUSPENDED 则直接return,就不会往下执行了,request()被暂停了。
            // 如果不是COROUTINE_SUSPENDED,则说明不需要挂起,就会break跳出switch语句。正常返回继续往下执行后续外部代码。
             return var4;
          }
          break;
       case 1:
          ResultKt.throwOnFailure($result);
          break;
       default:
          throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
       }
   return "result from request";
}

ResultKt.throwOnFailure($result) 是 Kotlin 协程中的一个重要方法,主要用于处理协程的异常情况。它的主要作用是: (1)检查异常:它会检查传入的 $result 对象,如果这个对象是一个异常(即协程执行过程中抛出的异常),它会立即抛出这个异常。 (2)确保正常执行:如果 $result 不是异常,则继续正常执行后续代码。

挂起过程

函数返回值由 String 变成 Object,编译器自动增加了Continuation参数,相当于帮我们添加Callback。

根据 completion 创建了一个 ContinuationImpl(如果已经创建就直接用,避免重复创建),复写了 invokeSuspend() 方法,在这个方法里面它又调用了 request() 方法,这里又调用了一次自己(是不是很神奇),并且把 continuation 传递进去。

在 switch 语句中,label 的默认初始值为 0,第一次会进入 case 0 分支,delay() 是一个挂起函数,传入上面的 continuation 参数,会有一个 Object 类型的返回值。这个结果要么是COROUTINE_SUSPENDED,否则就是真实结果。

DelayKt.delay(2000, continuation)的返回结果如果是 COROUTINE_SUSPENDED , 则直接 return ,那么方法执行就被结束了,方法就被挂起了。

函数即便被 suspend 修饰了,但是也未必会挂起。需要里面的代码编译后有返回值为 COROUTINE_SUSPENDED 这样的标记位才可以。

协程的挂起实际是方法的挂起,本质是return。

恢复过程

因为 delay() 是 IO操作,在2000ms后就会通过传递给它的 continuation 回调回来。

回调到 ContinuationImpl 类的 resumeWith() 方法,会再次调用 invokeSuspend() 方法,进而再次调用 request() 方法。

即反编译代码中的这一段:

Object continuation;
   if (completion instanceof <undefinedtype>){
     continuation =  <undefinedtype>completion
   }else{
     continuation = new ContinuationImpl(completion) {
       Object result;
       int label; //初始值为0
        
       @Nullable
       public final Object invokeSuspend(@NotNull Object $result) {
          this.result = $result;
          this.label |= Integer.MIN_VALUE;
          return request(this);//又调用了request()方法
       }
    };
  }

程序会再次进入switch语句,由于第一次在 case 0 时把 label = 1 赋值为1,所以这次会进入 case 1 分支,检查无异常之后,再次 break ,并且返回了结果result from request。

并且 request() 的返回值作为 invokeSuspend() 的返回值返回。重新被执行的时候就代表着方法被恢复了。

看到大家一定会疑问, 步骤2中 invokeSuspend() 是如何被再次调用呢? 我们都知道 ContinuationImpl 的父类是 BaseContinuationImpl,实际上ContinuationImpl中调用的resumeWith()是来自父类。

BaseContinuationImpl

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    //这个实现是最终的,用于展开 resumeWith 递归。
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        // 1.调用 invokeSuspend()方法执行,执行协程的真正运算逻辑,拿到返回值
                        val outcome = invokeSuspend(param)
                        // 2.如果返回的还是COROUTINE_SUSPENDED则提前结束
                        if (outcome == COROUTINE_SUSPENDED) return 
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                if (completion is BaseContinuationImpl) {
                    //3.如果 completion 是 BaseContinuationImpl,内部还有suspend方法,则会进入循环递归,继续执行和恢复
                    current = completion
                    param = outcome
                } else {
                    //4.否则是最顶层的completion,则会调用resumeWith恢复上一层并且return
                    // 这里实际调用的是其父类 AbstractCoroutine 的 resumeWith 方法
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}

实际上任何一个挂起函数它在恢复的时候都会调到 BaseContinuationImpl 的 resumeWith() 方法里面。

一但 invokeSuspend() 方法被执行,那么 request() 又会再次被调用, invokeSuspend() 就会拿到 request() 的返回值,在 ContinuationImpl 里面根据 val outcome = invokeSuspend() 的返回值来判断我们的 request() 方法恢复了之后的操作。

如果 outcome 是 COROUTINE_SUSPENDED 常量(可能挂起函数中又返回了一个挂起函数),说明你即使被恢复了,执行了一下, if (outcome == COROUTINE_SUSPENDED) return但是立马又被挂起了,所以又 return 了。

如果本次恢复 outcome 是一个正常的结果,就会走到 completion.resumeWith(outcome),当前被挂起的方法已经被执行完了,实际调用的是其父类 AbstractCoroutine 的 resumeWith 方法,那么协程就恢复了。

我们知道 request() 肯定是会被协程调用的(从上面反编译代码知道会传递一个Continuation completion参数),request() 方法恢复完了就会让协程completion.resumeWith()去恢复,所以说协程的恢复是方法的恢复,本质其实是 callback(resumeWith) 回调。

一张图总结一下:

协程的核心是挂起——恢复,挂起——恢复的本质是return & callback回调

协程挂起

我们说过协程启动后会调用到上面这个 resumeWith() 方法,接着调用其 invokeSuspend() 方法:

当 invokeSuspend() 返回 COROUTINE_SUSPENDED 后,就直接 return 终止执行了,此时协程被挂起。 当 invokeSuspend() 返回非 COROUTINE_SUSPENDED 后,说明协程体执行完毕了,对于 launch 启动的协程体,传入的 completion 是 AbstractCoroutine 子类对象,最终会调用其 AbstractCoroutine.resumeWith() 方法做一些状态改变之类的收尾逻辑。至此协程便执行完毕了。

协程恢复

这里我们接着看上面第一条:协程执行到挂起函数被挂起后,当这个挂起函数执行完毕后是怎么恢复协程的,以下面挂起函数为例:

private suspend fun login() = withContext(Dispatchers.IO) {
    Thread.sleep(2000)
    return@withContext true
}

通过反编译可以看到上面挂起函数中的函数体也被编译成了 SuspendLambda 的子类,创建其实例时也需要传入 Continuation 续体参数(调用该挂起函数的协程所在续体)。贴下 withContext 的源码:

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // 创建 new context
        val oldContext = uCont.context
        val newContext = oldContext + context
        // 检查新上下文是否作废
        newContext.ensureActive()
        // 新上下文与旧上下文相同
        if (newContext === oldContext) {
            val coroutine = ScopeCoroutine(newContext, uCont)
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        // 新调度程序与旧调度程序相同
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // 上下文有变化,所以这个线程需要更新
            withCoroutineContext(newContext, null) {
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // 使用新的调度程序
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}

首先调用了 suspendCoroutineUninterceptedOrReturn 方法,看注释知道可以通过它来获取到当前的续体对象 uCont, 接着有几条分支调用,但最终都是会通过续体对象来创建挂起函数体对应的 SuspendLambda 对象,并执行其 invokeSuspend() 方法,在其执行完毕后调用 uCont.resume() 来恢复协程,具体逻辑大家感兴趣可以自己跟代码,与前面大同小异。

至于其他的顶层挂起函数如 await(), suspendCoroutine(), suspendCancellableCoroutine() 等,其内部也是通过 suspendCoroutineUninterceptedOrReturn() 来获取到当前的续体对象,以便在挂起函数体执行完毕后,能通过这个续体对象恢复协程执行。

Desktop平台举例分析挂起恢复(Kotlin 2.1.0)

Kotlin测试代码如下:

class MySimpleTest {

    suspend fun stephenTest(): String {
        delay(500L)
        return "result From stephenTest"
    }
}

fun callFromOutside() {
    CoroutineScope(Dispatchers.IO).launch {
        val result = MySimpleTest().stephenTest()
        println(result)
    }
}

在callFromOutside函数中,我们创建了一个协程作用域,并在其中启动了一个协程。该协程将调用stephenTest函数。而stephenTest函数是一个挂起函数,它会暂停该协程的执行,直到delay函数返回。

将这个片段反编译成java代码,删掉导包和元数据注解信息等,分析过程见注释流程号:

public final class MySimpleTest {
   public static final int $stable;

   @Nullable
   // (8)stephenTest函数本来是无参的,现在有一个Continuation类型的参数
   // 这个就是外部调用代码块封装成的实例,stephenTest方法执行完毕,需要继续往下执行的代码都在这个对象里面
   public final Object stephenTest(@NotNull Continuation $completion) {
      Continuation $continuation;
      // (9)label20: 是一个Java中的标签(label),主要用于控制流程跳转。在这里它被用来实现协程的挂起和恢复机制
      label20: {
         if ($completion instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)$completion;
            // (10)用于检查当前协程是否处于挂起状态。Integer.MIN_VALUE 是一个特殊的标志位,用于标记协程是否被挂起。
            // 它的值是10000000 00000000 00000000 00000000
            // label首次传进来是1,即00000000 00000000 00000000 00000001,和Integer.MIN_VALUE按位与的结果为0,表示需要挂起,会走到11步,基于外部传入的 completion 对象创建一个新的ContinuationImpl对象
            //=======================分割线====================
            // (16)这里的label在15步被赋值成了10000000 00000000 00000000 00000001,按位与的结果是Integer.MIN_VALUE,即条件检查结果为真(即 != 0)
            // 10000000 00000000 00000000 00000001减去Integer.MIN_VALUE,结果是1,即00000000 00000000 00000000 00000001
            // 并将label20标签跳出循环,继续往下执行stephenTest的switch状态判断
            if (($continuation.label & Integer.MIN_VALUE) != 0) {
               $continuation.label -= Integer.MIN_VALUE;
               break label20;
            }
         }

         // (11)开始创建关于stephenTest代码块的ContinuationImpl对象,用于传递给下一个suspend函数
         $continuation = new ContinuationImpl($completion) {
            // $FF: synthetic field
            Object result;
            // 初始值为0
            int label;

            @Nullable
            // (13)delay执行完,调用resumeWith,触发这个invokeSuspend方法
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               //(14)将label = 1和Integer.MIN_VALUE按位或,
               // 运算的结果是 10000000 00000000 00000000 00000001(即 -2147483647)
               this.label |= Integer.MIN_VALUE;
               //(15)重入调用stephenTest函数,这次是传入 $continuation 自己作为参数。
               return MySimpleTest.this.stephenTest((Continuation)this);
            }
         };
      }


      Object $result = $continuation.result;
      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      // (17) 本轮调用中,label值为1,检查无异常后,就会返回这个字符串
      // "result From stephenTest"
      switch ($continuation.label) {
         case 0:
            ResultKt.throwOnFailure($result);
            $continuation.label = 1;
            // (12)调用delay函数,之后就和外部调用的(3)-(7)步流程一样.
            // 传入ContinuationImpl对象,delay函数内部会判断是否需要挂起,如果需要挂起,就return掉本轮stephenTest方法的调用
            // 进入了delay内部执行,等500ms过后,调用外部传进来的ContinuationImpl对象的 resumeWith 函数回调
            // 而resumeWith方法,必然会调用到这个ContinuationImpl 对象自己的invokeSuspend方法,就跳转到第13步了
            if (DelayKt.delay(500L, $continuation) == var4) {
               return var4;
            }
            break;
         case 1:
            ResultKt.throwOnFailure($result);
            break;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      return "result From stephenTest";
   }
}

// CoroutineTestKt.java
public final class CoroutineTestKt {

  
   public static final void callFromOutside() {
      // (1)分析入口,从最外部的调用开始
      BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)Dispatchers.getIO()), (CoroutineContext)null, (CoroutineStart)null, new Function2((Continuation)null) {

        // (2)函数代码块里的任务,被封装在了继承自Continuation的一个匿名内部类对象中
        // launch开始后,进入就会调用其invoke方法,并首次执行invokeSuspend方法,这时候label为0
         int label;
        // (18) 17步返回后,标志着 stephenTest 方法中 $continuation实例的invokeSuspend方法调用完毕
        // 将调用completion的invokeSuspend方法
         // (19)这时候外部的这个label值也已经为1了,就是继续往下执行了
         public final Object invokeSuspend(Object $result) {
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            Object var10000;
            // (3)通过label来判断当前是到了哪一个状态
            switch (this.label) {
               case 0:
                  // (4)首先检查异常
                  ResultKt.throwOnFailure($result);
                  // (5)创建一个MySimpleTest对象,并调用其stephenTest方法
                  var10000 = new MySimpleTest();
                  // (6)将这个匿名内部类自己传进去,作为参数
                  Continuation var10001 = (Continuation)this;
                  // 将label状态设置为1,等下次再次调用invokeSuspend就会走switch的1的分支
                  this.label = 1;
                  var10000 = (MySimpleTest)var10000.stephenTest(var10001);
                   // (7) 如果 stephenTest 这个方法的返回值是COROUTINE_SUSPENDED,则表示该函数已暂停,我们也返回COROUTINE_SUSPENDED给调用者
                  // 通知这个函数是挂起函数,暂时不往下执行了
                  if (var10000 == var3) {
                     return var3;
                  }
                  // 转到MySimpleTest这个类分析 ->(8)
                  break;
               case 1:
                  ResultKt.throwOnFailure($result);
                  var10000 = (MySimpleTest)$result;
                  break;
               default:
                  throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            // (20)挂起和恢复流程执行完毕,打印结果
            String result = (String)var10000;
            System.out.println(result);
            return Unit.INSTANCE;
         }

         public final Continuation create(Object value, Continuation $completion) {
            return (Continuation)(new <anonymous constructor>($completion));
         }

         public final Object invoke(CoroutineScope p1, Continuation p2) {
            return ((<undefinedtype>)this.create(p1, p2)).invokeSuspend(Unit.INSTANCE);
         }

         // $FF: synthetic method
         // $FF: bridge method
         public Object invoke(Object p1, Object p2) {
            return this.invoke((CoroutineScope)p1, (Continuation)p2);
         }
      }, 3, (Object)null);
   }
}

以上的分析流程就是协程的挂起恢复过程。

自动的线程切换

引例

在Android上使用协程,从本地读取一个字符串,或其他耗时逻辑,可能会写下这样的代码:

// MainViewModel.kt
suspend fun getLocalString() = withContext(Dispatchers.IO) {
        // 模拟IO操作
        Thread.sleep(2000) 
        "result from local"
    } 

// MainActivity.kt
class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        // 开启协程
        lifecycleScope.launch {
            val result = getLocalString()
            tv_result.text = result
        }
    }
}

getLocalString() 方法,我们设置了上下文为IO,很明显直觉上会在 IO 线程中执行。

MainActivity 中,我们通过 lifecycleScope.launch 开启了一个协程,协程中调用 getLocalString() 方法,最初为主线程环境,是怎么从主线程切换到IO线程来运行这个方法的呢?

ContinuationInterceptor

Kotlin协程实现自动线程切换的核心在于其调度器(Dispatcher)机制,而调度器 Dispatcher 就是 ContinuationInterceptor 的实现。

ContinuationInterceptor 接口(简化):

public interface ContinuationInterceptor : CoroutineContext.Element {
    // 拦截 Continuation 的恢复
    fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
}

AbstractCoroutine (例如 StandaloneCoroutine) 的 resumeWith 方法:

当协程需要恢复时,通常会调用 ContinuationresumeWith 方法。在协程的底层实现中,例如 AbstractCoroutine (所有 Job 的子类,如 StandaloneCoroutine 继承的基类),其 resumeWith 方法会检查 CoroutineContext 中是否存在 ContinuationInterceptor

// AbstractCoroutine.kt (简化)
override fun resumeWith(result: Result<T>) {
    val context = this.context
    val dispatcher = context[ContinuationInterceptor] as? ContinuationInterceptor

    if (dispatcher == null) {
        // 没有调度器,直接在当前线程执行
        dispatchResume(result)
    } else {
        // 有调度器,通过调度器来分发恢复操作
        dispatcher.dispatch(this, result) // 最终会调用 dispatchResume
    }
}

Dispatcherdispatch 方法

Dispatcherdispatch 方法是实际执行线程切换的地方。不同的调度器有不同的实现。

Dispatchers.Default (例如 DefaultScheduler.kt):

Dispatchers.Default 通常使用一个共享的线程池来执行协程。

// DefaultScheduler.kt (简化)
internal object DefaultScheduler : CoroutineDispatcher(), Executor {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // 将 block (即恢复协程的Runnable) 提交到默认的线程池
        DefaultExecutor.enqueue(block) // DefaultExecutor 是一个线程池
    }
    // ... 其他方法
}

dispatch 被调用时,它会将协程的恢复逻辑封装在一个 Runnable 中,然后提交给调度器底层的 线程池 。这个 Runnable 最终会在线程池中的某个线程上执行,从而实现了线程的切换。

  • Dispatchers.IO 通常会使用一个独立的、容量更大的线程池,用于处理 IO 密集型任务。其 dispatch 逻辑与 Default 类似,只是提交给不同的线程池。
  • Dispatchers.Main 在 Android 上通常会与主线程的 Looper 绑定。
// AndroidMainDispatcherFactory.kt (简化)
internal class AndroidMainDispatcherFactory : MainDispatcherFactory {
    override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
        // ...
        return HandlerContext(Looper.getMainLooper(), "Main") // 包装了 Looper
    }
}

// HandlerDispatcher.kt (简化)
class HandlerContext(...) : MainCoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // 将 block 提交到 Looper 关联的 Handler
        handler.post(block)
    }
    // ...
}

上面的代码可以看出, Dispatchers.Main 会将协程的恢复操作通过 Android Handlerpost 方法发送到主线程的消息队列,从而确保协程在主线程上恢复执行。

suspendCoroutine / suspendCancellableCoroutine

除了编译器自动生成的挂起点,我们也可以手动创建挂起点,这通常通过 suspendCoroutinesuspendCancellableCoroutine 函数实现。

suspend fun manualSuspendExample(): String = suspendCancellableCoroutine { continuation ->
    // 可以在这里执行一些异步操作
    Thread {
        Thread.sleep(1000)
        continuation.resume("Resumed from another thread") // 在另一个线程调用 resume
    }.start()
}

这里 continuation.resume(...) 的调用是关键。当这个 resume 被调用时,它会触发之前提到的 ContinuationInterceptor 机制,如果 CoroutineContext 中有调度器,就会通过调度器进行线程切换。

总结挂起和线程切换流程

  1. 协程挂起: 当协程遇到 suspend 函数(如 delay,或自定义的挂起函数),如果该函数需要等待某个异步操作完成,它会返回 COROUTINE_SUSPENDED,并将当前的执行上下文(Continuation)保存起来。
  2. 异步操作完成: 当异步操作完成时(例如网络请求返回数据,或 delay 时间到),会调用之前保存的 Continuation 对象的 resumeWith 方法。
  3. 调度器介入: resumeWith 方法会检查协程的 CoroutineContext 中是否存在 ContinuationInterceptor (即 Dispatcher)。
  4. 分发恢复: 如果存在调度器,resumeWith 会调用调度器的 dispatch 方法。
  5. 线程切换: 调度器的 dispatch 方法会将协程的恢复逻辑(一个 Runnable)提交到其管理的线程(如线程池中的线程,或 Android 主线程)。
  6. 协程恢复: Runnable 在目标线程上执行,调用实际的恢复逻辑,协程从挂起的地方继续执行。

通过这种 CPS转换 + 回调resume + 调度器拦截 的机制,Kotlin 协程得以在不阻塞线程的情况下,根据需要自动在不同的线程之间切换执行,从而实现高效的并发编程。