前一篇文章Java中的纖程庫 – Quasar中我做了簡單的介紹,現在進一步介紹這個纖程庫。
Quasar還沒有得到廣泛的應用,搜尋整個github也就pinterest/quasar-thrift這么一個像樣的使用Quasar的庫,並且官方的文檔也很簡陋,很多地方並沒有詳細的介紹,和Maven的集成也不是很好。這些都限制了Quasar的進一步發展。
但是,作為目前最好用的Java coroutine的實現,它在某些情況下的性能還是表現相當出色的,希望這個項目能夠得到更大的支持和快速發展。
因為Quasar文檔的缺乏,所以使用起來需要不斷的摸索和在論壇上搜索答案,本文將一些記錄了我在Quasar使用過程中的一些探索。
1 Thread vs Quasar
雖然Java的線程的API封裝的很好,使用起來非常的方便,但是使用起來也得小心。首先線程需要耗費資源,所以單個的機器上創建上萬個線程很困難,其次線程之間的切換也需要耗費CPU,在線程非常多的情況下導致很多CPU資源耗費在線程切換上,通過提高線程數來提高系統的性能有時候適得其反。你可以看到現在一些優秀的框架如Netty都不會創建很多的線程,默認2倍的CPU core的線程數就已經應付的很好了,比如node.js可以使用單一的進程/線程應付高並發。
纖程使用的資源更少,它主要保存棧信息,所以一個系統中可以創建上萬的纖程Fiber,而實際的纖程調度器只需要幾個Java線程即可。
我們看一個性能的比較,直觀的感受一下Quasar帶來的吞吐率的提高。
下面這個例子中方法m1
調用m2
,m2
調用m3
,但是m2
會暫停1秒鍾,用來模擬實際產品中的阻塞,m3
執行了一個簡單的計算。
通過線程和纖程兩種方式我們看看系統的吞吐率(throughput)和延遲(latency)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
public
class
Helloworld {
@Suspendable
static
void
m1()
throws
InterruptedException, SuspendExecution {
String m =
"m1"
;
//System.out.println("m1 begin");
m = m2();
//System.out.println("m1 end");
//System.out.println(m);
}
static
String m2()
throws
SuspendExecution, InterruptedException {
String m = m3();
Strand.sleep(
1000
);
return
m;
}
//or define in META-INF/suspendables
@Suspendable
static
String m3() {
List l = Stream.of(
1
,
2
,
3
).filter(i -> i%
2
==
0
).collect(Collectors.toList());
return
l.toString();
}
static
public
void
main(String[] args)
throws
ExecutionException, InterruptedException {
int
count =
10000
;
testThreadpool(count);
testFiber(count);
}
static
void
testThreadpool(
int
count)
throws
InterruptedException {
final
CountDownLatch latch =
new
CountDownLatch(count);
ExecutorService es = Executors.newFixedThreadPool(
200
);
LongAdder latency =
new
LongAdder();
long
t = System.currentTimeMillis();
for
(
int
i =
0
; i< count; i++) {
es.submit(() -> {
long
start = System.currentTimeMillis();
try
{
m1();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
catch
(SuspendExecution suspendExecution) {
suspendExecution.printStackTrace();
}
start = System.currentTimeMillis() - start;
latency.add(start);
latch.countDown();
});
}
latch.await();
t = System.currentTimeMillis() - t;
long
l = latency.longValue() / count;
System.out.println(
"thread pool took: "
+ t +
", latency: "
+ l +
" ms"
);
es.shutdownNow();
}
static
void
testFiber(
int
count)
throws
InterruptedException {
final
CountDownLatch latch =
new
CountDownLatch(count);
LongAdder latency =
new
LongAdder();
long
t = System.currentTimeMillis();
for
(
int
i =
0
; i< count; i++) {
new
Fiber<Void>(
"Caller"
,
new
SuspendableRunnable() {
@Override
public
void
run()
throws
SuspendExecution, InterruptedException {
long
start = System.currentTimeMillis();
m1();
start = System.currentTimeMillis() - start;
latency.add(start);
latch.countDown();
}
}).start();
}
latch.await();
t = System.currentTimeMillis() - t;
long
l = latency.longValue() / count;
System.out.println(
"fiber took: "
+ t +
", latency: "
+ l +
" ms"
);
}
}
|
運行這個程序(需要某種instrument, agent或者AOT或者其它,在下面會介紹),輸出結果為:
1
2
|
thread pool took: 50341, latency: 1005 ms
fiber took: 1158, latency: 1000 ms
|
如果使用線程,執行完1萬個操作需要50秒,平均延遲為1秒左右(我們故意讓延遲至少1秒),線程池數量為200。(其實總時間50秒可以計算出來)
但是如果使用纖程,執行完1萬個操作僅需要1.158秒,平均延遲時間為1秒,線程數量為CPU core數(缺省使用ForkJoinPool)。
可以看到,通過使用纖程,盡受限於系統的業務邏輯,我們沒有辦法提升業務的處理時間, 但是我們確可以極大的提高系統的吞吐率,如上面的簡單的例子將10000個操作的處理時間從50秒提高到1秒,非凡的成就。
如果我們將方法m2
中的Strand.sleep(1000);
注釋掉,這樣這個例子中就沒有什么阻塞了,我們看看在這種純計算的情況下兩者的表現:
1
2
|
thread pool took: 114, latency: 0 ms
fiber took: 180, latency: 0 ms
|
可以看到,纖程非但沒有提升性能,反而會帶來性能的下降。對於這種純計算沒有阻塞的case,Quasar並不適合。
正如官方所說:
Fibers are not meant to replace threads in all circumstances. A fiber should be used when its body (the code it executes) blocks very often waiting on other fibers (e.g. waiting for messages sent by other fibers on a channel, or waiting for the value of a dataflow-variable). For long-running computations that rarely block, traditional threads are preferable. Fortunately, as we shall see, fibers and threads interoperate very well.
2 Suspendable方法
Fiber中的run方法,如SuspendableRunnable
和 SuspendableCallable
聲明了SuspendExecution
異常。這並不是一個真的異常,而是fiber內部工作的機制。任何運行在fiber中的可能阻塞的方法,如果聲明了這個異常,就被叫做 suspendable 方法。 如果你的方法調用了一個suspendable
方法,那么你的方法也是suspendable
方法,所以也需要聲明拋出SuspendExecution
異常。
有時候不能在某個方法上聲明拋出SuspendExecution
異常,比如你實現某個接口,你不能更改接口的方法聲明,你不得不使用其它的方法來指定suspendable
方法。方法之一就是使用@Suspendable
注解,在你需要指定的suspendable
方法上加上這個注解就可以告訴Quasar這個方法是suspendable
方法。
另一個情況就是對於第三的庫,你不可能更改它們的代碼,如果想指定這些庫的某些方法是suspendable
方法,比如java.net.URL.openStream()Ljava/io/InputStream;
, 就需要另外一種解決辦法,也就是在META-INF/suspendables
和META-INF/suspendable-supers
定義。
文件中每個方法占一行,具體(concrete)的suspendable
方法應該寫在META-INF/suspendables
中,non-suspendable
方法,但是有suspendable override
的類、接口寫在META-INF/suspendable-supers
中(可以是具體類單不能是final, 接口和抽象類也可以)。
每一行應該是方法的簽名的全稱“full.class.name.methodName” 以及*
通配符。
使用`SuspendablesScanner`可以自動增加你的方法到這些文件中,待會介紹它。
java.lang
包下的方法不能標記為suspendable
,其它的JDK方法則可以顯示地在文件META-INF/suspendables
和META-INF/suspendable-supers
中標記為suspendable
,並且設置環境變量co.paralleluniverse.fibers.allowJdkInstrumentation
為true,但是很少這樣使用。
還有一些特殊的情況也會被認為是suspendable
的。
反射調用總是被看作是suspendable
的。
Java 8 lambda也總是被看作suspendable
的。
構造函數/類初始化器不能被標記為suspendable
。
缺省情況下synchronized
和blocking thread 調用不能運行在Fiber中。這是因為它們會阻塞Fiber使用的線程,導致系統處理變慢,但是如果你非要在Fiber中使用它們,可以可以將allowMonitors
和allowBlocking
傳給instrumentation Ant task,或者將b
、m
傳給Quasar Java agent。
3 Maven配置
Quasar依賴字節碼的instrumentation, instrumentation用來修改字節碼。 Quasar可以在運行時或者編譯時修改字節碼,下面介紹這幾種實現。
1、Quasar Java Agent
Quasar java agent可以在運行時動態修改字節碼,將下面一行加搭配java命令行中即可,注意把path-to-quasar-jar.jar替換成你實際的quasar java的地址。
1
|
-javaagent:path-to-quasar-jar.jar
|
如果你使用maven的exec task,你可以使用maven-dependency-plugin
為依賴設置properties,然后在插件exec-maven-plugin中引用quasar庫即可。
詳細配置可以參考Specifying the Java Agent with Maven:。
Quasar對gradle的支持比較好,你可以方便的使用gradle配置。
這是首選的一種方式,因為在某些情況下,比如你使用第三方的庫,如comsat,它們只能使用這種方式配置。
2、AOT(Ahead-of-Time)
另外一種是在編譯時的時候完成instrumentation。
它是通過一個Ant Task來完成的,所以對於Maven管理的項目來說,配置起來有些麻煩。
這個Ant Task是co.paralleluniverse.fibers.instrument.InstrumentationTask
,包含在quasar-core.jar
中。它接受一組(fileset)classes進行instrument,但並不是傳給它的所有classes都需要classes進行instrument,只有suspendable
方法才有可能被instrument。它還會進行優化,有些suspendable
方法可能不需要instrument。
在Maven中配置起來有些復雜,如下面所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
<
plugin
>
<
groupId
>org.apache.maven.plugins</
groupId
>
<
artifactId
>maven-antrun-plugin</
artifactId
>
<
executions
>
<
execution
>
<
id
>instrument-classes</
id
>
<
phase
>compile</
phase
>
<
configuration
>
<
tasks
>
<
property
name
=
"ant_classpath"
refid
=
"maven.dependency.classpath"
/>
<
taskdef
name
=
"instrumentationTask"
classname
=
"co.paralleluniverse.fibers.instrument.InstrumentationTask"
classpath
=
"${co.paralleluniverse:quasar-core:jar:jdk8}"
/>
<
instrumentationTask
allowMonitors
=
"true"
allowBlocking
=
"true"
check
=
"true"
verbose
=
"true"
debug
=
"true"
>
<
fileset
dir
=
"${project.build.directory}/classes/"
includes
=
"**/*"
/>
</
instrumentationTask
>
</
tasks
>
</
configuration
>
<
goals
>
<
goal
>run</
goal
>
</
goals
>
</
execution
>
</
executions
>
</
plugin
>
<
plugin
>
<
artifactId
>maven-dependency-plugin</
artifactId
>
<
version
>2.5.1</
version
>
<
executions
>
<
execution
>
<
id
>getClasspathFilenames</
id
>
<
goals
>
<
goal
>properties</
goal
>
</
goals
>
</
execution
>
</
executions
>
</
plugin
>
|
Quasar官方並沒有提供一個maven插件,好心的社區倒是提供了一個quasar-maven-plugin。所以你可以不用上面的寫法,而是用下面簡單的寫法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
<
plugin
>
<
groupId
>com.vlkan</
groupId
>
<
artifactId
>quasar-maven-plugin</
artifactId
>
<
version
>0.7.3</
version
>
<
configuration
>
<
check
>true</
check
>
<
debug
>true</
debug
>
<
verbose
>true</
verbose
>
</
configuration
>
<
executions
>
<
execution
>
<
phase
>compile</
phase
>
<
goals
>
<
goal
>instrument</
goal
>
</
goals
>
</
execution
>
</
executions
>
</
plugin
>
|
3、在Web容器中
如果你使用web容器使用基於Quasar的庫comsat等,比如Tomcat,則比較棘手。因為你不太像將Quasar java agent直接加到tomcat的啟動腳本中,這樣會instrument所有的應用,導致很多的警告。
Comsat提供了Tomcat和Jetty的解決方案。
Tomcat
對於tomcat,你可以把comsat-tomcat-loader-0.7.0-jdk8.jar
或者comsat-tomcat-loader-0.7.0.jar
加入到tomcat的common/lib
或者lib
中,然后在你的web應用META-INF/context.xml
中加入:
1
|
<
Loader
loaderClass
=
"co.paralleluniverse.comsat.tomcat.QuasarWebAppClassLoader"
/>
|
Jetty
如果使用Jetty,則把comsat-jetty-loader-0.7.0-jdk8.jar
或者comsat-jetty-loader-0.7.0.jar
加入到Jetty的lib中,然后在你的context.xml中加入<Set name="classLoader">
:
1
2
3
4
5
6
7
8
9
10
11
|
<
Configure
id
=
"ctx"
class
=
"org.eclipse.jetty.webapp.WebAppContext"
>
<
Set
name
=
"war"
>./build/wars/dep.war</
Set
>
<!--use custom classloader in order to instrument classes by quasar-->
<
Set
name
=
"classLoader"
>
<
New
class
=
"co.paralleluniverse.comsat.jetty.QuasarWebAppClassLoader"
>
<
Arg
>
<
Ref
id
=
"ctx"
/>
</
Arg
>
</
New
>
</
Set
>
</
Configure
>
|
總之,通過實現一個定制的ClassLoader實現instrumentation。
4 Auto Suspendables Detection
quasar提供了一個ant task,可以實現自動偵測suspendable
方法,並可以把它們寫入到`META-INF/suspendables和
META-INF/suspendable-supers`。
但是官方並沒有詳細的介紹,而且也沒有相應的maven插件可以使用。
我們可以看看在gradle如何使用的,我們可以把偵測結果復制到maven中使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
apply plugin: 'java'
apply plugin: 'maven'
group = 'com.colobu.fiber'
version = '1.0'
description = """"""
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
maven { url "http://repo.maven.apache.org/maven2" }
}
dependencies {
compile group: 'co.paralleluniverse', name: 'quasar-core', version:'0.7.5', classifier:'jdk8'
compile group: 'co.paralleluniverse', name: 'comsat-httpclient', version:'0.7.0'
testCompile group: 'junit', name: 'junit', version:'4.12'
}
classes {
doFirst {
ant.taskdef(name: 'scanSuspendables',
classname: 'co.paralleluniverse.fibers.instrument.SuspendablesScanner',
classpath: "build/classes/main:build/resources/main:${configurations.runtime.asPath}")
ant.scanSuspendables(auto: true,
suspendablesFile: "$sourceSets.main.output.resourcesDir/META-INF/suspendables",
supersFile: "$sourceSets.main.output.resourcesDir/META-INF/suspendable-supers",
append: true) {
fileset(dir: sourceSets.main.output.classesDir)
}
}
}
|
我們可以看一下官方的庫comsat的一些`META-INF/suspendables`例子:
1、comsat-okhttp
/META-INF/suspendables:
1
|
com.squareup.okhttp.apache.OkApacheClient.execute
|
2、comsat-httpclient
/META-INF/suspendables
1
2
|
org.apache.http.impl.client.CloseableHttpClient.doExecute
org.apache.http.impl.client.CloseableHttpClient.execute
|
5 故障檢測
當前quasar依賴字節碼的instrumentation,所以suspendable方法必須在運行之前進行標記。
Quasar開發組和OpenJDK協作,將在JDK9中移除這個限制,將會有效地自動地實現instrumentation。
如果你忘記將一個方法標記為suspendable
(throws SuspendExecution、@Suspendable或者META-INF/suspendables/META-INF/suspendable-supers),你可能會遇到一些奇怪的錯誤。
環境變量co.paralleluniverse.fibers.verifyInstrumentation
設為true可以檢查未標記的方法。但是在生產環境中不要設置它。
UnableToInstrumentException
異常表明quasar不能instrument一些方法如synchronized
或者阻塞的線程調用。verbose(v), debug(d) 和 check(c)可以打印出詳細信息。
更多的調試可以參考:troubleshooting。
6 其它
Fiber可以序列化。
Fiber也可以打印它的堆棧進行調試。
Fiber也有Actor和Channel的實現,並且可以運行在集群上。
轉自:
http://www.importnew.com/23314.html
http://docs.paralleluniverse.co/quasar/#specifying-the-java-agent-with-gradle
附:
1 <!-- quasar-core --> 2 <dependency> 3 <groupId>co.paralleluniverse</groupId> 4 <artifactId>quasar-core</artifactId> 5 <version>${quasar.version}</version> 6 </dependency> 7 <!-- comsat-httpclient --> 8 <dependency> 9 <groupId>co.paralleluniverse</groupId> 10 <artifactId>comsat-httpclient</artifactId> 11 <version>${comsat.version}</version> 12 </dependency> 13 <!-- comsat-spring-boot --> 14 <dependency> 15 <groupId>co.paralleluniverse</groupId> 16 <artifactId>comsat-spring-boot</artifactId> 17 <version>${comsat.version}</version> 18 <exclusions> 19 <exclusion> 20 <groupId>org.slf4j</groupId> 21 <artifactId>log4j-over-slf4j</artifactId> 22 </exclusion> 23 </exclusions> 24 </dependency>