vertx核心類之VertxImpl


在Vert.x中,Vertx接口是最為重要的一個接口,vertx-core的基礎功能都在此接口中提供。這篇文章中我們就來分析一下Vertx接口體系的內部實現以及創建流程。本文對應Vert.x的版本為 3.2.1

Vertx接口體系

我們來看一下Vertx接口的UML關系:

可以看到有VertxImpl <:< VertxInternal <:< Vertx這個繼承鏈。這里我們先不研究MeasuredMetricsProvider這兩個接口。我們先來看一下VertxInternal的結構:

可以看到里邊包含了各種操作多線程、執行回調等等的方法。VertxInternal接口僅供vertx-core內部調用。

VertxImpl類是對VertxInternalVertx接口的實現。我們創建的Vertx實例都是VertxImpl

Vertx創建流程以及內部實現

通常,我們都會通過Vertx接口中的靜態方法vertx創建Vertx實例:

1
Vertx vertx = Vertx.vertx();

vertx方法底層通過工廠模式創建VertxImpl實例:

1
2
3
static Vertx vertx() {
 return factory.vertx();
}
1
2
3
4
5
6
7
8
public class VertxFactoryImpl implements VertxFactory {

 @Override
 public Vertx vertx() {
   return new VertxImpl();
 }
 // ...
}

下面我們來探究一下VertxImpl類的創建流程和內部實現。我們首先來看一下VertxImpl類的實例成員:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final FileSystem fileSystem = getFileSystem();
private final SharedData sharedData;
private final VertxMetrics metrics;
private final ConcurrentMap<Long, InternalTimerHandler> timeouts = new ConcurrentHashMap<>();
private final AtomicLong timeoutCounter = new AtomicLong(0);
private final ClusterManager clusterManager;
private final DeploymentManager deploymentManager;
private final FileResolver fileResolver;
private final Map<ServerID, HttpServerImpl> sharedHttpServers = new HashMap<>();
private final Map<ServerID, NetServerImpl> sharedNetServers = new HashMap<>();
private final ExecutorService workerPool;
private final ExecutorService internalBlockingPool;
private final OrderedExecutorFactory workerOrderedFact;
private final OrderedExecutorFactory internalOrderedFact;
private final ThreadFactory eventLoopThreadFactory;
private final NioEventLoopGroup eventLoopGroup;
private final NioEventLoopGroup acceptorEventLoopGroup;
private final BlockedThreadChecker checker;
private final boolean haEnabled;
private EventBus eventBus;
private HAManager haManager;
private boolean closed;

這里面包含了一系列重要的類。我們將在初始化部分來分析這些成員的作用。下面我們來看一下構造函數:

1
2
3
4
5
6
7
8
9
10
11
VertxImpl() {
 this(new VertxOptions());
}

VertxImpl(VertxOptions options) {
 this(options, null);
}

VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
 // ...
}

可以看到最終都會調用到VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler)這個構造函數,下面我們就來分析一下。

首先,Vertx會檢查當前是否有Vertx實例運行(通過factory.context()方法)。如果有實例運行的話就會給出警告。

1
2
3
4
// Sanity check
if (Vertx.currentContext() != null) {
 log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?");
}

接着Vertx會初始化checker成員,它是一個BlockedThreadChecker,作用是檢查vertx context中是否有阻塞的線程,如果有線程阻塞則給出警告。

1
2
checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getMaxEventLoopExecuteTime(),
                                  options.getMaxWorkerExecuteTime(), options.getWarningExceptionTime());

接下來,Vertx會初始化EventLoop線程工廠eventLoopThreadFactory,它用於產生EventLoop線程。然后初始化eventLoopGroup並進行配置。NioEventLoopGroup是Netty里的概念,將在稍后進行介紹。

1
2
3
eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false);
eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

接下來,Vertx會初始化Acceptor EventLoop線程工廠,並對其進行配置。然后對workerPoolinternalBlockingPool這兩個線程池進行初始化。其中workerPool用於執行worker線程,internalBlockingPool用於執行阻塞操作線程。

1
2
3
4
5
6
7
8
9
ThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false);
// The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections
// under a lot of load
acceptorEventLoopGroup = new NioEventLoopGroup(1, acceptorEventLoopThreadFactory);
acceptorEventLoopGroup.setIoRatio(100);
workerPool = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
                                         new VertxThreadFactory("vert.x-worker-thread-", checker, true));
internalBlockingPool = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
                                                   new VertxThreadFactory("vert.x-internal-blocking-", checker, true));

然后,Vertx會初始化兩個線程池工廠workerOrderedFactinternalOrderedFact,它們的類型是OrderedExecutorFactory,里邊包含一種能夠按照次序執行線程的線程池。

1
2
workerOrderedFact = new OrderedExecutorFactory(workerPool);
internalOrderedFact = new OrderedExecutorFactory(internalBlockingPool);

接下來,Vertx會依次對文件解析器fileResolver、部署管理器deploymentManager、SPI管理器metrics進行初始化,並且根據配置來決定是否初始化集群管理器clusterManager和高可用管理器haManager。然后Vertx會調用createAndStartEventBus(options, resultHandler)方法,創建並啟動EventBus。最后對共享數據成員sharedData進行初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
this.fileResolver = new FileResolver(this);
this.deploymentManager = new DeploymentManager(this);
this.metrics = initialiseMetrics(options);
this.haEnabled = options.isClustered() && options.isHAEnabled();
if (options.isClustered()) {
 this.clusterManager = getClusterManager(options);
 this.clusterManager.setVertx(this);
 this.clusterManager.join(ar -> {
   if (ar.failed()) {
     log.error("Failed to join cluster", ar.cause());
   } else {
     // Provide a memory barrier as we are setting from a different thread
     synchronized (VertxImpl.this) {
       haManager = new HAManager(this, deploymentManager, clusterManager, options.getQuorumSize(),
                                 options.getHAGroup(), haEnabled);
       createAndStartEventBus(options, resultHandler);
     }
   }
 });
} else {
 this.clusterManager = null;
 createAndStartEventBus(options, resultHandler);
}
this.sharedData = new SharedDataImpl(this, clusterManager);
}

經過一系列的構造后,VertxImpl創建完成。

以上。

以下為鏈接:

來自vertx中國用戶組,

(不同版本,代碼可能有少許不同之處(我目前使用的是3.3.2),但不妨礙閱讀和理解)

http://mp.weixin.qq.com/s?__biz=MzA4MjUwNzQ0NQ==&mid=2650547613&idx=1&sn=5435d20496e4c9f57d958a45855f9f59&chksm=878c2647b0fbaf51bb8a937c2468117a159e2e0acc8a5bf4ba6404b6282d1f6fd1241f01de25&mpshare=1&scene=23&srcid=1206gxIsVTqNU9B885NQYJzL#rd


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM