Zookeeper內存結構
Zookeeper是怎么存儲數據的,什么機制保證集群中數據是一致性,在網絡異常,當機以及停電等異常情況下恢復數據的,我們知道數據庫給我們提供了這些功能,其實zookeeper也實現了類似數據庫的功能。
1. Zookeeper內存結構
Zookeeper數據在內存中的結構類似於linux的目錄結構
DataTree代表這個目錄結構, DataNode代表一個節點 DataTree: 默認初始化三目錄 1)"" 2) "/zookeeper" 3) "/zookeeper/quota" DataNode 表示一個節點 1) 存儲了父節點的引用 2) 節點的權限信息 3) 子節點路徑集合
Snapshot
Snapshot是datatree在內存中某一時刻的影像,zookeeper有一定的機制會定時生成datatree的snapshot。FileSnap實現了SnapShot接口負責將數據寫入文件中,下面我們來看看snap相關內容。
2.1 snapshot文件格式
Snapshot是以二進制形式存在在文件的,我們用ue打開一個新的snapshot文件
Snapshot文件的中數據大體可以分為兩部分header和body。
Header數據格式:
FileHeader{ int magic //魔數 常量ZKSN 代表zookeeper snapshot文件 int version //版本 常量 2 long dbid //dbid 常量 -1 }
這里很奇怪 version和dbid都是常量,那還有什么意思,也許是保留字段為后續版本使用。
由頭部字段可以計算出頭部信息占用 4 + 4 + 8 =16bit的固定長度
5A 4B 53 4E 就是魔術ZKSN
00 00 00 02 就是dbid號2
FF FF FF FF FF FF FF FF就是十六進制的-1
public voidserialize(OutputArchive oa, String tag) throws IOException { scount = 0; serializeList(longKeyMap, oa); serializeNode(oa, newStringBuilder("")); if (root != null) { oa.writeString("/","path"); } } 2.1)序列化longKeyMap是存儲在datatree中的acl權限集合 readInt("map") //acl的映射個數? while (map > 0) { readLong("long") //這個long值longKeyMap的key,作用?是這一組acl的key readInt("acls") while (acls) { readInt("perms") readString("scheme") readString("id") } } 2.2)存儲datatree中數據節點 readString("path")//第一個datanode "" while(!path.equals("/")) { // "/"代表路徑結束 readRecord(node, "node")包括: readBuffer("data") readLong("acl") deserialize(archive,"statpersisted") 狀態存儲包括: readLong("czxid") //createNode時事務號 readLong("mzxid") //createNode時與Czxid同,setData時的事務號 readLong("ctime") // 創建節點時間 readLong("mtime") //createNode時與ctime相同,setData時間 readInt("version") //createNode版本為0,setData的數據版本號 readInt("cversion") //createNode版本為0,增加/刪除子節點時父節點+1 readInt("aversion") //createNode版本為0,setACL時節點的版本號 readLong("ephemeralOwner") // 臨時節點表示sessionid,非臨時節點這個值為0 readLong("pzxid") //createNode時與Czxid同,增加/刪除子節點時為子節點事務號 readString("path") //讀取下一個路徑 } 3) 文件尾部校驗數據 00 00 00 01 2F snapshot文件結尾5位數據用來校驗snapshot文件是否有效 00 00 00 01一個int的數值就是數字1,代表后面1一個字符數據 2F 就是snapshot的結束符/
Body數據格式:
Snapshot文件中頭部信息之后,緊接着就是body部分的信息,body數據大小是動態不是固定。
1) Map<Long, Integer> sessionWithTimeoutbody信息前面部分存儲的是內存中活着的session以及session的超時時間
oa.writeInt(sessSnap.size(),"count");
for (Entry<Long, Integer> entry :sessSnap.entrySet()) {
oa.writeLong(entry.getKey().longValue(), "id");
oa.writeInt(entry.getValue().intValue(),"timeout");
}
由上面序列到文件代碼可以看出先寫入一個int類型字段用來存儲sessionWithTimeout的個數,然后在遍歷集合以一個long一個int的形式寫入
2) 緊接着就是對datatree序列化到文件了
我們看下datatree的序列化方法
4)Snapshot序列化
5)Snapshot反序列化
5)TxnLog事務日志
事務日志文件用來記錄事物操作,每一個事務操作如添加,刪除節點等等,都會在事務日志中記錄一條記錄,用來在zookeeper異常情況下,通過txnlog和snapshot文件來恢復數據,下面我們來看下txnLog事務日志文件的格式
打開一個事務日志文件看看
一個日志文件LogFile: FileHeader TxnList ZeroPad三部分組成
1) 日志文件頭FileHeader: {
magic 4bytes (ZKLG) //常量代表
version 4bytes //常量2
dbid 8bytes //這個沒啥用,就是默認值0
}
頭文件是固定長度 16 = 4 + 4 + 8數據,它的值也固定
2) TxnList代表記錄記錄集合,txn代表一條記錄
Txn:checksum Txnlen TxnHeaderRecord 0x42由順序的五部分組成
//序列化TxnHeader Record記錄到byte[]
byte[] buf = Util.marshallTxnEntry(hdr, txn);
Checksum crc = makeChecksumAlgorithm();
//根據指定數組更新校驗值
crc.update(buf, 0, buf.length);
//將校驗嗎寫入輸出流
oa.writeLong(crc.getValue(), "txnEntryCRC");
//將TxnHeader Record數據寫入到輸出流
//1.先計算buf數據長度寫入
//2.寫入buf數組數據
//3.記錄尾部以’B’字符結尾,寫入0x42
Util.writeTxnBytes(oa, buf);
2.1) checksum校驗位計算,是由Adler32校驗算法計算TxnHeader Record序列化后的字節碼(跟文檔說明有出入,文檔說是由Txnlen TxnHeaderRecord 0x42計算出來的, 可是看代碼不是,難道我理解錯了????????)
2.2) TxnLen:記錄數據長度包括記錄頭TxnHeader和記錄Record
2.3)TxnHeader: {
sessionid 8bytes
cxid 4bytes // 與客戶端交互的xid
zxid 8bytes // 服務器端生成的事務id
time 8bytes // 時間
type 4bytes // 事務操作的類型
}
2.4)Record:事務記錄的內容,由jute規范定義了序列化反序列化流程,各個事務操作都實現了Record接口,下面看下創建的事務操作記錄
public class CreateTxn implements Record {
privateString path; //創建路徑
privatebyte[] data; //節點數據內容
privatejava.util.List<org.apache.zookeeper.data.ACL> acl; //節點權限
privateboolean ephemeral; //是否臨時節點
privateint parentCVersion; //父節點的版本號
//下面過程就是序列化過程,反序列化類似
publicvoid serialize(OutputArchive a_, String tag) throws Java.io.IOException {
a_.startRecord(this,tag);
a_.writeString(path,"path");
a_.writeBuffer(data,"data");
{
a_.startVector(acl,"acl");
if(acl!= null) { int len1 =acl.size();
for(int vidx1 = 0; vidx1<len1;vidx1++) {
org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL)acl.get(vidx1);
a_.writeRecord(e1,"e1");
}
}
a_.endVector(acl,"acl");
}
a_.writeBool(ephemeral,"ephemeral");
a_.writeInt(parentCVersion,"parentCVersion");
a_.endRecord(this,tag);
}
2.5)0x42:每條事務記錄尾部以’B’字符結尾就是0x42
3) 每個文件尾部都用一個字符 0 填充, 工具Util.padLogFile擴充文件的時候在尾部填寫上
6)FileTxnLog& FileTxnSnapLog工具
FileTxnLog類用來操作事務記錄文件下面我們來看看這個類主要實現方法
4.1)append方法: 用來向文件尾部添加一條記錄
4.1.1)判斷當前輸入流logStream是否已清空(在同步處理器SyncRequestProcessor中據一定算法得出一個count,記錄大於count就要rollLog,開啟一個新的文件,算法是: 100000/2 + random.nextInt(100000/2), 這個十萬是一個默認值可配置),清空開啟一個新的文件寫入
4.1.2)padFile()判斷是否要擴充文件容量預分配buffer,當buffer容量小於等於4k的時候預分配,每次擴充預分配64k容量
4.1.3)數據序列化,寫入輸入流緩存
4.2)read(zxid): 讀取事務日志, 這個方法在服務當機恢復的時候,用來遍歷事務日志來恢復數據。Zxid是事物日志號遞增生成,在FileTxnLog中會將大於zxid的所有日志文件組合成一個FileTxnIterator用來遍歷從zxid開始的所有日志
4.2.1)FileTxnIterator的構造器中調用init方法,init方法中過濾出所有需要讀的日志文件,並利用goToNextLog()方法打開第一個日志日志文件的輸入流
4.2.2)FileTxnIterator的next方法用來從日志文件中讀取一條記錄,校驗並反序列化出來,讀取成功返回true,如果讀到了文件末尾調goToNextLog()讀下一個文件,以此遞歸直到最后
4.2.3)FileTxnIterator的getTxn()方法,返回next()方法中讀取的記錄
4.3)commit()方法,將流中數據刷到硬盤SyncRequestProcessor中任務會定時調用異步刷盤
4.4)truncate(zxid)方法,用來刪除日志,主要是在恢復數據的時候,利用leader的最后有效zxid,來刪除learner的無效多余的事務記錄。類似於數據庫中的truncate操作概念,它並不是一條一條刪除記錄,大於zxid的文件直接將文件刪除掉,zxid所在文件直接修改文件的長度,將文件長度設置到zxid所在的位置
4.5)序列化事務記錄
4.6)反序列化事務記錄
5FileTxnSnapLog
這是一個工具類主要用來操作TxnLog和Snapshot文件,我們主要關注一下restore方法
7)ZKDatabase
ZKDatabase在內存中維護了zookeeper的sessions, datatree和commit logs集合。 當zookeeper server啟動的時候會將txnlogs和snapshots從磁盤讀取到內存中
6.1)loadDatabase: 跟數據庫的啟動類似zookeeper服務啟動結合txnlogs和snapshot, snapshot是內存數據的某個點一份影像,takeSnapshot操作還是很耗時,為了性能根據某算法(在同步處理器SyncRequestProcessor中據一定算法得出一個count,記錄大於count就要takeSnapshot,算法是: 100000/2 + random.nextInt(100000/2),這個十萬是一個默認值可配置)計算出一個點來異步做一次takeSnapShot操作,這個跟數據庫實現原理上很類似, 但是這樣在非正常關機情況下,最新有效的那個snapshot並不是內存中最新的數據,所以需要利用txnLogs來把沒有生成snapshot的操作在內存重新執行一邊來恢復到非正常關閉服務那一刻內存情況。
下面我們來看一下loadDatabase的流程:
6.1.1) 構建一個PlayBackListener對象
6.1.2) snapshot的反序列,倒敘排目錄下的snapshot文件,遍歷查找出最新的那個有效snapshot文件進行反序列化到內存(具體流程查看snapshot那部分介紹),snapshot的反序列后我們會知道snapshot最新的zxid叫做lastProcessedZxid, 這個lastProcessedZxid之前的事務操作,都成功執行並序列到snapshot中可恢復到內存,lastProcessedZxid之后的操作只有事務日志,不能直接通過snapshot恢復。
6.1.2) lastProcessedZxid+1從事務日志文件txnLog讀取事務操作
FileTxnLog txnLog = newFileTxnLog(dataDir);
TxnIterator itr =txnLog.read(dt.lastProcessedZxid+1);
遍歷TxnIterator,執行processTransaction方法,就是把事務操作在內存中在執行一邊把丟失的操作補回來
同時將事務操作通過PlayBackListener添加到commitedLog集合,commitedLog的事務操作在服務恢復的時候會同步到其他leaner server, 因為很有可能其他leaner server也沒有及時的takesnapshot
返回最后的事務日志zxid給database,作為ZKDatabase的最新事物id
6.1.3) 在zookeeperServer成功loadDatabase后,會及時主動的做一次takesnapshot操作來得到一份最新的內存影像
8)數據存儲小結
Zookeeper數據是以文件形式存儲在硬盤上的,以snapshot為主,txnlog為輔。因為當對內存數據進行變更的時候,會保證將事務操作記入log日志,而snapshot只是內存某一個時刻影像,為了性能takeSnapshot生成snapshot並不是實時的,而是由后台線程根據一定規則處理的
來看看snapshot和txnlog在磁盤上的文件
文件名是以log.或者snapshot.加上一串long的16進制數字組成,這個long值就是zxid服務器端事務id
Snapshot文件名生成, FileTxnSnapLog.save方法中
long lastZxid = dataTree.lastProcessedZxid;
FilesnapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
如上代碼創建一個新的snapshot文件,工具Util用來用來創建文件名
public static String makeSnapshotName(long zxid) {
return "snapshot." +Long.toHexString(zxid);
}
日志Log文件生成,在FileTxnLog.apend方法中,如果被執行了rollLog方法,那么文件輸入流會被清空,這里會創建一個新的文件
if (logStream==null) {
logFileWrite = new File(logDir,("log." + Long.toHexString(hdr.getZxid())));
fos = newFileOutputStream(logFileWrite);
………
}
如上代碼可以看出文件名是最新請求的zxid,這里snapshot和log文件都和zxid有關,那么下面我們來看看zxid。
Zxid
當客戶端一個事務請求操作是leader的PrepRequestProcessor處理器會對請求進行預處理包括生成zxid設置到請求中去,zxid的生成是通過調用ZookeeperServer.getNextZxid生成
protected long hzxid = 0;
synchronized long getNextZxid() {
return ++hzxid;
}
它是hzxid一個自增的long值,有沒有奇怪這個變量取名叫做hzixd多了一個h, h我的理解是high的縮寫代表64位long的高32位。Zxid的分為兩部分高32位用來存儲每次選舉的時代epoch,低32位用來存儲事務請求的自增序列。所謂選舉時代就是一個數值,標記代表一次選舉,跟年份一樣是自增的。每次服務器啟動或者zookeeper異常導致重新選舉都會在原來epoch值加一代表一個新的時代,工具類ZxidUtils用來操作前32或者后32位
public class ZxidUtils {
static public long getEpochFromZxid(long zxid) {
return zxid >> 32L;
}
static public long getCounterFromZxid(long zxid) {
return zxid & 0xffffffffL;
}
static public long makeZxid(long epoch,long counter) {
return (epoch << 32L) | (counter & 0xffffffffL);
}
static public String zxidToString(long zxid) {
return Long.toHexString(zxid);
}
}
比如現在epoch=4代表經歷了4次選舉,如果重新選舉后epoch值為5,通過工具類的zxid=hzxid=ZxidUtils.makeZxid(5,0)= 21474836480,此時低32重新開始值為0, 如果這時來了新的請求值為zxid=21474836481=21474836480+ 1 = ZxidUtils.makeZxid(5, 1)