kotlin5協程2-深入協程


參考

https://www.bennyhuo.com/2019/05/07/coroutine-suspend/

深入協程掛起

從下邊的函數說起;

public suspend inline fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        /*
         * For non-atomic cancellation we setup parent-child relationship immediately
         * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
         * properly supports cancellation.
         */
        cancellable.initCancellability()
        block(cancellable)
        cancellable.getResult()
    }

 

上邊函數是delay,join等suspend函數內部都會調用的。

suspendCoroutineUninterceptedOrReturn 這個方法調用的源碼是看不到的,因為它根本沒有源碼:P 它的邏輯就是幫大家拿到 Continuation 實例,而此Continuation 實例就是編譯器自動生成的包裝了我們的lambda的SuspendLambda對象,真的就只有這樣。

不過這樣說起來還是很抽象,因為有一處非常的可疑:suspendCoroutineUninterceptedOrReturn 的返回值類型是 T,而傳入的 lambda 的返回值類型是 Any?, 也就是我們看到的 cancellable.getResult() 的類型是 Any?,而這個結果值就是掛起的關鍵所在。

 

簡單來說就是,對於 suspend 函數,不是一定要掛起的,可以在需要的時候掛起,也就是要等待的協程還沒有執行完的時候,等待協程執行完再繼續執行;而如果在開始 join 或者 await 或者其他 suspend 函數,如果目標協程已經完成,那么就沒必要等了,直接拿着結果走人即可。那么這個神奇的邏輯就在於 cancellable.getResult() 究竟返回什么了,且看:

internal fun getResult(): Any? {
    ...
    if (trySuspend()) return COROUTINE_SUSPENDED // ① 觸發掛起邏輯
    ...
    if (state is CompletedExceptionally)  // ② 異常立即拋出
        throw recoverStackTrace(state.cause, this) 
    return getSuccessfulResult(state) // ③ 正常結果立即返回
}

 

這段代碼 ① 處就是掛起邏輯了,表示這時候目標協程還沒有執行完,需要等待結果,②③是協程已經執行完可以直接拿到異常和正常結果的兩種情況。②③好理解,關鍵是 ①,它要掛起,這返回的是個什么東西?

public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED

internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }

 

這是 1.3 的實現,1.3 以前的實現更有趣,就是一個白板 Any。其實是什么不重要,關鍵是這個東西是一個單例,任何時候協程見到它就知道自己該掛起了。

 

delay

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

 

cont.context.delay.scheduleResumeAfterDelay 這個操作,你可以類比 JavaScript 的 setTimeout,Android 的 handler.postDelay,本質上就是設置了一個延時回調,時間一到就調用 cont 的 resume 系列方法讓協程繼續執行。

深入理解協程的狀態轉移

其實kotlin的協程就是一個狀態機回調。

SuspendFunctions.kt文件:

suspend fun returnSuspended() = suspendCoroutineUninterceptedOrReturn<String>{
    continuation ->
    thread {
        Thread.sleep(1000)
        continuation.resume("Return suspended.")
    }
    COROUTINE_SUSPENDED
}

suspend fun returnImmediately() = suspendCoroutineUninterceptedOrReturn<String>{
    log(1)
    "Return immediately."
}

 

kotlin調用

suspend fun main() {
    log(1)
    log(returnSuspended())
    log(2)
    delay(1000)
    log(3)
    log(returnImmediately())
    log(4)
}

 

結果:

08:09:37:090 [main] 1

08:09:38:096 [Thread-0] Return suspended.

08:09:38:096 [Thread-0] 2

08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 3

08:09:39:141 [kotlinx.coroutines.DefaultExecutor] Return immediately.

08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 4

 

其實kotlin就是把協程體中每次遇到掛起函數時,作為一個分界點(也就是下邊的label),那么每次在執行resumeWith時都會根據當前label的值來重新繼續執行之前掛起位置后的代碼。

其實看java代碼更好理解,這里把kotlin轉換成java,並做了一些修改,便於理解:

public class ContinuationImpl implements Continuation<Object> {

    private int label = 0;
    private final Continuation<Unit> completion;

    public ContinuationImpl(Continuation<Unit> completion) {
        this.completion = completion;
    }

    @Override
    public CoroutineContext getContext() {
        return EmptyCoroutineContext.INSTANCE;
    }

    @Override
    public void resumeWith(@NotNull Object o) {
        try {
            Object result = o;
            switch (label) {
                case 0: {
                    LogKt.log(1);
                    result = SuspendFunctionsKt.returnSuspended( this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 1: {
                    LogKt.log(result);
                    LogKt.log(2);
                    result = DelayKt.delay(1000, this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 2: {
                    LogKt.log(3);
                    result = SuspendFunctionsKt.returnImmediately( this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 3:{
                    LogKt.log(result);
                    LogKt.log(4);
                }
            }
            completion.resumeWith(Unit.INSTANCE);
        } catch (Exception e) {
            completion.resumeWith(e);
        }
    }

    private boolean isSuspended(Object result) {
        return result == IntrinsicsKt.getCOROUTINE_SUSPENDED();
    }
}

 

我們定義了一個 Java 類 ContinuationImpl,它就是一個 Continuation 的實現。

實際上如果你願意,你還真得可以在 Kotlin 的標准庫當中找到一個名叫 ContinuationImpl 的類,只不過,它的 resumeWith 最終調用到了 invokeSuspend,而這個 invokeSuspend 實際上就是我們的協程體,通常也就是一個 Lambda 表達式 —— 我們通過 launch啟動協程,傳入的那個 Lambda 表達式,實際上會被編譯成一個 SuspendLambda 的子類,而它又是 ContinuationImpl 的子類。

有了這個類我們還需要准備一個 completion 用來接收結果,這個類仿照標准庫的 RunSuspend 類實現,如果你有閱讀前面的文章,那么你應該知道 suspend main 的實現就是基於這個類:

public class RunSuspend implements Continuation<Unit> {

    private Object result;

    @Override
    public CoroutineContext getContext() {
        return EmptyCoroutineContext.INSTANCE;
    }

    @Override
    public void resumeWith(@NotNull Object result) {
        synchronized (this){
            this.result = result;
            notifyAll(); // 協程已經結束,通知下面的 wait() 方法停止阻塞
        }
    }

    public void await() throws Throwable {
        synchronized (this){
            while (true){
                Object result = this.result;
                if(result == null) wait(); // 調用了 Object.wait(),阻塞當前線程,在 notify 或者 notifyAll 調用時返回
                else if(result instanceof Throwable){
                    throw (Throwable) result;
                } else return;
            }
        }
    }
}

 

java的調用:

public static void main(String... args) throws Throwable {
    RunSuspend runSuspend = new RunSuspend();
    ContinuationImpl table = new ContinuationImpl(runSuspend);
    table.resumeWith(Unit.INSTANCE);
    runSuspend.await();
}

 

我們看到,作為 completion 傳入的 RunSuspend 實例的 resumeWith 實際上是在 ContinuationImpl 的 resumeWtih 的最后才會被調用,因此它的 await() 一旦進入阻塞態,直到 ContinuationImpl 的整體狀態流轉完畢才會停止阻塞,此時進程也就運行完畢正常退出了。

我們看到,這段普通的 Java 代碼與前面的 Kotlin 協程調用完全一樣。那么我這段 Java 代碼的編寫根據是什么呢?就是 Kotlin 協程編譯之后產生的字節碼。當然,字節碼是比較抽象的,我這樣寫出來就是為了讓大家更容易的理解協程是如何執行的,看到這里,相信大家對於協程的本質有了進一步的認識:

l 協程的掛起函數本質上就是一個回調,回調類型就是 Continuation

l 協程體的執行就是一個狀態機,每一次遇到掛起函數,都是一次狀態轉移,就像我們前面例子中的 label 不斷的自增來實現狀態流轉一樣

如果能夠把這兩點認識清楚,那么相信你在學習協程其他概念的時候就都將不再是問題了。如果想要進行線程調度,就按照我們講到的調度器的做法,在 resumeWith 處執行線程切換就好了,其實非常容易理解的。官方的協程框架本質上就是在做這么幾件事兒,如果你去看源碼,可能一時雲里霧里,主要是因為框架除了實現核心邏輯外還需要考慮跨平台實現,還需要優化性能,但不管怎么樣,這源碼橫豎看起來就是五個字:狀態機回調。

協程的創建與啟動

https://www.jianshu.com/p/2979732fb6fb

從最常用的launch來說明協程的創建和啟動。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

 

接着會調用到AbstractCoroutine.start

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    initParentJob()
    start(block, receiver, this)
}

 

然后再調用start方法,此時發現沒法跟下去了,因為沒有要找的方法了,解決方法就是用debug模式斷點跟蹤,最后發現原來進到CoroutineStart.invoke了

@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        ATOMIC -> block.startCoroutine(receiver, completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        LAZY -> Unit // will start lazily
    }

 

原來因為CoroutineStart自定義了“()”操作符,

另外注意completion是上邊的StandaloneCoroutine。

接着由於是CoroutineStart.DEFAULT,所以調用到Cancellable.kt中的startCoroutineCancellable

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
    }

 

這其中的三個方法是協程的創建、啟動 以及 線程調度的關鍵。

createCoroutineUnintercepted

先說createCoroutineUnintercepted,這個方法和上邊掛起函數的suspendCoroutineUninterceptedOrReturn 類似應該都是委托給編譯器來處理一部分邏輯,所以是看不到里邊是怎么實現的,不過可以通過注釋來知道它是干嘛的:

/**
 * Creates unintercepted coroutine without receiver and with result type [T].
 * This function creates a new, fresh instance of suspendable computation every time it is invoked.
 *
 * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
 * The [completion] continuation is invoked when coroutine completes with result or exception.
 ...
 */
 public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> { ... }

 

這個方法會在每次調用時創建一個可掛起的計算邏輯對象(其實可以認為時SuspendLambda的實例),為了啟動這個協程,會去調用返回對象(SuspendLambda的實例)的resume(Unit),而completion(可以認為是StandaloneCoroutine)會在協程執行完或出異常時執行(調用其resume)。

resumeCancellableWith

接着intercepted()方法就是進行線程調度的,這個放到下邊說,假設這里沒有線程調度,那么直接執行最后的方法resumeCancellableWith:

@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result)
    else -> resumeWith(result)
}

 

因為沒有線程調度所以執行else,

那么就會調用到Continuation.resumeWith,以SuspendLambda為例:

public final override fun resumeWith(result: Result<Any?>) {
    // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
    var current = this
    var param = result
    while (true) {
        // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
        // can precisely track what part of suspended callstack was already resumed
        probeCoroutineResumed(current)
        with(current) {
            val completion = completion!! // fail fast when trying to resume continuation without completion
            val outcome: Result<Any?> =
                try {
                    val outcome = invokeSuspend(param)
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {
                    Result.failure(exception)
                }
            releaseIntercepted() // this state machine instance is terminating
            if (completion is BaseContinuationImpl) {
                // unrolling recursion via loop
                current = completion
                param = outcome
            } else {
                // top-level completion reached -- invoke and return
                completion.resumeWith(outcome)
                return
            }
        }
    }
}

protected abstract fun invokeSuspend(result: Result<Any?>): Any?

 

可以看到會調用到invokeSuspend,而這個方法就是編譯器把我們的lambda生成的代碼,生成的代碼就是狀態機,在最后用一個例子說明。

協程的線程調度

接着上邊的startCoroutineCancellable

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
    }

 

intercepted()

如果有線程調度時,

@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

 

createCoroutineUnintercepted方法返回的是SuspendLambda的實例,SuspendLambda實現了ContinuationImpl,

接着看ContinuationImpl.intercepted():

public fun intercepted(): Continuation<Any?> =
    intercepted
        ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
            .also { intercepted = it }

 

標紅的就是判斷context中是否有調度器,如果有就執行其interceptContinuation(this)方法並返回其包裝結果,注意把SuspendLambda傳遞進去了;沒有就返回SuspendLambda。

接着就會調用到CoroutineDispatcher.interceptContinuation()

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    DispatchedContinuation(this, continuation)

 

這里邊創建了一個DispatchedContinuation對象,並把調度器自身和SuspendLambda傳遞進去包裝了一下。

看這個類的名字就知道他也是個Continuation,那么后邊肯定會調用它的resumeWith來進行線程調度。

resumeCancellableWith

接着調用最后的方法resumeCancellableWith

@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result)
    else -> resumeWith(result)
}

 

此時的this就是DispatchedContinuation了,所以會執行第一個判斷。

DispatchedContinuation.resumeCancellableWith:

@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(result: Result<T>) {
    val state = result.toState()
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_CANCELLABLE
        dispatcher.dispatch(context, this)
    } else {
// 如果不需要調度,直接在當前線程執行協程運算
        executeUnconfined(state, MODE_CANCELLABLE) {
            if (!resumeCancelled()) {
                resumeUndispatchedWith(result)
            }
        }
    }
}

 

此方法會先判斷是否需要進行調度,如果需要,就把this傳遞進去進行調度,dispatch傳遞的this是Runnable,而DispatchedContinuation是實現了Runnable的。

接下來就不再跟了,猜想一下就知道,肯定是調度器空閑時會把傳遞進去的拿出來執行,也就會調用回SuspendLambda.resume。

協程的掛起和恢復例子

fun main() {
    runBlocking {
        log("1")
        delay(100)
        log("2")
        delay(100)
        log("3")
    }
    log("4")
}

 

生成的java代碼:

public static final void main() {
   BuildersKt.runBlocking$default((CoroutineContext)null, (Function2)(new Function2((Continuation)null) {
      private CoroutineScope p$;
      Object L$0;
      int label;

      @Nullable
      public final Object invokeSuspend(@NotNull Object $result) {
         label17: {
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            CoroutineScope $this$runBlocking;
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               $this$runBlocking = this.p$;
               HahaKt.log("1");
               this.L$0 = $this$runBlocking;
               this.label = 1;
               if (DelayKt.delay(100L, this) == var3) {
                  return var3;
               }
               break;
            case 1:
               $this$runBlocking = (CoroutineScope)this.L$0;
               ResultKt.throwOnFailure($result);
               break;
            case 2:
               $this$runBlocking = (CoroutineScope)this.L$0;
               ResultKt.throwOnFailure($result);
               break label17;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            HahaKt.log("2");
            this.L$0 = $this$runBlocking;
            this.label = 2;
            if (DelayKt.delay(100L, this) == var3) {
               return var3;
            }
         }

         HahaKt.log("3");
         return Unit.INSTANCE;
      }

      @NotNull
      public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
         Intrinsics.checkNotNullParameter(completion, "completion");
         Function2 var3 = new <anonymous constructor>(completion);
         var3.p$ = (CoroutineScope)value;
         return var3;
      }

      public final Object invoke(Object var1, Object var2) {
         return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
      }
   }), 1, (Object)null);
   log("4");
}

 

由上邊協程創建與啟動可知,我們在協程中的代碼會被編譯器生成一個SuspendLambda對象,最后會調用到SuspendLambda.resumeWith,而resumeWith會調用到invokeSuspend。其中invokeSuspend就是我們的協程中的代碼,只不過按照遇到的掛起函數把代碼分成了多段。

上邊有行代碼是掛起的判斷依據:

Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();

if (DelayKt.delay(100L, this) == var3) {
  return var3;
}

 

delay方法的返回值如果是var3 ,那么就表示delay還沒有完成,就直接return了。

但它return后就不執行后邊的代碼了,那怎么辦?

其實在調用delay時把當前的對象傳進去了,而接收的參數類型是Continuation,那么就可以推斷,在delay執行完成后會調用這個Continuation.resumeWith來重新進入invokeSuspend,為了能跳到下個case代碼段,label在上次delay前就加一了,所以再次進入就直接往后執行了,接着如果在遇到掛起函數,那么再進行類似的判斷即可。

總結

通過一步步的分析,慢慢發現協程其實有三層包裝。

l 常用的launch和async返回的Job、Deferred,里面封裝了協程狀態,提供了取消協程接口,而它們的實例都是繼承自AbstractCoroutine,它是協程的第一層包裝。

l 第二層包裝是編譯器生成的SuspendLambda的子類,封裝了協程的真正運算邏輯,繼承自BaseContinuationImpl,其中completion屬性就是協程的第一層包裝。

l 第三層包裝是前面分析協程的線程調度時提到的DispatchedContinuation,封裝了線程調度邏輯,包含了協程的第二層包裝。

三層包裝都實現了Continuation接口,通過代理模式將協程的各層包裝組合在一起,每層負責不同的功能。

wps2

suspendCoroutineUninterceptedOrReturn

https://www.jianshu.com/p/2857993af646

在異步編程中,回調是非常常見的寫法,那么如何將回調 轉換為 協程中的掛起函數呢?

可以通過兩個掛起函數suspendCoroutine{}或suspendCancellableCoroutine{}。

下面看如何將 OkHttp 的網絡請求轉換為掛起函數:

suspend fun <T> Call<T>.await(): T = suspendCoroutine { cont ->
    enqueue(object : Callback<T> {
        override fun onResponse(call: Call<T>, response: Response<T>) {
            if (response.isSuccessful) {
                cont.resume(response.body()!!)
            } else {
                cont.resumeWithException(ErrorResponse(response))
            }
        }
        override fun onFailure(call: Call<T>, t: Throwable) {
            cont.resumeWithException(t)
        }
    })
}

 

上面的await()的擴展函數調用時,首先會掛起當前協程,然后執行enqueue將網絡請求放入隊列中,當請求成功時,手動調用cont.resume(response.body()!!)來恢復之前的協程。

再來看下suspendCoroutine{}和suspendCancellableCoroutine{}的定義:

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
    suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }

public suspend inline fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        // 和 suspendCoroutine 的區別就在這里,如果協程已經被取消或者已完成,就會拋出 CancellationException 異常
        cancellable.initCancellability()
        block(cancellable)
        cancellable.getResult()
    }

 

suspendCoroutineUninterceptedOrReturn 這個方法調用的源碼是看不到的,因為它根本沒有源碼,它的邏輯就是幫大家拿到 Continuation 實例,而此Continuation 實例就是編譯器自動生成的包裝了我們的lambda的SuspendLambda對象。會把此Continuation 傳遞到block中(也就是我們的lambda中),我們在執行完成后手動調用Continuation的resume相關函數來喚醒之前掛起的協程。

在上邊的協程調度中分析可知c.intercepted()調用后會去找當前上下文中是否有調度器,如果有就返回一個包裝了調度器的DispatchedContinuation對象,這樣就能保證在我們手動調用resume來在原線程/線程池中恢復 掛起的協程。

協程中還有兩個常見的掛起函數使用到了suspendCoroutineUninterceptedOrReturn()函數,分別是delay()和yield()。

delay 的實現

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

/** Returns [Delay] implementation of the given context */
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay

internal actual val DefaultDelay: Delay = DefaultExecutor

 

yield 的實現

yield()的作用是掛起當前協程,然后將協程分發到 Dispatcher 的隊列,這樣可以讓該協程所在線程或線程池可以運行其他協程邏輯,然后在 Dispatcher 空閑的時候繼續執行原來協程。簡單的來說就是讓出自己的執行權,給其他協程使用,當其他協程執行完成或也讓出執行權時,一開始的協程可以恢復繼續運行。

yield()需要依賴協程的線程調度器,如果沒有調度器那么就不會讓協程掛起。

public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    val context = uCont.context
    // 檢測協程是否已經取消或者完成,如果是的話拋出 CancellationException
    context.checkCompletion()
    // 如果協程沒有線程調度器,或者像 Dispatchers.Unconfined 一樣沒有進行調度,則直接返回
    val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
    if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
    // dispatchYield(Unit) 最終會調用到 dispatcher.dispatch(context, block) 將協程分發到調度器隊列中,這樣線程可以執行其他協程
    cont.dispatchYield(Unit)
    COROUTINE_SUSPENDED
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM