一、協程是什么?
協程是程序自己控制掛起和恢復的程序。
協程可以實現多任務協作執行。
二、協程作用?
- 協程可以讓異步代碼同步化。
- 協程可以降低異步程序的設計復雜度。
三、協程分類
-
按調用棧分類:
- 有棧協程:每個協程都會分配一個單獨調用棧,類似於線程的調用棧。
- 無棧協程:協程不會分配一個單獨調用棧,掛起點的狀態通過閉包和對象保存。
-
按調用關系分類:
- 對稱協程:協程調用權可以轉移給任意協程,協程之間是對等關系。比如:協程A調度協程B,協程B調度協程C,協程C調度協程A。
- 非對稱協程:調度權只能轉移給調度自己的協程,協程存在父子關系。
四、Kotlin中協程
Kotlin實現的協程是無棧協程,掛起狀態通過Continuation保存,CoroutineContext包含一個協程調度器。
1. 掛起函數及suspend修飾符
通過suspend修飾的函數就是掛起函數。
掛起函數只能通過其它掛起函數或者協程調用。
掛起函數調用時包含了協程的掛起語義。
Continuation源碼:
1 /** 2 * Interface representing a continuation after a suspension point that returns a value of type `T`. 3 */ 4 @SinceKotlin("1.3") 5 public interface Continuation<in T> { 6 /** 7 * The context of the coroutine that corresponds to this continuation. 8 */ 9 public val context: CoroutineContext 10 11 /** 12 * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the 13 * return value of the last suspension point. 14 */ 15 public fun resumeWith(result: Result<T>) 16 } 17 18 /** 19 * Classes and interfaces marked with this annotation are restricted when used as receivers for extension 20 * `suspend` functions. These `suspend` extensions can only invoke other member or extension `suspend` functions on this particular 21 * receiver and are restricted from calling arbitrary suspension functions. 22 */ 23 @SinceKotlin("1.3") 24 @Target(AnnotationTarget.CLASS) 25 @Retention(AnnotationRetention.BINARY) 26 public annotation class RestrictsSuspension 27 28 /** 29 * Resumes the execution of the corresponding coroutine passing [value] as the return value of the last suspension point. 30 */ 31 @SinceKotlin("1.3") 32 @InlineOnly 33 public inline fun <T> Continuation<T>.resume(value: T): Unit = 34 resumeWith(Result.success(value)) 35 36 /** 37 * Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the 38 * last suspension point. 39 */ 40 @SinceKotlin("1.3") 41 @InlineOnly 42 public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit = 43 resumeWith(Result.failure(exception)) 44 45 46 /** 47 * Creates a [Continuation] instance with the given [context] and implementation of [resumeWith] method. 48 */ 49 @SinceKotlin("1.3") 50 @InlineOnly 51 public inline fun <T> Continuation( 52 context: CoroutineContext, 53 crossinline resumeWith: (Result<T>) -> Unit 54 ): Continuation<T> = 55 object : Continuation<T> { 56 override val context: CoroutineContext 57 get() = context 58 59 override fun resumeWith(result: Result<T>) = 60 resumeWith(result) 61 } 62 63 /** 64 * Creates a coroutine without a receiver and with result type [T]. 65 * This function creates a new, fresh instance of suspendable computation every time it is invoked. 66 * 67 * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance. 68 * The [completion] continuation is invoked when the coroutine completes with a result or an exception. 69 * Subsequent invocation of any resume function on the resulting continuation will produce an [IllegalStateException]. 70 */ 71 @SinceKotlin("1.3") 72 @Suppress("UNCHECKED_CAST") 73 public fun <T> (suspend () -> T).createCoroutine( 74 completion: Continuation<T> 75 ): Continuation<Unit> = 76 SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED) 77 78 /** 79 * Creates a coroutine with receiver type [R] and result type [T]. 80 * This function creates a new, fresh instance of suspendable computation every time it is invoked. 81 * 82 * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance. 83 * The [completion] continuation is invoked when the coroutine completes with a result or an exception. 84 * Subsequent invocation of any resume function on the resulting continuation will produce an [IllegalStateException]. 85 */ 86 @SinceKotlin("1.3") 87 @Suppress("UNCHECKED_CAST") 88 public fun <R, T> (suspend R.() -> T).createCoroutine( 89 receiver: R, 90 completion: Continuation<T> 91 ): Continuation<Unit> = 92 SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED) 93 94 /** 95 * Starts a coroutine without a receiver and with result type [T]. 96 * This function creates and starts a new, fresh instance of suspendable computation every time it is invoked. 97 * The [completion] continuation is invoked when the coroutine completes with a result or an exception. 98 */ 99 @SinceKotlin("1.3") 100 @Suppress("UNCHECKED_CAST") 101 public fun <T> (suspend () -> T).startCoroutine( 102 completion: Continuation<T> 103 ) { 104 createCoroutineUnintercepted(completion).intercepted().resume(Unit) 105 } 106 107 /** 108 * Starts a coroutine with receiver type [R] and result type [T]. 109 * This function creates and starts a new, fresh instance of suspendable computation every time it is invoked. 110 * The [completion] continuation is invoked when the coroutine completes with a result or an exception. 111 */ 112 @SinceKotlin("1.3") 113 @Suppress("UNCHECKED_CAST") 114 public fun <R, T> (suspend R.() -> T).startCoroutine( 115 receiver: R, 116 completion: Continuation<T> 117 ) { 118 createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit) 119 } 120 121 /** 122 * Obtains the current continuation instance inside suspend functions and suspends 123 * the currently running coroutine. 124 * 125 * In this function both [Continuation.resume] and [Continuation.resumeWithException] can be used either synchronously in 126 * the same stack-frame where the suspension function is run or asynchronously later in the same thread or 127 * from a different thread of execution. Subsequent invocation of any resume function will produce an [IllegalStateException]. 128 */ 129 @SinceKotlin("1.3") 130 @InlineOnly 131 public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T = 132 suspendCoroutineUninterceptedOrReturn { c: Continuation<T> -> 133 val safe = SafeContinuation(c.intercepted()) 134 block(safe) 135 safe.getOrThrow() 136 } 137 138 /** 139 * Returns the context of the current coroutine. 140 */ 141 @SinceKotlin("1.3") 142 @Suppress("WRONG_MODIFIER_TARGET") 143 @InlineOnly 144 public suspend inline val coroutineContext: CoroutineContext 145 get() { 146 throw NotImplementedError("Implemented as intrinsic") 147 }
掛起函數格式:
suspend fun test(param: Int): String { return "test" }
掛起函數類型:
suspend (param: Int) -> String
在編譯后,解碼掛起函數:
suspend fun test(param: Int, continuation: Continuation<String>): String { return "test" }
在函數參數中多了一個Continuation參數,Continuation就是用於保存協程掛起點狀態對象。這也是為什么掛起函數只能由其它掛起函數或者協程調用,為什么Kotlin編譯器在編譯時給掛起函數多加了一個參數。
2. 掛起與未掛起
- 真正的掛起:就是必須異步調用resume,包括:1. 切換到其它線程的resume;2. 單線程事件循環異步執行;
- 未掛起:就是未異步調用resume,直接將結果傳遞出去。
PS:在resume時返回值分兩類,1. 函數真正的返回值,比如上面test函數,返回值類型為String類型;2. 返回掛起函數的掛起標識;只有返回掛起標識時才是真正的掛起。
3. 將回調轉寫成掛起函數:
- 使用suspendCoroutine獲取掛起函數的Continuation;
- 回調成功分支使用Continuation.resume(value)函數;
- 回調失敗分支使用Continuation.resumeWithException(error)函數;
4. CoroutineContext 協程上下文包含一個協程調度器。