目錄
- 處理寫請求總體過程
- 客戶端發起寫請求
- follower和leader交互過程
- follower發送請求給客戶端
處理寫請求總體過程
zk為了保證分布式數據一致性,使用ZAB協議,在客戶端發起一次寫請求的時候時候,假設該請求請求到的是follower,follower不會直接處理這個請求,而是轉發給leader,由leader發起投票決定該請求最終能否執行成功,所以整個過程client、被請求的follower、leader、其他follower都參與其中。以創建一個節點為例,總體流程如下
從圖中可以看出,創建流程為
- follower接受請求,解析請求
- follower通過FollowerRequestProcessor將請求轉發給leader
- leader接收請求
- leader發送proposal給follower
- follower收到請求記錄txlog、snapshot
- follower發送ack給leader
- leader收到ack后進行commit,並且通知所有的learner,發送commit packet給所有的learner
這里說的follower、leader都是server,在zk里面server總共有這么幾種
由於server角色不同,對於請求所做的處理不同,每種server包含的processor也不同,下面細說下具體有哪些processor。
follower的processor鏈
這里的follower是FollowerZooKeeperServer,通過setupRequestProcessors來設置自己的processor鏈
FollowerRequestProcessor -> CommitProcessor ->FinalRequestProcessor
每個processor對應的功能為:
FollowerRequestProcessor:
作用:將請求先轉發給下一個processor,然后根據不同的OpCode做不同的操作
如果是:sync,先加入org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#pendingSyncs,然后發送給leader
如果是:create等,直接轉發
如果是:createSession或者closeSession,如果不是localsession則轉發給leader
CommitProcessor :
有一個WorkerService,將請求封裝為CommitWorkRequest執行
作用:
轉發請求,讀請求直接轉發給下一個processor
寫請求先放在pendingRequests對應的sessionId下的list中,收到leader返回的commitRequest再處理
- 處理讀請求(不會改變服務器狀態的請求)
- 處理committed的寫請求(經過leader 處理完成的請求)
維護一個線程池服務WorkerService,每個請求都在單獨的線程中處理
- 每個session的請求必須按順序執行
- 寫請求必須按照zxid順序執行
- 確認一個session中寫請求之間沒有競爭
FinalRequestProcessor:
總是processor chain上最后一個processor
作用:
- 實際處理事務請求的processor
- 處理query請求
- 返回response給客戶端
SyncRequestProcessor:
作用:
- 接收leader的proposal進行處理
- 從org.apache.zookeeper.server.SyncRequestProcessor#queuedRequests中取出請求記錄txlog和snapshot
- 然后加入toFlush,從toFlush中取出請求交給org.apache.zookeeper.server.quorum.SendAckRequestProcessor#flush處理
leader的processor鏈
這里的leader就是LeaderZooKeeperServer
通過setupRequestProcessors來設置自己的processor鏈
PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor
客戶端發起寫請求
在客戶端啟動的時候會創建Zookeeper實例,client會連接到server,后面client在創建節點的時候就可以直接和server通信,client發起創建創建節點請求的過程是:
org.apache.zookeeper.ZooKeeper#create(java.lang.String, byte[], java.util.List<org.apache.zookeeper.data.ACL>, org.apache.zookeeper.CreateMode)
org.apache.zookeeper.ClientCnxn#submitRequest
org.apache.zookeeper.ClientCnxn#queuePacket
- 在
ZooKeeper#create
方法中構造請求的request - 在
ClientCnxn#queuePacket
方法中將request封裝到packet中,將packet放入發送隊列outgoingQueue中等待發送- SendThread不斷從發送隊列outgoingQueue中取出packet發送
- 通過 packet.wait等待server返回
follower和leader交互過程
client發出請求后,follower會接收並處理該請求。選舉結束后follower確定了自己的角色為follower,一個端口和client通信,一個端口和leader通信。監聽到來自client的連接口建立新的session,監聽對應的socket上的讀寫事件,如果client有請求發到follower,follower會用下面的方法處理
org.apache.zookeeper.server.NIOServerCnxn#readPayload
org.apache.zookeeper.server.NIOServerCnxn#readRequest
readPayload這個方法里面會判斷是連接請求還是非連接請求,連接請求在之前session建立的文章中介紹過,這里從處理非連接請求開始。
follower接收client請求
follower收到請求之后,先請求請求的opCode類型(這里是create)構造對應的request,然后交給第一個processor執行,follower的第一個processor是FollowerRequestProcessor.
follower轉發請求給leader
由於在zk中follower是不能處理寫請求的,需要轉交給leader處理,在FollowerRequestProcessor中將請求轉發給leader,轉發請求的調用堆棧是
serialize(OutputArchive, String):82, QuorumPacket (org.apache.zookeeper.server.quorum), QuorumPacket.java
writeRecord(Record, String):123, BinaryOutputArchive (org.apache.jute), BinaryOutputArchive.java
writePacket(QuorumPacket, boolean):139, Learner (org.apache.zookeeper.server.quorum), Learner.java
request(Request):191, Learner (org.apache.zookeeper.server.quorum), Learner.java
run():96, FollowerRequestProcessor (org.apache.zookeeper.server.quorum), FollowerRequestProcessor.java
FollowerRequestProcessor是一個線程在zk啟動的時候就開始運行,主要邏輯在run方法里面,run方法的主要邏輯是
先把請求提交給CommitProcessor(后面leader發送給follower的commit請求對應到這里),然后將請求轉發給leader,轉發給leader的過程就是構造一個QuorumPacket,通過之前選舉通信的端口發送給leader。
leader接收follower請求
leader獲取leader地位以后,啟動learnhandler,然后一直在LearnerHandler#run循環,接收來自learner的packet,處理流程是:
processRequest(Request):1003, org.apache.zookeeper.server.PrepRequestProcessor.java
submitLearnerRequest(Request):150, org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.java
run():625, org.apache.zookeeper.server.quorum.LearnerHandler.java
handler判斷是REQUEST請求的話交給leader的processor鏈處理,將請求放入org.apache.zookeeper.server.PrepRequestProcessor#submittedRequests,即leader的第一個processor。這個processor也是一個線程,從submittedRequests中不斷拿出請求處理
processor主要做了:
- 交給CommitProcessor等待提交
- 交給leader的下一個processor處理:ProposalRequestProcessor
leader 發送proposal給follower
ProposalRequestProcessor主要作用就是講請求交給下一個processor並且發起投票,將proposal發送給所有的follower。
// org.apache.zookeeper.server.quorum.Leader#sendPacket
void sendPacket(QuorumPacket qp) {
synchronized (forwardingFollowers) {
// 所有的follower,observer沒有投票權
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
}
}
follower 收到proposal
follower處理proposal請求的調用堆棧
processRequest(Request):214, org.apache.zookeeper.server.SyncRequestProcessor.java
logRequest(TxnHeader, Record):89, org.apache.zookeeper.server.quorumFollowerZooKeeperServer.java
processPacket(QuorumPacket):147, org.apache.zookeeper.server.quorum.Follower.java
followLeader():102, org.apache.zookeeper.server.quorum.Follower.java
run():1199, org.apache.zookeeper.server.quorum.QuorumPeer.java
將請求放入org.apache.zookeeper.server.SyncRequestProcessor#queuedRequests
follower 發送ack
線程SyncRequestProcessor#run從org.apache.zookeeper.server.SyncRequestProcessor#toFlush中取出請求flush,處理過程
- follower開始commit,記錄txLog和snapShot
- 發送commit成功請求給leader,也就是follower給leader的ACK
leader收到ack
leader 收到ack后判斷是否收到大多數的follower的ack,如果是說明可以commit,commit后同步給follower
follower收到commit
還是Follower#followLeader里面的while循環收到leader的commit請求后,調用下面的方法處理
org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#commit
最終加入CommitProcessor.committedRequests隊列,CommitProcessor主線程發現隊列不空表明需要把這個request轉發到下一個processor
follower發送請求給客戶端
follower的最后一個processor是FinalRequestProcessor,最后會創建對應的節點並且構造response返回給client
總結
本篇文章主要介紹了client發起一次寫請求,client、follower和leader各自的處理過程。當然了,為了簡單,其中設定了一些具體的場景,比如請求是發送到follower的而不是leader。