協程及Kotlin協程


一、協程是什么?

協程是程序自己控制掛起和恢復的程序。

協程可以實現多任務協作執行。

二、協程作用?

  • 協程可以讓異步代碼同步化。
  • 協程可以降低異步程序的設計復雜度。

三、協程分類

  • 按調用棧分類:

    • 有棧協程:每個協程都會分配一個單獨調用棧,類似於線程的調用棧。
    • 無棧協程:協程不會分配一個單獨調用棧,掛起點的狀態通過閉包和對象保存。
  • 按調用關系分類:

    • 對稱協程:協程調用權可以轉移給任意協程,協程之間是對等關系。比如:協程A調度協程B,協程B調度協程C,協程C調度協程A。
    • 非對稱協程:調度權只能轉移給調度自己的協程,協程存在父子關系。

 

四、Kotlin中協程

Kotlin實現的協程是無棧協程,掛起狀態通過Continuation保存,CoroutineContext包含一個協程調度器。

1. 掛起函數及suspend修飾符

通過suspend修飾的函數就是掛起函數。

掛起函數只能通過其它掛起函數或者協程調用。

掛起函數調用時包含了協程的掛起語義。 

Continuation源碼:

  1 /**
  2  * Interface representing a continuation after a suspension point that returns a value of type `T`.
  3  */
  4 @SinceKotlin("1.3")
  5 public interface Continuation<in T> {
  6     /**
  7      * The context of the coroutine that corresponds to this continuation.
  8      */
  9     public val context: CoroutineContext
 10 
 11     /**
 12      * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
 13      * return value of the last suspension point.
 14      */
 15     public fun resumeWith(result: Result<T>)
 16 }
 17 
 18 /**
 19  * Classes and interfaces marked with this annotation are restricted when used as receivers for extension
 20  * `suspend` functions. These `suspend` extensions can only invoke other member or extension `suspend` functions on this particular
 21  * receiver and are restricted from calling arbitrary suspension functions.
 22  */
 23 @SinceKotlin("1.3")
 24 @Target(AnnotationTarget.CLASS)
 25 @Retention(AnnotationRetention.BINARY)
 26 public annotation class RestrictsSuspension
 27 
 28 /**
 29  * Resumes the execution of the corresponding coroutine passing [value] as the return value of the last suspension point.
 30  */
 31 @SinceKotlin("1.3")
 32 @InlineOnly
 33 public inline fun <T> Continuation<T>.resume(value: T): Unit =
 34     resumeWith(Result.success(value))
 35 
 36 /**
 37  * Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the
 38  * last suspension point.
 39  */
 40 @SinceKotlin("1.3")
 41 @InlineOnly
 42 public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
 43     resumeWith(Result.failure(exception))
 44 
 45 
 46 /**
 47  * Creates a [Continuation] instance with the given [context] and implementation of [resumeWith] method.
 48  */
 49 @SinceKotlin("1.3")
 50 @InlineOnly
 51 public inline fun <T> Continuation(
 52     context: CoroutineContext,
 53     crossinline resumeWith: (Result<T>) -> Unit
 54 ): Continuation<T> =
 55     object : Continuation<T> {
 56         override val context: CoroutineContext
 57             get() = context
 58 
 59         override fun resumeWith(result: Result<T>) =
 60             resumeWith(result)
 61     }
 62 
 63 /**
 64  * Creates a coroutine without a receiver and with result type [T].
 65  * This function creates a new, fresh instance of suspendable computation every time it is invoked.
 66  *
 67  * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
 68  * The [completion] continuation is invoked when the coroutine completes with a result or an exception.
 69  * Subsequent invocation of any resume function on the resulting continuation will produce an [IllegalStateException].
 70  */
 71 @SinceKotlin("1.3")
 72 @Suppress("UNCHECKED_CAST")
 73 public fun <T> (suspend () -> T).createCoroutine(
 74     completion: Continuation<T>
 75 ): Continuation<Unit> =
 76     SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
 77 
 78 /**
 79  * Creates a coroutine with receiver type [R] and result type [T].
 80  * This function creates a new, fresh instance of suspendable computation every time it is invoked.
 81  *
 82  * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
 83  * The [completion] continuation is invoked when the coroutine completes with a result or an exception.
 84  * Subsequent invocation of any resume function on the resulting continuation will produce an [IllegalStateException].
 85  */
 86 @SinceKotlin("1.3")
 87 @Suppress("UNCHECKED_CAST")
 88 public fun <R, T> (suspend R.() -> T).createCoroutine(
 89     receiver: R,
 90     completion: Continuation<T>
 91 ): Continuation<Unit> =
 92     SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
 93 
 94 /**
 95  * Starts a coroutine without a receiver and with result type [T].
 96  * This function creates and starts a new, fresh instance of suspendable computation every time it is invoked.
 97  * The [completion] continuation is invoked when the coroutine completes with a result or an exception.
 98  */
 99 @SinceKotlin("1.3")
100 @Suppress("UNCHECKED_CAST")
101 public fun <T> (suspend () -> T).startCoroutine(
102     completion: Continuation<T>
103 ) {
104     createCoroutineUnintercepted(completion).intercepted().resume(Unit)
105 }
106 
107 /**
108  * Starts a coroutine with receiver type [R] and result type [T].
109  * This function creates and starts a new, fresh instance of suspendable computation every time it is invoked.
110  * The [completion] continuation is invoked when the coroutine completes with a result or an exception.
111  */
112 @SinceKotlin("1.3")
113 @Suppress("UNCHECKED_CAST")
114 public fun <R, T> (suspend R.() -> T).startCoroutine(
115     receiver: R,
116     completion: Continuation<T>
117 ) {
118     createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
119 }
120 
121 /**
122  * Obtains the current continuation instance inside suspend functions and suspends
123  * the currently running coroutine.
124  *
125  * In this function both [Continuation.resume] and [Continuation.resumeWithException] can be used either synchronously in
126  * the same stack-frame where the suspension function is run or asynchronously later in the same thread or
127  * from a different thread of execution. Subsequent invocation of any resume function will produce an [IllegalStateException].
128  */
129 @SinceKotlin("1.3")
130 @InlineOnly
131 public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
132     suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
133         val safe = SafeContinuation(c.intercepted())
134         block(safe)
135         safe.getOrThrow()
136     }
137 
138 /**
139  * Returns the context of the current coroutine.
140  */
141 @SinceKotlin("1.3")
142 @Suppress("WRONG_MODIFIER_TARGET")
143 @InlineOnly
144 public suspend inline val coroutineContext: CoroutineContext
145     get() {
146         throw NotImplementedError("Implemented as intrinsic")
147     }

 掛起函數格式:

suspend fun test(param: Int): String {
   return "test"
}

掛起函數類型:

suspend (param: Int) -> String

在編譯后,解碼掛起函數:

suspend fun test(param: Int, continuation: Continuation<String>): String {
    return "test"
}

在函數參數中多了一個Continuation參數,Continuation就是用於保存協程掛起點狀態對象。這也是為什么掛起函數只能由其它掛起函數或者協程調用,為什么Kotlin編譯器在編譯時給掛起函數多加了一個參數。

2. 掛起與未掛起

  • 真正的掛起:就是必須異步調用resume,包括:1. 切換到其它線程的resume;2. 單線程事件循環異步執行;
  • 未掛起:就是未異步調用resume,直接將結果傳遞出去。

PS:在resume時返回值分兩類,1. 函數真正的返回值,比如上面test函數,返回值類型為String類型;2. 返回掛起函數的掛起標識;只有返回掛起標識時才是真正的掛起。

3. 將回調轉寫成掛起函數:

  • 使用suspendCoroutine獲取掛起函數的Continuation;
  • 回調成功分支使用Continuation.resume(value)函數;
  • 回調失敗分支使用Continuation.resumeWithException(error)函數;

 4. CoroutineContext 協程上下文包含一個協程調度器。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM