io.vertx.core.impl.BlockedThreadChecker 警告: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2204 ms, time limit is 2000 二月 02, 2020 1:10:01 上午 io.vertx.core.impl.BlockedThreadChecker 警告: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3210 ms, time limit is 2000 二月 02, 2020 1:10:02 上午 io.vertx.core.impl.BlockedThreadChecker 警告: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 4210 ms, time limit is 2000 二月 02, 2020 1:10:03 上午 io.vertx.core.impl.BlockedThreadChecker 警告: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 5211 ms, time limit is 2000 io.vertx.core.VertxException: Thread blocked at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:254) at io.netty.channel.DefaultChannelPromise.awaitUninterruptibly(DefaultChannelPromise.java:135) at io.netty.channel.DefaultChannelPromise.awaitUninterruptibly(DefaultChannelPromise.java:28)
解決方案:運行ip地址與本機ip地址不一至,修改成本機ip地址
config.setJdbcUrl("jdbc:mysql://localhost:3306/endv_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8");
ip 應為 localhost 原因 127.0.0.1 可能未允許連接,換成可連接的數據庫地址
Thread vertx-eventloop-thread-3 has been blocked for 20458 ms
快速解決:
WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool"); executor.executeBlocking(future -> { // 調用一些需要耗費顯著執行時間返回結果的阻塞式API String result = someAPI.blockingMethod("hello"); future.complete(result); }, res -> { System.out.println("The result is: " + res.result()); });
了解更多:
https://vertxchina.github.io/vertx-translation-chinese/core/Core.html
如果您的應用程序沒有響應,可能這是一個跡象,表明您在某個地方阻塞了Event Loop。為了幫助您診斷類似問題,若 Vert.x 檢測到 Event Loop 有一段時間沒有響應,將會自動記錄這種警告。若您在日志中看到類似警告,那么您需要檢查您的代碼。比如:
Thread vertx-eventloop-thread-3 has been blocked for 20458 ms
Vert.x 還將提供堆棧跟蹤,以精確定位發生阻塞的位置。
如果想關閉這些警告或更改設置,您可以在創建 Vertx
對象之前在 VertxOptions
中完成此操作。
運行阻塞式代碼
在一個完美的世界中,不存在戰爭和飢餓,所有的API都將使用異步方式編寫,兔兔和小羊羔將會在陽光明媚的綠色草地上手牽手地跳舞。
但是……真實世界並非如此(您最近看新聞了吧?)
事實是,很多,也非所有的庫,特別是在JVM生態系統中有很多同步API,這些API中許多方法都是阻塞式的。一個很好的例子就是 JDBC API,它本質上是同步的,無論多么努力地去嘗試,Vert.x都不能像魔法小精靈撒塵變法一樣將它轉換成異步API。
我們不會將所有的內容重寫成異步方式,所以我們為您提供一種在 Vert.x 應用中安全調用"傳統"阻塞API的方法。
如之前討論,您不能在 Event Loop 中直接調用阻塞式操作,因為這樣做會阻止 Event Loop 執行其他有用的任務。那您該怎么做?
可以通過調用 executeBlocking
方法來指定阻塞式代碼的執行以及阻塞式代碼執行后處理結果的異步回調。
vertx.executeBlocking(future -> { // 調用一些需要耗費顯著執行時間返回結果的阻塞式API String result = someAPI.blockingMethod("hello"); future.complete(result); }, res -> { System.out.println("The result is: " + res.result()); });
默認情況下,如果 executeBlocking
在同一個上下文環境中(如:同一個 Verticle 實例)被調用了多次,那么這些不同的 executeBlocking
代碼塊會 順序執行(一個接一個)。
若您不需要關心您調用 executeBlocking
的順序,可以將 ordered
參數的值設為 false
。這樣任何executeBlocking
都會在 Worker Pool 中並行執行。
另外一種運行阻塞式代碼的方法是使用 Worker Verticle。
一個 Worker Verticle 始終會使用 Worker Pool 中的某個線程來執行。
默認的阻塞式代碼會在 Vert.x 的 Worker Pool 中執行,通過 setWorkerPoolSize
配置。
可以為不同的用途創建不同的池:
WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool"); executor.executeBlocking(future -> { // 調用一些需要耗費顯著執行時間返回結果的阻塞式API String result = someAPI.blockingMethod("hello"); future.complete(result); }, res -> { System.out.println("The result is: " + res.result()); });
Worker Executor 在不需要的時候必須被關閉:
executor.close();
當使用同一個名字創建了許多 worker 時,它們將共享同一個 pool。當所有的 worker executor 調用了close
方法被關閉過后,對應的 worker pool 會被銷毀。
如果 Worker Executor 在 Verticle 中創建,那么 Verticle 實例銷毀的同時 Vert.x 將會自動關閉這個 Worker Executor。
Worker Executor 可以在創建的時候配置:
int poolSize = 10; // 2分鍾 long maxExecuteTime = 120000; WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime);
請注意:這個配置信息在 worker pool 創建的時候設置。
線程模型概述
Vert.x 的線程模型設計的非常巧妙。總的來說,Vert.x 中主要有兩種線程:Event Loop 線程 和 Worker 線程。其中,Event Loop 線程結合了 Netty 的 EventLoop
,用於處理事件。每一個 EventLoop
都與唯一的線程相綁定,這個線程就叫 Event Loop 線程。Event Loop 線程不能被阻塞,否則事件將無法被處理。
Worker 線程用於執行阻塞任務,這樣既可以執行阻塞任務而又不阻塞 Event Loop 線程。
如果像 Node.js 一樣只有單個 Event Loop 的話就不能充分利用多核 CPU 的性能了。為了充分利用多核 CPU 的性能,Vert.x 中提供了一組 Event Loop 線程。每個 Event Loop 線程都可以處理事件。為了保證線程安全,防止資源爭用,Vert.x 保證了某一個 Handler
總是被同一個 Event Loop 線程執行,這樣不僅可以保證線程安全,而且還可以在底層對鎖進行優化提升性能。所以,只要開發者遵循 Vert.x 的線程模型,開發者就不需要再擔心線程安全的問題,這是非常方便的。
本篇文章將底層的角度來解析 Vert.x 的線程模型。對應的 Vert.x 版本為 3.3.3。
Event Loop 線程
首先回顧一下 Event Loop 線程,它會不斷地輪詢獲取事件,並將獲取到的事件分發到對應的事件處理器中進行處理:
Vert.x 線程模型中最重要的一點就是:永遠不要阻塞 Event Loop 線程。因為一旦處理事件的線程被阻塞了,事件就會一直積壓着不能被處理,整個應用也就不能正常工作了。
Vert.x 中內置一種用於檢測 Event Loop 是否阻塞的線程:vertx-blocked-thread-checker
。一旦 Event Loop 處理某個事件的時間超過一定閾值(默認為 2000 ms)就會警告,如果阻塞的時間過長就會拋出異常。Block Checker 的實現原理比較簡單,底層借助了 JUC 的 TimerTask
,定時計算每個 Event Loop 線程的處理事件消耗的時間,如果超時就進行相應的警告。
Vert.x Thread
Vert.x 中的 Event Loop 線程及 Worker 線程都用 VertxThread
類表示,並通過 VertxThreadFactory
線程工廠來創建。VertxThreadFactory
創建 Vert.x 線程的過程非常簡單:
1
2
3
4
5
6
7
8
9
10
11
12
|
public Thread newThread(Runnable runnable) {
VertxThread t =
new VertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecTime);
if (checker != null) {
checker.registerThread(t);
}
addToMap(t);
t.setDaemon(
false);
return t;
}
|
除了創建 VertxThread
線程之外,VertxThreadFactory
還會將此線程注冊至 Block Checker 線程中以監視線程的阻塞情況,並且將此線程添加至內部的 weakMap
中。這個 weakMap
作用只有一個,就是在注銷對應的 Verticle 的時候可以將每個 VertxThread
中的 Context
實例清除(unset)。為了保證資源不被一直占用,這里使用了 WeakHashMap
來存儲每一個 VertxThread
。當里面的 VertxThread
的引用不被其他實例持有的時候,它就會被標記為可清除的對象,等待 GC。
至於 VertxThread
,它其實就是在普通線程的基礎上存儲了額外的數據(如對應的 Vert.x Context,最大執行時長,當前執行時間,是否為 Worker 線程等),這里就不多講了。
Vert.x Context
Vert.x 底層中一個重要的概念就是 Context
,每個 Context
都會綁定着一個 Event Loop 線程(而一個 Event Loop 線程可以對應多個 Context
)。我們可以把 Context
看作是控制一系列的 Handler
的執行作用域及順序的上下文對象。
每當 Vert.x 底層將事件分發至 Handler
的時候,Vert.x 都會給此 Handler
綁定一個 Context
用於處理任務:
- 如果當前線程是 Vert.x 線程(
VertxThread
),那么 Vert.x 就會復用此線程上綁定的Context
;如果沒有對應的Context
就創建新的 - 如果當前線程是普通線程,就創建新的
Context
Vert.x 中存在三種 Context
,與之前的線程種類相對應:
EventLoopContext
WorkerContext
MultiThreadedWorkerContext
Event Loop Context
每個 Event Loop Context 都會對應着唯一的一個 EventLoop
,即一個 Event Loop Context 只會在同一個 Event Loop 線程上執行任務。在創建 Context
的時候,Vert.x 會自動根據輪詢策略選擇對應的 EventLoop
:
1
2
3
4
5
6
7
8
9
10
11
|
protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,
ClassLoader tccl) {
// ...
EventLoopGroup group = vertx.getEventLoopGroup();
if (group != null) {
this.eventLoop = group.next();
}
else {
this.eventLoop = null;
}
// ...
}
|
在 Netty 中,EventLoopGroup
代表一組 EventLoop
,而從中獲取 EventLoop
的方法則是 next
方法。EventLoopGroup
中 EventLoop
的數量由 CPU 核數所確定。Vert.x 這里使用了 Netty NIO 對應的 NioEventLoop
:
1
2
|
eventLoopGroup =
new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);
|
對應的輪詢算法:
1
2
3
4
5
6
7
8
|
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
}
else {
return new GenericEventExecutorChooser(executors);
}
}
|
可以看到,正常情況下 Netty 會用輪詢策略選擇 EventLoop
。特別地,如果 EventLoop
的個數是 2 的倍數的話,選擇的會快一些:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
// ...
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
// ...
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
|
我們可以在 Embedded 模式下測試一下 Event Loop 線程的分配:
1
2
3
4
5
6
7
|
System.out.println(Thread.currentThread());
Vertx vertx = Vertx.vertx();
for (int i = 0; i < 20; i++) {
int index = i;
vertx.setTimer(
1, t -> {
System.out.println(index +
":" + Thread.currentThread());
});
|
運行結果(不同機器運行順序、Event Loop線程數可能不同):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
Thread[main,
5,main]
0:Thread[vert.x-eventloop-thread-0,5,main]
1:Thread[vert.x-eventloop-thread-1,5,main]
2:Thread[vert.x-eventloop-thread-2,5,main]
3:Thread[vert.x-eventloop-thread-3,5,main]
5:Thread[vert.x-eventloop-thread-5,5,main]
6:Thread[vert.x-eventloop-thread-6,5,main]
8:Thread[vert.x-eventloop-thread-8,5,main]
7:Thread[vert.x-eventloop-thread-7,5,main]
10:Thread[vert.x-eventloop-thread-10,5,main]
9:Thread[vert.x-eventloop-thread-9,5,main]
4:Thread[vert.x-eventloop-thread-4,5,main]
11:Thread[vert.x-eventloop-thread-11,5,main]
12:Thread[vert.x-eventloop-thread-12,5,main]
13:Thread[vert.x-eventloop-thread-13,5,main]
14:Thread[vert.x-eventloop-thread-14,5,main]
16:Thread[vert.x-eventloop-thread-0,5,main]
17:Thread[vert.x-eventloop-thread-1,5,main]
15:Thread[vert.x-eventloop-thread-15,5,main]
18:Thread[vert.x-eventloop-thread-2,5,main]
19:Thread[vert.x-eventloop-thread-3,5,main]
|
可以看到盡管每個 Context
對應唯一的 Event Loop 線程,而每個 Event Loop 線程卻可能對應多個 Context
。
Event Loop Context 會在對應的 EventLoop
中執行 Handler
進行事件的處理(IO 事件,非阻塞)。Vert.x 會保證同一個 Handler
會一直在同一個 Event Loop 線程中執行,這樣可以簡化線程模型,讓開發者在寫 Handler
的時候不需要考慮並發的問題,非常方便。
我們來看一下 Handler
是如何在 EventLoop
上執行的。EventLoopContext
中實現了 executeAsync
方法用於包裝 Handler
中事件處理的邏輯並將其提交至對應的 EventLoop
中進行執行:
1
2
3
4
|
public void executeAsync(Handler<Void> task) {
// No metrics, we are on the event loop.
nettyEventLoop().execute(wrapTask(
null, task, true, null));
}
|
這里 Vert.x 使用了 wrapTask
方法將 Handler
封裝成了一個 Runnable
用於向 EventLoop
中提交。代碼比較直觀,大致就是檢查當前線程是否為 Vert.x 線程,然后記錄事件處理開始的時間,給當前的 Vert.x 線程設置 Context
,並且調用 Handler
里面的事件處理方法。具體請參考源碼,這里就不貼出來了。
那么把封裝好的 task 提交到 EventLoop
以后,EventLoop
是怎么處理的呢?這就需要更多的 Netty 相關的知識了。根據Netty 的模型,Event Loop 線程需要處理 IO 事件,普通事件(即我們的 Handler
)以及定時事件(比如 Vert.x 的 setTimer
)。Vert.x 會提供一個 NETTY_IO_RATIO
給Netty代表 EventLoop
處理 IO 事件時間占用的百分比(默認為 50,即 IO事件時間占用:非IO事件時間占用 = 1:1)。當 EventLoop
啟動的時候,它會不斷輪詢 IO 事件及其它事件並進行處理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(
false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys =
0;
needsToSelectAgain =
false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
}
else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (
100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
}
catch (Throwable t) {
// process the error
// ...
}
}
}
|
這里面 Netty 會調用 processSelectedKeys
方法進行 IO 事件的處理,並且會計算出處理 IO 時間所用的事件然后計算出給非 IO 事件處理分配的時間,然后調用 runAllTasks
方法執行所有的非 IO 任務(這里面就有我們的各個 Handler
)。
runAllTasks
會按順序從內部的任務隊列中取出任務(Runnable
)然后進行安全執行。而我們剛才調用的 NioEventLoop
的 execute
方法其實就是將包裝好的 Handler
置入 NioEventLoop
內部的任務隊列中等待執行。
Worker Context
顧名思義,Worker Context 用於跑阻塞任務。與 Event Loop Context 相似,每一個 Handler
都只會跑在固定的 Worker 線程下。
Vert.x 還提供一種 Multi-threaded worker context 可以在多個 Worker 線程下並發執行任務,這樣就會出現並發問題,需要開發者自行解決並發問題。因此一般情況下我們用不到 Multi-threaded worker context。
Verticle
我們再來討論一下 Verticle
中的 Context
。在部署 Verticle
的時候,Vert.x 會根據配置來創建 Context
並綁定到 Verticle 上,此后此 Verticle 上所有綁定的 Handler
都會在此 Context
上執行。相關實現位於 doDeploy
方法,這里摘取核心部分:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
for (Verticle verticle: verticles) {
WorkerExecutorImpl workerExec = poolName !=
null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) : null;
WorkerPool pool = workerExec !=
null ? workerExec.getPool() : null;
// 根據配置創建Context
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
if (workerExec != null) {
context.addCloseHook(workerExec);
}
context.setDeployment(deployment);
deployment.addVerticle(
new VerticleHolder(verticle, context));
// 此Verticle上的Handler都會在創建的context作用域內執行
context.runOnContext(v -> {
try {
verticle.init(vertx, context);
Future<Void> startFuture = Future.future();
// 大家熟悉的start方法的執行點
verticle.start(startFuture);
startFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (parent != null) {
parent.addChild(deployment);
deployment.child =
true;
}
vertx.metricsSPI().verticleDeployed(verticle);
deployments.put(deploymentID, deployment);
if (deployCount.incrementAndGet() == verticles.length) {
reportSuccess(deploymentID, callingContext, completionHandler);
}
}
else if (!failureReported.get()) {
reportFailure(ar.cause(), callingContext, completionHandler);
}
});
}
catch (Throwable t) {
reportFailure(t, callingContext, completionHandler);
}
});
}
|
通過這樣一種方式,Vert.x保證了Verticle
的線程安全 —— 即某個Verticle
上的所有Handler
都會在同一個Vert.x線程上執行,這樣也保證了Verticle
內部成員的安全(沒有race condition問題)。比如下面Verticle中處理IO及事件的處理都一直是在同一個Vert.x線程下執行的,每次打印出的線程名稱應該是一樣的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public class TcpClientVerticle extends AbstractVerticle {
int i = 0;
public void start() throws Exception {
vertx.createNetClient().connect(
6666, "localhost", ar -> {
if (ar.succeeded()) {
NetSocket socket = ar.result();
System.out.println(Thread.currentThread().getName());
socket.handler(buffer -> {
i++;
System.out.println(Thread.currentThread().getName());
System.out.println(
"Net client receiving: " + buffer.toString("UTF-8"));
});
socket.write(
"+1s\n");
}
else {
ar.cause().printStackTrace();
}
});
}
}
|
線程池
Event Loop 線程池
之前我們已經提到過,Event Loop 線程池的類型為 Netty 中的NioEventLoopGroup
,里面的線程通過 Vert.x 自己的線程工廠VertxThreadFactory
進行創建:
1
2
3
|
eventLoopThreadFactory =
new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime());
eventLoopGroup =
new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);
|
其中 Event Loop 線程的數目可以在配置中指定。
Worker 線程池
在之前講 executeBlocking
底層實現的文章中我們已經提到過 Worker 線程池,它其實就是一種 Fixed Thread Pool:
1
2
3
4
5
|
ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec,
"worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
workerPool =
new WorkerPool(workerExec, workerPoolMetrics);
|
Worker線程同樣由VertxThreadFactory
構造,類型為VertxThread
,用於執行阻塞任務。我們同樣可以在配置中指定其數目。
內部阻塞線程池
1
2
3
4
|
ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime()));
|