ElasticSearch源碼之——Netty在Elasticsearch中的應用


Elasticsearch作為分布式集群,客戶端到服務端,節點與節點間通信有TCP和Http通信協議,底層實現為Netty框架。不了解Netty的同學先了解Netty基本原理及使用https://www.cnblogs.com/zhxdxf/articles/10340791.html

1.關於啟動

HTTP請求僅提供服務端響應,節點啟動時啟動HTTP服務端,TCP請求時ES的節點即作為服務端,又作為客戶端,需要啟動Transport的服務端、客戶端服務。節點啟動請參考ElasticSearch源碼之啟動類

injector.getInstance(HttpServerTransport.class).start();//提供HttpServerTransport服務的啟動

最終調用Netty4HttpServerTransport中的doStart()方法
serverBootstrap = new ServerBootstrap();//即Netty的服務端啟動方式
serverBootstrap.childHandler(configureServerChannelHandler());//添加服務端接消息的處理類
    this.requestHandler = new Netty4HttpRequestHandler(transport, detailedErrorsEnabled, threadContext);
    ch.pipeline().addLast("handler", requestHandler);
    protected void doStart() {
        boolean success = false;
        try {
            this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);

            serverBootstrap = new ServerBootstrap();
            if (blockingServer) {
                serverBootstrap.group(new OioEventLoopGroup(workerCount, daemonThreadFactory(settings,
                    HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
                serverBootstrap.channel(OioServerSocketChannel.class);
            } else {
                serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
                    HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
                serverBootstrap.channel(NioServerSocketChannel.class);
            }

            serverBootstrap.childHandler(configureServerChannelHandler());

            serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));

            final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
            if (tcpSendBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
            }

            final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
            if (tcpReceiveBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
            }

            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
            serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);

            final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
            serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
            serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);

            this.boundAddress = createBoundHttpAddress();
            if (logger.isInfoEnabled()) {
                logger.info("{}", boundAddress);
            }
            success = true;
        } finally {
            if (success == false) {
                doStop(); // otherwise we leak threads since we never moved to started
            }
        }
    }
View Code
TransportService transportService = injector.getInstance(TransportService.class);//提供Transport服務的啟動
transportService.start();
最終調用Netty4Transport中的doStart()方法
bootstrap = createBootstrap();//客戶端啟動
    bootstrap.handler(getClientChannelInitializer());//添加客戶端消息處理類
        ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
createServerBootstrap(entry.getKey(), settings);//服務端啟動
    serverBootstrap.childHandler(getServerChannelInitializer(name, settings));//添加服務端消息處理類
        ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
 
         
    @Override
    protected void doStart() {
        boolean success = false;
        try {
            bootstrap = createBootstrap();
            if (NetworkService.NETWORK_SERVER.get(settings)) {
                final Netty4OpenChannelsHandler openChannels = new Netty4OpenChannelsHandler(logger);
                this.serverOpenChannels = openChannels;
                // loop through all profiles and start them up, special handling for default one
                for (Map.Entry<String, Settings> entry : buildProfileSettings().entrySet()) {
                    // merge fallback settings with default settings with profile settings so we have complete settings with default values
                    final Settings settings = Settings.builder()
                        .put(createFallbackSettings())
                        .put(entry.getValue()).build();
                    createServerBootstrap(entry.getKey(), settings);
                    bindServer(entry.getKey(), settings);
                }
            }
            super.doStart();
            success = true;
        } finally {
            if (success == false) {
                doStop();
            }
        }
    }
View Code

 

當客戶端發送消息時,建立連接。(調用過程見下一小節)

    @Override
    public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
                              CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
        throws ConnectTransportException {
        connectionProfile = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
        if (node == null) {
            throw new ConnectTransportException(null, "can't connect to a null node");
        }
        globalLock.readLock().lock(); // ensure we don't open connections while we are closing
        try {
            ensureOpen();
            try (Releasable ignored = connectionLock.acquire(node.getId())) {
                NodeChannels nodeChannels = connectedNodes.get(node);
                if (nodeChannels != null) {
                    return;
                }
                boolean success = false;
                try {
                    nodeChannels = openConnection(node, connectionProfile);
                    connectionValidator.accept(nodeChannels, connectionProfile);
                    // we acquire a connection lock, so no way there is an existing connection
                    connectedNodes.put(node, nodeChannels);
                    if (logger.isDebugEnabled()) {
                        logger.debug("connected to node [{}]", node);
                    }
                    transportServiceAdapter.onNodeConnected(node);
                    success = true;
                } catch (ConnectTransportException e) {
                    throw e;
                } catch (Exception e) {
                    throw new ConnectTransportException(node, "general node connection failure", e);
                } finally {
                    if (success == false) { // close the connection if there is a failure
                        logger.trace(
                            (Supplier<?>) () -> new ParameterizedMessage(
                                "failed to connect to [{}], cleaning dangling connections", node));
                        IOUtils.closeWhileHandlingException(nodeChannels);
                    }
                }
            }
        } finally {
            globalLock.readLock().unlock();
        }
    }
View Code
Netty4HttpServerTransport類圖                                Netty4ServerTransport類圖

 

2.客戶端發送請求

客戶端類圖

 發送請求后,執行客戶端的execute()方法到doExecute()方法

    @Override
    protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
        proxy.execute(action, request, listener);
    }
final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);//根據action類型不同,生產執行不同的execute方法
nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
transportService.sendRequest(node, action.name(), request, transportOptions,
    new ActionListenerResponseHandler<>(listener, action::newResponse));
}
Transport.Connection connection = getConnection(node);//節點連接
sendRequest(connection, action, request, options, handler);//發送請求
connection.sendRequest(requestId, action, request, options);
通過channel發送請求
Channel channel = channel(options.type());
sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte)0);
最終執行Netty4Transport的sendMessage方法將請求發送至channel
@Override
protected void sendMessage(Channel channel, BytesReference reference, Runnable sendListener) {
    final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference));
    future.addListener(f -> sendListener.run());
}

3.服務端處理請求

服務端通過Netty4HttpRequestHandler的channelRead得到客戶端請求,通過MessageReceived接受並處理請求

transport.messageReceived(reference, ctx.channel(), profileName, remoteAddress, remainingMessageSize);
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);//根據action類型,生產具體的TransportRequest子類
final TransportRequest request = reg.newRequest();
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
        @Override
        protected void doRun() throws Exception {
            reg.processMessageReceived(request, transportChannel);//
        }
                handler.messageReceived(request, channel);//TransportRequestHandler中的messageReceived方法
最終調用TransportHandler中的messageReceived方法
        public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
            // We already got the task created on the network layer - no need to create it again on the transport layer
            execute(task, request, new ActionListener<Response>() {//執行具體的請求處理
                @Override
                public void onResponse(Response response) {
                    try {
                        channel.sendResponse(response);
                    } catch (Exception e) {
                        onFailure(e);
                    }
                }
.......

4.示例bulk請求API

transportClient.bulk(bulkRequest);
依次執行:
    public ActionFuture<BulkResponse> bulk(final BulkRequest request) {
        return execute(BulkAction.INSTANCE, request);
    }
final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);//proxy則根據不同的action,生成不同的TransportActionNodeProxy
nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);//nodesService為TransportClientNodesService實例
    public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) {
        // we first read nodes before checking the closed state; this
        // is because otherwise we could be subject to a race where we
        // read the state as not being closed, and then the client is
        // closed and the nodes list is cleared, and then a
        // NoNodeAvailableException is thrown
        // it is important that the order of first setting the state of
        // closed and then clearing the list of nodes is maintained in
        // the close method
        final List<DiscoveryNode> nodes = this.nodes;
        if (closed) {
            throw new IllegalStateException("transport client is closed");
        }
        ensureNodesAreAvailable(nodes);
        int index = getNodeNumber();
        RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, hostFailureListener);
        DiscoveryNode node = retryListener.getNode(0);
        try {
            callback.doWithNode(node, retryListener);
        } catch (Exception e) {
            try {
                //this exception can't come from the TransportService as it doesn't throw exception at all
                listener.onFailure(e);
            } finally {
                retryListener.maybeNodeFailed(node, e);
            }
        }
    }
        final DiscoveryNode getNode(int i) {
            return nodes.get((index + i) % nodes.size());
        }
    public void execute(final DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }
        transportService.sendRequest(node, action.name(), request, transportOptions,
            new ActionListenerResponseHandler<>(listener, action::newResponse));//根據請求類型調用sendRequest方法
    }
    public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
                                                                final TransportRequest request,
                                                                final TransportRequestOptions options,
                                                                TransportResponseHandler<T> handler) {
        try {
            Transport.Connection connection = getConnection(node);//連接節點
            sendRequest(connection, action, request, options, handler);//處理請求
        } catch (NodeNotConnectedException ex) {
            // the caller might not handle this so we invoke the handler
            handler.handleException(ex);
        }
    }

        @Override
        public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
            throws IOException, TransportException {
            if (closed.get()) {
                throw new NodeNotConnectedException(node, "connection already closed");
            }
            Channel channel = channel(options.type());//根據不同的請求類型,添加不同的過濾器
            sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte)0);
        }
    }
    private boolean internalSendMessage(Channel targetChannel, BytesReference message, Runnable onRequestSent) throws IOException {
        boolean success;
        try {
            sendMessage(targetChannel, message, onRequestSent);//終極調用Netty4Transport的sendMessage方法
            success = true;
        } catch (IOException ex) {
            // passing exception handling to deal with this and raise disconnect events and decide the right logging level
            onException(targetChannel, ex);
            success = false;
        }
        return success;
    }
    @Override
    protected void sendMessage(Channel channel, BytesReference reference, Runnable sendListener) {
        final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference));//向服務端發送消息
        future.addListener(f -> sendListener.run());

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM