java 協程


協程是比線程更輕量級的程序處理單元,也可以說是運行在線程上的線程,由自己控制

 1.適用於被阻塞的,且需要大量並發的場景。

2.不適用於,大量計算的多線程,遇到此種情況,更好實用線程去解決。

 

雖然Java的線程的API封裝的很好,使用起來非常的方便,但是使用起來也得小心。首先線程需要耗費資源,所以單個的機器上創建上萬個線程很困難,其次線程之間的切換也需要耗費CPU,在線程非常多的情況下導致很多CPU資源耗費在線程切換上,通過提高線程數來提高系統的性能有時候適得其反。你可以看到現在一些優秀的框架如Netty都不會創建很多的線程,默認2倍的CPU core的線程數就已經應付的很好了,比如node.js可以使用單一的進程/線程應付高並發。

纖程使用的資源更少,它主要保存棧信息,所以一個系統中可以創建上萬的纖程Fiber,而實際的纖程調度器只需要幾個Java線程即可。

我們看一個性能的比較,直觀的感受一下Quasar帶來的吞吐率的提高。

下面這個例子中方法m1調用m2,m2調用m3,但是m2會暫停1秒鍾,用來模擬實際產品中的阻塞,m3執行了一個簡單的計算。
通過線程和纖程兩種方式我們看看系統的吞吐率(throughput)和延遲(latency)。

package com.zhou.quasar.quasar;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.SuspendableRunnable;

public class Helloworld {
    @Suspendable
    static void m1() throws InterruptedException, SuspendExecution {
        String m = "m1";
        // System.out.println("m1 begin");
        m = m2();
        // System.out.println("m1 end");
        // System.out.println(m);
    }

    static String m2() throws SuspendExecution, InterruptedException {
        String m = m3();
        Strand.sleep(1000);
        return m;
    }

    @Suspendable
    static String m3() {
        List l = Stream.of(1, 2, 3).filter(i -> i % 2 == 0).collect(Collectors.toList());
        return l.toString();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int count = 10000;
        testFiber(count);
        testThreadpool(count);
    }

    static void testThreadpool(int count) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(count);
        ExecutorService es = Executors.newFixedThreadPool(200);
        LongAdder latency = new LongAdder();
        LongAdder counter = new LongAdder();
        long t = System.currentTimeMillis();
        WrappyInteger sum=new WrappyInteger(0);
        for (int i = 0; i < count; i++) {
            es.submit(() -> {
//                Long long1=sum;
                long start = System.currentTimeMillis();
                try {
                    m1();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (SuspendExecution suspendExecution) {
                    suspendExecution.printStackTrace();
                }
                start = System.currentTimeMillis() - start;
                latency.add(start);
                counter.add(1);
                sum.i++;
                latch.countDown();
            });
        }
        latch.await();
        t = System.currentTimeMillis() - t;
        long l = latency.longValue() / count;
        System.out.println("thread pool took: " + t + ", latency: " + l + " ms------"+counter.longValue()+"sum ---"+sum.i);
        es.shutdownNow();
    }

    static void testFiber(int count) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(count);
        LongAdder latency = new LongAdder();
        LongAdder counter = new LongAdder();
        WrappyInteger sum=new WrappyInteger(0);
        long t = System.currentTimeMillis();
        for (int i = 0; i < count; i++) {
            new Fiber<Void>("Caller", new SuspendableRunnable() {
                @Override
                public void run() throws SuspendExecution, InterruptedException {
                    long start = System.currentTimeMillis();
                    m1();
                    start = System.currentTimeMillis() - start;
                    counter.add(1);
                    latency.add(start);
                    sum.i++;
                    latch.countDown();
                }
            }).start();
        }
        latch.await();
        t = System.currentTimeMillis() - t;
        long l = latency.longValue() / count;
        System.out.println("fiber took: " + t + ", latency: " + l + " ms,------"+counter+"--sum"+sum.i);
    }
    
    static class WrappyInteger{
        public int i;

        public WrappyInteger(int i) {
            this.i = i;
        }
        
    }
}

運行這個程序(需要某種instrument, agent--啟動時java代理,或者AOT--編譯時已完成代理時工作,或者其它,在下面會介紹),輸出結果為:

QUASAR WARNING: Quasar Java Agent isn't running. If you're using another instrumentation method you can ignore this message; otherwise, please refer to the Getting Started section in the Quasar documentation.
fiber took: 1382, latency: 1015 ms,------10000--sum10000
thread pool took: 50184, latency: 1000 ms------10000sum ---9999

1、Quasar Java Agent
Quasar java agent可以在運行時動態修改字節碼,將下面一行加搭配java命令行中即可,注意把path-to-quasar-jar.jar替換成你實際的quasar java的地址。

-javaagent:path-to-quasar-jar.jar
2、 AOT(Ahead-of-Time)
另外一種是在編譯時的時候完成instrumentation
 
<plugin>
    <groupId>com.vlkan</groupId>
    <artifactId>quasar-maven-plugin</artifactId>
    <version>0.7.3</version>
    <configuration>
        <check>true</check>
        <debug>true</debug>
        <verbose>true</verbose>
    </configuration>
    <executions>
        <execution>
            <phase>compile</phase>
            <goals>
                <goal>instrument</goal>
            </goals>
        </execution>
    </executions>
</plugin>

3、在Web容器中
如果你使用web容器使用基於Quasar的庫comsat等,比如Tomcat,則比較棘手。因為你不太像將Quasar java agent直接加到tomcat的啟動腳本中,這樣會instrument所有的應用,導致很多的警告。

Comsat提供了Tomcat和Jetty的解決方案。

Tomcat
對於tomcat,你可以把comsat-tomcat-loader-0.7.0-jdk8.jar或者comsat-tomcat-loader-0.7.0.jar加入到tomcat的common/lib或者lib中,然后在你的web應用META-INF/context.xml中加入:

1
< Loader loaderClass = "co.paralleluniverse.comsat.tomcat.QuasarWebAppClassLoader" />

Jetty
如果使用Jetty,則把comsat-jetty-loader-0.7.0-jdk8.jar或者comsat-jetty-loader-0.7.0.jar加入到Jetty的lib中,然后在你的context.xml中加入<Set name="classLoader">:

1
2
3
4
5
6
7
8
9
10
11
< Configure id = "ctx" class = "org.eclipse.jetty.webapp.WebAppContext" >
     < Set name = "war" >./build/wars/dep.war</ Set >
     <!--use custom classloader in order to instrument classes by quasar-->
     < Set name = "classLoader" >
         < New class = "co.paralleluniverse.comsat.jetty.QuasarWebAppClassLoader" >
             < Arg >
                 < Ref id = "ctx" />
             </ Arg >
         </ New >
     </ Set >
</ Configure >

總之,通過實現一個定制的ClassLoader實現instrumentation。

官方提供文檔地址: http://docs.paralleluniverse.co/quasar/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM