Java的協程Quasar


協程是對函數和線程進一步優化的產物, 是一種函數的編排方式, 將傳統意義上的函數拆成更小粒度的過程. 簡單說, 就是比函數粒度還要小的可手動控制的過程. 

協程可以通過yield 來調用其它協程,接下來的每次協程被調用時,從協程上次yield返回的位置接着執行,通過yield方式轉移執行權的協程之間不是調用者與被調用者的關系,而是彼此對稱、平等的。

協程vs函數

函數可以調用其他函數,調用者等待被調用者結束后繼續執行,因此函數的生命期遵循后進先出,即最后一個被調用的函數最先結束返回。協程的生命期完全由對它們的使用需要來決定。
函數的起始處是惟一的入口點,每當函數被調用時,執行都從被調用函數的起始處開始。協程可以有多個入口點,協程的起始處是第一個入口點,每個yield返回出口點都是再次被調用執行時的入口點。
函數只在結束時一次性的返回全部結果值。協程可以在yield時不調用其他協程,而是每次返回一部分的結果值,這種協程常稱為生成器或迭代器。
現代的指令集架構通常提供對調用棧的指令支持,便於實現可遞歸調用的函數。在以Scheme為代表的提供續體的語言環境下,可用此控制狀態抽象表示來實現協程。

函數可以看作是特定狀況的協程,任何函數都可轉寫為不調用yield的協程

協程vs線程

協程占用內存小, 在Quasar庫實現中, 一個空閑的Fiber只占400個字節左右的內存, 而一個Java線程需要至少1MB, 是協程的千倍. Fibers provide functionality similar to threads, and a similar API, but they’re not managed by the OS. They are lightweight in terms of RAM (an idle fiber occupies ~400 bytes of RAM) and put a far lesser burden on the CPU when task-switching. You can have millions of fibers in an application

另外的好處是對比使用多線程來解決IO阻塞任務,使用協程不用加鎖,訪問共享的數據不用進行同步操作。使用協程之所以不需要加鎖不是因為所有的協程只在一個線程中運行,而是因為協程的非搶占式的特點。也就是說,協程在沒主動交出CPU之前都是不會被突然切換到其它協程上。而線程是搶占式的,使用多線程你不能確定線程什么時候被操作系統調度,什么時候被切換,因此需要用鎖到實現一種“原子操作”的語義。

協程vs異步回調

常見的做法是使用非阻塞的IO(比如是異步IO,又或者是在syscall上自己實現的一套異步IO,如asio)並且將處理操作寫在回調函數中。這樣的做法一般沒什么問題,但當回調函數變多,一段連貫的業務代碼就會被拆分到多個回調函數之中,增加維護的成本。因此使用協程可以用同步的寫法寫出效果相當於是異步的代碼。

在Java中通過Quasar庫實現協程

對應JDK8的最高版本為0.7.9, 需要在maven中引入依賴

<dependency>
	<groupId>co.paralleluniverse</groupId>
	<artifactId>quasar-core</artifactId>
	<version>0.7.9</version>
	<classifier>jdk8</classifier>
</dependency>

通過一個channel, 將生成的數據推送給處理者, 這個流程是可以多級串聯的, 達到生成和處理交叉進行的效果.

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;

import java.util.concurrent.ExecutionException;

public class FiberExample {
    private static void printer(Channel<Integer> in) throws SuspendExecution,  InterruptedException {
        Integer v;
        while ((v = in.receive()) != null) {
            System.out.println("<< " + v);
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
        //定義兩個Channel
        Channel<Integer> naturals = Channels.newChannel(1024, Channels.OverflowPolicy.BLOCK, true, true);
        Channel<Integer> squares = Channels.newChannel(1024, Channels.OverflowPolicy.BLOCK, true, true);

        //運行兩個Fiber實現.
        new Fiber<>(() -> {
            for (int i = 0; i < 1000; i++) {
                System.out.println(">> " + i);
                naturals.send(i);
            }
            naturals.close();
        }).start();

        new Fiber<>(() -> {
            while (!naturals.isClosed()) {
                Integer v = naturals.receive();
                System.out.println("< " + v);
                squares.send(v * v);
            }
            System.out.println("Stopped receiving messages");
            squares.close();
        }).start();

        System.out.println("Reached printer");
        printer(squares);
    }
}

 在Fiber的處理方法中做Thread相關的操作會引起fiber失效.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM