ping是es中集群发现的基本手段,通过在局域网中广播或者指定ping的某些节点(单播)获取集群信息和节点加入集群等操作。ZenDiscovery
机制实现了两种ping机制:
- 广播,当es实例启动的时候,它发送了广播的ping请求到地址
224.2.2.4:54328
。而其他的es实例使用同样的集群名称响应了这个请求。 - 单播,各节点通过单播列表来发现彼此从而加入同一个集群。
广播的原理很简单,当一个节点启动后向局域网发送广播信息,任何收到节点只要集群名称和该节点相同,就会对此广播作出回应。这样这个节点就能获取集群相关的信息。它定义了一个action:discovery/zen/multicast
和广播的信息头INTERNAL_HEADER
。
在之前说过,nettyTransport
是cluster的通信基础,但是广播却没有使用 ,而是采用了java的multicastsocket
。而multicastsocket
是一个UDP的socket,用来进行多个数据包的广播。它将节点间的通信组成一个组。任何multicastsocket
都可以加入进来,组内的socket发送信息会被组内其他的节点接收。es将其进一步封装成multicastchannel
。
mutlicastZenPing
共定义了4个内部类,共同完成广播功能:
- finalizingPingCollection是一个pingresponse的容器,用来存储所有的响应。
- multicastPingResponseRequestHander是response处理类,类似于之前说的nettytransporthandler,这里虽然没有使用netty,但是也定义了一个messageReceived方法,当收到一个请求时直接返回一个response。
- multicastPingResponse是一个响应类。
- Received类处理消息逻辑,也是最重要的一个类。
刚才说了,因为广播没有使用nettytransport,所以对于消息的逻辑处理都在received类中。在初始化的时候,multicastZenPing
时会将received注册进去:
protected void doStart() throws ElasticsearchException {
try {
....
multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
new Receiver());//将receiver注册到channel中
} catch (Throwable t) {
....
}
}
received
类继承了listener
,实现了3个方法,消息经过onMessage
方法区分,如果是内部的ping则使用handlerNodePingRequest
方法处理,否则使用handlerExternalPingRequest
处理。那怎么区分这个消息到底是内部ping还是外部的ping呢?区分方法也很简单,就是读取消息中的关于INTERNAL_HEADER
信息头,下面是nodePing
的相关代码:
private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
....
final DiscoveryNodes discoveryNodes = contextProvider.nodes();
final DiscoveryNode requestingNode = requestingNodeX;
if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
// 自身发出的ping,忽略
return;
}
//只接受本集群ping
if (!requestClusterName.equals(clusterName)) {
...return;
}
// 两个client间不需要ping
if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
}
//新建一个response
final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
multicastPingResponse.id = id;
multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
//无法连接的情况
if (!transportService.nodeConnected(requestingNode)) {
// do the connect and send on a thread pool
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
// connect to the node if possible
try {
transportService.connectToNode(requestingNode);
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
} catch (Exception e) {
if (lifecycle.started()) {
logger.warn("failed to connect to requesting node {}", e, requestingNode);
}
}
}
});
} else {
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
if (lifecycle.started()) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
}
});
}
}
}
上述代码描述了如何处理内部ping,接下来再说说如何处理来自外部的ping信息。当收到其他节点的响应信息后,它会把本节点及集群的master节点相关信息返回广播节点。这样广播节点就获知了集群信息。
在multicastZenPing
类中还有一个类multicastPingResponseRequestHandler
,它的作用是广播节点对于其他节点广播信息响应的回应。广播节点的第二次发送信息的过程,它跟其他transportRequestHandler
一样有messageReceived
方法。在启动时注册到transportserver
中,只处理一类actioninternal:discovery/zen/multicast
。
我们再来看ping请求的发送策略代码:
public void ping(final PingListener listener, final TimeValue timeout) {
....
//产生一个id
final int id = pingIdGenerator.incrementAndGet();
try {
receivedResponses.put(id, new PingCollection());
sendPingRequest(id);//第一次发送ping请求
// 等待时间的1/2后再次发送一个请求
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
sendPingRequest(id);
//再过1/2时间再次发送一个请求
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
//最后一次发送请求,超时的1/4后
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}
@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});
} catch (Exception e) {
logger.warn("failed to ping", e);
finalizePingCycle(id, listener);
}
}
ping的过程主要是调用sendPingRequest(id)
方法,在该方法中将id、版本、本地节点信息一起写入到BytesStreamOutput
中,然后将其进行广播。这个广播信息会被其他机器上的Received
接收并处理,并且响应ping请求。另外一个需要关注的是上述代码中注释的部分。它通过链式定期发送请求,在等待的时间内可能会发送4次请求,但这也带来了一些问题,这种发送方式会造成大量的ping请求重复,但幸运的是ping请求资源消耗较小。并且带来的好处也显而易见,因为这样尽可能的保证了在timeout的时间段内,集群新增节点都能收到这个ping信息,这种方式应用于单播发现中。
简要来说,广播使用了java的multicastsocket
,在timeout时间内发送4次ping请求,该请求包括一个id、信息头、本地节点信息。这些信息在被其他响应节点接收,交给Received
处理,Received
会将集群的master和本节点的相关信息通过transport
返回给广播节点。广播节点收到这些信息后会立即使用transport
返回一个空的response。至此一个广播过程完成。
广播虽好,但我选择单播!因为当节点在分布在多个网段时,广播模式就失效了,因为广播信息不可达!这个时候就要使用单播去向指定的节点发送ping请求获取cluster的相关信息。这就是单播的用处与优点。
单播使用的是nettytransport
,它会使用跟广播一样,通过链式请求向指定的节点发送请求,信息的处理方式是nettytransport
标准的信息处理过程。
欢迎斧正,that's all 本文参考:[zendiscovery 的Ping机制](https://www.cnblogs.com/zziawanblog/p/6551549.html)