es的集群通信機制transport


transport通信基礎

transport是集群間通信的基礎,它有兩種實現:

  • localtransport,主要用於jvm中的節點通信,因為在同一個jvm上的網絡模擬,localtransport的實現也相對簡單,但實際用處在es中有限。
  • nettytransport,一種基於netty實現的transport,同樣用於節點間的通信。

我們這里以nettytransport來展開。
transport是集群通信的基本通道,無論是集群的狀態信息,還是索引請求信息,都由transport傳送。es定義了包括transport接口在內的所有基礎接口,NettyTransport也實現了該接口。
來簡要的說下Netty的用法,Netty的使用依賴三個模塊:

  • ServerBootStrap,啟動服務。
  • ClientBootStrap,啟動客戶端並建立於服務端的連接。
  • MessageHandler,負責主要的業務邏輯。

NettyTransport在doStart方法中調用ServerBootStrap和ClientBootStrap並綁定ip:

protected void doStart() throws ElasticsearchException {
       clientBootstrap = createClientBootstrap();//根據配置啟動客戶端
       //省略了無關分代碼
    createServerBootstrap(name, mergedSettings);//啟動server端
       bindServerBootstrap(name, mergedSettings);//綁定ip
        }

bindServerBootstrap將本地ip綁定到netty同時設定好export host。然后啟動client和server的過程將mergedSettings注入到channelpipeline中,至此啟動過程結束,但需要注意的是,現在client端並未連接server端,這個連接過程是在節點啟動后才進行連接。

public void connectToNode(DiscoveryNode node, boolean light) {
     //transport的模塊必須要啟動
        if (!lifecycle.started()) {
            throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
        }
     //獲取讀鎖,每個節點可以和多個節點建立連接,因此這里用讀鎖

        globalLock.readLock().lock();
        try {
        //以node.id為基礎獲取一個鎖,這保證對於每個node只能建立一次連接
            connectionLock.acquire(node.id());
            try {
                if (!lifecycle.started()) {
                    throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
                }
                NodeChannels nodeChannels = connectedNodes.get(node);
                if (nodeChannels != null) {
                    return;
                }
                try {
                    if (light) {//這里的light,就是對該節點只獲取一個channel,所有類型(5種連接類型下面會說到)都使用者一個channel
                        nodeChannels = connectToChannelsLight(node);
                    } else {
                        nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
                        try {
                            connectToChannels(nodeChannels, node);
                        } catch (Throwable e) {
                            logger.trace("failed to connect to [{}], cleaning dangling connections", e, node);
                            nodeChannels.close();
                            throw e;
                        }
                    }
                    // we acquire a connection lock, so no way there is an existing connection
                    connectedNodes.put(node, nodeChannels);
                    if (logger.isDebugEnabled()) {
                        logger.debug("connected to node [{}]", node);
                    }
                    transportServiceAdapter.raiseNodeConnected(node);
                } catch (ConnectTransportException e) {
                    throw e;
                } catch (Exception e) {
                    throw new ConnectTransportException(node, "general node connection failure", e);
                }
            } finally {
                connectionLock.release(node.id());
            }
        } finally {
            globalLock.readLock().unlock();
        }
    }

在每個server和client之間都有5個連接,每個連接承擔着不同的任務:

protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
    //五種連接方式,不同的連接方式對應不同的集群操作
        ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];
        ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];
        ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];
        ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];
        ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
        InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
    //嘗試建立連接
        for (int i = 0; i < connectRecovery.length; i++) {
            connectRecovery[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectBulk.length; i++) {
            connectBulk[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectReg.length; i++) {
            connectReg[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectState.length; i++) {
            connectState[i] = clientBootstrap.connect(address);
        }
        for (int i = 0; i < connectPing.length; i++) {
            connectPing[i] = clientBootstrap.connect(address);
        }
    //獲取每個連接的channel存入到相應的channels中便於后面使用。
        try {
            for (int i = 0; i < connectRecovery.length; i++) {
                connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectRecovery[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause());
                }
                nodeChannels.recovery[i] = connectRecovery[i].getChannel();
                nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }

            for (int i = 0; i < connectBulk.length; i++) {
                connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectBulk[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause());
                }
                nodeChannels.bulk[i] = connectBulk[i].getChannel();
                nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }

            for (int i = 0; i < connectReg.length; i++) {
                connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectReg[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause());
                }
                nodeChannels.reg[i] = connectReg[i].getChannel();
                nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }

            for (int i = 0; i < connectState.length; i++) {
                connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectState[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause());
                }
                nodeChannels.state[i] = connectState[i].getChannel();
                nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }

            for (int i = 0; i < connectPing.length; i++) {
                connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                if (!connectPing[i].isSuccess()) {
                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());
                }
                nodeChannels.ping[i] = connectPing[i].getChannel();
                nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));
            }

            if (nodeChannels.recovery.length == 0) {
                if (nodeChannels.bulk.length > 0) {
                    nodeChannels.recovery = nodeChannels.bulk;
                } else {
                    nodeChannels.recovery = nodeChannels.reg;
                }
            }
            if (nodeChannels.bulk.length == 0) {
                nodeChannels.bulk = nodeChannels.reg;
            }
        } catch (RuntimeException e) {
            // clean the futures
            for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) {
                future.cancel();
                if (future.getChannel() != null && future.getChannel().isOpen()) {
                    try {
                        future.getChannel().close();
                    } catch (Exception e1) {
                        // ignore
                    }
                }
            }
            throw e;
        }
    }

上例為節點建立連接的過程,每一對client和server間都會建立一定數量的不同連接,為什么要有這些不同的連接呢?是因為不同的操作消耗的資源也不同,請求的頻率也不相同,對於資源消耗少、請求頻率高的ping請求,可以多建立一些連接,來確保並發,對於資源消耗大的操作如bulk操作,則要少建立一些連接,防止機器負載過大可能導致節點失聯。
總的來說,當nettytransport的連接過程,就是分別啟動client和server,同時將messagechandler。並且只有當節點啟動時,client會連接server,獲取集群信息,包括之前的5個連接。
transport除了上述這些功能之外,還負責處理request相關請求。

transport處理請求

之前我們聊了transport的啟動及連接,當這一切成功之后,transport還會負責處理請求。比如集群中master確認節點是否存在,節點獲取集群的狀態等。
為了保證信息傳輸,es定義了一個19個字節長度的信息頭:

HEADER_SIZE = 2 + 4 + 8 + 1 + 4

ES開頭,緊接着是4個字節的int類型信息長度,然后是8個字節的long類型的信息id,再是一個字節的status,最后是4個字節int類型的version。所有節點間的信息交互都以這個19個字節的頭部開始。同時,es對於節點間的所有action都定義了名字,如對master的周期檢測型action。每個action對應着相應的messagehandler:

public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
        //參數說明:node發送的目的節點,requestId請求id,action action名稱,request請求,options包括以下幾種操作 RECOVERY,BULK,REG,STATE,PING;
     Channel targetChannel = nodeChannel(node, options);//獲取對應節點的channel,channel在連接節點時初始化完成(請參考上一篇)

        if (compress) {
            options.withCompress(true);
        }

        byte status = 0;
     //設置status 包括以下幾種STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;
    status = TransportStatus.setRequest(status); 
     ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始寫出流
        boolean addedReleaseListener = false;
        try {
            bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置
            StreamOutput stream = bStream;
            // only compress if asked, and, the request is not bytes, since then only
            // the header part is compressed, and the "body" can't be extracted as compressed
            if (options.compress() && (!(request instanceof BytesTransportRequest))) {
                status = TransportStatus.setCompress(status);
                stream = CompressorFactory.defaultCompressor().streamOutput(stream);
            }
            stream = new HandlesStreamOutput(stream);

            // we pick the smallest of the 2, to support both backward and forward compatibility
            // note, this is the only place we need to do this, since from here on, we use the serialized version
            // as the version to use also when the node receiving this request will send the response with
            Version version = Version.smallest(this.version, node.version());

            stream.setVersion(version);
            stream.writeString(transportServiceAdapter.action(action, version));

            ReleasableBytesReference bytes;
            ChannelBuffer buffer;
            // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
            // that create paged channel buffers, but its tricky to know when to do it (where this option is
            // more explicit).
            if (request instanceof BytesTransportRequest) {
                BytesTransportRequest bRequest = (BytesTransportRequest) request;
                assert node.version().equals(bRequest.version());
                bRequest.writeThin(stream);
                stream.close();
                bytes = bStream.bytes();
                ChannelBuffer headerBuffer = bytes.toChannelBuffer();
                ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
                buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
            } else {
                request.writeTo(stream);
                stream.close();
                bytes = bStream.bytes();
                buffer = bytes.toChannelBuffer();
            }
            NettyHeader.writeHeader(buffer, requestId, status, version);//寫信息頭
            ChannelFuture future = targetChannel.write(buffer);//寫buffer同時獲取future,發送信息發生在這里
            ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
            future.addListener(listener);//添加listener
            addedReleaseListener = true;
            transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
        } finally {
            if (!addedReleaseListener) {
                Releasables.close(bStream.bytes());
            }
        }
    }

上例展示了request的發送過程,獲取目標node的channel封裝請求寫入信息頭,然后發送並使用listener監聽,這里的transportRequest是一個抽象類,繼承了transportMessage並同時實現了streamable接口,各個功能都有相應的request。
有發就有收,transport仍將接收交給message處理,而message則轉交給messageHandler處理,因為nettytransport的信息處理邏輯都在messageCHannelHandler的messageReceived方法中:

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Transports.assertTransportThread();
        Object m = e.getMessage();
        if (!(m instanceof ChannelBuffer)) {//非buffer之間返回
            ctx.sendUpstream(e);
            return;
        }
     //解析message頭
        ChannelBuffer buffer = (ChannelBuffer) m;
        int size = buffer.getInt(buffer.readerIndex() - 4);
        transportServiceAdapter.received(size + 6);

        // we have additional bytes to read, outside of the header
        boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0;

        int markedReaderIndex = buffer.readerIndex();
        int expectedIndexReader = markedReaderIndex + size;

        // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
        // buffer, or in the cumlation buffer, which is cleaned each time
        StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
      //讀取信息頭中的幾個重要元數據
        long requestId = buffer.readLong();
        byte status = buffer.readByte();
        Version version = Version.fromId(buffer.readInt());

        StreamInput wrappedStream;
      …………
        if (TransportStatus.isRequest(status)) {//處理請求
            String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
            if (buffer.readerIndex() != expectedIndexReader) {
                if (buffer.readerIndex() < expectedIndexReader) {
                    logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
                } else {
                    logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
                }
                buffer.readerIndex(expectedIndexReader);
            }
        } else {//處理響應
            TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
            // ignore if its null, the adapter logs it
            if (handler != null) {
                if (TransportStatus.isError(status)) {
                    handlerResponseError(wrappedStream, handler);
                } else {
                    handleResponse(ctx.getChannel(), wrappedStream, handler);
                }
            } else {
                // if its null, skip those bytes
                buffer.readerIndex(markedReaderIndex + size);
            }
          …………
        wrappedStream.close();
    }

上述代碼展示了信息處理邏輯。
那么,request和response是如何被處理的呢?先看request的處理代碼:

protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
        final String action = buffer.readString();//讀出action的名字
        transportServiceAdapter.onRequestReceived(requestId, action);
        final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
        try {
            final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);//獲取處理該信息的handler
            if (handler == null) {
                throw new ActionNotFoundTransportException(action);
            }
            final TransportRequest request = handler.newInstance();
            request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
            request.readFrom(buffer);
            if (handler.executor() == ThreadPool.Names.SAME) {
                //noinspection unchecked
                handler.messageReceived(request, transportChannel);//使用該handler處理信息。
            } else {
                threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
            }
        } catch (Throwable e) {
            try {
                transportChannel.sendResponse(e);
            } catch (IOException e1) {
                logger.warn("Failed to send error message back to client for action [" + action + "]", e);
                logger.warn("Actual Exception", e1);
            }
        }
        return action;
    }

上例雖然在關鍵部分已經加了標注,但是仍不能看到請求是如何處理的,因為集群中存在各種請求,比如ping、discovery等等。因此要對應多種處理方式,所以,request最終被提交給handler處理。
每個功能都有自己的handler,當request被提交handler時,會自動的調用相應的方法來處理。
request的完整處理流程是:messageRecevied方法收到信息判斷時,將request轉發給transportServiceApdater的handler方法。handler根據請求類型分發給對應的方法處理。
那response的處理流程是什么呢?response通過handlerResponse方法處理:

protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
        final TransportResponse response = handler.newInstance();
        response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
        response.remoteAddress();
        try {
            response.readFrom(buffer);
        } catch (Throwable e) {
            handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
            return;
        }
        try {
            if (handler.executor() == ThreadPool.Names.SAME) {
                //noinspection unchecked
                handler.handleResponse(response);//轉發給對應的handler
            } else {
                threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
            }
        } catch (Throwable e) {
            handleException(handler, new ResponseHandlerFailureTransportException(e));
        }
    }

response的處理流程跟request類似。response也有對應的handler處理響應request。
最后,來總結一下nettytransport的信息處理流程:

  • 信息通過request方法發送到目標節點。
  • 目標節點的messageHandler收到該信息,確定是request還是response,然后將它們轉發給transportServicedAdapter,transportServicedAdapter根據request或response類型交給對應的handler處理並反饋。

cluster discovery概述

es的cluster實現了自己的發現(discovery)機制Zen,discovery的功能包括:

  • mater選舉。
  • master錯誤探測。
  • cluster中節點探測。
  • 單播廣播的ping。

discovery是可配置模塊,除了默認機制Zen,還有支持其他機制包括:

  • Azure classic discovery 插件方式,廣播。
  • EC2 discovery 插件方式,廣播。
  • Google Compute Engine (GCE) discovery 插件方式,廣播。
  • Zen discovery 默認實現,廣播/單播。

我們可以根據各插件規則配置自己的發現機制。該機制通過實現guicediscoveryModule對外提供注冊和啟動。發現模塊對外提供接口discoveryService。它本質上是一個discovery的一個代理,所有的功能最終都由所綁定的discovery實現。
當節點啟動時通過discoveryModule獲取discoveryService並啟動它。這也是其他機制的實現方式。通過discoveryModule對外提供綁定和獲取,通過discoveryService接口對外提供功能。

節點探測:discovery faultdetection

在es的設計中,一個集群必須有一個主節點(master node)。用來處理請求、索引的創建、修改、節點管理等。
當有了master節點,該節點就要對各子節點進行周期性(心跳機制)的探測,保證整個集群的健康。
主節點和各節點之間都會進行心跳檢測,比如mater要確保各節點健康狀況、是否宕機等,而子節點也要要確保master的健康狀況,一旦master宕機,各子節點要重新選舉新的master。這種相互間的心跳檢測就是cluster的faultdetection。下圖展示了faultdetection繼承關系。

faultdetection有兩種實現方式,分別是master探測其他節點和其他節點對master的探測。faultdetection抽象了方法handleTransportDisconnect,該方法在內部類FDConnectionListener 中被調用。es中大量使用了listener的異步方式,因此可以大大的提升系統性能:

private class FDConnectionListener implements TransportConnectionListener {
        @Override
        public void onNodeConnected(DiscoveryNode node) {
        }

        @Override
        public void onNodeDisconnected(DiscoveryNode node) {
            handleTransportDisconnect(node);
        }
    }

faultdetection啟動時就會注冊相應的FDConnectionListener,在周期性檢測時,發現有節點失聯,會通過onNodeDisconnected方法回調handleTransportDisconnect進行處理。先來看masterFaultdetection的啟動代碼:

private void innerStart(final DiscoveryNode masterNode) {
    this.masterNode = masterNode;
            this.retryCount = 0;
            this.notifiedMasterFailure.set(false);

            // 嘗試連接master節點
            try {
                transportService.connectToNode(masterNode);
            } catch (final Exception e) {
                // 連接失敗通知masterNode失敗

                notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]");
                return;
            }
        //關閉之前的masterping,重啟新的masterping
            if (masterPinger != null) {
                masterPinger.stop();
            }
            this.masterPinger = new MasterPinger();

            // 周期之后啟動masterPing,這里並沒有周期啟動masterPing,只是設定了延遲時間。
            threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
        }

再來看master連接失敗的處理邏輯:

private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
        if (notifiedMasterFailure.compareAndSet(false, true)) {
            threadPool.generic().execute(new Runnable() {
                @Override
                public void run() {
            //通知所有listener master丟失
                    for (Listener listener : listeners) {
                        listener.onMasterFailure(masterNode, reason);
                    }
                }
            });
            stop("master failure, " + reason);
        }
    }

zen discovery機制實現了listener.onMasterFailure接口,處理master失聯的相關問題。下面是部分示例代碼:

private class MasterPinger implements Runnable {

        private volatile boolean running = true;

        public void stop() {
            this.running = false;
        }

        @Override
        public void run() {
            if (!running) {
                // return and don't spawn...
                return;
            }
            final DiscoveryNode masterToPing = masterNode;
   final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
            final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {

                        @Override
                        public MasterPingResponseResponse newInstance() {
                            return new MasterPingResponseResponse();
                        }

                        @Override
                        public void handleResponse(MasterPingResponseResponse response) {
                            if (!running) {
                                return;
                            }
                            // reset the counter, we got a good result
                            MasterFaultDetection.this.retryCount = 0;
                            // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                            if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                // 啟動新的ping周期
                                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
                            }
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            if (!running) {
                                return;
                            }
                            synchronized (masterNodeMutex) {
                                // check if the master node did not get switched on us...
                                if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                    if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                                        handleTransportDisconnect(masterToPing);
                                        return;
                                    } else if (exp.getCause() instanceof NoLongerMasterException) {
                                        logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                        notifyMasterFailure(masterToPing, "no longer master");
                                        return;
                                    } else if (exp.getCause() instanceof NotMasterException) {
                                        logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                        notifyMasterFailure(masterToPing, "not master");
                                        return;
                                    } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
                                        logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
                                        notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
                                        return;
                                    }

                                    int retryCount = ++MasterFaultDetection.this.retryCount;
                                    logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
                                    if (retryCount >= pingRetryCount) {
                                        logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout);
                                        // not good, failure
                                        notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                                    } else {
                                         // resend the request, not reschedule, rely on send timeout
                                        transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
                                    }
                                }
                            }
                        }

            );
        }
    }

masterPing是一個線程,在innerStart的方法中沒有設定周期啟動masterPing,但是由於masterPing需要進行心跳檢測,這個問題就交給了上例的run方法。如果ping成功就會重啟一個新的ping,這樣既保證了ping線程的唯一性同時也保證了ping的順序和間隔。ping的方式同樣是通過transport發送一個masterPingRequest進行連接,節點收到該請求后,如果該節點已不再是master就會拋出一個NotMasterException。否則會響應notifyMasterFailure方法。對於網絡問題導致的無響應情況,會調用handleTransportDisconnect(masterToPing)方法處理:

protected void handleTransportDisconnect(DiscoveryNode node) {
    //這里需要同步
        synchronized (masterNodeMutex) {
        //master 已經換成其它節點,就沒必要再連接
            if (!node.equals(this.masterNode)) {
                return;
            }
            if (connectOnNetworkDisconnect) {
                try {
            //嘗試再次連接
                    transportService.connectToNode(node);
                    // if all is well, make sure we restart the pinger
                    if (masterPinger != null) {
                        masterPinger.stop();
                    }
            //連接成功啟動新的masterping
                    this.masterPinger = new MasterPinger();
                    // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                    threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
                } catch (Exception e) {
            //連接出現異常,啟動master節點丟失通知
                    logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                    notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
                }
            } else {
          //不需要重連,通知master丟失。
                logger.trace("[master] [{}] transport disconnected", node);
                notifyMasterFailure(node, "transport disconnected");
            }
        }
    }

就是masterfaultDetection的整個流程:
啟動中如果master節點失聯則通知節點丟失,否則在一定延遲(3s)后啟動masterPingmasterPing線程嘗試連接master節點,如果master節點仍然失聯,則再次嘗試連接。master節點收到masterPingRequest請求后首先看一下自己還是不是master,如果不是則拋出異常,否則正常回應。節點如果收到響應式異常則啟動master丟失通知,否則此次ping結束。在一定時間后重新啟動新的masterPing線程。
這里只是說master的faultdetection,而node的faultdetection跟master邏輯相似。區別主要在於ping異常處理上。
在node的faultdetection中,當某個node出現異常或者沒有響應,會啟動node丟失機制,只是具體的處理邏輯不同。

discovery ping機制

ping是es中集群發現的基本手段,通過在局域網中廣播或者指定ping的某些節點(單播)獲取集群信息和節點加入集群等操作。ZenDiscovery機制實現了兩種ping機制:

  • 廣播,當es實例啟動的時候,它發送了廣播的ping請求到地址224.2.2.4:54328。而其他的es實例使用同樣的集群名稱響應了這個請求。
  • 單播,各節點通過單播列表來發現彼此從而加入同一個集群。

廣播的原理很簡單,當一個節點啟動后向局域網發送廣播信息,任何收到節點只要集群名稱和該節點相同,就會對此廣播作出回應。這樣這個節點就能獲取集群相關的信息。它定義了一個action:discovery/zen/multicast和廣播的信息頭INTERNAL_HEADER
在之前說過,nettyTransport是cluster的通信基礎,但是廣播卻沒有使用 ,而是采用了java的multicastsocket。而multicastsocket是一個UDP的socket,用來進行多個數據包的廣播。它將節點間的通信組成一個。任何multicastsocket都可以加入進來,組內的socket發送信息會被組內其他的節點接收。es將其進一步封裝成multicastchannel
mutlicastZenPing共定義了4個內部類,共同完成廣播功能:

  • finalizingPingCollection是一個pingresponse的容器,用來存儲所有的響應。
  • multicastPingResponseRequestHander是response處理類,類似於之前說的nettytransporthandler,這里雖然沒有使用netty,但是也定義了一個messageReceived方法,當收到一個請求時直接返回一個response。
  • multicastPingResponse是一個響應類。
  • Received類處理消息邏輯,也是最重要的一個類。

剛才說了,因為廣播沒有使用nettytransport,所以對於消息的邏輯處理都在received類中。在初始化的時候,multicastZenPing時會將received注冊進去:

protected void doStart() throws ElasticsearchException {
        try {
            ....
            multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
                    new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
                    new Receiver());//將receiver注冊到channel中
        } catch (Throwable t) {
          ....
        }
    }

received類繼承了listener,實現了3個方法,消息經過onMessage方法區分,如果是內部的ping則使用handlerNodePingRequest方法處理,否則使用handlerExternalPingRequest處理。那怎么區分這個消息到底是內部ping還是外部的ping呢?區分方法也很簡單,就是讀取消息中的關於INTERNAL_HEADER信息頭,下面是nodePing的相關代碼:

private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
           ....
            final DiscoveryNodes discoveryNodes = contextProvider.nodes();
            final DiscoveryNode requestingNode = requestingNodeX;
            if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
                // 自身發出的ping,忽略
                return;
            }
        //只接受本集群ping
            if (!requestClusterName.equals(clusterName)) {
            ...return;
            }
            // 兩個client間不需要ping
            if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
            }
        //新建一個response
            final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
            multicastPingResponse.id = id;
            multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
        //無法連接的情況
            if (!transportService.nodeConnected(requestingNode)) {
                // do the connect and send on a thread pool
                threadPool.generic().execute(new Runnable() {
                    @Override
                    public void run() {
                        // connect to the node if possible
                        try {
                            transportService.connectToNode(requestingNode);
                            transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                                @Override
                                public void handleException(TransportException exp) {
                                    logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                                }
                            });
                        } catch (Exception e) {
                            if (lifecycle.started()) {
                                logger.warn("failed to connect to requesting node {}", e, requestingNode);
                            }
                        }
                    }
                });
            } else {
                transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                    @Override
                    public void handleException(TransportException exp) {
                        if (lifecycle.started()) {
                            logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                        }
                    }
                });
            }
        }
    }

上述代碼描述了如何處理內部ping,接下來再說說如何處理來自外部的ping信息。當收到其他節點的響應信息后,它會把本節點及集群的master節點相關信息返回廣播節點。這樣廣播節點就獲知了集群信息。
multicastZenPing類中還有一個類multicastPingResponseRequestHandler,它的作用是廣播節點對於其他節點廣播信息響應的回應。廣播節點的第二次發送信息的過程,它跟其他transportRequestHandler一樣有messageReceived方法。在啟動時注冊到transportserver中,只處理一類actioninternal:discovery/zen/multicast
我們再來看ping請求的發送策略代碼:

public void ping(final PingListener listener, final TimeValue timeout) {
       ....
    
    //產生一個id
        final int id = pingIdGenerator.incrementAndGet();
        try {
            receivedResponses.put(id, new PingCollection());
            sendPingRequest(id);//第一次發送ping請求
            // 等待時間的1/2后再次發送一個請求
            threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                @Override
                public void onFailure(Throwable t) {
                    logger.warn("[{}] failed to send second ping request", t, id);
                    finalizePingCycle(id, listener);
                }

                @Override
                public void doRun() {
                    sendPingRequest(id);
            //再過1/2時間再次發送一個請求
                    threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                        @Override
                        public void onFailure(Throwable t) {
                            logger.warn("[{}] failed to send third ping request", t, id);
                            finalizePingCycle(id, listener);
                        }

                        @Override
                        public void doRun() {
                            // make one last ping, but finalize as soon as all nodes have responded or a timeout has past
                            PingCollection collection = receivedResponses.get(id);
                            FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
                            receivedResponses.put(id, finalizingPingCollection);
                            logger.trace("[{}] sending last pings", id);
                            sendPingRequest(id);
                //最后一次發送請求,超時的1/4后
                            threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                                @Override
                                public void onFailure(Throwable t) {
                                    logger.warn("[{}] failed to finalize ping", t, id);
                                }

                                @Override
                                protected void doRun() throws Exception {
                                    finalizePingCycle(id, listener);
                                }
                            });
                        }
                    });
                }
            });
        } catch (Exception e) {
            logger.warn("failed to ping", e);
            finalizePingCycle(id, listener);
        }
    }

ping的過程主要是調用sendPingRequest(id)方法,在該方法中將id、版本、本地節點信息一起寫入到BytesStreamOutput中,然后將其進行廣播。這個廣播信息會被其他機器上的Received接收並處理,並且響應ping請求。另外一個需要關注的是上述代碼中注釋的部分。它通過鏈式定期發送請求,在等待的時間內可能會發送4次請求,但這也帶來了一些問題,這種發送方式會造成大量的ping請求重復,但幸運的是ping請求資源消耗較小。並且帶來的好處也顯而易見,因為這樣盡可能的保證了在timeout的時間段內,集群新增節點都能收到這個ping信息,這種方式應用於單播發現中。
簡要來說,廣播使用了java的multicastsocket,在timeout時間內發送4次ping請求,該請求包括一個id、信息頭、本地節點信息。這些信息在被其他響應節點接收,交給Received處理,Received會將集群的master和本節點的相關信息通過transport返回給廣播節點。廣播節點收到這些信息后會立即使用transport返回一個空的response。至此一個廣播過程完成。
廣播雖好,但我選擇單播!因為當節點在分布在多個網段時,廣播模式就失效了,因為廣播信息不可達!這個時候就要使用單播去向指定的節點發送ping請求獲取cluster的相關信息。這就是單播的用處與優點。
單播使用的是nettytransport,它會使用跟廣播一樣,通過鏈式請求向指定的節點發送請求,信息的處理方式是nettytransport標准的信息處理過程。


歡迎斧正,that's all 本文主要參考:[elasticsearch節點間通信的基礎transport](https://www.cnblogs.com/zziawanblog/p/6523706.html) | [elasticsearch transport 請求發送和處理](https://www.cnblogs.com/zziawanblog/p/6528616.html) | [cluster discovery概述及FaultDetection分析](https://www.cnblogs.com/zziawanblog/p/6533731.html) | [zendiscovery 的Ping機制](https://www.cnblogs.com/zziawanblog/p/6551549.html)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM