摘要:Gossip協議是Cassandra維護各節點狀態的一個重要組件,下面我們以Gossip協議三次握手為線索逐步分析Gossip協議源碼。
Gossip協議是Cassandra維護各節點狀態的一個重要組件,下面我們以Gossip協議三次握手為線索逐步分析Gossip協議源碼。
Gossip協議通過判斷節點的generation和version 來確認節點狀態信息新舊,如果節點重啟,則generation加一,version每次從零開始計算。所以 generation是大版本號,version為小版本號,理解這個概念對后面的握手邏輯有很大幫助。
Gossip協議最重要的一個屬性是endpointStateMap ,這個map以address為key,以EndpointState為value維護節點自身狀態信息。EndopointState 包含了節點 net_version,host_id,rpc_address,release_version,dc,rack,load,status,tokens 信息。總體來說,所有節點維護的endpointStateMap應該是一致的,如果出現不一致信息或者新增,替換,刪除節點 ,這中間的狀態維護就要靠Gossip來實現了。
另外一個重要屬性subscribers ,當節點狀態變更時候,gossip 會通知各個subscribers。
Gossip啟動時候,會每隔一秒會在集群中隨機選擇一個節點發送一條GossipDigestSyn消息,開始和其他節點的通信,如下圖:

接下來我們根據上面的了流程圖一步步分析gossip代碼,GossipDigestSyn 消息是在GossipTask構造的。
1 // syn消息包含 集群名字,分區器,和gDigests消息 2 GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),DatabaseDescriptor.getPartitionerName(),gDigests); 3 4 MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,digestSynMessage,GossipDigestSyn.serializer);
GossipDigestSyn 消息的主要部分在gDigests里面,gDigests是通過方法Gossiper.instance.makeRandomGossipDigest(gDigests) 生成的。
private void makeRandomGossipDigest(List<GossipDigest> gDigests) 02 { 03 EndpointState epState; 04 int generation = 0 ; 05 int maxVersion = 0 ; 06 07 // local epstate will be part of endpointStateMap 08 //當前節點維護的節點列表 09 List<InetAddress> endpoints = new ArrayList<InetAddress>(endpointStateMap.keySet()); 10 //亂序處理 11 Collections.shuffle(endpoints, random); 12 for (InetAddress endpoint : endpoints) 13 { 14 epState = endpointStateMap.get(endpoint); 15 if (epState != null ) 16 { 17 //獲取generation版本號 18 generation = epState.getHeartBeatState().getGeneration(); 19 //EndpointState包含了tokens,hostid,status,load等信息,所以冒泡排序獲取其中最大的maxVersion 20 maxVersion = getMaxEndpointStateVersion(epState); 21 } 22 gDigests.add( new GossipDigest(endpoint, generation, maxVersion)); 23 } 24 25 if (logger.isTraceEnabled()) 26 { 27 StringBuilder sb = new StringBuilder(); 28 for (GossipDigest gDigest : gDigests) 29 { 30 sb.append(gDigest); 31 sb.append( " " ); 32 } 33 logger.trace( "Gossip Digests are : {}" , sb); 34 } 35 }
A節點發出GossipDigestSyn后,B節點會通過GossipDigestSynVerbHandler 來處理GossipDigestSyn 消息,具體處理邏輯在Gossiper.instance.examineGossiper中,


上面方法對比版本號以后,主要處理邏輯在senall方法和requestAll方法,繼續跟進:
1 private void requestAll(GossipDigest gDigest, List<GossipDigest> deltaGossipDigestList, int remoteGeneration) 2 { 3 /* We are here since we have no data for this endpoint locally so request everthing. */ 4 //生成一個Digest,等待對方節點發送消息 5 deltaGossipDigestList.add( new GossipDigest(gDigest.getEndpoint(), remoteGeneration, 0 )); 6 if (logger.isTraceEnabled()) 7 logger.trace( "requestAll for {}" , gDigest.getEndpoint()); 8 } 1 private void sendAll(GossipDigest gDigest, Map<InetAddress, EndpointState> deltaEpStateMap, int maxRemoteVersion) 2 { 3 EndpointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndpoint(), maxRemoteVersion); 4 if (localEpStatePtr != null ) 5 //將endpintState信息通過ack 消息發送給對方 6 deltaEpStateMap.put(gDigest.getEndpoint(), localEpStatePtr); 7 }
到這里我們發現向對方節點發送的ack消息已經構造完成了,包含了deltaGossipDigestList(對方節點信息最新,我們需要對方節點給我們發endpointState) 和 deltaEpStateMap(當前節點新,我們發送給對方節點) 。
Gossip 通過GossipDigestAckVerbHandler 處理ack消息,主要邏輯有兩塊:
1.如果deltaEpStateMap有數據,則說明需要更新本地applicationState,執行Gossiper.instance.applyStateLocally方法
2.如果deltaGossipDigestList 有數據,則說明對方節點需要更新,構造EndpointState,並發送ack2消息給對方
GossipDigestAck2VerbHandler 用來處理 ack2消息,主要邏輯也在Gossiper.instance.applyStateLocally中,我們看一下Gossiper.instance.applyStateLocally的邏輯:


到這里Gossip 三次握手的全過程就分析完了(由於平台字數限制,部分代碼以圖片形式展示,可點擊放大查看哦)。
