ping是es中集群發現的基本手段,通過在局域網中廣播或者指定ping的某些節點(單播)獲取集群信息和節點加入集群等操作。ZenDiscovery
機制實現了兩種ping機制:
- 廣播,當es實例啟動的時候,它發送了廣播的ping請求到地址
224.2.2.4:54328
。而其他的es實例使用同樣的集群名稱響應了這個請求。 - 單播,各節點通過單播列表來發現彼此從而加入同一個集群。
廣播的原理很簡單,當一個節點啟動后向局域網發送廣播信息,任何收到節點只要集群名稱和該節點相同,就會對此廣播作出回應。這樣這個節點就能獲取集群相關的信息。它定義了一個action:discovery/zen/multicast
和廣播的信息頭INTERNAL_HEADER
。
在之前說過,nettyTransport
是cluster的通信基礎,但是廣播卻沒有使用 ,而是采用了java的multicastsocket
。而multicastsocket
是一個UDP的socket,用來進行多個數據包的廣播。它將節點間的通信組成一個組。任何multicastsocket
都可以加入進來,組內的socket發送信息會被組內其他的節點接收。es將其進一步封裝成multicastchannel
。
mutlicastZenPing
共定義了4個內部類,共同完成廣播功能:
- finalizingPingCollection是一個pingresponse的容器,用來存儲所有的響應。
- multicastPingResponseRequestHander是response處理類,類似於之前說的nettytransporthandler,這里雖然沒有使用netty,但是也定義了一個messageReceived方法,當收到一個請求時直接返回一個response。
- multicastPingResponse是一個響應類。
- Received類處理消息邏輯,也是最重要的一個類。
剛才說了,因為廣播沒有使用nettytransport,所以對於消息的邏輯處理都在received類中。在初始化的時候,multicastZenPing
時會將received注冊進去:
protected void doStart() throws ElasticsearchException {
try {
....
multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
new Receiver());//將receiver注冊到channel中
} catch (Throwable t) {
....
}
}
received
類繼承了listener
,實現了3個方法,消息經過onMessage
方法區分,如果是內部的ping則使用handlerNodePingRequest
方法處理,否則使用handlerExternalPingRequest
處理。那怎么區分這個消息到底是內部ping還是外部的ping呢?區分方法也很簡單,就是讀取消息中的關於INTERNAL_HEADER
信息頭,下面是nodePing
的相關代碼:
private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
....
final DiscoveryNodes discoveryNodes = contextProvider.nodes();
final DiscoveryNode requestingNode = requestingNodeX;
if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
// 自身發出的ping,忽略
return;
}
//只接受本集群ping
if (!requestClusterName.equals(clusterName)) {
...return;
}
// 兩個client間不需要ping
if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
}
//新建一個response
final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
multicastPingResponse.id = id;
multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
//無法連接的情況
if (!transportService.nodeConnected(requestingNode)) {
// do the connect and send on a thread pool
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
// connect to the node if possible
try {
transportService.connectToNode(requestingNode);
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
} catch (Exception e) {
if (lifecycle.started()) {
logger.warn("failed to connect to requesting node {}", e, requestingNode);
}
}
}
});
} else {
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
if (lifecycle.started()) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
}
});
}
}
}
上述代碼描述了如何處理內部ping,接下來再說說如何處理來自外部的ping信息。當收到其他節點的響應信息后,它會把本節點及集群的master節點相關信息返回廣播節點。這樣廣播節點就獲知了集群信息。
在multicastZenPing
類中還有一個類multicastPingResponseRequestHandler
,它的作用是廣播節點對於其他節點廣播信息響應的回應。廣播節點的第二次發送信息的過程,它跟其他transportRequestHandler
一樣有messageReceived
方法。在啟動時注冊到transportserver
中,只處理一類actioninternal:discovery/zen/multicast
。
我們再來看ping請求的發送策略代碼:
public void ping(final PingListener listener, final TimeValue timeout) {
....
//產生一個id
final int id = pingIdGenerator.incrementAndGet();
try {
receivedResponses.put(id, new PingCollection());
sendPingRequest(id);//第一次發送ping請求
// 等待時間的1/2后再次發送一個請求
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
sendPingRequest(id);
//再過1/2時間再次發送一個請求
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
//最后一次發送請求,超時的1/4后
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}
@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});
} catch (Exception e) {
logger.warn("failed to ping", e);
finalizePingCycle(id, listener);
}
}
ping的過程主要是調用sendPingRequest(id)
方法,在該方法中將id、版本、本地節點信息一起寫入到BytesStreamOutput
中,然后將其進行廣播。這個廣播信息會被其他機器上的Received
接收並處理,並且響應ping請求。另外一個需要關注的是上述代碼中注釋的部分。它通過鏈式定期發送請求,在等待的時間內可能會發送4次請求,但這也帶來了一些問題,這種發送方式會造成大量的ping請求重復,但幸運的是ping請求資源消耗較小。並且帶來的好處也顯而易見,因為這樣盡可能的保證了在timeout的時間段內,集群新增節點都能收到這個ping信息,這種方式應用於單播發現中。
簡要來說,廣播使用了java的multicastsocket
,在timeout時間內發送4次ping請求,該請求包括一個id、信息頭、本地節點信息。這些信息在被其他響應節點接收,交給Received
處理,Received
會將集群的master和本節點的相關信息通過transport
返回給廣播節點。廣播節點收到這些信息后會立即使用transport
返回一個空的response。至此一個廣播過程完成。
廣播雖好,但我選擇單播!因為當節點在分布在多個網段時,廣播模式就失效了,因為廣播信息不可達!這個時候就要使用單播去向指定的節點發送ping請求獲取cluster的相關信息。這就是單播的用處與優點。
單播使用的是nettytransport
,它會使用跟廣播一樣,通過鏈式請求向指定的節點發送請求,信息的處理方式是nettytransport
標准的信息處理過程。
歡迎斧正,that's all 本文參考:[zendiscovery 的Ping機制](https://www.cnblogs.com/zziawanblog/p/6551549.html)