Java不支持協程?那是你不知道Quasar!


原創:微信公眾號 碼農參上,歡迎分享,轉載請保留出處。

在編程語言的這個圈子里,各種語言之間的對比似乎就一直就沒有停過,像什么古早時期的"PHP是世界上最好的語言"就不提了,最近我在摸魚的時候,看到不少文章都在說"Golang性能吊打Java"。作為一個寫了好幾年java的javaer,這我怎么能忍?於是在網上看了一些對比golang和java的文章,其中戳中java痛點、也是golang被吹上天的一條,就是對多線程並發的支持了。先看一段描述:

Go從語言層面原生支持並發,並且使用簡單,Go語言中的並發基於輕量級線程Goroutine,創建成本很低,單個Go應用也可以充分利用CPU多核,編寫高並發服務端軟件簡單,執行性能好,很多情況下完全不需要考慮鎖機制以及由此帶來的各種問題。

看到這,我的心瞬間涼了大半截,真的是字字扎心。雖然說java里的JUC包已經幫我們封裝好了很多並發工具,但實際高並發的環境中我們還要考慮到各種鎖的使用,以及服務器性能瓶頸、限流熔斷等非常多方面的問題。

再說回go,前面提到的這個goroutine究竟是什么東西?其實,輕量級線程goroutine也可以被稱為協程,得益於go中的調度器以及GMP模型,go程序會智能地將goroutine中的任務合理地分配給每個 CPU。

好了,其實上面說的這一大段我也不懂,都是向寫go的哥們兒請教來的,總之就是go的並發性能非常優秀就是了。不過這都不是我們要說的重點,今天我們要討論的是如何在Java中使用協程。

協程是什么?

我們知道,線程在阻塞狀態和可運行狀態的切換,以及線程間的上下文切換都會造成性能的損耗。為了解決這些問題,引入協程coroutine這一概念,就像在一個進程中允許存在多個線程,在一個線程中,也可以存在多個協程。

那么,使用協程究竟有什么好處呢?

首先,執行效率高。線程的切換由操作系統內核執行,消耗資源較多。而協程由程序控制,在用戶態執行,不需要從用戶態切換到內核態,我們也可以理解為,協程是一種進程自身來調度任務的調度模式,因此協程間的切換開銷遠小於線程切換。

其次,節省資源。因為協程在本質上是通過分時復用了一個單線程,因此能夠節省一定的資源。

類似於線程的五種狀態切換,協程間也存在狀態的切換,下面這張圖展示了協程調度器內部任務的流轉。

綜合上面這些角度來看,和原生支持協程的go比起來,java在多線程並發上還真的是不堪一擊。但是,雖然在Java官方的jdk中不能直接使用協程,但是,有其他的開源框架借助動態修改字節碼的方式實現了協程,就比如我們接下來要學習的Quasar。

Quasar使用

Quasar是一個開源的Java協程框架,通過利用Java instrument技術對字節碼進行修改,使方法掛起前后可以保存和恢復jvm棧幀,方法內部已執行到的字節碼位置也通過增加狀態機的方式記錄,在下次恢復執行可直接跳轉至最新位置。

Quasar項目最后更新時間為2018年,版本停留在0.8.0,但是我在直接使用這個版本時報了一個錯誤:

這個錯誤的大意就是這個class文件是使用的高版本jdk編譯的,所以你在低版本的jdk上當然無法運行了。這里major版本號54對應的是jdk10,而我使用的是jdk8,無奈降級試了一下低版本,果然0.7.10可以使用:

<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-core</artifactId>
    <version>0.7.10</version>
</dependency>

在我們做好准備工作后,下面就寫幾個例子來感受一下協程的魅力吧。

1、運行時間

下面我們模擬一個簡單的場景,假設我們有一個任務,平均執行時間為1秒,分別測試一下使用線程和協程並發執行10000次需要消耗多少時間。

先通過線程進行調用,直接使用Executors線程池:

public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch=new CountDownLatch(10000);
    long start = System.currentTimeMillis();
    ExecutorService executor= Executors.newCachedThreadPool();
    for (int i = 0; i < 10000; i++) {
        executor.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
    long end = System.currentTimeMillis();
    System.out.println("Thread use:"+(end-start)+" ms");
}

查看運行時間:

好了,下面我們再用Quasar中的協程跑一下和上面相同的流程。這里我們要使用的是Quasar中的Fiber,它可以被翻譯為協程纖程,創建Fiber的類型主要可分為下面兩類:

public Fiber(String name, FiberScheduler scheduler, int stackSize, SuspendableRunnable target);
public Fiber(String name, FiberScheduler scheduler, int stackSize, SuspendableCallable<V> target);

Fiber中可以運行無返回值的SuspendableRunnable或有返回值的SuspendableCallable,看這個名字也知道區別就是java中的RunnableCallable的區別了。其余參數都可以省略,name為協程的名稱,scheduler是調度器,默認使用FiberForkJoinSchedulerstackSize指定用於保存fiber調用棧信息的stack大小。

在下面的代碼中,使用了Fiber.sleep()方法進行協程的休眠,和Thread.sleep()非常類似。

public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch=new CountDownLatch(10000);
    long start = System.currentTimeMillis();

    for (int i = 0; i < 10000; i++) {
        new Fiber<>(new SuspendableRunnable(){
            @Override
            public Integer run() throws SuspendExecution, InterruptedException {
                Fiber.sleep(1000);
                countDownLatch.countDown();
            }
        }).start();
    }

    countDownLatch.await();
    long end = System.currentTimeMillis();
    System.out.println("Fiber use:"+(end-start)+" ms");
}

直接運行,報了一個警告:

QUASAR WARNING: Quasar Java Agent isn't running. If you're using another instrumentation method you can ignore this message; otherwise, please refer to the Getting Started section in the Quasar documentation.

還記得我們前面說過的Quasar生效的原理是基於Java instrument技術嗎,所以這里需要給它添加一個代理Agent。找到本地maven倉庫中已經下好的jar包,在VM options中添加參數:

-javaagent:E:\Apache\maven-repository\co\paralleluniverse\quasar-core\0.7.10\quasar-core-0.7.10.jar

這次運行時就沒有提示警告了,查看一下運行時間:

運行時間只有使用線程池時的一半多一點,確實能大大縮短程序的效率。

2、內存占用

在測試完運行時間后,我們再來測試一下運行內存占用的對比。通過下面代碼嘗試在本地啟動100萬個線程:

public static void main(String[] args) {
    for (int i = 0; i < 1000000; i++) {
        new Thread(() -> {
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

本來以為會報OutOfMemoryError,但是沒想到的是我的電腦直接直接卡死了…而且不是一次,試了幾次都是以卡死只能重啟電腦而結束。好吧,我選擇放棄,那么下面再試試啟動100萬個Fiber協程。

public static void main(String[] args) throws Exception {
    CountDownLatch countDownLatch=new CountDownLatch(10000);
    for (int i = 0; i < 1000000; i++) {
        int finalI = i;
        new Fiber<>((SuspendableCallable<Integer>)()->{
            Fiber.sleep(100000);
            countDownLatch.countDown();
            return finalI;
        }).start();
    }
    countDownLatch.await();
    System.out.println("end");
}

程序能夠正常執行結束,看樣子使用的內存真的比線程少很多。上面我故意使每個協程結束的時間拖得很長,這樣我們就可以在運行過程中使用Java VisualVM查看內存的占用情況了:

可以看到在使用Fiber的情況下只使用了1G多一點的內存,平均到100萬個協程上也就是說每個Fiber只占用了1Kb左右的內存空間,和Thread線程比起來真的是非常的輕量級。

從上面這張圖中我們也可以看到,運行了非常多的ForkJoinPool,它們又起到了什么作用呢?我們在前面說過,協程是由程序控制在用戶態進行切換,而Quasar中的調度器就使用了一個或多個ForkJoinPool來完成對Fiber的調度。

3、原理與應用

這里簡單介紹一下Quasar的原理,在編譯時框架會對代碼進行掃描,如果方法帶有@Suspendable注解,或拋出了SuspendExecution,或在配置文件META-INF/suspendables中指定該方法,那么Quasar就會修改生成的字節碼,在park掛起方法的前后,插入一些字節碼。

這些字節碼會記錄此時協程的執行狀態,例如相關的局部變量與操作數棧,然后通過拋出異常的方式將cpu的控制權從當前協程交回到控制器,此時控制器可以再調度另外一個協程運行,並通過之前插入的那些字節碼恢復當前協程的執行狀態,使程序能繼續正常執行。

回頭看一下前面例子中的SuspendableRunnableSuspendableCallable,它們的run方法上都拋出了SuspendExecution,其實這並不是一個真正的異常,僅作為識別掛起方法的聲明,在實際運行中不會拋出。當我們創建了一個Fiber,並在其中調用了其他方法時,如果想要Quasar的調度器能夠介入,那么必須在使用時層層拋出這個異常或添加注解。

看一下簡單的代碼書寫的示例:

public void request(){
    new Fiber<>(new SuspendableRunnable() {
        @Override
        public void run() throws SuspendExecution, InterruptedException {
            String content = sendRequest();
            System.out.println(content);
        }
    }).start();
}

private String sendRequest() throws SuspendExecution {
    return realSendRequest();
}

private String realSendRequest() throws SuspendExecution{
    HttpResponse response = HttpRequest.get("http://127.0.0.1:6879/name").execute();
    String content = response.body();
    return content;
}

需要注意的是,如果在方法內部已經通過try/catch的方式捕獲了Exception,也應該再次手動拋出這個SuspendExecution異常。

總結

本文介紹了Quasar框架的簡單使用,其具體的實現原理比較復雜,暫時就不在這里進行討論,后面打算單獨拎出來進行分析。另外,目前已經有不少其他的框架中已經集成了Quasar,例如同樣是Parallel Universe下的Comsat項目,能夠提供了HTTP和DB訪問等功能。

雖然現在想要在Java中使用協程還只能使用這樣的第三方的框架,但是也不必灰心,在OpenJDK 16中已經加入了一個名為Project Loom的項目, 在OpenJDK Wiki上可以看到對它的介紹,它將使用Fiber輕量級用戶模式線程,從jvm層面對多線程技術進行徹底的改變,使用新的編程模型,使輕量級線程的並發也能夠適用於高吞吐量的業務場景。

Quasar和Loom的相關的文檔放在下面,有興趣的小伙伴們可以自己看一下。

Quasar git:https://github.com/puniverse/quasar

Quasar api:http://docs.paralleluniverse.co/quasar/javadoc/

OpenJdk Wiki:https://wiki.openjdk.java.net/display/loom/Main

那么,這次的分享就到這里,我是Hydra,下期見。

作者簡介,碼農參上,一個熱愛分享的公眾號,有趣、深入、直接,與你聊聊技術。個人微信DrHydra9,歡迎添加好友,進一步交流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM