探究高級的Kotlin Coroutines知識


要說程序如何從簡單走向復雜, 線程的引入必然功不可沒, 當我們期望利用線程來提升程序效能的過程中, 處理線程的方式也發生了從原始時代向科技時代發生了一步一步的進化, 正如我們的Elisha大神所著文章The Evolution of Android Network Access中所講到的, Future可能會是Kotlin Coroutines的時代.

什么是Coroutines

Coroutines是Kotlin 1.1推出的實驗性的一個擴展, 它被定義為一個輕量級的高效的線程框架, 並且在1.3版本正式發布, 去掉Experiment標簽.

如何啟動一個Coroutines

最基礎的創建一個Coroutines的方法就是使用launch或者async, 二者的區別是前者返回的是一個Job, 不帶結果 而后者可以將結果以Deferred<T>格式返回.

如:

val job = launch {
    delay(100)
}

而通常在Coroutines內執行的函數都會有一個suspend聲明, 而有suspend聲明的函數也只能在Coroutines Scope中調用.

suspend的意思是這個函數可以被suspend(掛起), 讓Coroutines來調度它, 這也是為何Kotlin的delay函數可以不阻塞的進行延遲, 因為它就是一個suspend函數.

Coroutines與線程的關系

Coroutines可以簡單理解為一個有隊列的任務鏈, 每一個Coroutines都有自己的Context, 而Context又可以決定其運行的線程.

所以可以看到, 並不是起一個Coroutines就是起了一個線程, 而只是啟動了一個在某個Scope下運行的協程(Coroutines)罷了. 這里的Scope (CoroutineScope) 內部包含了一個 Context (CoroutineContext).

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}

如果只是通過launch來啟動一個協程, 那它將會運行在Parent Scope所定義的線程中, 但是如果使用GlobalScope.launch來啟動一個協程, 它將會使用線程池中的線程來創建一個協程, 線程池的大小跟CPU的核數相關.

當然launch也支持自己傳入一個CoroutinesContext來控制它運行的線程, 它叫做CoroutineDispatcher, 是Context的子類.

上面講了默認的launch會啟在父Scope(Context)的線程中, 而launch(Dispatchers.Default)則等於GlobalScope.launch, 還可以通過launch(newSingleThreadContext("MyOwnThread"))來啟動自己的線程, 另外有一個不推薦在general code中出現的launch(Dispatchers.Unconfined), 它將會運行在第一個進入Suspend狀態的線程中.

可以舉一個簡單的例子:

val job = launch {
    log("hehe")
    delay(1000)
    log("haha")
}

這個協程是可以完全在main函數里執行完的, 即輸出結果為:

hehe
haha

因為launch會跑在main的scope中. 如果替換成:

val job = GlobalScope.launch {
    log("hehe")
    delay(1000)
    log("haha")
}

則只會輸出hehe, 因為主線程已經結束.

這里我們可以通過job.join()來等待子協程執行結束, 這一點跟大家熟知的線程的join是一樣.

如何切換Context

如果把Context對應到我們平時認為的線程, 那么這個問題可以類比成 如何切換線程.

答案是使用withContext, 舉一個簡單的栗子.

launch(UI) {
    updateUI()
    val result = withContext(IO) {
        
    }
    setView(result)
}

它類似於async(IO){ }.await().

如何共享資源

線程與線程之間會涉及到同步與資源競爭的關系, 協程亦是如此.

通常情況下在線程中我們解決問題的方式是加鎖, 而不正確的使用可能會導致性能下降甚至死鎖(dead lock. 或者在高級語言中使用已經實現線程安全的數據類型, 來進行誇線程操作。

而我們的Coroutines自然也考慮到了這一點, 它認為我們不應該以共享資源來進行通信, 而是以通信來進行資源共享.

Do not communicate by sharing memory; instead, share memory by communicating.

所以它提出了一個叫做Channel的東西來在不同的Coroutines之間進行通信.

譬如我們期望將一堆數據交給兩個並行的協程進行處理, 那么我們可以把數據放進Channel, 其他的協程從這個Channel進行數據讀取.

launch {
    for (o in data) { channel.send(o) }
    channel.close()
}

launch(One) {
    for (o in channel) {
        xxx
    }
}

launch(Two) {
    for (o in channel) {
        xxx
    }
}

一定要記得關閉channel, 否則從channel讀取數據的協程都將會無限掛起等待數據傳過來.

由於Channel本身實現了iterator, 所以直接通過in就可以挨個取出內部的數據.

ReceiveChannel與SendChannel

上一個環節提到的協程之間是通過Channel來進行通信, 而Channel本身卻是實現了接收管道與發送管道兩個接口.

我們可以通過producer函數來進行生成數據, 提供給別的協程, 因為它的返回值是一個ReceiveChannel.

val channel = produce<XXX>() {
    for (o in data) send(o)
}

而且produce自己會做channel close的處理, 省去我們發送完畢還要掉close的煩惱.

如果我們多個協程需要發送請求並集中處理, 或者可以叫數據整合, 那么我們可能需要用到actor這個函數, 它的返回值是一個SendChannel.

val channel = actor<XXX>() {
                consumeEach {
                   xxx     
                }
            }

launch(One) {
    channel.send(xxx)
}
launch(Two) {
    channel.send(xxx)
}

由於actor返回的SendChannel有點像是一個郵箱, 它會不斷的接收數據, 所以必須手動關閉才會停止.

多個Channel之間數據如何進行選擇

Coroutines推出一個仍在Experiment階段的關鍵字select來在多個suspend function中進行選擇第一個到達available的, 其實有點像RxJava的concat+first.

比如我有兩個接收Channel, 但是每一個Channel接收到數據的頻率不得而知, 我想分別從中得到數據, 這里就需要使用select.

select<Unit> {
    channel1.onReceive {}
    channel2.onReceive {}
}

如果在配合外圍的循環, 就可以做到不斷的去接收兩個Channel的數據.

再比如有兩個發送Channel都可以處理我的需求, 我也不知道這個時候誰是空閑的, 那也可以通過select來解決.

select<Unit> {
    channel1.onSend(xxx) {}
    channel2.onSend(xxx) {}
}

有時候兩個Channel是嵌套使用的.

比如一個咖啡店, 他們會不斷的收到Oder, 只有兩個打咖啡的服務員, 咖啡機也只有兩個口, 如果我們對這個咖啡店進行抽象. 將Oder存在於一個Channel里, 服務員接收Order並不斷的把咖啡遞出來, 這也是一個Channel, 咖啡機會不斷接收到服務員需要打咖啡的操作, 也這是一個Channel.

而在這個過程中, 兩個服務員會有一個選擇, 咖啡機的兩個出口也會有一個選擇的過程.

如果抽象成我們的Coroutines代碼, 或許會是這個樣子:

val orderChannel = producer {
    for (o in orders) send(o)
}

val waiter1 = producer {
    for (o in orderChannel) { 
        pullCoffee(o)
    }
}

// waiter2 is the same as 1

val coffeePort1 = actor {
    consumeEach { 
        //pass coffee through channel inside order
        it.channel.send(Coffee)
        it.channel.close()
    }
}

// coffeePort2 is the same as 2

pullCoffee {
    select<Coffee> {
        coffeePort1.onSend(Request(channel)) {
            //get coffee from coffeePort
            channel.recevie()
        }
        coffeePort2.onSend ....
    }
}

while(someCondition) {
    select<Coffee> {
        waiter1.onReceiveOrNull {
            //上菜了
        }
        waiter2.onReceiveOrNull {
            //上菜了
        }
    }
}

補充說明

協程作為未來non blocking編程的方向, 需要大家花時間去理解, 花時間去嘗試, 在此特別推薦這個咖啡小程序幫助大家學習.

https://medium.com/@jagsaund/kotlin-coroutines-channels-csp-android-db441400965f

以及官方的Overview

https://kotlinlang.org/docs/reference/coroutines-overview.html

還有個CheatSheet可以參考

https://blog.kotlin-academy.com/kotlin-coroutines-cheat-sheet-8cf1e284dc35


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM