Start HttpServer
/** * 啟動 HttpServer * multi instances 采用 synchronized防止線程安全問題 * addHandlers 方法是actor模式的實現(EventLoopPoolSize >= instances): * 1 instances : 1 verticle(actor) : 1 VertxThread(Eventloop) */ public synchronized HttpServer listen(int port, String host, Handler<AsyncResult<HttpServer>> listenHandler) { //是否有配置requestHandler或webscoket if (requestStream.handler() == null && wsStream.handler() == null) { throw new IllegalStateException("Set request or websocket handler first"); } if (listening) { throw new IllegalStateException("Already listening"); } listenContext = vertx.getOrCreateContext(); //根據currentThread 獲取Context,獲取null則create serverOrigin = (options.isSsl() ? "https" : "http") + "://" + host + ":" + port;//判斷是否啟用ssl List<HttpVersion> applicationProtocols = options.getAlpnVersions();//獲取協議版本,默認支持1.1和2.0 if (listenContext.isWorkerContext()) {//是否使用 Worker Verticles ,不予許使用HTTP2.0 applicationProtocols = applicationProtocols.stream().filter(v -> v != HttpVersion.HTTP_2).collect(Collectors.toList()); } sslHelper.setApplicationProtocols(applicationProtocols);//應用協議 synchronized (vertx.sharedHttpServers()) {// 監聽多個不同網絡接口(ip:port) Httpserver 防止並發 this.actualPort = port; id = new ServerID(port, host);//生成服務id HttpServerImpl shared = vertx.sharedHttpServers().get(id); if (shared == null || port == 0) {// mutil instances 的情況,利用 mutli core cpu /** * frist instances */ serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE); ServerBootstrap bootstrap = new ServerBootstrap(); //定義兩個線程組,accept size 1, 重寫的VertxEventLoopGroup bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers); applyConnectionOptions(bootstrap);//添加Connection Accept之后的附屬選項 sslHelper.validate(vertx);//驗證ssl相關參數 bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override /** * connection accept 調度切換線程后觸發 */ protected void initChannel(Channel ch) throws Exception { //限流策略,讀大於寫,導致內存無限擴大,最終 OOM if (requestStream.isPaused() || wsStream.isPaused()) { ch.close(); //超過服務承載能力,關閉連接 return; } ChannelPipeline pipeline = ch.pipeline(); if (sslHelper.isSSL()) {//是否啟用ssl io.netty.util.concurrent.Future<Channel> handshakeFuture; if (options.isSni()) {//是否啟用sni,單服務多證書情況 VertxSniHandler sniHandler = new VertxSniHandler(sslHelper, vertx); pipeline.addLast(sniHandler); handshakeFuture = sniHandler.handshakeFuture(); } else { SslHandler handler = new SslHandler(sslHelper.createEngine(vertx)); pipeline.addLast("ssl", handler); handshakeFuture = handler.handshakeFuture(); } //偵聽 TLS handshake handshakeFuture.addListener(future -> { if (future.isSuccess()) {// 握手成功 if (options.isUseAlpn()) {//是否啟用alpn,協調使用的protocol //獲取使用的協議 SslHandler sslHandler = pipeline.get(SslHandler.class); String protocol = sslHandler.applicationProtocol(); if ("h2".equals(protocol)) {//是否是http2.0 handleHttp2(ch); } else { handleHttp1(ch); } } else { handleHttp1(ch); } } else {//握手失敗 HandlerHolder<HttpHandlers> handler = httpHandlerMgr.chooseHandler(ch.eventLoop()); handler.context.executeFromIO(() -> handler.handler.exceptionHandler.handle(future.cause())); } }); } else { //是否是啟用http2,通過VM Options: -Dvertx.disableH2c 設置;默認false if (DISABLE_H2C) { handleHttp1(ch); } else { IdleStateHandler idle; if (options.getIdleTimeout() > 0) {//是否定義最大空閑時間 pipeline.addLast("idle", idle = new IdleStateHandler(0, 0, options.getIdleTimeout())); } else { idle = null; } /**直接使用明文的http2.0或1.1處理*/ pipeline.addLast(new Http1xOrH2CHandler() { @Override protected void configure(ChannelHandlerContext ctx, boolean h2c) { if (idle != null) { //移除idleHandler,重新添加,不用注意次序 pipeline.remove(idle); } if (h2c) {//判斷協議,如果定義idle則會重新添加 idleHandler handleHttp2(ctx.channel()); } else { handleHttp1(ch); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.ALL_IDLE) { ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); //根據eventloop選中對應的handler進行異常傳播 HandlerHolder<HttpHandlers> handler = httpHandlerMgr.chooseHandler(ctx.channel().eventLoop()); handler.context.executeFromIO(() -> handler.handler.exceptionHandler.handle(cause)); } }); } } } }); addHandlers(this, listenContext);////添加一個instaces(verticle的HttpHandlers)到httpHandlerMgr中 try { //listen ip:port bindFuture = AsyncResolveConnectHelper.doBind(vertx, SocketAddress.inetSocketAddress(port, host), bootstrap); bindFuture.addListener(res -> { if (res.failed()) { vertx.sharedHttpServers().remove(id); } else { Channel serverChannel = res.result(); HttpServerImpl.this.actualPort = ((InetSocketAddress) serverChannel.localAddress()).getPort(); serverChannelGroup.add(serverChannel);//添加當前的ServerSocketChannel //初始化metrcis指標 VertxMetrics metrics = vertx.metricsSPI(); this.metrics = metrics != null ? metrics.createMetrics(this, new SocketAddressImpl(port, host), options) : null; } }); } catch (final Throwable t) { if (listenHandler != null) { vertx.runOnContext(v -> listenHandler.handle(Future.failedFuture(t))); } else { log.error(t); } listening = false; return this; } vertx.sharedHttpServers().put(id, this);//啟動的HttpServer服務(verticle)添加到Vertx.sharedHttpMap中 actualServer = this; } else {//other instances actualServer = shared; this.actualPort = shared.actualPort; //在actualServer基礎上添加一個instaces(verticle的HttpHandlers)到httpHandlerMgr中 addHandlers(actualServer, listenContext); //初始化metrics VertxMetrics metrics = vertx.metricsSPI(); this.metrics = metrics != null ? metrics.createMetrics(this, new SocketAddressImpl(port, host), options) : null; } //服務 bind 狀態 actualServer.bindFuture.addListener(future -> { if (listenHandler != null) { final AsyncResult<HttpServer> res; if (future.succeeded()) { res = Future.succeededFuture(HttpServerImpl.this); } else { res = Future.failedFuture(future.cause()); listening = false; } listenContext.runOnContext((v) -> listenHandler.handle(res));//回調處理 } else if (future.failed()) { listening = false; log.error(future.cause()); } }); } return this; }
如何實現隔離(actor模型)
/** * 添加一個verticle instances handlers * @param server First Actual Server(multi instances) * mutil instances 情況下第一個instance啟動成功,other instances 僅僅是 * 利用multi core cpu,所以以 first instances actual Server為主,后續在 * Current HttpServerImpl instance 添加handlers(verticle) * @param context current Thread context * multi instances 下EventLoopGroup.next 方法挑選(choose)出一個Eventloop * 與Context 映射. netty EventExecutor調度DefaultEventExecutorChooserFactory類 * 兩種實現: * ①求余取模 * ②位運算取模(2的冪) * 所以防止實例數量大於EventloopGroup數量,Default : 2 * CpuCoreSensor.availableProcessors() * ,linux下以讀取/proc/self/status 文件為主,而不是Runtime.getRuntime().availableProcessors() */ private void addHandlers(HttpServerImpl server, ContextImpl context) { server.httpHandlerMgr.addHandler( new HttpHandlers( requestStream.handler(), wsStream.handler(), connectionHandler, exceptionHandler == null ? DEFAULT_EXCEPTION_HANDLER : exceptionHandler) , context); } public class HttpHandlers { final Handler<HttpServerRequest> requestHandler; final Handler<ServerWebSocket> wsHandler; final Handler<HttpConnection> connectionHandler; final Handler<Throwable> exceptionHandler; /** * @param requestHandler Http Request Handler * @param wsHandler WebScoket Handler * @param connectionHander TCP Connection Handler * @param exceptionHander Exception Handlet */ public HttpHandlers( Handler<HttpServerRequest> requestHandler, Handler<ServerWebSocket> wsHandler, Handler<HttpConnection> connectionHandler, Handler<Throwable> exceptionHandler) { this.requestHandler = requestHandler; this.wsHandler = wsHandler; this.connectionHandler = connectionHandler; this.exceptionHandler = exceptionHandler; } } public class HandlerManager<T> { public synchronized void addHandler(T handler, ContextImpl context) { /** * 添加一個eventloop(Thread)到 VertxEventLoopGroup 集合中. * accept狀態后的read/write事件,線程調度在VertxEventLoopGroup類的next方法, * vertx重寫choose策略 */ EventLoop worker = context.nettyEventLoop(); availableWorkers.addWorker(worker); /** * 添加handlers,並且綁定handler和context映射關系. * 注意部署的instances size不要超過EventLoopPoolSize, * 否則出現 1 EventLoop : N handler(verticle)
* 導致一個eventloop上執行 N 個verticle */ Handlers<T> handlers = new Handlers<>(); Handlers<T> prev = handlerMap.putIfAbsent(worker, handlers); if (prev != null) { handlers = prev; } handlers.addHandler(new HandlerHolder<>(context, handler)); hasHandlers = true; } }
Connection scheduling process:
add handler to eventloop structure:
- an eventloop corresponds to a handlers
- an eventloop corresponds to multiple instances verticles(HandlerHolder)
HttpServer option
public class HttpServerOptions extends NetServerOptions { //是否啟用壓縮,默認false private boolean compressionSupported; //壓縮級別越高cpu負荷越大,默認gzip private int compressionLevel; //websocket最大的 Frame 大小,默認65536 private int maxWebsocketFrameSize; //websocket 最大消息大小,默認65536*4 private int maxWebsocketMessageSize; //處理WebSocket消息的約定的子協議 private String websocketSubProtocols; //是否自動處理100-Continue,默認false private boolean handle100ContinueAutomatically; //分段傳輸chunk 大小,默認8192 private int maxChunkSize; //initial line 最大長度,默認 4096 private int maxInitialLineLength; //Header 最大大小,默認 8192 private int maxHeaderSize; //http2.0最大的並發流,默認100 private Http2Settings initialSettings; //支持alpn的版本,默認Http1.1和Http2.0 private List<HttpVersion> alpnVersions; //設置連接的窗口大小,默認無限制 private int http2ConnectionWindowSize; //是否啟用壓縮解碼 private boolean decompressionSupported; //WebSocket Masked位為true。 PerformingUnMasking將是錯誤的,默認為false private boolean acceptUnmaskedFrames; //默認HttpObjectDecoder的初始緩沖區大小,默認128 private int decoderInitialBufferSize; }
備注
1.建立HttpServer,配置最大的idle時間,默認tcpkeepalive配置是false, 網絡故障等造成TCP揮手交互失敗從而導致epoll的達到FileMax,阻止后續連接,導致 服務器無法提供服務; 或者啟用keepalive,依靠內核TCP模塊去偵測(默認2小時一次). 可用netstat工具查看應用當前網絡狀況 2.啟用HTTP2,使用jetty開源apln-boot jar包,對JDK版本依賴關系強,需下載對應JDK版本的apln; 或者使用openssl,當前服務環境都需安裝,遷移服務麻煩,但是性能稍高. 3.具體 Route和其它HttpServer功能在 Web 模塊中, core 模塊只是實現Tcp相關、TLS、 Choose vertcile.handlers Scheduling 和codec等.