協程是比線程更輕量級的程序處理單元,也可以說是運行在線程上的線程,由自己控制
1.適用於被阻塞的,且需要大量並發的場景。
2.不適用於,大量計算的多線程,遇到此種情況,更好實用線程去解決。
雖然Java的線程的API封裝的很好,使用起來非常的方便,但是使用起來也得小心。首先線程需要耗費資源,所以單個的機器上創建上萬個線程很困難,其次線程之間的切換也需要耗費CPU,在線程非常多的情況下導致很多CPU資源耗費在線程切換上,通過提高線程數來提高系統的性能有時候適得其反。你可以看到現在一些優秀的框架如Netty都不會創建很多的線程,默認2倍的CPU core的線程數就已經應付的很好了,比如node.js可以使用單一的進程/線程應付高並發。
纖程使用的資源更少,它主要保存棧信息,所以一個系統中可以創建上萬的纖程Fiber,而實際的纖程調度器只需要幾個Java線程即可。
我們看一個性能的比較,直觀的感受一下Quasar帶來的吞吐率的提高。
下面這個例子中方法m1
調用m2
,m2
調用m3
,但是m2
會暫停1秒鍾,用來模擬實際產品中的阻塞,m3
執行了一個簡單的計算。
通過線程和纖程兩種方式我們看看系統的吞吐率(throughput)和延遲(latency)。
package com.zhou.quasar.quasar; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import java.util.stream.Stream; import co.paralleluniverse.fibers.Fiber; import co.paralleluniverse.fibers.SuspendExecution; import co.paralleluniverse.fibers.Suspendable; import co.paralleluniverse.strands.Strand; import co.paralleluniverse.strands.SuspendableRunnable; public class Helloworld { @Suspendable static void m1() throws InterruptedException, SuspendExecution { String m = "m1"; // System.out.println("m1 begin"); m = m2(); // System.out.println("m1 end"); // System.out.println(m); } static String m2() throws SuspendExecution, InterruptedException { String m = m3(); Strand.sleep(1000); return m; } @Suspendable static String m3() { List l = Stream.of(1, 2, 3).filter(i -> i % 2 == 0).collect(Collectors.toList()); return l.toString(); } public static void main(String[] args) throws ExecutionException, InterruptedException { int count = 10000; testFiber(count); testThreadpool(count); } static void testThreadpool(int count) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(count); ExecutorService es = Executors.newFixedThreadPool(200); LongAdder latency = new LongAdder(); LongAdder counter = new LongAdder(); long t = System.currentTimeMillis(); WrappyInteger sum=new WrappyInteger(0); for (int i = 0; i < count; i++) { es.submit(() -> { // Long long1=sum; long start = System.currentTimeMillis(); try { m1(); } catch (InterruptedException e) { e.printStackTrace(); } catch (SuspendExecution suspendExecution) { suspendExecution.printStackTrace(); } start = System.currentTimeMillis() - start; latency.add(start); counter.add(1); sum.i++; latch.countDown(); }); } latch.await(); t = System.currentTimeMillis() - t; long l = latency.longValue() / count; System.out.println("thread pool took: " + t + ", latency: " + l + " ms------"+counter.longValue()+"sum ---"+sum.i); es.shutdownNow(); } static void testFiber(int count) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(count); LongAdder latency = new LongAdder(); LongAdder counter = new LongAdder(); WrappyInteger sum=new WrappyInteger(0); long t = System.currentTimeMillis(); for (int i = 0; i < count; i++) { new Fiber<Void>("Caller", new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { long start = System.currentTimeMillis(); m1(); start = System.currentTimeMillis() - start; counter.add(1); latency.add(start); sum.i++; latch.countDown(); } }).start(); } latch.await(); t = System.currentTimeMillis() - t; long l = latency.longValue() / count; System.out.println("fiber took: " + t + ", latency: " + l + " ms,------"+counter+"--sum"+sum.i); } static class WrappyInteger{ public int i; public WrappyInteger(int i) { this.i = i; } } }
運行這個程序(需要某種instrument, agent--啟動時java代理,或者AOT--編譯時已完成代理時工作,或者其它,在下面會介紹),輸出結果為:
QUASAR WARNING: Quasar Java Agent isn't running. If you're using another instrumentation method you can ignore this message; otherwise, please refer to the Getting Started section in the Quasar documentation. fiber took: 1382, latency: 1015 ms,------10000--sum10000 thread pool took: 50184, latency: 1000 ms------10000sum ---9999
1、Quasar Java Agent
Quasar java agent可以在運行時動態修改字節碼,將下面一行加搭配java命令行中即可,注意把path-to-quasar-jar.jar替換成你實際的quasar java的地址。
另外一種是在編譯時的時候完成instrumentation
<plugin> <groupId>com.vlkan</groupId> <artifactId>quasar-maven-plugin</artifactId> <version>0.7.3</version> <configuration> <check>true</check> <debug>true</debug> <verbose>true</verbose> </configuration> <executions> <execution> <phase>compile</phase> <goals> <goal>instrument</goal> </goals> </execution> </executions> </plugin>
3、在Web容器中
如果你使用web容器使用基於Quasar的庫comsat等,比如Tomcat,則比較棘手。因為你不太像將Quasar java agent直接加到tomcat的啟動腳本中,這樣會instrument所有的應用,導致很多的警告。
Comsat提供了Tomcat和Jetty的解決方案。
Tomcat
對於tomcat,你可以把comsat-tomcat-loader-0.7.0-jdk8.jar
或者comsat-tomcat-loader-0.7.0.jar
加入到tomcat的common/lib
或者lib
中,然后在你的web應用META-INF/context.xml
中加入:
1
|
<
Loader
loaderClass
=
"co.paralleluniverse.comsat.tomcat.QuasarWebAppClassLoader"
/>
|
Jetty
如果使用Jetty,則把comsat-jetty-loader-0.7.0-jdk8.jar
或者comsat-jetty-loader-0.7.0.jar
加入到Jetty的lib中,然后在你的context.xml中加入<Set name="classLoader">
:
1
2
3
4
5
6
7
8
9
10
11
|
<
Configure
id
=
"ctx"
class
=
"org.eclipse.jetty.webapp.WebAppContext"
>
<
Set
name
=
"war"
>./build/wars/dep.war</
Set
>
<!--use custom classloader in order to instrument classes by quasar-->
<
Set
name
=
"classLoader"
>
<
New
class
=
"co.paralleluniverse.comsat.jetty.QuasarWebAppClassLoader"
>
<
Arg
>
<
Ref
id
=
"ctx"
/>
</
Arg
>
</
New
>
</
Set
>
</
Configure
>
|
總之,通過實現一個定制的ClassLoader實現instrumentation。