transport通信基礎
transport是集群間通信的基礎,它有兩種實現:
- localtransport,主要用於jvm中的節點通信,因為在同一個jvm上的網絡模擬,localtransport的實現也相對簡單,但實際用處在es中有限。
- nettytransport,一種基於netty實現的transport,同樣用於節點間的通信。
我們這里以nettytransport來展開。
transport是集群通信的基本通道,無論是集群的狀態信息,還是索引請求信息,都由transport傳送。es定義了包括transport接口在內的所有基礎接口,NettyTransport也實現了該接口。
來簡要的說下Netty的用法,Netty的使用依賴三個模塊:
- ServerBootStrap,啟動服務。
- ClientBootStrap,啟動客戶端並建立於服務端的連接。
- MessageHandler,負責主要的業務邏輯。
NettyTransport在doStart方法中調用ServerBootStrap和ClientBootStrap並綁定ip:
protected void doStart() throws ElasticsearchException {
clientBootstrap = createClientBootstrap();//根據配置啟動客戶端
//省略了無關分代碼
createServerBootstrap(name, mergedSettings);//啟動server端
bindServerBootstrap(name, mergedSettings);//綁定ip
}
bindServerBootstrap將本地ip綁定到netty同時設定好export host。然后啟動client和server的過程將mergedSettings注入到channelpipeline中,至此啟動過程結束,但需要注意的是,現在client端並未連接server端,這個連接過程是在節點啟動后才進行連接。
public void connectToNode(DiscoveryNode node, boolean light) {
//transport的模塊必須要啟動
if (!lifecycle.started()) {
throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
}
//獲取讀鎖,每個節點可以和多個節點建立連接,因此這里用讀鎖
globalLock.readLock().lock();
try {
//以node.id為基礎獲取一個鎖,這保證對於每個node只能建立一次連接
connectionLock.acquire(node.id());
try {
if (!lifecycle.started()) {
throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
}
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
try {
if (light) {//這里的light,就是對該節點只獲取一個channel,所有類型(5種連接類型下面會說到)都使用者一個channel
nodeChannels = connectToChannelsLight(node);
} else {
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
try {
connectToChannels(nodeChannels, node);
} catch (Throwable e) {
logger.trace("failed to connect to [{}], cleaning dangling connections", e, node);
nodeChannels.close();
throw e;
}
}
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
transportServiceAdapter.raiseNodeConnected(node);
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
throw new ConnectTransportException(node, "general node connection failure", e);
}
} finally {
connectionLock.release(node.id());
}
} finally {
globalLock.readLock().unlock();
}
}
在每個server和client之間都有5個連接,每個連接承擔着不同的任務:
protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
//五種連接方式,不同的連接方式對應不同的集群操作
ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];
ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];
ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];
ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];
ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
//嘗試建立連接
for (int i = 0; i < connectRecovery.length; i++) {
connectRecovery[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectBulk.length; i++) {
connectBulk[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectReg.length; i++) {
connectReg[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectState.length; i++) {
connectState[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectPing.length; i++) {
connectPing[i] = clientBootstrap.connect(address);
}
//獲取每個連接的channel存入到相應的channels中便於后面使用。
try {
for (int i = 0; i < connectRecovery.length; i++) {
connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectRecovery[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause());
}
nodeChannels.recovery[i] = connectRecovery[i].getChannel();
nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectBulk.length; i++) {
connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectBulk[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause());
}
nodeChannels.bulk[i] = connectBulk[i].getChannel();
nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectReg.length; i++) {
connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectReg[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause());
}
nodeChannels.reg[i] = connectReg[i].getChannel();
nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectState.length; i++) {
connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectState[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause());
}
nodeChannels.state[i] = connectState[i].getChannel();
nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectPing.length; i++) {
connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectPing[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());
}
nodeChannels.ping[i] = connectPing[i].getChannel();
nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
if (nodeChannels.recovery.length == 0) {
if (nodeChannels.bulk.length > 0) {
nodeChannels.recovery = nodeChannels.bulk;
} else {
nodeChannels.recovery = nodeChannels.reg;
}
}
if (nodeChannels.bulk.length == 0) {
nodeChannels.bulk = nodeChannels.reg;
}
} catch (RuntimeException e) {
// clean the futures
for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) {
future.cancel();
if (future.getChannel() != null && future.getChannel().isOpen()) {
try {
future.getChannel().close();
} catch (Exception e1) {
// ignore
}
}
}
throw e;
}
}
上例為節點建立連接的過程,每一對client和server間都會建立一定數量的不同連接,為什么要有這些不同的連接呢?是因為不同的操作消耗的資源也不同,請求的頻率也不相同,對於資源消耗少、請求頻率高的ping請求,可以多建立一些連接,來確保並發,對於資源消耗大的操作如bulk操作,則要少建立一些連接,防止機器負載過大可能導致節點失聯。
總的來說,當nettytransport的連接過程,就是分別啟動client和server,同時將messagechandler。並且只有當節點啟動時,client會連接server,獲取集群信息,包括之前的5個連接。
transport除了上述這些功能之外,還負責處理request相關請求。
transport處理請求
之前我們聊了transport的啟動及連接,當這一切成功之后,transport還會負責處理請求。比如集群中master確認節點是否存在,節點獲取集群的狀態等。
為了保證信息傳輸,es定義了一個19個字節長度的信息頭:
HEADER_SIZE = 2 + 4 + 8 + 1 + 4
以E
和S
開頭,緊接着是4個字節的int類型信息長度,然后是8個字節的long類型的信息id,再是一個字節的status,最后是4個字節int類型的version。所有節點間的信息交互都以這個19個字節的頭部開始。同時,es對於節點間的所有action都定義了名字,如對master的周期檢測型action。每個action對應着相應的messagehandler:
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
//參數說明:node發送的目的節點,requestId請求id,action action名稱,request請求,options包括以下幾種操作 RECOVERY,BULK,REG,STATE,PING;
Channel targetChannel = nodeChannel(node, options);//獲取對應節點的channel,channel在連接節點時初始化完成(請參考上一篇)
if (compress) {
options.withCompress(true);
}
byte status = 0;
//設置status 包括以下幾種STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;
status = TransportStatus.setRequest(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始寫出流
boolean addedReleaseListener = false;
try {
bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置
StreamOutput stream = bStream;
// only compress if asked, and, the request is not bytes, since then only
// the header part is compressed, and the "body" can't be extracted as compressed
if (options.compress() && (!(request instanceof BytesTransportRequest))) {
status = TransportStatus.setCompress(status);
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
stream = new HandlesStreamOutput(stream);
// we pick the smallest of the 2, to support both backward and forward compatibility
// note, this is the only place we need to do this, since from here on, we use the serialized version
// as the version to use also when the node receiving this request will send the response with
Version version = Version.smallest(this.version, node.version());
stream.setVersion(version);
stream.writeString(transportServiceAdapter.action(action, version));
ReleasableBytesReference bytes;
ChannelBuffer buffer;
// it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
// that create paged channel buffers, but its tricky to know when to do it (where this option is
// more explicit).
if (request instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) request;
assert node.version().equals(bRequest.version());
bRequest.writeThin(stream);
stream.close();
bytes = bStream.bytes();
ChannelBuffer headerBuffer = bytes.toChannelBuffer();
ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
} else {
request.writeTo(stream);
stream.close();
bytes = bStream.bytes();
buffer = bytes.toChannelBuffer();
}
NettyHeader.writeHeader(buffer, requestId, status, version);//寫信息頭
ChannelFuture future = targetChannel.write(buffer);//寫buffer同時獲取future,發送信息發生在這里
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);//添加listener
addedReleaseListener = true;
transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
} finally {
if (!addedReleaseListener) {
Releasables.close(bStream.bytes());
}
}
}
上例展示了request的發送過程,獲取目標node的channel封裝請求寫入信息頭,然后發送並使用listener監聽,這里的transportRequest是一個抽象類,繼承了transportMessage並同時實現了streamable接口,各個功能都有相應的request。
有發就有收,transport仍將接收交給message處理,而message則轉交給messageHandler處理,因為nettytransport的信息處理邏輯都在messageCHannelHandler的messageReceived方法中:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Transports.assertTransportThread();
Object m = e.getMessage();
if (!(m instanceof ChannelBuffer)) {//非buffer之間返回
ctx.sendUpstream(e);
return;
}
//解析message頭
ChannelBuffer buffer = (ChannelBuffer) m;
int size = buffer.getInt(buffer.readerIndex() - 4);
transportServiceAdapter.received(size + 6);
// we have additional bytes to read, outside of the header
boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0;
int markedReaderIndex = buffer.readerIndex();
int expectedIndexReader = markedReaderIndex + size;
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumlation buffer, which is cleaned each time
StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
//讀取信息頭中的幾個重要元數據
long requestId = buffer.readLong();
byte status = buffer.readByte();
Version version = Version.fromId(buffer.readInt());
StreamInput wrappedStream;
…………
if (TransportStatus.isRequest(status)) {//處理請求
String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
if (buffer.readerIndex() != expectedIndexReader) {
if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
} else {
logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
}
buffer.readerIndex(expectedIndexReader);
}
} else {//處理響應
TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStatus.isError(status)) {
handlerResponseError(wrappedStream, handler);
} else {
handleResponse(ctx.getChannel(), wrappedStream, handler);
}
} else {
// if its null, skip those bytes
buffer.readerIndex(markedReaderIndex + size);
}
…………
wrappedStream.close();
}
上述代碼展示了信息處理邏輯。
那么,request和response是如何被處理的呢?先看request的處理代碼:
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();//讀出action的名字
transportServiceAdapter.onRequestReceived(requestId, action);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);//獲取處理該信息的handler
if (handler == null) {
throw new ActionNotFoundTransportException(action);
}
final TransportRequest request = handler.newInstance();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.messageReceived(request, transportChannel);//使用該handler處理信息。
} else {
threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
}
} catch (Throwable e) {
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
logger.warn("Actual Exception", e1);
}
}
return action;
}
上例雖然在關鍵部分已經加了標注,但是仍不能看到請求是如何處理的,因為集群中存在各種請求,比如ping、discovery等等。因此要對應多種處理方式,所以,request最終被提交給handler處理。
每個功能都有自己的handler,當request被提交handler時,會自動的調用相應的方法來處理。
request的完整處理流程是:messageRecevied方法收到信息判斷時,將request轉發給transportServiceApdater的handler方法。handler根據請求類型分發給對應的方法處理。
那response的處理流程是什么呢?response通過handlerResponse方法處理:
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();
response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
response.remoteAddress();
try {
response.readFrom(buffer);
} catch (Throwable e) {
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
return;
}
try {
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.handleResponse(response);//轉發給對應的handler
} else {
threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
}
} catch (Throwable e) {
handleException(handler, new ResponseHandlerFailureTransportException(e));
}
}
response的處理流程跟request類似。response也有對應的handler處理響應request。
最后,來總結一下nettytransport的信息處理流程:
- 信息通過request方法發送到目標節點。
- 目標節點的messageHandler收到該信息,確定是request還是response,然后將它們轉發給transportServicedAdapter,transportServicedAdapter根據request或response類型交給對應的handler處理並反饋。
cluster discovery概述
es的cluster實現了自己的發現(discovery)機制Zen,discovery的功能包括:
- mater選舉。
- master錯誤探測。
- cluster中節點探測。
- 單播廣播的ping。
discovery是可配置模塊,除了默認機制Zen,還有支持其他機制包括:
- Azure classic discovery 插件方式,廣播。
- EC2 discovery 插件方式,廣播。
- Google Compute Engine (GCE) discovery 插件方式,廣播。
- Zen discovery 默認實現,廣播/單播。
我們可以根據各插件規則配置自己的發現機制。該機制通過實現guice
的discoveryModule
對外提供注冊和啟動。發現模塊對外提供接口discoveryService
。它本質上是一個discovery
的一個代理,所有的功能最終都由所綁定的discovery
實現。
當節點啟動時通過discoveryModule
獲取discoveryService
並啟動它。這也是其他機制的實現方式。通過discoveryModule
對外提供綁定和獲取,通過discoveryService
接口對外提供功能。
節點探測:discovery faultdetection
在es的設計中,一個集群必須有一個主節點(master node)。用來處理請求、索引的創建、修改、節點管理等。
當有了master節點,該節點就要對各子節點進行周期性(心跳機制)的探測,保證整個集群的健康。
主節點和各節點之間都會進行心跳檢測,比如mater要確保各節點健康狀況、是否宕機等,而子節點也要要確保master的健康狀況,一旦master宕機,各子節點要重新選舉新的master。這種相互間的心跳檢測就是cluster的faultdetection
。下圖展示了faultdetection
繼承關系。
faultdetection
有兩種實現方式,分別是master探測其他節點和其他節點對master的探測。faultdetection
抽象了方法handleTransportDisconnect
,該方法在內部類FDConnectionListener
中被調用。es中大量使用了listener的異步方式,因此可以大大的提升系統性能:
private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}
當faultdetection
啟動時就會注冊相應的FDConnectionListener
,在周期性檢測時,發現有節點失聯,會通過onNodeDisconnected
方法回調handleTransportDisconnect
進行處理。先來看masterFaultdetection
的啟動代碼:
private void innerStart(final DiscoveryNode masterNode) {
this.masterNode = masterNode;
this.retryCount = 0;
this.notifiedMasterFailure.set(false);
// 嘗試連接master節點
try {
transportService.connectToNode(masterNode);
} catch (final Exception e) {
// 連接失敗通知masterNode失敗
notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]");
return;
}
//關閉之前的masterping,重啟新的masterping
if (masterPinger != null) {
masterPinger.stop();
}
this.masterPinger = new MasterPinger();
// 周期之后啟動masterPing,這里並沒有周期啟動masterPing,只是設定了延遲時間。
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
}
再來看master連接失敗的處理邏輯:
private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
if (notifiedMasterFailure.compareAndSet(false, true)) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
//通知所有listener master丟失
for (Listener listener : listeners) {
listener.onMasterFailure(masterNode, reason);
}
}
});
stop("master failure, " + reason);
}
}
zen discovery
機制實現了listener.onMasterFailure
接口,處理master失聯的相關問題。下面是部分示例代碼:
private class MasterPinger implements Runnable {
private volatile boolean running = true;
public void stop() {
this.running = false;
}
@Override
public void run() {
if (!running) {
// return and don't spawn...
return;
}
final DiscoveryNode masterToPing = masterNode;
final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@Override
public MasterPingResponseResponse newInstance() {
return new MasterPingResponseResponse();
}
@Override
public void handleResponse(MasterPingResponseResponse response) {
if (!running) {
return;
}
// reset the counter, we got a good result
MasterFaultDetection.this.retryCount = 0;
// check if the master node did not get switched on us..., if it did, we simply return with no reschedule
if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
// 啟動新的ping周期
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
}
}
@Override
public void handleException(TransportException exp) {
if (!running) {
return;
}
synchronized (masterNodeMutex) {
// check if the master node did not get switched on us...
if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
handleTransportDisconnect(masterToPing);
return;
} else if (exp.getCause() instanceof NoLongerMasterException) {
logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
notifyMasterFailure(masterToPing, "no longer master");
return;
} else if (exp.getCause() instanceof NotMasterException) {
logger.debug("[master] pinging a master {} that is not the master", masterNode);
notifyMasterFailure(masterToPing, "not master");
return;
} else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
return;
}
int retryCount = ++MasterFaultDetection.this.retryCount;
logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
if (retryCount >= pingRetryCount) {
logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout);
// not good, failure
notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
}
}
}
}
);
}
}
masterPing
是一個線程,在innerStart
的方法中沒有設定周期啟動masterPing
,但是由於masterPing
需要進行心跳檢測,這個問題就交給了上例的run
方法。如果ping成功就會重啟一個新的ping,這樣既保證了ping線程的唯一性同時也保證了ping的順序和間隔。ping的方式同樣是通過transport
發送一個masterPingRequest
進行連接,節點收到該請求后,如果該節點已不再是master就會拋出一個NotMasterException
。否則會響應notifyMasterFailure
方法。對於網絡問題導致的無響應情況,會調用handleTransportDisconnect(masterToPing)
方法處理:
protected void handleTransportDisconnect(DiscoveryNode node) {
//這里需要同步
synchronized (masterNodeMutex) {
//master 已經換成其它節點,就沒必要再連接
if (!node.equals(this.masterNode)) {
return;
}
if (connectOnNetworkDisconnect) {
try {
//嘗試再次連接
transportService.connectToNode(node);
// if all is well, make sure we restart the pinger
if (masterPinger != null) {
masterPinger.stop();
}
//連接成功啟動新的masterping
this.masterPinger = new MasterPinger();
// we use schedule with a 0 time value to run the pinger on the pool as it will run on later
threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
} catch (Exception e) {
//連接出現異常,啟動master節點丟失通知
logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
}
} else {
//不需要重連,通知master丟失。
logger.trace("[master] [{}] transport disconnected", node);
notifyMasterFailure(node, "transport disconnected");
}
}
}
就是masterfaultDetection
的整個流程:
啟動中如果master節點失聯則通知節點丟失,否則在一定延遲(3s)后啟動masterPing
,masterPing
線程嘗試連接master節點,如果master節點仍然失聯,則再次嘗試連接。master節點收到masterPingRequest
請求后首先看一下自己還是不是master,如果不是則拋出異常,否則正常回應。節點如果收到響應式異常則啟動master丟失通知,否則此次ping結束。在一定時間后重新啟動新的masterPing
線程。
這里只是說master的faultdetection,而node的faultdetection跟master邏輯相似。區別主要在於ping異常處理上。
在node的faultdetection中,當某個node出現異常或者沒有響應,會啟動node丟失機制,只是具體的處理邏輯不同。
discovery ping機制
ping是es中集群發現的基本手段,通過在局域網中廣播或者指定ping的某些節點(單播)獲取集群信息和節點加入集群等操作。ZenDiscovery
機制實現了兩種ping機制:
- 廣播,當es實例啟動的時候,它發送了廣播的ping請求到地址
224.2.2.4:54328
。而其他的es實例使用同樣的集群名稱響應了這個請求。 - 單播,各節點通過單播列表來發現彼此從而加入同一個集群。
廣播的原理很簡單,當一個節點啟動后向局域網發送廣播信息,任何收到節點只要集群名稱和該節點相同,就會對此廣播作出回應。這樣這個節點就能獲取集群相關的信息。它定義了一個action:discovery/zen/multicast
和廣播的信息頭INTERNAL_HEADER
。
在之前說過,nettyTransport
是cluster的通信基礎,但是廣播卻沒有使用 ,而是采用了java的multicastsocket
。而multicastsocket
是一個UDP的socket,用來進行多個數據包的廣播。它將節點間的通信組成一個組。任何multicastsocket
都可以加入進來,組內的socket發送信息會被組內其他的節點接收。es將其進一步封裝成multicastchannel
。
mutlicastZenPing
共定義了4個內部類,共同完成廣播功能:
- finalizingPingCollection是一個pingresponse的容器,用來存儲所有的響應。
- multicastPingResponseRequestHander是response處理類,類似於之前說的nettytransporthandler,這里雖然沒有使用netty,但是也定義了一個messageReceived方法,當收到一個請求時直接返回一個response。
- multicastPingResponse是一個響應類。
- Received類處理消息邏輯,也是最重要的一個類。
剛才說了,因為廣播沒有使用nettytransport,所以對於消息的邏輯處理都在received類中。在初始化的時候,multicastZenPing
時會將received注冊進去:
protected void doStart() throws ElasticsearchException {
try {
....
multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
new Receiver());//將receiver注冊到channel中
} catch (Throwable t) {
....
}
}
received
類繼承了listener
,實現了3個方法,消息經過onMessage
方法區分,如果是內部的ping則使用handlerNodePingRequest
方法處理,否則使用handlerExternalPingRequest
處理。那怎么區分這個消息到底是內部ping還是外部的ping呢?區分方法也很簡單,就是讀取消息中的關於INTERNAL_HEADER
信息頭,下面是nodePing
的相關代碼:
private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
....
final DiscoveryNodes discoveryNodes = contextProvider.nodes();
final DiscoveryNode requestingNode = requestingNodeX;
if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
// 自身發出的ping,忽略
return;
}
//只接受本集群ping
if (!requestClusterName.equals(clusterName)) {
...return;
}
// 兩個client間不需要ping
if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
}
//新建一個response
final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
multicastPingResponse.id = id;
multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
//無法連接的情況
if (!transportService.nodeConnected(requestingNode)) {
// do the connect and send on a thread pool
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
// connect to the node if possible
try {
transportService.connectToNode(requestingNode);
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
} catch (Exception e) {
if (lifecycle.started()) {
logger.warn("failed to connect to requesting node {}", e, requestingNode);
}
}
}
});
} else {
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
if (lifecycle.started()) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
}
});
}
}
}
上述代碼描述了如何處理內部ping,接下來再說說如何處理來自外部的ping信息。當收到其他節點的響應信息后,它會把本節點及集群的master節點相關信息返回廣播節點。這樣廣播節點就獲知了集群信息。
在multicastZenPing
類中還有一個類multicastPingResponseRequestHandler
,它的作用是廣播節點對於其他節點廣播信息響應的回應。廣播節點的第二次發送信息的過程,它跟其他transportRequestHandler
一樣有messageReceived
方法。在啟動時注冊到transportserver
中,只處理一類actioninternal:discovery/zen/multicast
。
我們再來看ping請求的發送策略代碼:
public void ping(final PingListener listener, final TimeValue timeout) {
....
//產生一個id
final int id = pingIdGenerator.incrementAndGet();
try {
receivedResponses.put(id, new PingCollection());
sendPingRequest(id);//第一次發送ping請求
// 等待時間的1/2后再次發送一個請求
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
sendPingRequest(id);
//再過1/2時間再次發送一個請求
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
//最后一次發送請求,超時的1/4后
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}
@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});
} catch (Exception e) {
logger.warn("failed to ping", e);
finalizePingCycle(id, listener);
}
}
ping的過程主要是調用sendPingRequest(id)
方法,在該方法中將id、版本、本地節點信息一起寫入到BytesStreamOutput
中,然后將其進行廣播。這個廣播信息會被其他機器上的Received
接收並處理,並且響應ping請求。另外一個需要關注的是上述代碼中注釋的部分。它通過鏈式定期發送請求,在等待的時間內可能會發送4次請求,但這也帶來了一些問題,這種發送方式會造成大量的ping請求重復,但幸運的是ping請求資源消耗較小。並且帶來的好處也顯而易見,因為這樣盡可能的保證了在timeout的時間段內,集群新增節點都能收到這個ping信息,這種方式應用於單播發現中。
簡要來說,廣播使用了java的multicastsocket
,在timeout時間內發送4次ping請求,該請求包括一個id、信息頭、本地節點信息。這些信息在被其他響應節點接收,交給Received
處理,Received
會將集群的master和本節點的相關信息通過transport
返回給廣播節點。廣播節點收到這些信息后會立即使用transport
返回一個空的response。至此一個廣播過程完成。
廣播雖好,但我選擇單播!因為當節點在分布在多個網段時,廣播模式就失效了,因為廣播信息不可達!這個時候就要使用單播去向指定的節點發送ping請求獲取cluster的相關信息。這就是單播的用處與優點。
單播使用的是nettytransport
,它會使用跟廣播一樣,通過鏈式請求向指定的節點發送請求,信息的處理方式是nettytransport
標准的信息處理過程。
歡迎斧正,that's all 本文主要參考:[elasticsearch節點間通信的基礎transport](https://www.cnblogs.com/zziawanblog/p/6523706.html) | [elasticsearch transport 請求發送和處理](https://www.cnblogs.com/zziawanblog/p/6528616.html) | [cluster discovery概述及FaultDetection分析](https://www.cnblogs.com/zziawanblog/p/6533731.html) | [zendiscovery 的Ping機制](https://www.cnblogs.com/zziawanblog/p/6551549.html)