elasticsearch之discovery節點探測


目錄

    在es的設計中,一個集群必須有一個主節點(master node)。用來處理請求、索引的創建、修改、節點管理等。
    當有了master節點,該節點就要對各子節點進行周期性(心跳機制)的探測,保證整個集群的健康。
    主節點和各節點之間都會進行心跳檢測,比如mater要確保各節點健康狀況、是否宕機等,而子節點也要要確保master的健康狀況,一旦master宕機,各子節點要重新選舉新的master。這種相互間的心跳檢測就是cluster的faultdetection。下圖展示了faultdetection繼承關系。

    faultdetection有兩種實現方式,分別是master探測其他節點和其他節點對master的探測。faultdetection抽象了方法handleTransportDisconnect,該方法在內部類FDConnectionListener 中被調用。es中大量使用了listener的異步方式,因此可以大大的提升系統性能:

    private class FDConnectionListener implements TransportConnectionListener {
            @Override
            public void onNodeConnected(DiscoveryNode node) {
            }
    
            @Override
            public void onNodeDisconnected(DiscoveryNode node) {
                handleTransportDisconnect(node);
            }
        }
    

    faultdetection啟動時就會注冊相應的FDConnectionListener,在周期性檢測時,發現有節點失聯,會通過onNodeDisconnected方法回調handleTransportDisconnect進行處理。先來看masterFaultdetection的啟動代碼:

    private void innerStart(final DiscoveryNode masterNode) {
        this.masterNode = masterNode;
                this.retryCount = 0;
                this.notifiedMasterFailure.set(false);
    
                // 嘗試連接master節點
                try {
                    transportService.connectToNode(masterNode);
                } catch (final Exception e) {
                    // 連接失敗通知masterNode失敗
    
                    notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]");
                    return;
                }
            //關閉之前的masterping,重啟新的masterping
                if (masterPinger != null) {
                    masterPinger.stop();
                }
                this.masterPinger = new MasterPinger();
    
                // 周期之后啟動masterPing,這里並沒有周期啟動masterPing,只是設定了延遲時間。
                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
            }
    

    再來看master連接失敗的處理邏輯:

    private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
            if (notifiedMasterFailure.compareAndSet(false, true)) {
                threadPool.generic().execute(new Runnable() {
                    @Override
                    public void run() {
                //通知所有listener master丟失
                        for (Listener listener : listeners) {
                            listener.onMasterFailure(masterNode, reason);
                        }
                    }
                });
                stop("master failure, " + reason);
            }
        }
    

    zen discovery機制實現了listener.onMasterFailure接口,處理master失聯的相關問題。下面是部分示例代碼:

    private class MasterPinger implements Runnable {
    
            private volatile boolean running = true;
    
            public void stop() {
                this.running = false;
            }
    
            @Override
            public void run() {
                if (!running) {
                    // return and don't spawn...
                    return;
                }
                final DiscoveryNode masterToPing = masterNode;
       final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
                final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
                transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
    
                            @Override
                            public MasterPingResponseResponse newInstance() {
                                return new MasterPingResponseResponse();
                            }
    
                            @Override
                            public void handleResponse(MasterPingResponseResponse response) {
                                if (!running) {
                                    return;
                                }
                                // reset the counter, we got a good result
                                MasterFaultDetection.this.retryCount = 0;
                                // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                                if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                    // 啟動新的ping周期
                                    threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
                                }
                            }
    
                            @Override
                            public void handleException(TransportException exp) {
                                if (!running) {
                                    return;
                                }
                                synchronized (masterNodeMutex) {
                                    // check if the master node did not get switched on us...
                                    if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                        if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                                            handleTransportDisconnect(masterToPing);
                                            return;
                                        } else if (exp.getCause() instanceof NoLongerMasterException) {
                                            logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                            notifyMasterFailure(masterToPing, "no longer master");
                                            return;
                                        } else if (exp.getCause() instanceof NotMasterException) {
                                            logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                            notifyMasterFailure(masterToPing, "not master");
                                            return;
                                        } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
                                            logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
                                            notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
                                            return;
                                        }
    
                                        int retryCount = ++MasterFaultDetection.this.retryCount;
                                        logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
                                        if (retryCount >= pingRetryCount) {
                                            logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout);
                                            // not good, failure
                                            notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                                        } else {
                                             // resend the request, not reschedule, rely on send timeout
                                            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
                                        }
                                    }
                                }
                            }
    
                );
            }
        }
    

    masterPing是一個線程,在innerStart的方法中沒有設定周期啟動masterPing,但是由於masterPing需要進行心跳檢測,這個問題就交給了上例的run方法。如果ping成功就會重啟一個新的ping,這樣既保證了ping線程的唯一性同時也保證了ping的順序和間隔。ping的方式同樣是通過transport發送一個masterPingRequest進行連接,節點收到該請求后,如果該節點已不再是master就會拋出一個NotMasterException。否則會響應notifyMasterFailure方法。對於網絡問題導致的無響應情況,會調用handleTransportDisconnect(masterToPing)方法處理:

    protected void handleTransportDisconnect(DiscoveryNode node) {
        //這里需要同步
            synchronized (masterNodeMutex) {
            //master 已經換成其它節點,就沒必要再連接
                if (!node.equals(this.masterNode)) {
                    return;
                }
                if (connectOnNetworkDisconnect) {
                    try {
                //嘗試再次連接
                        transportService.connectToNode(node);
                        // if all is well, make sure we restart the pinger
                        if (masterPinger != null) {
                            masterPinger.stop();
                        }
                //連接成功啟動新的masterping
                        this.masterPinger = new MasterPinger();
                        // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                        threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
                    } catch (Exception e) {
                //連接出現異常,啟動master節點丟失通知
                        logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                        notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
                    }
                } else {
              //不需要重連,通知master丟失。
                    logger.trace("[master] [{}] transport disconnected", node);
                    notifyMasterFailure(node, "transport disconnected");
                }
            }
        }
    

    就是masterfaultDetection的整個流程:
    啟動中如果master節點失聯則通知節點丟失,否則在一定延遲(3s)后啟動masterPingmasterPing線程嘗試連接master節點,如果master節點仍然失聯,則再次嘗試連接。master節點收到masterPingRequest請求后首先看一下自己還是不是master,如果不是則拋出異常,否則正常回應。節點如果收到響應式異常則啟動master丟失通知,否則此次ping結束。在一定時間后重新啟動新的masterPing線程。
    這里只是說master的faultdetection,而node的faultdetection跟master邏輯相似。區別主要在於ping異常處理上。
    在node的faultdetection中,當某個node出現異常或者沒有響應,會啟動node丟失機制,只是具體的處理邏輯不同。


    歡迎斧正,that's all see also:[cluster discovery概述及FaultDetection分析](https://www.cnblogs.com/zziawanblog/p/6533731.html)


    免責聲明!

    本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



     
    粵ICP備18138465號   © 2018-2025 CODEPRJ.COM