Start HttpServer
/**
* 啟動 HttpServer
* multi instances 采用 synchronized防止線程安全問題
* addHandlers 方法是actor模式的實現(EventLoopPoolSize >= instances):
* 1 instances : 1 verticle(actor) : 1 VertxThread(Eventloop)
*/
public synchronized HttpServer listen(int port, String host, Handler<AsyncResult<HttpServer>> listenHandler) {
//是否有配置requestHandler或webscoket
if (requestStream.handler() == null && wsStream.handler() == null) {
throw new IllegalStateException("Set request or websocket handler first");
}
if (listening) {
throw new IllegalStateException("Already listening");
}
listenContext = vertx.getOrCreateContext(); //根據currentThread 獲取Context,獲取null則create
serverOrigin = (options.isSsl() ? "https" : "http") + "://" + host + ":" + port;//判斷是否啟用ssl
List<HttpVersion> applicationProtocols = options.getAlpnVersions();//獲取協議版本,默認支持1.1和2.0
if (listenContext.isWorkerContext()) {//是否使用 Worker Verticles ,不予許使用HTTP2.0
applicationProtocols = applicationProtocols.stream().filter(v -> v != HttpVersion.HTTP_2).collect(Collectors.toList());
}
sslHelper.setApplicationProtocols(applicationProtocols);//應用協議
synchronized (vertx.sharedHttpServers()) {// 監聽多個不同網絡接口(ip:port) Httpserver 防止並發
this.actualPort = port;
id = new ServerID(port, host);//生成服務id
HttpServerImpl shared = vertx.sharedHttpServers().get(id);
if (shared == null || port == 0) {// mutil instances 的情況,利用 mutli core cpu
/**
* frist instances
*/
serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE);
ServerBootstrap bootstrap = new ServerBootstrap();
//定義兩個線程組,accept size 1, 重寫的VertxEventLoopGroup
bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers);
applyConnectionOptions(bootstrap);//添加Connection Accept之后的附屬選項
sslHelper.validate(vertx);//驗證ssl相關參數
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
/**
* connection accept 調度切換線程后觸發
*/
protected void initChannel(Channel ch) throws Exception {
//限流策略,讀大於寫,導致內存無限擴大,最終 OOM
if (requestStream.isPaused() || wsStream.isPaused()) {
ch.close(); //超過服務承載能力,關閉連接
return;
}
ChannelPipeline pipeline = ch.pipeline();
if (sslHelper.isSSL()) {//是否啟用ssl
io.netty.util.concurrent.Future<Channel> handshakeFuture;
if (options.isSni()) {//是否啟用sni,單服務多證書情況
VertxSniHandler sniHandler = new VertxSniHandler(sslHelper, vertx);
pipeline.addLast(sniHandler);
handshakeFuture = sniHandler.handshakeFuture();
} else {
SslHandler handler = new SslHandler(sslHelper.createEngine(vertx));
pipeline.addLast("ssl", handler);
handshakeFuture = handler.handshakeFuture();
}
//偵聽 TLS handshake
handshakeFuture.addListener(future -> {
if (future.isSuccess()) {// 握手成功
if (options.isUseAlpn()) {//是否啟用alpn,協調使用的protocol
//獲取使用的協議
SslHandler sslHandler = pipeline.get(SslHandler.class);
String protocol = sslHandler.applicationProtocol();
if ("h2".equals(protocol)) {//是否是http2.0
handleHttp2(ch);
} else {
handleHttp1(ch);
}
} else {
handleHttp1(ch);
}
} else {//握手失敗
HandlerHolder<HttpHandlers> handler = httpHandlerMgr.chooseHandler(ch.eventLoop());
handler.context.executeFromIO(() -> handler.handler.exceptionHandler.handle(future.cause()));
}
});
} else {
//是否是啟用http2,通過VM Options: -Dvertx.disableH2c 設置;默認false
if (DISABLE_H2C) {
handleHttp1(ch);
} else {
IdleStateHandler idle;
if (options.getIdleTimeout() > 0) {//是否定義最大空閑時間
pipeline.addLast("idle", idle = new IdleStateHandler(0, 0, options.getIdleTimeout()));
} else {
idle = null;
}
/**直接使用明文的http2.0或1.1處理*/
pipeline.addLast(new Http1xOrH2CHandler() {
@Override
protected void configure(ChannelHandlerContext ctx, boolean h2c) {
if (idle != null) {
//移除idleHandler,重新添加,不用注意次序
pipeline.remove(idle);
}
if (h2c) {//判斷協議,如果定義idle則會重新添加 idleHandler
handleHttp2(ctx.channel());
} else {
handleHttp1(ch);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.ALL_IDLE) {
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
//根據eventloop選中對應的handler進行異常傳播
HandlerHolder<HttpHandlers> handler = httpHandlerMgr.chooseHandler(ctx.channel().eventLoop());
handler.context.executeFromIO(() -> handler.handler.exceptionHandler.handle(cause));
}
});
}
}
}
});
addHandlers(this, listenContext);////添加一個instaces(verticle的HttpHandlers)到httpHandlerMgr中
try {
//listen ip:port
bindFuture = AsyncResolveConnectHelper.doBind(vertx, SocketAddress.inetSocketAddress(port, host), bootstrap);
bindFuture.addListener(res -> {
if (res.failed()) {
vertx.sharedHttpServers().remove(id);
} else {
Channel serverChannel = res.result();
HttpServerImpl.this.actualPort = ((InetSocketAddress) serverChannel.localAddress()).getPort();
serverChannelGroup.add(serverChannel);//添加當前的ServerSocketChannel
//初始化metrcis指標
VertxMetrics metrics = vertx.metricsSPI();
this.metrics = metrics != null ? metrics.createMetrics(this, new SocketAddressImpl(port, host), options) : null;
}
});
} catch (final Throwable t) {
if (listenHandler != null) {
vertx.runOnContext(v -> listenHandler.handle(Future.failedFuture(t)));
} else {
log.error(t);
}
listening = false;
return this;
}
vertx.sharedHttpServers().put(id, this);//啟動的HttpServer服務(verticle)添加到Vertx.sharedHttpMap中
actualServer = this;
} else {//other instances
actualServer = shared;
this.actualPort = shared.actualPort;
//在actualServer基礎上添加一個instaces(verticle的HttpHandlers)到httpHandlerMgr中
addHandlers(actualServer, listenContext);
//初始化metrics
VertxMetrics metrics = vertx.metricsSPI();
this.metrics = metrics != null ? metrics.createMetrics(this, new SocketAddressImpl(port, host), options) : null;
}
//服務 bind 狀態
actualServer.bindFuture.addListener(future -> {
if (listenHandler != null) {
final AsyncResult<HttpServer> res;
if (future.succeeded()) {
res = Future.succeededFuture(HttpServerImpl.this);
} else {
res = Future.failedFuture(future.cause());
listening = false;
}
listenContext.runOnContext((v) -> listenHandler.handle(res));//回調處理
} else if (future.failed()) {
listening = false;
log.error(future.cause());
}
});
}
return this;
}
如何實現隔離(actor模型)
/**
* 添加一個verticle instances handlers
* @param server First Actual Server(multi instances)
* mutil instances 情況下第一個instance啟動成功,other instances 僅僅是
* 利用multi core cpu,所以以 first instances actual Server為主,后續在
* Current HttpServerImpl instance 添加handlers(verticle)
* @param context current Thread context
* multi instances 下EventLoopGroup.next 方法挑選(choose)出一個Eventloop
* 與Context 映射. netty EventExecutor調度DefaultEventExecutorChooserFactory類
* 兩種實現:
* ①求余取模
* ②位運算取模(2的冪)
* 所以防止實例數量大於EventloopGroup數量,Default : 2 * CpuCoreSensor.availableProcessors()
* ,linux下以讀取/proc/self/status 文件為主,而不是Runtime.getRuntime().availableProcessors()
*/
private void addHandlers(HttpServerImpl server, ContextImpl context) {
server.httpHandlerMgr.addHandler(
new HttpHandlers(
requestStream.handler(),
wsStream.handler(),
connectionHandler,
exceptionHandler == null ? DEFAULT_EXCEPTION_HANDLER : exceptionHandler)
, context);
}
public class HttpHandlers {
final Handler<HttpServerRequest> requestHandler;
final Handler<ServerWebSocket> wsHandler;
final Handler<HttpConnection> connectionHandler;
final Handler<Throwable> exceptionHandler;
/**
* @param requestHandler Http Request Handler
* @param wsHandler WebScoket Handler
* @param connectionHander TCP Connection Handler
* @param exceptionHander Exception Handlet
*/
public HttpHandlers(
Handler<HttpServerRequest> requestHandler,
Handler<ServerWebSocket> wsHandler,
Handler<HttpConnection> connectionHandler,
Handler<Throwable> exceptionHandler) {
this.requestHandler = requestHandler;
this.wsHandler = wsHandler;
this.connectionHandler = connectionHandler;
this.exceptionHandler = exceptionHandler;
}
}
public class HandlerManager<T> {
public synchronized void addHandler(T handler, ContextImpl context) {
/**
* 添加一個eventloop(Thread)到 VertxEventLoopGroup 集合中.
* accept狀態后的read/write事件,線程調度在VertxEventLoopGroup類的next方法,
* vertx重寫choose策略
*/
EventLoop worker = context.nettyEventLoop();
availableWorkers.addWorker(worker);
/**
* 添加handlers,並且綁定handler和context映射關系.
* 注意部署的instances size不要超過EventLoopPoolSize,
* 否則出現 1 EventLoop : N handler(verticle)
* 導致一個eventloop上執行 N 個verticle
*/
Handlers<T> handlers = new Handlers<>();
Handlers<T> prev = handlerMap.putIfAbsent(worker, handlers);
if (prev != null) {
handlers = prev;
}
handlers.addHandler(new HandlerHolder<>(context, handler));
hasHandlers = true;
}
}
Connection scheduling process:

add handler to eventloop structure:
- an eventloop corresponds to a handlers
- an eventloop corresponds to multiple instances verticles(HandlerHolder)

HttpServer option
public class HttpServerOptions extends NetServerOptions {
//是否啟用壓縮,默認false
private boolean compressionSupported;
//壓縮級別越高cpu負荷越大,默認gzip
private int compressionLevel;
//websocket最大的 Frame 大小,默認65536
private int maxWebsocketFrameSize;
//websocket 最大消息大小,默認65536*4
private int maxWebsocketMessageSize;
//處理WebSocket消息的約定的子協議
private String websocketSubProtocols;
//是否自動處理100-Continue,默認false
private boolean handle100ContinueAutomatically;
//分段傳輸chunk 大小,默認8192
private int maxChunkSize;
//initial line 最大長度,默認 4096
private int maxInitialLineLength;
//Header 最大大小,默認 8192
private int maxHeaderSize;
//http2.0最大的並發流,默認100
private Http2Settings initialSettings;
//支持alpn的版本,默認Http1.1和Http2.0
private List<HttpVersion> alpnVersions;
//設置連接的窗口大小,默認無限制
private int http2ConnectionWindowSize;
//是否啟用壓縮解碼
private boolean decompressionSupported;
//WebSocket Masked位為true。 PerformingUnMasking將是錯誤的,默認為false
private boolean acceptUnmaskedFrames;
//默認HttpObjectDecoder的初始緩沖區大小,默認128
private int decoderInitialBufferSize;
}
備注
1.建立HttpServer,配置最大的idle時間,默認tcpkeepalive配置是false, 網絡故障等造成TCP揮手交互失敗從而導致epoll的達到FileMax,阻止后續連接,導致 服務器無法提供服務; 或者啟用keepalive,依靠內核TCP模塊去偵測(默認2小時一次). 可用netstat工具查看應用當前網絡狀況 2.啟用HTTP2,使用jetty開源apln-boot jar包,對JDK版本依賴關系強,需下載對應JDK版本的apln; 或者使用openssl,當前服務環境都需安裝,遷移服務麻煩,但是性能稍高. 3.具體 Route和其它HttpServer功能在 Web 模塊中, core 模塊只是實現Tcp相關、TLS、 Choose vertcile.handlers Scheduling 和codec等.