elasticsearch之discovery ping機制


目錄

    ping是es中集群發現的基本手段,通過在局域網中廣播或者指定ping的某些節點(單播)獲取集群信息和節點加入集群等操作。ZenDiscovery機制實現了兩種ping機制:

    • 廣播,當es實例啟動的時候,它發送了廣播的ping請求到地址224.2.2.4:54328。而其他的es實例使用同樣的集群名稱響應了這個請求。
    • 單播,各節點通過單播列表來發現彼此從而加入同一個集群。

    廣播的原理很簡單,當一個節點啟動后向局域網發送廣播信息,任何收到節點只要集群名稱和該節點相同,就會對此廣播作出回應。這樣這個節點就能獲取集群相關的信息。它定義了一個action:discovery/zen/multicast和廣播的信息頭INTERNAL_HEADER
    在之前說過,nettyTransport是cluster的通信基礎,但是廣播卻沒有使用 ,而是采用了java的multicastsocket。而multicastsocket是一個UDP的socket,用來進行多個數據包的廣播。它將節點間的通信組成一個。任何multicastsocket都可以加入進來,組內的socket發送信息會被組內其他的節點接收。es將其進一步封裝成multicastchannel
    mutlicastZenPing共定義了4個內部類,共同完成廣播功能:

    • finalizingPingCollection是一個pingresponse的容器,用來存儲所有的響應。
    • multicastPingResponseRequestHander是response處理類,類似於之前說的nettytransporthandler,這里雖然沒有使用netty,但是也定義了一個messageReceived方法,當收到一個請求時直接返回一個response。
    • multicastPingResponse是一個響應類。
    • Received類處理消息邏輯,也是最重要的一個類。

    剛才說了,因為廣播沒有使用nettytransport,所以對於消息的邏輯處理都在received類中。在初始化的時候,multicastZenPing時會將received注冊進去:

    protected void doStart() throws ElasticsearchException {
            try {
                ....
                multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
                        new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
                        new Receiver());//將receiver注冊到channel中
            } catch (Throwable t) {
              ....
            }
        }
    

    received類繼承了listener,實現了3個方法,消息經過onMessage方法區分,如果是內部的ping則使用handlerNodePingRequest方法處理,否則使用handlerExternalPingRequest處理。那怎么區分這個消息到底是內部ping還是外部的ping呢?區分方法也很簡單,就是讀取消息中的關於INTERNAL_HEADER信息頭,下面是nodePing的相關代碼:

    private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
               ....
                final DiscoveryNodes discoveryNodes = contextProvider.nodes();
                final DiscoveryNode requestingNode = requestingNodeX;
                if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
                    // 自身發出的ping,忽略
                    return;
                }
            //只接受本集群ping
                if (!requestClusterName.equals(clusterName)) {
                ...return;
                }
                // 兩個client間不需要ping
                if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
                }
            //新建一個response
                final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
                multicastPingResponse.id = id;
                multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
            //無法連接的情況
                if (!transportService.nodeConnected(requestingNode)) {
                    // do the connect and send on a thread pool
                    threadPool.generic().execute(new Runnable() {
                        @Override
                        public void run() {
                            // connect to the node if possible
                            try {
                                transportService.connectToNode(requestingNode);
                                transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                                    @Override
                                    public void handleException(TransportException exp) {
                                        logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                                    }
                                });
                            } catch (Exception e) {
                                if (lifecycle.started()) {
                                    logger.warn("failed to connect to requesting node {}", e, requestingNode);
                                }
                            }
                        }
                    });
                } else {
                    transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                        @Override
                        public void handleException(TransportException exp) {
                            if (lifecycle.started()) {
                                logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                            }
                        }
                    });
                }
            }
        }
    

    上述代碼描述了如何處理內部ping,接下來再說說如何處理來自外部的ping信息。當收到其他節點的響應信息后,它會把本節點及集群的master節點相關信息返回廣播節點。這樣廣播節點就獲知了集群信息。
    multicastZenPing類中還有一個類multicastPingResponseRequestHandler,它的作用是廣播節點對於其他節點廣播信息響應的回應。廣播節點的第二次發送信息的過程,它跟其他transportRequestHandler一樣有messageReceived方法。在啟動時注冊到transportserver中,只處理一類actioninternal:discovery/zen/multicast
    我們再來看ping請求的發送策略代碼:

    public void ping(final PingListener listener, final TimeValue timeout) {
           ....
        
        //產生一個id
            final int id = pingIdGenerator.incrementAndGet();
            try {
                receivedResponses.put(id, new PingCollection());
                sendPingRequest(id);//第一次發送ping請求
                // 等待時間的1/2后再次發送一個請求
                threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                    @Override
                    public void onFailure(Throwable t) {
                        logger.warn("[{}] failed to send second ping request", t, id);
                        finalizePingCycle(id, listener);
                    }
    
                    @Override
                    public void doRun() {
                        sendPingRequest(id);
                //再過1/2時間再次發送一個請求
                        threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                            @Override
                            public void onFailure(Throwable t) {
                                logger.warn("[{}] failed to send third ping request", t, id);
                                finalizePingCycle(id, listener);
                            }
    
                            @Override
                            public void doRun() {
                                // make one last ping, but finalize as soon as all nodes have responded or a timeout has past
                                PingCollection collection = receivedResponses.get(id);
                                FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
                                receivedResponses.put(id, finalizingPingCollection);
                                logger.trace("[{}] sending last pings", id);
                                sendPingRequest(id);
                    //最后一次發送請求,超時的1/4后
                                threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                                    @Override
                                    public void onFailure(Throwable t) {
                                        logger.warn("[{}] failed to finalize ping", t, id);
                                    }
    
                                    @Override
                                    protected void doRun() throws Exception {
                                        finalizePingCycle(id, listener);
                                    }
                                });
                            }
                        });
                    }
                });
            } catch (Exception e) {
                logger.warn("failed to ping", e);
                finalizePingCycle(id, listener);
            }
        }
    

    ping的過程主要是調用sendPingRequest(id)方法,在該方法中將id、版本、本地節點信息一起寫入到BytesStreamOutput中,然后將其進行廣播。這個廣播信息會被其他機器上的Received接收並處理,並且響應ping請求。另外一個需要關注的是上述代碼中注釋的部分。它通過鏈式定期發送請求,在等待的時間內可能會發送4次請求,但這也帶來了一些問題,這種發送方式會造成大量的ping請求重復,但幸運的是ping請求資源消耗較小。並且帶來的好處也顯而易見,因為這樣盡可能的保證了在timeout的時間段內,集群新增節點都能收到這個ping信息,這種方式應用於單播發現中。
    簡要來說,廣播使用了java的multicastsocket,在timeout時間內發送4次ping請求,該請求包括一個id、信息頭、本地節點信息。這些信息在被其他響應節點接收,交給Received處理,Received會將集群的master和本節點的相關信息通過transport返回給廣播節點。廣播節點收到這些信息后會立即使用transport返回一個空的response。至此一個廣播過程完成。
    廣播雖好,但我選擇單播!因為當節點在分布在多個網段時,廣播模式就失效了,因為廣播信息不可達!這個時候就要使用單播去向指定的節點發送ping請求獲取cluster的相關信息。這就是單播的用處與優點。
    單播使用的是nettytransport,它會使用跟廣播一樣,通過鏈式請求向指定的節點發送請求,信息的處理方式是nettytransport標准的信息處理過程。


    歡迎斧正,that's all 本文參考:[zendiscovery 的Ping機制](https://www.cnblogs.com/zziawanblog/p/6551549.html)


    免責聲明!

    本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



     
    粵ICP備18138465號   © 2018-2025 CODEPRJ.COM