最近針對ZK一些比較疑惑的問題,再看了一下相關代碼,列舉如下。這里只列官方文檔中沒有的,或者不清晰的。以zookeeper-3.3.3為基准。以下用ZK表示ZooKeeper。
一個ZooKeeper對象,代表一個ZK Client。應用通過ZooKeeper對象中的讀寫API與ZK集群進行交互。一個簡單的創建一條數據的例子,只需如下兩行代碼:
ZooKeeper zk = new ZooKeeper(serverList, sessionTimeout, watcher); zk.create("/test", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Client和ZK集群的連接和Session的建立過程
ZooKeeper對象一旦創建,就會啟動一個線程(ClientCnxn)去連接ZK集群。ZooKeeper內部維護了一個Client端狀態。
public enum States { CONNECTING, ASSOCIATING, CONNECTED, CLOSED, AUTH_FAILED; …}
第一次連接ZK集群時,首先將狀態置為CONNECTING,然后挨個嘗試連接serverlist中的每一台Server。Serverlist在初始化時,順序已經被隨機打亂:
Collections.shuffle(serverAddrsList)
這樣可以避免多個client以同樣的順序重連server。重連的間隔毫秒數是0-1000之間的一個隨機數。
一旦連接上一台server,首先發送一個ConnectRequest包,將ZooKeeper構造函數傳入的sessionTimeout數值發動給Server。ZooKeeper Server有兩個配置項:
minSessionTimeout 單位毫秒。默認2倍tickTime
maxSessionTimeout 單位毫秒。默認20倍tickTime
(tickTime也是一個配置項。是Server內部控制時間邏輯的最小時間單位)
如果客戶端發來的sessionTimeout超過min-max這個范圍,server會自動截取為min或max,然后為這個Client新建一個Session對象。Session對象包含sessionId、timeout、tickTime三個屬性。其中sessionId是Server端維護的一個原子自增long型(8字節)整數;啟動時Leader將其初始化為1個字節的leader Server Id+當前時間的后5個字節+2個字節的0;這個可以保證在leader切換中,sessionId的唯一性(只要leader兩次切換為同一個Server的時間間隔中session建立數不超過( 2的16次方)*間隔毫秒數。。。不可能達到的數值)。
ZK Server端維護如下3個Map結構,Session創建后相關數據分別放入這三個Map中:
Map<Long[sessionId], Session> sessionsById
Map<Long[sessionId], Integer> sessionsWithTimeout
Map<Long[tickTime], SessionSet> sessionSets
其中sessionsById簡單用來存放Session對象及校驗sessionId是否過期。sessionsWithTimeout用來維護session的持久化:數據會寫入snapshot,在Server重啟時會從snapshot恢復到sessionsWithTimeout,從而能夠維持跨重啟的session狀態。
Session對象的tickTime屬性表示session的過期時間。sessionSets這個Map會以過期時間為key,將所有過期時間相同的session收集為一個集合。Server每次接到Client的一個請求或者心跳時,會根據當前時間和其sessionTimeout重新計算過期時間並更新Session對象和sessionSets。計算出的過期時間點會向上取整為ZKServer的屬性tickTime的整數倍。Server啟動時會啟動一個獨立的線程負責將大於當前時間的所有tickTime對應的Session全部清除關閉。
Leader收到連接請求后,會發起一個createSession的Proposal,如果表決成功,最終所有的Server都會在其內存中建立同樣的Session,並作同樣的過期管理。等表決通過后,與客戶端建立連接的Server為這個session生成一個password,連同sessionId,sessionTimeOut一起返回給客戶端(ConnectResponse)。客戶端如果需要重連Server,可以新建一個ZooKeeper對象,將上一個成功連接的ZooKeeper 對象的sessionId和password傳給Server
ZooKeeper zk = new ZooKeeper(serverList, sessionTimeout, watcher, sessionId,passwd);
ZKServer會根據sessionId和password為同一個client恢復session,如果還沒有過期的話。
Server生成password的算法比較有意思:
new Random(sessionId ^ superSecret).nextBytes(byte[] passwd)
superSecret是一個固定的常量。Server不保存password,每次在返回client的ConnectRequest應答時計算生成。在客戶端重連時再重新計算,與傳入的password作比較。因為Random相同的seed隨機生成的序列是完全相同的!
Client發送完ConnectRequest包,會緊接着發送authInfo包(OpCode.auth)和setWatches 包OpCode.setWatches;authInfo列表由ZooKeeper的addAuthInfo()方法添加,用來進行自定義的認證和授權。
最后當zookeeper.disableAutoWatchReset為false時,若建立連接時ZooKeeper注冊的Watcher不為空,那么會通過setWatches告訴ZKServer重新注冊這些Watcher。這個用來在Client自動切換ZKServer或重練時,尚未觸發的Watcher能夠帶到新的Server上
以上是連接初始化的時候做的事情。
關於ACL
之前看到很多例子里
zk.create(“/test”, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
中Ids.OPEN_ACL_UNSAFE的地方用 Ids.CREATOR_ALL_ACL,在zookeeper-3.3.3上面跑直接就掛了,報下面的錯:
org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode = InvalidACL for /test
at org.apache.zookeeper.KeeperException.create(KeeperException.java:112)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:637)
是因為3.3.3的ACL進行了細微的調整。先來看下ACL的數據結構:
每一個znode節點上都可以設置一個訪問控制列表,數據結構為List
ACL +--perms int (allow What) +--id Id +--scheme String (Who) +--id String (How)
一個ACL對象就是一個Id和permission對,用來表示哪個/哪些范圍的Id(Who)在通過了怎樣的鑒權(How)之后,就允許進行那些操作(What):Who How What;permission(What)就是一個int表示的位碼,每一位代表一個對應操作的允許狀態。類似unix的文件權限,不同的是共有5種操作:CREATE、READ、WRITE、DELETE、ADMIN(對應更改ACL的權限);Id由scheme(Who)和一個具體的字符串鑒權表達式id(How)構成,用來描述哪個/哪些范圍的Id應該怎樣被鑒權。Scheme事實上是所使用的鑒權插件的標識。id的具體格式和語義由scheme對應的鑒權實現決定。不管是內置還是自定義的鑒權插件都要實現AuthenticationProvider接口(以下簡稱AP)。自定義的鑒權插件由zookeeper.authProvider開頭的系統屬性指定其類名,例如:
authProvider.1=com.f.MyAuth
authProvider.2=com.f.MyAuth2
AP接口的getScheme()方法定義了其對應的scheme
客戶端與Server建立連接時,會將ZooKeeper.addAuthInfo()方法添加的每個authInfo都發送給ZKServer。
void addAuthInfo(String scheme, byte auth[])
addAuthInfo 方法本身也會直接將authInfo發送給ZKServer。ZKServer接受到authInfo請求后,首先根據scheme找到對應的AP,然后調用其handleAuthentication()方法將auth數據傳入。對應的AP將auth數據解析為一個Id,將其加入連接上綁定的authInfo列表(List)中。Server在接入客戶端連接時,首先會自動在連接上加上一個默認的scheme為ip的authIndo:authInfo.add(new Id(“ip”, client-ip));
鑒權時調用AP的matches()方法判斷進行該操作的當前連接上綁定的authInfo是否與所操作的znode的ACL列表匹配。
ZK有4個內置的scheme:
• world 只有一個唯一的id:anyone;表示任何人都可以做對應的操作。這個scheme沒有對應的鑒權實現。只要一個znode的ACL list中包含有這個scheme的Id,其對應的操作就運行執行
• auth 沒有對應的id,或者只有一個空串””id。這個scheme沒有對應的鑒權實現。語義是當前連接綁定的適合做創建者鑒權的autoInfo (通過調用autoInfo的scheme對應的AP的isAuthenticated()得知)都擁有對應的權限。遇到這個auth后,Server會根據當前連接綁定的符合要求的autoInfo生成ACL加入到所操作znode的acl列表中。
• digest 使用username:password格式的字符串生成MD5 hash 作為ACL ID。 具體格式為:username:base64 encoded SHA1 password digest.對應內置的鑒權插件:DigestAuthenticationProvider
• ip 用IP通配符匹配客戶端ip。對應內置鑒權插件IPAuthenticationProvider
只有兩類API會改變Znode的ACL列表:一個是create(),一個是setACL()。所以這兩個方法都要求傳入一個List。Server接到這兩種更新請求后,會判斷指定的每一個ACL中,scheme對應的AuthenticationProvider是否存在,如果存在,調用其isValid(String)方法判斷對應的id表達式是否合法。。。具體參見PrepRequestProcessor.fixupACL()方法。上文的那個報錯是因為CREATOR_ALL_ACL只包含一個ACL : Perms.ALL, Id(“auth”, “”),而auth要求將連接上適合做創建者鑒權的autoInfo都加入節點的acl中,而此時連接上只有一個默認加入的Id(“ip”, client-ip),其對應的IPAuthenticationProvider的isAuthenticated()是返回false的,表示不用來鑒權node的創建者。
tbd:具體例子
關於Watcher
先來看一下ZooKeeper的API: 讀API包括exists,getData,getChildren四種
Stat exists(String path, Watcher watcher) Stat exists(String path, boolean watch) void exists(String path, Watcher watcher, StatCallback cb, Object ctx) void exists(String path, boolean watch , StatCallback cb, Object ctx) byte[] getData(String path, Watcher watcher, Stat stat) byte[] getData(String path, boolean watch , Stat stat) void getData(String path, Watcher watcher, DataCallback cb, Object ctx) void getData(String path, boolean watch , DataCallback cb, Object ctx) List<String> getChildren(String path, Watcher watcher) List<String> getChildren(String path, boolean watch ) void getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx) void getChildren(String path, boolean watch , ChildrenCallback cb, Object ctx) List<String> getChildren(String path, Watcher watcher, Stat stat) List<String> getChildren(String path, boolean watch , Stat stat) void getChildren(String path, Watcher watcher, Children2Callback cb, Object ctx) void getChildren(String path, boolean watch , Children2Callback cb, Object ctx)
每一種按同步還是異步,添加指定watcher還是默認watcher又分為4種。默認watcher是只在ZooKeeper zk = new ZooKeeper(serverList, sessionTimeout, watcher)中指定的watch。如果包含boolean watch的讀方法傳入true則將默認watcher注冊為所關注事件的watch。如果傳入false則不注冊任何watch
寫API包括create、delete、setData、setACL四種,每一種根據同步還是異步又分為兩種:
String create(String path, byte data[], List<ACL> acl, CreateMode createMode) void create(String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) void delete(String path, int version) void delete(String path, int version, VoidCallback cb, Object ctx) Stat setData(String path, byte data[], int version) void setData(String path, byte data[], int version, StatCallback cb, Object ctx) Stat setACL(String path, List<ACL> acl, int version) void setACL(String path, List<ACL> acl, int version, StatCallback cb, Object ctx)
一個讀寫交互,或者說pub/sub的簡單描述如下圖:
更詳細一點:
可見Watcher機制的輕量性:通知的只是事件。Client和server端額外傳輸的只是個boolean值。對於讀寫api操作來說,path和eventType的信息本身就有了。只有在notify的時候才需要加上path、eventType的信息。內部存儲上,Server端只維護一個Map(當然會根據watcher的類型分為兩個),key為path,value為本身以及存在的連接對象。所以存儲上也不是負擔。不會隨着watcher的增加無限制的增大
Watcher的一次性設計也大大的減輕了服務器的負擔和風險。假設watcher不是一次性,那么在更新很頻繁的時候,大量的通知要不要丟棄?精簡?並發怎么處理?都是一個問題。一次性的話,這些問題就都丟給了Client端。並且Client端事實上並不需要每次密集更新都去處理。
如果一個znode上大量Client都注冊了watcher,那么觸發的時候是串行的。這里可能會有一些延遲。
關於Log文件和snapshot
Follower/Leader每接收到一個PROPOSAL消息之后,都會寫入log文件。log文件的在配置項dataLogDir指定的目錄下面。文件名為log.+第一條記錄對應的zxid
[linxuan@test036081 conf]$ ls /usr/zkdataLogDir/version-2/
log.100000001 log.200000001
ZooKeeper在每次寫入log文件時會做檢查,當文件剩余大小不足4k的時候,默認會一次性預擴展64M大小。這個預擴展大小可以通過系統屬性zookeeper.preAllocSize或配置參數preAllocSize指定,單位為K;
會為每條記錄計算checksum,放在實際數據前面
每寫1000條log做一次flush(調用BufferedOutputStream.flush()和FileChannel.force(false))。這個次數直到3.3.3都是硬編碼的,無法配置
每當log寫到一定數目時,ZooKeeper會將當前數據的快照輸出為一個snapshot文件:
randRoll = Random.nextInt(snapCount/2); if (logCount > (snapCount / 2 + randRoll)) { rollLog(); take_a_snapshot_in_a_new_started_thread(); }
這個randRoll是一個隨機數,為了避免幾台Zk Server在同一時間都做snapshot
輸出快照的log數目閥值snapCount可以通過zookeeper.snapCount系統屬性設置,默認是100000條。輸出snapshot文件的操作在新創建的單獨線程里進行。任一時刻只會有一個snapshot線程。Snapshot文件在配置項dataDir指定的目錄下面生成,命名格式為snapshot.+最后一個更新的zxid。
如指定dataDir=/home/linxuan/zookeeper-3.3.3/data,則snapshot文件為:
[linxuan@test036081 version-2]$ ls /home/linxuan/zookeeper-3.3.3/data/version-2
snapshot.0 snapshot.100000002
每個snapshot文件也都會寫入全部數據的一個checksum。
ZK在每次啟動snapshot線程前都會將當前的log文件刷出,在下次寫入時創建一個新的log文件。不管當前的log文件有沒有寫滿。舊的log文件句柄會在下一次commit(也就是flush的時候)再順便關閉。
所以這種機制下,log文件會有一定的空間浪費,大多情況下會沒有寫滿就換到下一個文件了。可以通過調整preAllocSize和snapCount兩個參數來減少這種浪費。但是定時自動刪除沒用的log文件還是必須的,只保留最新的即可。
為了保證消息的安全,排隊的消息在沒有flush到log文件之前不會提交到下一個環節。而為了提高log文件寫入的效率,又必須做批量flush。所以更新消息實際上也是和批量flushlog文件的操作一起,批量提交到下一個協議環節的。當請求比較少時(包括讀請求),每個更新會很快刷出,即使沒有寫夠1000條。當請求壓力很大時,才會一直等堆積到1000條才刷出log文件,同時送出消息到下一個環節。這里的實現比較細致,實質上是在壓力大時,不光是寫log,連同消息處理都做了一個批量操作。具體實現細節在SyncRequestProcessor中
Client和ZK集群的完整交互
ZK整體上來說,通過單線程和大量的隊列來達到消息在集群內完成一致性協議的情況下,仍然能保證全局順序。下面是一個線程和queue的全景圖:
這個圖中,除了個別的之外,每個節點都要么代表一個Thread,要么代表一個queue
其他
ZKServer內部通過大量的queue來處理消息,保證順序。這些queue的大小本身都不設上限。有一個配置屬性globalOutstandingLimit用來指定Server的最大請求堆積數。ZKServer在讀入消息時如果發覺內部的全局消息計數大於這個值,就會直接關閉當前連接上的讀取來保護服務端。(取消與當前Client的Nio連接上的讀取事件注冊)
人人安家網(http://www.renrenaj.com) 技術架構 老徐 開源倡議 http://www.renrenaj.com/opensource/