Java之協程(quasar)


  一、前面我們簡單的說了一下,Python中的協程原理。這里補充Java的協程實現過程。有需要可以查看python之協程

  二、Java協程,其實做Java這么久我也沒有怎么聽過Java協程的東西,但是一直有有聽到微線程/協程的概念,這不在學習Python的時候接觸到了協程一詞。然后返回來去了解Java的協程問題,但是看了很多資料,發現官網以及很多地方都沒有涉及到協程的東西,沒有辦法,只能通過強大的社區來學習協程的相關東西。

  三、這里主要關注的是:quasar。

  1)協程的目的:當我們在使用多線程的時候,如果存在長時間的I/O操作。這個時候線程一直處於阻塞狀態,如果線程很多的時候,會存在很多線程處於空閑狀態,造成了資源應用不徹底。相對的協程不一樣了,在單線程中多個任務來回自行如果出現長時間的I/O操作,讓其讓出目前的協程調度,執行下一個任務。當然可能所有任務,全部卡在同一個點上,但是這只是針對於單線程而言,當所有數據正常返回時,會同時處理當前的I/O操作。

  2)多線程測試(這里使用100萬個線程,來測試內存占用) 

        for (int i = 0; i < 1000000; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(100000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }    

  結果:

  直接卡死了,內存溢出

  

  可想而知,如果存在100萬個線程,開銷是有多大。

  3)協程測試

  a、阿里雲搜索到的依賴包

     <dependency>
            <groupId>co.paralleluniverse</groupId>
            <artifactId>quasar-core</artifactId>
            <version>0.7.9</version>
            <classifier>jdk8</classifier>
        </dependency>

  b、測試內存占用量

  public static void main(String[] args) throws Exception {
        //使用阻塞隊列來獲取結果。
        LinkedBlockingQueue<Fiber<Integer>> fiberQueue = new LinkedBlockingQueue<>();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < 1000000; i++) {
            int finalI = i;
            //這里的Fiber有點像Callable,可以返回數據
            Fiber<Integer> fiber = new Fiber<>((SuspendableCallable<Integer>) () -> {
                //這里用於測試內存占用量
                Fiber.sleep(100000);
                System.out.println("in-" + finalI + "-" + LocalDateTime.now().format(formatter));
                return finalI;
            });
            //開始執行
            fiber.start();
            //加入隊列
            fiberQueue.add(fiber);
        }
        while (true) {
            //阻塞
            Fiber<Integer> fiber = fiberQueue.take();
            System.out.println("out-" + fiber.get() + "-" + LocalDateTime.now().format(formatter));
        }
    }

  結果:

  堆:

  

   估計:1個G左右。

  內存:

  

  估計:1個G左右,也就是每一個fiber占用1Kb左右。

   c、正常測試

  修改一下參數:

    public static void main(String[] args) throws Exception {
        //使用阻塞隊列來獲取結果。
        LinkedBlockingQueue<Fiber<Integer>> fiberQueue = new LinkedBlockingQueue<>();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            //這里的Fiber有點像Callable,可以返回數據
            Fiber<Integer> fiber = new Fiber<>((SuspendableCallable<Integer>) () -> {
                //這里用於測試內存占用量
                Fiber.sleep(1000);
                System.out.println("in-" + finalI + "-" + LocalDateTime.now().format(formatter));
                return finalI;
            });
            //開始執行
            fiber.start();
            //加入隊列
            fiberQueue.add(fiber);
        }
        while (true) {
            //阻塞
            Fiber<Integer> fiber = fiberQueue.take();
            System.out.println("out-" + fiber.get() + "-" + LocalDateTime.now().format(formatter));
        }
    }

  結果:

  

  4)可以看出並發的狀態還是很不錯的,當然這只是多個任務執行而已。

  四、通過上面的測試,可以看出,quasar中Fiber,很像Callable的用法、而且在內存占用上面減少了很多,當然,堆的數量確實不少,但是可以接受。

  還是要說明:協程的方式更多用來做I/O密集型的操作。計算密集型的還是使用線程更加合理。

  五、原理,我估么着看了一下源碼,復雜度很高,這里不做深究。

  原理參考:次時代Java編程(一):Java里的協程

  1、Quasar里的Fiber其實是一個continuation,他可以被Quasar定義的scheduler調度,一個continuation記錄着運行實例的狀態,而且會被隨時中斷,並且也會隨后在他被中斷的地方恢復。Quasar其實是通過修改bytecode來達到這個目的,所以運行Quasar程序的時候,你需要先通過java-agent在運行時修改你的代碼,當然也可以在編譯期間這么干。golang的內置了自己的調度器,Quasar則默認使用ForkJoinPool這個JDK7以后才有的,具有work-stealing功能的線程池來當調度器。work-stealing非常重要,因為你不清楚哪個Fiber會先執行完,而work-stealing可以動態的從其他的等等隊列偷一個context過來,這樣可以最大化使用CPU資源。

  2、那這里你會問了,Quasar怎么知道修改哪些字節碼呢,其實也很簡單,Quasar會通過java-agent在運行時掃描哪些方法是可以中斷的,同時會在方法被調用前和調度后的方法內插入一些continuation邏輯,如果你在方法上定義了@Suspendable注解,那Quasar會對調用該注解的方法做類似下面的事情。

  3、這里假設你在方法f上定義了@Suspendable,同時去調用了有同樣注解的方法g,那么所有調用f的方法會插入一些字節碼,這些字節碼的邏輯就是記錄當前Fiber棧上的狀態,以便在未來可以動態的恢復。(Fiber類似線程也有自己的棧)。在suspendable方法鏈內Fiber的父類會調用Fiber.park,這樣會拋出SuspendExecution異常,從而來停止線程的運行,好讓Quasar的調度器執行調度。這里的SuspendExecution會被Fiber自己捕獲,業務層面上不應該捕獲到。如果Fiber被喚醒了(調度器層面會去調用Fiber.unpark),那么f會在被中斷的地方重新被調用(這里Fiber會知道自己在哪里被中斷),同時會把g的調用結果(g會return結果)插入到f的恢復點,這樣看上去就好像g的return是f的local variables了,從而避免了callback嵌套。

  4、上面啰嗦了一大堆,其實簡單點講就是,想辦法讓運行中的線程棧停下來,好讓Quasar的調度器介入。JVM線程中斷的條件只有兩個,一個是拋異常,另外一個就是return。這里Quasar就是通過拋異常的方式來達到的,所以你會看到我上面的代碼會拋出SuspendExecution。但是如果你真捕獲到這個異常,那就說明有問題了,所以一般會這么寫。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM