Kotlin 協程二 —— 通道 Channel


一、 Channel 基本使用

1.1 Channel 的概念

Channel 翻譯過來為通道或者管道,實際上就是個隊列, 是一個面向多協程之間數據傳輸的 BlockQueue,用於協程間通信。Channel 允許我們在不同的協程間傳遞數據。形象點說就是不同的協程可以往同一個管道里面寫入數據或者讀取數據。它是一個和 BlockingQueue 非常相似的概念。區別在於:BlockingQueue 使用 puttake 往隊列里面寫入和讀取數據,這兩個方法是阻塞的。而 Channel 使用 sendreceive 兩個方法往管道里面寫入和讀取數據。這兩個方法是非阻塞的掛起函數,鑒於此,Channel 的 sendreceive 方法也只能在協程中使用。

1.2 Channel 的簡單使用

val channel = Channel<Int>()
launch {
    // 這里可能是消耗大量 CPU 運算的異步邏輯,我們將僅僅做 5 次整數的平方並發送
    for (x in 1..5) channel.send(x * x)
}
// 這里我們打印了 5 次被接收的整數:
repeat(5) { println(channel.receive()) }
println("Done!")

輸出結果:

1
4
9
16
25
Done!

1.3 Channel 的迭代

如果要取出 Channel 中所有的數據,可以使用迭代。

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) {
            channel.send(x * x)
        }
    }

    val iterator = channel.iterator()
    while (iterator.hasNext()) {
        val next = iterator.next()
        println(next)
    }
    println("Done!")
}

可以簡化成:

val channel = Channel<Int>()
    launch {
        // 這里可能是消耗大量 CPU 運算的異步邏輯,我們將僅僅做 5 次整數的平方並發送
        for (x in 1..5) channel.send(x * x)
    }
    for (y in channel) {
        println(y)
    }
    println("Done!")

此時輸出結果:

1
4
9
16
25

最后一行 Done! 沒有打印出來,並且程序沒有結束。此時,我們發現,這種方式,實際上是我們一直在等待讀取 Channel 中的數據,只要有數據到了,就會被讀取到。

1.4 close 關閉 Channel

我們可以使用 close() 方法關閉 Channel,來表明沒有更多的元素將會進入通道。

val channel = Channel<Int>()
    launch {
        // 這里可能是消耗大量 CPU 運算的異步邏輯,我們將僅僅做 5 次整數的平方並發送
        for (x in 1..5) channel.send(x * x)
        channle.close()  // 結束發送
    }
    for (y in channel) {
        println(y)
    }
    println("Done!")

從概念上來講,調用 close 方法就像向通道發送了一個特殊的關閉指令,這個迭代停止,說明關閉指令已經被接收了。所以這里能夠保證所有先前發送出去的原色都能在通道關閉前被接收到。
對於一個 Channel,如果我們調用了它的 close,它會立即停止接受新元素,也就是說這時候它的 isClosedForSend 會立即返回 true,而由於 Channel 緩沖區的存在,這時候可能還有一些元素沒有被處理完,所以要等所有的元素都被讀取之后 isClosedForReceive 才會返回 true。
輸出結果:

1
4
9
16
25
Done!

1.5 Channel 是熱流

Flow 是冷流,只有調用末端流操作的時候,上游才會發射數據,與 Flow 不同,Channel 是熱流,不管有沒有訂閱者,上游都會發射數據。

二、Channel 的類型

2.1 SendChannel 和 ReceiveChannel

Channel 是一個接口,它繼承了 SendChannelReceiveChannel 兩個接口

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

SendChannel
SendChannel 提供了發射數據的功能,有如下重點接口:

  • send 是一個掛起函數,將指定的元素發送到此通道,在該通道的緩沖區已滿或不存在時掛起調用者。如果通道已經關閉,調用發送時會拋出異常。
  • trySend 如果不違反其容量限制,則立即將指定元素添加到此通道,並返回成功結果。否則,返回失敗或關閉的結果。
  • close 關閉通道。
  • isClosedForSend 判斷通道是否已經關閉,如果關閉,調用 send 會引發異常。

ReceiveChannel
ReceiveChannel 提供了接收數據的功能,有如下重點接口:

  • receive 如果此通道不為空,則從中檢索並刪除元素;如果通道為空,則掛起調用者;如果通道為接收而關閉,則引發ClosedReceiveChannel異常。
  • tryReceive 如果此通道不為空,則從中檢索並刪除元素,返回成功結果;如果通道為空,則返回失敗結果;如果通道關閉,則返回關閉結果。
  • receiveCatching 如果此通道不為空,則從中檢索並刪除元素,返回成功結果;如果通道為空,則返回失敗結果;如果通道關閉,則返回關閉的原因。
  • isEmpty 判斷通道是否為空
  • isClosedForReceive 判斷通道是否已經關閉,如果關閉,調用 receive 會引發異常。
  • cancel(cause: CancellationException? = null) 以可選原因取消接收此頻道的剩余元素。此函數用於關閉通道並從中刪除所有緩沖發送的元素。
  • iterator() 返回通道的迭代器

2.2 創建不同類型的 Channel

Kotlin 協程庫中定義了多個 Channel 類型,所有channel類型的receive方法都是同樣的行為: 如果channel不為空, 接收一個元素, 否則掛起。
它們的主要區別在於:

  • 內部可以存儲元素的數量
  • send 是否可以被掛起

Channel 的不同類型:

  • Rendezvous channel: 0尺寸buffer (默認類型).
  • Unlimited channel: 無限元素, send不被掛起.
  • Buffered channel: 指定大小, 滿了之后send掛起.
  • Conflated channel: 新元素會覆蓋舊元素, receiver只會得到最新元素, send永不掛起.

創建 Channel:

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

三、協程間通過 Channel 實現通信

3.1 多個協程訪問同一個 Channel

在協程外部定義 Channel, 就可以多個協程可以訪問同一個channel,達到協程間通信的目的。

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x)
    }
    launch {
        delay(10)
        for (y in channel) {
            println(" 1 --> $y")
        }
    }
    launch {
        delay(20)
        for (y in channel) {
            println(" 2 --> $y")
        }
    }
    launch {
        delay(30)
        for (x in 90..100) channel.send(x)
        channel.close()
    }
}

3.2 produce 和 actor

在協程外部定義 Channel,多個協程同時訪問 Channel, 就可以實現生產者消費者模式。
produce
使用 produce 可以更便捷地構造生產者

fun main() = runBlocking<Unit> {
    val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce {
        var i = 0
        while(true){
            delay(1000)
            send(i)
            i++
        }
        delay(3000)
        receiveChannel.cancel()
    }
}

我們可以通過 produce 這個方法啟動一個生產者協程,並返回一個 ReceiveChannel,其他協程就可以拿着這個 Channel 來接收數據了。

actor
actor 可以用來構建一個消費者協程

fun main() = runBlocking<Unit> {
    val sendChannel: SendChannel<Int> = actor<Int> {
        for (ele in channel)
            ele
        }
    }
    
    delay(2000)
    sendChannel.close()
}

注意:不要在循環中使用 receive ,思考為什么?

produce 和 actor 與 launch 一樣都被稱作“協程啟動器”。通過這兩個協程的啟動器啟動的協程也自然的與返回的 Channel 綁定到了一起,因此 Channel 的關閉也會在協程結束時自動完成,以 produce 為例,它構造出了一個 ProducerCoroutine 的對象

3.3 扇入和扇出

多個協程可能會從同一個channel中接收值,這種情況稱為Fan-out。
多個協程可能會向同一個channel發射值,這種情況稱為Fan-in。

3.4 BroadcastChannel

3.4.1 BroadcastChannel 基本使用

3.1 中例子提到一對多的情形,從數據處理本身來講,有多個接收端的時候,同一個元素只會被一個接收端讀到。而 BroadcastChannel 則不然,多個接收端不存在互斥現象。

public interface BroadcastChannel<E> : SendChannel<E> {

    public fun openSubscription(): ReceiveChannel<E>

    public fun cancel(cause: CancellationException? = null)

    @Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
    public fun cancel(cause: Throwable? = null): Boolean
}

public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
when (capacity) {
    0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
    UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
    CONFLATED -> ConflatedBroadcastChannel()
    BUFFERED -> ArrayBroadcastChannel(CHANNEL_DEFAULT_CAPACITY)
    else -> ArrayBroadcastChannel(capacity)
}

創建 BroadcastChannel
創建 BroadcastChannel 需要指定緩沖區大小

val broadcastChannel = broadcastChannel<Int>(5)

訂閱 broadcastChannel
訂閱 broadcastChannel,那么只需要調用

val receiveChannel = broadcastChannel.openSubscription()

這樣我們就得到了一個 ReceiveChannel,獲取訂閱的消息,只需要調用它的 receive。

3.4.2 使用拓展函數轉換

使用 Channel 的拓展函數,也可以將一個 Channel 轉換成 BroadcastChannel, 需要指定緩沖區大小。

val channel = Channel<Int>()
val broadcast = channel.broadcast(3)

這樣發射給原 channel 的數據會被讀取后發射給轉換后的 broadcastChannel。如果還有其他協程也在讀這個原始的 Channel,那么會與 BroadcastChannel 產生互斥關系。

3.4.3 過時的 API

BroadcastChannel 源碼中的說明:

Note: This API is obsolete since 1.5.0. It will be deprecated with warning in 1.6.0 and with error in 1.7.0. It is replaced with SharedFlow.

BroadcastChannel 對於廣播式的任務來說有點太復雜了。使用通道進行狀態管理時會出現一些邏輯上的不一致。例如,可以關閉或取消通道。但由於無法取消狀態,因此在狀態管理中無法正常使用!
所以官方決定啟用 BroadcastChannel。BroadcastChannel 被標記為過時了,在 kotlin 1.6.0 版本中使用將顯示警告,在 1.7.0 版本中將顯示錯誤。請使用 SharedFlow 和 StateFlow 替代它。
關於 SharedFlow 和 StateFlow 將在下文中講到。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM