來源:鳥窩,
colobu.com/2016/03/31/vertx-thread-model/
Vert.x是一個在JVM開發reactive應用的框架,可用於開發異步、可伸縮、高並發的Web應用(雖然不限於web應用)。其目的在於為JVM提供一個Node.js的替代方案。開發者可以通過它使用JavaScript、Ruby、Groovy、Java,甚至是混合語言來編寫應用。
使用Vertx.x框架,可以用JavaScript、CoffeeScript、Ruby、Python、Groovy或Java開發應用程序的組件,最終應用程序可以是混合語言構建的。
本文試圖揭示Vert.x的線程模型的應用,通過源代碼分析Vert.x如何使用線程池處理請求的,以及比較Netty和Vert.x使用線程池的異同。
也許你覺得奇怪,默認啟動一個Vert.x Verticle實例,它只用一個線程處理事件,在多核的情況下你需要創建多個Verticle實例以充分利用多個CPU Core的性能。
Vert.x 實例
首先先啰嗦地介紹一些Vert.x概念,熟悉Vert.x開發的朋友可以跳過這一節
在Vert.x里,如果你不使用Vertx對象,你幾乎是寸步難行。
Vertx對象扮演着Vert.x控制中心的角色,同時它也提供了大量的功能,例如:
編寫TCP客戶端和服務器
編寫HTTP客戶端和服務器,包括websocket
Event bus
共享數據
定時器
發布和卸載Verticle
UDP
DNS client
文件系統訪問
高可用
集群
如果你將Vert.x嵌入到你的應用程序中,你可以向下面這樣獲得一個Vertx對象的引用
Vertx vertx = Vertx.vertx();
當你實例化Vertx對象時,如果你感覺默認的參數不符合你的需求,你可以指定實例化時的參數:
Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));
VertxOptions對象擁有很多關於Vertx實例設置,例如配置集群,高可用設置,線程池大小以及等等其他參數。下面就介紹一下它的線程池。
1 線程池
1、eventLoopGroup
這個對象是NioEventLoopGroup的一個實例,它的線程池的大小由options.getEventLoopPoolSize()決定,如果沒有設置,默認為CPU核數 * 2。
eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false);
eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);
它的EventLoop和一個Context對應:
protected ContextImpl(……) {
……
EventLoopGroup group = vertx.getEventLoopGroup();
if (group != null) {
this.eventLoop = group.next();
} else {
this.eventLoop = null;
}
……
}
它用來執行標准的Verticle。
2、WorkerPool
用來執行worker Verticle。
workerPool = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true));
3、Internal Blocking Pool
內部使用的線程池,可以用來將阻塞代碼異步化。
internalBlockingPool = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true));
不要在event loop中執行阻塞操作, 比如訪問數據庫或者網絡資源,這絕對會影響你的應用的性能。對於這些阻塞操作,你可以將它們異步化:
vertx.executeBlocking(future -> {
// 下面這行代碼可能花費很長時間
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});
默認情況下executeBlocking會在同一個context中執行(同一個verticle實例),它們會串行化執行。如果不關心這個執行的順序,可以將ordered參數設為false,它們會在worker pool線程池中並行的執行。
另外一種執行阻塞代碼的方式就是使用worker verticle,worker verticle總是在worker pool線程池中執行。
2 Verticle
Verticle有點類似Actor模型,也可以實現並發的,可擴展的,易於發布的模型。
一個vert.x應用可以包含多個verticle實例,實例之間可以通過event bus通訊。
2.1 三種類型
http://vertx.io/docs/vertx-core/java/#_verticle_types
1、Standard Verticle: 最通用的類型,總是在event loop中執行。
2、Worker Verticle:它們使用worker pool線程池運行。一個verticle實例絕對不會在兩個或者更多線程中並發執行。
3、Multi-threaded worker verticle:它們使用worker pool線程池運行。 一個verticle實例可以在多個線程中並發執行。
實現一個Verticle很簡單:
public class MyVerticle extends AbstractVerticle {
// 當發布verticle時調用
public void start() {
}
// 可以不實現。當 verticle 卸載時調用
public void stop() {
}
}
2.2 發布方式
1、命令行方式
vertx run SomeJavaSourceFile.java
或者通過maven-shade-plugin打包成一個fat包:
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>io.vertx.core.Starter</Main-Class>
<Main-Verticle>com.colobu.vertx.Main</Main-Verticle>
</manifestEntries>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/io.vertx.core.spi.VerticleFactory</resource>
</transformer>
</transformers>
然后運行 java -jar xxx-fat.jar,你還可以傳遞一些參數。
2、編程方式
你也可以編程的方式,通過vertx.deployVerticle發布:
public class Main extends AbstractVerticle {
public static void main(String[] args) {
VertxOptions vo = new VertxOptions();
vo.setEventLoopPoolSize(16);
Vertx vertx = Vertx.vertx(vo);
DeploymentOptions options = new DeploymentOptions();
options.setInstances(100);
vertx.deployVerticle(Main.class.getName(), options, e -> {
System.out.println(e.succeeded());
System.out.println(e.failed());
System.out.println(e.cause());
System.out.println(e.result());
});
}
@Override
public void start() {
Handler<HttpServerRequest> handler = e -> {
HttpServerResponse response = e.response();
response.putHeader("content-type", "application/json").end("Hello world");
};
vertx.createHttpServer().requestHandler(handler).listen(8080);
}
}
Verticle發布和Vert.x線程模型
以上比較啰嗦,主要介紹了一些Vert.x的一些概念。下面是我想重點介紹的內容。
本節以實現一個簡單的http server為例(編程方式發布Verticle),分析 vert.x 的線程和Verticle的關系。只分析標准的Verticle。代碼如上。
1 Verticle發布過程
首先先創建一個Vertx實例,可以你可以通過VertxOptions設置線程池的大小。上面的例子中設置Event Loop線程池的大小為16:
vo.setEventLoopPoolSize(16);
因此即使你創建幾百個Verticle,也只會有16個Event Loop處理它們,你可以通過jstack查看這些線程。你會看到多個名為vert.x-eventloop-thread-<num>的線程,一個vertx-blocked-thread-checker線程,一個vert.x-acceptor-thread-0。
調用void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler)方法發布Verticle。
DeploymentOptions對象可以設置發布參數,比如是否是worker verticle,多線程worker verticle, ha, 隔離組等, 重要的是instances,它用來指定分布的Verticle實例的數量,默認是一個。
底層調用DeploymentManager的doDeployVerticle來實現,它會根據實例數創建相應多的Verticle,然后調用doDeploy發布這些Verticle:
Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
我將doDeploy方法簡化,讓我們看一下關鍵代碼:
private void doDeploy(String identifier, String deploymentID, DeploymentOptions options,
ContextImpl parentContext,
ContextImpl callingContext,
Handler<AsyncResult<String>> completionHandler,
ClassLoader tccl, Verticle... verticles) {
//准備工作
……
for (Verticle verticle: verticles) {
//創建上下文
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, conf, tccl) :
vertx.createEventLoopContext(deploymentID, conf, tccl);
deployment.addVerticle(new VerticleHolder(verticle, context));
context.runOnContext(v -> {
try {
verticle.init(vertx, context);
Future<Void> startFuture = Future.future();
verticle.start(startFuture);
startFuture.setHandler(……);
} catch (Throwable t) {}
});
}
}
可以看到#11 行創建了一個上下文ContextImpl, 因為本例中我們不用worker模式,所以這個上下文是通過vertx.createEventLoopContext(deploymentID, conf, tccl)創建的。每個verticle都會創建一個新的上下文,因此verticle和上下文是意義對應的。
#17 行初始化verticle,#19 行啟動這個verticle。還記得我們的例子中實現的start方法嗎,它會在這里被調用。
這樣,多個verticle實例被發布了。
2 線程模型
首先插播一下Netty的線程模型,不感興趣的可以略過。
2.1 Netty的線程模型
雖然Vert.x底層籍由Netty實現,但是它的處理方式與Netty NIO的線程模型是不同的。
(以下談論的Netty線程模型是指NIO的情況)
比如下面的Netty代碼片段:
EventLoopGroup parentGroup = new NioEventLoopGroup(1);
EventLoopGroup childGroup = new NioEventLoopGroup(50);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>(){……});
Channel ch = b.bind("0.0.0.0",8080).sync().channel();
ch.closeFuture().sync();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
NioEventLoopGroup代表一組EventLoop,每個EventLoop映射一個線程,每個Channel注冊一個EventLoop,但是一個EventLoop可以關聯多個Channel。
parentGroup用來處理Accept事件,而childGroup用來處理其余的IO事件。當有並發連接的時候,Handler會在childGroup線程池中執行。你可以指定childGroup的線程數量,如果沒有指定,則從系統屬性中讀取”io.netty.eventLoopThreads”,如果這個屬性沒有設置,則使用CPU核數 2 (Runtime.getRuntime().availableProcessors() 2))。一般parentGroup設置為1,我們只需要一個Acceptor處理客戶端的連接即可。
當有多個並發連接的時候,每個連接/Channel被分配到一個EventLoop上。EventLoop選擇是均勻地 (如果線程數是2的n次方,可以用比較快的選擇方法PowerOfTwoEventExecutorChooser):
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[childIndex.getAndIncrement() & children.length - 1];
}
}
private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}
因此一旦如果某個EventLoop處理慢了,則這個線程上的event可能出現堆積。
比如下面的代碼故意在某個線程上處理慢一些,導致這個EventLoop上出現堆積,Netty並沒有根據壓力將時間分配到其它處理快的EventLoop上。
public class HelloServerHandler extends ChannelInboundHandlerAdapter {
……
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String name = Thread.currentThread().getName();
System.out.println(name);
if (name.endsWith("-5")) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
……
}
輸出結果可以看到nioEventLoopGroup-3-5處理了同樣多的請求,而且都堆積在后面了。
……
nioEventLoopGroup-3-19
nioEventLoopGroup-3-18
nioEventLoopGroup-3-19
nioEventLoopGroup-3-18
nioEventLoopGroup-3-20
nioEventLoopGroup-3-20
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
nioEventLoopGroup-3-5
因此,我們可以了解到,當啟動一個NIO方式的Netty實例的時候,它會使用一個線程池來處理http請求。
Netty 4.0的線程模型被很好的重定義,一個ChannelHandler實例的方法不會被並發的調用,除非它被@Sharable標記,因此你不應該增加一個ChannelHandler 實例多次。當你增加一個handler到ChannelPipeline中時,你可以指定一個特定的EventExecutorGroup來執行這個handler。如果沒有指定,則使用Channel注冊的EventLoop來執行。如果兩個Handler被指定不同的EventExecutorGroup,則它們會並發執行,因此如果它們會訪問共享數據的化,你需要關注並發控制的問題。更多內容可以查看 Netty的文檔。
2.2 Vert.x的線程模型
Vert.x如何在線程中處理事件的呢,還是以我們的例子分析。
回顧一下我們實現的Verticle的start方法。
@Override
public void start() {
Handler<HttpServerRequest> handler = e -> {
HttpServerResponse response = e.response();
response.putHeader("content-type", "application/json").end("Hello world");
};
vertx.createHttpServer().requestHandler(handler).listen(8080);
}
在這個start方法中,我們創建了一個http server,讓它監聽 8080端口, http request的處理交給handler執行。 那么監聽線程是哪一個?handler又是在哪個線程池中執行的呢?調用多個Verticle實例的方法為什么沒有出現”地址/端口被占用”的異常呢?
首先vertx.createHttpServer()會創建一個HttpServerImpl對象,可以通過HttpServerOptions配置更多的參數,每個Verticle實例都會創建一個HttpServerImpl對象。requestHandler(handler)方法設置處理器,你還可以使用Vert.x-Web設置路由的功能。
listen(8080)啟動http 服務器,它實際調用netty實現的。
我將listen方法簡化,去除一些檢查代碼和回調處理,只保留關鍵代碼如下:
public synchronized HttpServer listen(int port, String host, Handler<AsyncResult<HttpServer>> listenHandler) {
listenContext = vertx.getOrCreateContext();
listening = true;
synchronized (vertx.sharedHttpServers()) {
id = new ServerID(port, host);
HttpServerImpl shared = vertx.sharedHttpServers().get(id);
if (shared == null) {
serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers);
bootstrap.channelFactory(new VertxNioServerChannelFactory());
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
……
pipeline.addLast("handler", new ServerHandler());
}
});
addHandlers(this, listenContext);
vertx.sharedHttpServers().put(id, this);
actualServer = this;
} else {
// Server already exists with that host/port - we will use that
actualServer = shared;
addHandlers(actualServer, listenContext);
metrics = vertx.metricsSPI().createMetrics(this, new SocketAddressImpl(port, host), options);
}
}
return this;
}
#6 行可以看到它會檢查使用這個IP地址和端口的http server是否存在,如果存在的化直接跳到# 27行。因此回答上面的問題,多個Verticle實例不會引起沖突,因為它們會共享同一個http server。
這個http server通過netty ServerBootstrap創建。#10 行可以看到acceptor是一個單線程執行的,acceptorEventLoopGroup在VertxImpl中定義。
acceptorEventLoopGroup = new NioEventLoopGroup(1, acceptorEventLoopThreadFactory);
#10 行還顯示,netty的IO worker線程池由availableWorkers確定,它是一個VertxEventLoopGroup對象。VertxEventLoopGroup類擴展AbstractEventExecutorGroup,實現了EventLoopGroup接口:
……
@Override
public synchronized EventLoop next() {
if (workers.isEmpty()) {
throw new IllegalStateException();
} else {
EventLoop worker = workers.get(pos).worker;
pos++;
checkPos();
return worker;
}
}
public synchronized void addWorker(EventLoop worker) {
EventLoopHolder holder = findHolder(worker);
if (holder == null) {
workers.add(new EventLoopHolder(worker));
} else {
holder.count++;
}
}
……
線程的數量由worker的數量決定,worker的類型是EventLoop,對應一個線程,有多少worker就會有多少線程。
通過addWorker可以增加線程的數量,worker不會重復。
回到剛才的listen方法, #21 行addHandlers方法會配置handler在哪一個event loop中執行:
private void addHandlers(HttpServerImpl server, ContextImpl context) {
if (requestStream.handler() != null) {
server.reqHandlerManager.addHandler(requestStream.handler(), context);
}
if (wsStream.handler() != null) {
server.wsHandlerManager.addHandler(wsStream.handler(), context);
}
}
server.reqHandlerManager.addHandler方法如下:
public synchronized void addHandler(Handler<T> handler, ContextImpl context) {
EventLoop worker = context.nettyEventLoop();
availableWorkers.addWorker(worker);
Handlers<T> handlers = new Handlers<>();
Handlers<T> prev = handlerMap.putIfAbsent(worker, handlers);
if (prev != null) {
handlers = prev;
}
handlers.addHandler(new HandlerHolder<>(context, handler));
hasHandlers = true;
}
#2 行得到這個上下文的EventLoop。 還記得上下文的EventLoop怎么創建出來的嗎?每個Verticle實例關聯一個上下文,因此一個Verticle實例只會創建一個worker。
把這個worker加入到availableWorkers,這樣就增加了一個事件處理線程。
因此我們可以看出正常情況下Vert.x的每個Verticle實例只會用一個線程處理請求,在多核情況下一定要配置instance的數量。
如果配置的instance的數量大於eventLoopPoolSize數量,那么就會有一個Event Loop處理多個instance的情況。 線程配置的過多有時不會帶來性能的提升,由於線程也有context swicthing,反而會帶來性能的降低。