Cassandra Gossip協議的二三事兒


摘要:Gossip協議是Cassandra維護各節點狀態的一個重要組件,下面我們以Gossip協議三次握手為線索逐步分析Gossip協議源碼。

Gossip協議是Cassandra維護各節點狀態的一個重要組件,下面我們以Gossip協議三次握手為線索逐步分析Gossip協議源碼。

Gossip協議通過判斷節點的generationversion 來確認節點狀態信息新舊,如果節點重啟,則generation加一,version每次從零開始計算。所以 generation是大版本號,version為小版本號,理解這個概念對后面的握手邏輯有很大幫助。

Gossip協議最重要的一個屬性是endpointStateMap ,這個map以address為key,以EndpointState為value維護節點自身狀態信息。EndopointState 包含了節點 net_version,host_id,rpc_address,release_version,dc,rack,load,status,tokens 信息。總體來說,所有節點維護的endpointStateMap應該是一致的,如果出現不一致信息或者新增,替換,刪除節點 ,這中間的狀態維護就要靠Gossip來實現了。

另外一個重要屬性subscribers ,當節點狀態變更時候,gossip 會通知各個subscribers。

Gossip啟動時候,會每隔一秒會在集群中隨機選擇一個節點發送一條GossipDigestSyn消息,開始和其他節點的通信,如下圖:

接下來我們根據上面的了流程圖一步步分析gossip代碼,GossipDigestSyn 消息是在GossipTask構造的。

1  // syn消息包含 集群名字,分區器,和gDigests消息    
2  GossipDigestSyn digestSynMessage =  new  GossipDigestSyn(DatabaseDescriptor.getClusterName(),DatabaseDescriptor.getPartitionerName(),gDigests);    
3 
4  MessageOut<GossipDigestSyn> message =  new  MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,digestSynMessage,GossipDigestSyn.serializer); 

GossipDigestSyn 消息的主要部分在gDigests里面,gDigests是通過方法Gossiper.instance.makeRandomGossipDigest(gDigests) 生成的。

private  void  makeRandomGossipDigest(List<GossipDigest> gDigests)    
02  {    
03  EndpointState epState;    
04  int  generation =  0 ;    
05  int  maxVersion =  0 ;    
06 
07  // local epstate will be part of endpointStateMap    
08  //當前節點維護的節點列表    
09  List<InetAddress> endpoints =  new  ArrayList<InetAddress>(endpointStateMap.keySet());    
10  //亂序處理    
11  Collections.shuffle(endpoints, random);    
12  for  (InetAddress endpoint : endpoints)    
13  {    
14  epState = endpointStateMap.get(endpoint);    
15  if  (epState !=  null )    
16  {    
17  //獲取generation版本號    
18  generation = epState.getHeartBeatState().getGeneration();    
19  //EndpointState包含了tokens,hostid,status,load等信息,所以冒泡排序獲取其中最大的maxVersion    
20  maxVersion = getMaxEndpointStateVersion(epState);    
21  }    
22  gDigests.add( new  GossipDigest(endpoint, generation, maxVersion));    
23  }    
24 
25  if  (logger.isTraceEnabled())    
26  {    
27  StringBuilder sb =  new  StringBuilder();    
28  for  (GossipDigest gDigest : gDigests)    
29  {    
30  sb.append(gDigest);    
31  sb.append( " " );    
32  }    
33  logger.trace( "Gossip Digests are : {}" , sb);    
34  }    
35  } 

A節點發出GossipDigestSyn后,B節點會通過GossipDigestSynVerbHandler 來處理GossipDigestSyn 消息,具體處理邏輯在Gossiper.instance.examineGossiper中,

上面方法對比版本號以后,主要處理邏輯在senall方法和requestAll方法,繼續跟進:

1  private  void  requestAll(GossipDigest gDigest, List<GossipDigest> deltaGossipDigestList,  int  remoteGeneration)    
2  {    
3  /* We are here since we have no data for this endpoint locally so request everthing. */ 
4  //生成一個Digest,等待對方節點發送消息    
5  deltaGossipDigestList.add( new  GossipDigest(gDigest.getEndpoint(), remoteGeneration,  0 ));    
6  if  (logger.isTraceEnabled())    
7  logger.trace( "requestAll for {}" , gDigest.getEndpoint());    
8  }    
1  private  void  sendAll(GossipDigest gDigest, Map<InetAddress, EndpointState> deltaEpStateMap,  int  maxRemoteVersion)    
2  {    
3  EndpointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndpoint(), maxRemoteVersion);    
4  if  (localEpStatePtr !=  null )    
5  //將endpintState信息通過ack 消息發送給對方    
6  deltaEpStateMap.put(gDigest.getEndpoint(), localEpStatePtr);    
7  } 

到這里我們發現向對方節點發送的ack消息已經構造完成了,包含了deltaGossipDigestList(對方節點信息最新,我們需要對方節點給我們發endpointState) 和 deltaEpStateMap(當前節點新,我們發送給對方節點) 。

Gossip 通過GossipDigestAckVerbHandler 處理ack消息,主要邏輯有兩塊:

1.如果deltaEpStateMap有數據,則說明需要更新本地applicationState,執行Gossiper.instance.applyStateLocally方法

2.如果deltaGossipDigestList 有數據,則說明對方節點需要更新,構造EndpointState,並發送ack2消息給對方

GossipDigestAck2VerbHandler 用來處理 ack2消息,主要邏輯也在Gossiper.instance.applyStateLocally中,我們看一下Gossiper.instance.applyStateLocally的邏輯:

到這里Gossip 三次握手的全過程就分析完了(由於平台字數限制,部分代碼以圖片形式展示,可點擊放大查看哦)。

 

點擊關注,第一時間了解華為雲新鮮技術~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM