一、Jute序列化工具
1、Jute概述
Zookeeper的客戶端與服務端之間會進行一系列的網絡通信來實現數據傳輸,Zookeeper使用Jute組件來完成數據的序列化和反序列化操作,其用於Zookeeper進行網絡數據傳輸和本地磁盤數據存儲的序列化和反序列化工作。
實體類要使用Jute進行序列化和反序列化步驟:
- 1.需要實現Record接口的serialize和deserialize方法;
- 2.構建一個序列化器BinaryOutputArchive;
- 3.序列化:調用實體類的serialize方法,將對象序列化到指定的tag中去,比如這里將對象序列化到header中;
- 4.反序列化:調用實體類的deserialize方法,從指定的tag中反序列化出數據內容。
2、Record接口
Zookeeper中所需要進行網絡傳輸或是本地磁盤存儲的類型定義,都實現了該接口,是Jute序列化的核心。Record定義了兩個基本的方法,分別是serialize和deserialize,分別用於序列化和反序列化。其中archive是底層真正的序列化器和反序列化器,並且每個archive中可以包含對多個對象的序列化和反序列化,因此兩個接口中都標記了參數tag,用於序列化器和反序列化器標識對象自己的標記。
3、OutputArchive和InputArchive
OutputArchive和InputArchive分別是Jute底層的序列化器和反序列化器定義。有BinaryOutputArchive/BinaryInputArchive、CsvOutputArchive/CsvInputArchive和XmlOutputArchive/XmlInputArchive三種實現,無論哪種實現都是基於OutputStream和InputStream進行操作。
BinaryOutputArchive對數據對象的序列化和反序列化,主要用於進行網絡傳輸和本地磁盤的存儲,是Zookeeper底層最主要的序列化方式。CsvOutputArchive對數據的序列化,更多的是方便數據的可視化展示,因此被用在toString方法中。XmlOutputArchive則是為了將數據對象以xml格式保存和還原,但目前在Zookeeper中基本沒使用到。
4、測試示例
首先我們構建一個實體類,實現Record接口的serialize和deserialize方法:
public class MockReHeader implements Record { private long sessionId; private String type; public MockReHeader() {} public MockReHeader(long sessionId, String type) { this.sessionId = sessionId; this.type = type; } public void setSessionId(long sessionId) { this.sessionId = sessionId; } public void setType(String type) { this.type = type; } public long getSessionId() { return sessionId; } public String getType() { return type; } public void serialize(OutputArchive outputArchive, String tag) throws java.io.IOException { outputArchive.startRecord(this, tag); outputArchive.writeLong(sessionId, "sessionId"); outputArchive.writeString(type, "type"); outputArchive.endRecord(this, tag); } public void deserialize(InputArchive inputArchive, String tag) throws java.io.IOException { inputArchive.startRecord(tag); this.sessionId = inputArchive.readLong("sessionId"); this.type = inputArchive.readString("type"); inputArchive.endRecord(tag); } @Override public String toString() { return "sessionId = " + sessionId + ", type = " + type; } }
可以看到MockReHeader實體類需要實現Record接口並且實現serialize和deserialize方法。OutputArchive和InputArchive分別是Jute底層的序列化器和反序列化器。
接下來測試我們的實體類:
public class JuteTest { public static void main(String[] args) throws IOException { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); BinaryOutputArchive binaryOutputArchive = BinaryOutputArchive.getArchive(byteArrayOutputStream); new MockReHeader(0x3421eccb92a34el, "ping").serialize(binaryOutputArchive, "header"); ByteBuffer byteBuffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray()); ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream(byteBuffer); BinaryInputArchive binaryInputArchive = BinaryInputArchive.getArchive(byteBufferInputStream); MockReHeader mockReHeader = new MockReHeader(); System.out.println(mockReHeader); mockReHeader.deserialize(binaryInputArchive, "header"); System.out.println(mockReHeader); byteBufferInputStream.close(); byteArrayOutputStream.close(); } }
輸出如下:
sessionId = 0, type = null sessionId = 14673999700337486, type = ping
5、zookeeper.jute
在Zookeeper的src文件夾下有zookeeper.jute文件,這個文件定義了所有的實體類的所屬包名、類名及類的所有成員變量和類型,該文件會在源代碼編譯時,Jute會使用不同的代碼生成器為這些類定義生成實際編程語言的類文件,如java語言生成的類文件保存在src/java/generated目錄下,每個類都會實現Record接口。zookeeper.jute文件部分源碼如下:
module org.apache.zookeeper.data { class Id { ustring scheme; ustring id; } class ACL { int perms; Id id; } // information shared with the client class Stat { long czxid; // created zxid long mzxid; // last modified zxid long ctime; // created long mtime; // last modified int version; // version int cversion; // child version int aversion; // acl version long ephemeralOwner; // owner id if ephemeral, 0 otw int dataLength; //length of the data in the node int numChildren; //number of children of this node long pzxid; // last modified children } // information explicitly stored by the server persistently class StatPersisted { long czxid; // created zxid long mzxid; // last modified zxid long ctime; // created long mtime; // last modified int version; // version int cversion; // child version int aversion; // acl version long ephemeralOwner; // owner id if ephemeral, 0 otw long pzxid; // last modified children } } module org.apache.zookeeper.proto { class ConnectRequest { int protocolVersion; long lastZxidSeen; int timeOut; long sessionId; buffer passwd; } class ConnectResponse { int protocolVersion; int timeOut; long sessionId; buffer passwd; } class SetWatches { long relativeZxid; vector<ustring>dataWatches; vector<ustring>existWatches; vector<ustring>childWatches; }class GetDataRequest { ustring path; boolean watch; } class SetDataRequest { ustring path; buffer data; int version; } class ReconfigRequest { ustring joiningServers; ustring leavingServers; ustring newMembers; long curConfigId; } } module org.apache.zookeeper.server.quorum { class LearnerInfo { long serverid; int protocolVersion; long configVersion; } class QuorumPacket { int type; // Request, Ack, Commit, Ping long zxid; buffer data; // Only significant when type is request vector<org.apache.zookeeper.data.Id> authinfo; } class QuorumAuthPacket { long magic; int status; buffer token; } } module org.apache.zookeeper.server.persistence { class FileHeader { int magic; int version; long dbid; } }
二、ZooKeeper通信協議
基於TCP/IP協議,Zookeeper實現了自己的通信協議來玩按成客戶端與服務端、服務端與服務端之間的網絡通信,對於請求,主要包含請求頭和請求體,對於響應,主要包含響應頭和響應體。
1、請求協議
對於請求協議而言,如下為獲取節點數據請求的完整協議定義:
class RequestHeader { int xid; int type; }
從zookeeper.jute中可知RequestHeader包含了xid和type,xid用於記錄客戶端請求發起的先后序號,用來確保單個客戶端請求的響應順序,type代表請求的操作類型,如創建節點(OpCode.create)、刪除節點(OpCode.delete)、獲取節點數據(OpCode.getData)。
協議的請求主體內容部分,包含了請求的所有操作內容,不同的請求類型請求體不同。對於會話創建而言,其請求體如下:
class ConnectRequest { int protocolVersion; long lastZxidSeen; int timeOut; long sessionId; buffer passwd; }
Zookeeper客戶端和服務器在創建會話時,會發送ConnectRequest請求,該請求包含協議版本號protocolVersion、最近一次接收到服務器ZXID lastZxidSeen、會話超時時間timeOut、會話標識sessionId和會話密碼passwd。
對於獲取節點數據而言,其請求體如下:
class GetDataRequest { ustring path; boolean watch; }
Zookeeper客戶端在向服務器發送節點數據請求時,會發送GetDataRequest請求,該請求包含了數據節點路徑path、是否注冊Watcher的標識watch。
對於更新節點數據而言,其請求體如下:
class SetDataRequest { ustring path; buffer data; int version; }
Zookeeper客戶端在向服務器發送更新節點數據請求時,會發送SetDataRequest請求,該請求包含了數據節點路徑path、數據內容data、節點數據的期望版本號version。
針對不同的請求類型,Zookeeper都會定義不同的請求體,可以在zookeeper.jute中查看,所有的請求都會按照此文件的描述進行序列化/反序列化。
2、響應協議
對於響應協議而言,如下為獲取節點數據響應的完整協議定義:
響應頭中包含了每個響應最基本的信息,包括xid、zxid和err:
class ReplyHeader { int xid; long zxid; int err; }
xid與請求頭中的xid一致,zxid表示Zookeeper服務器上當前最新的事務ID,err則是一個錯誤碼,表示當請求處理過程出現異常情況時,就會在錯誤碼中標識出來,常見的包括處理成功(Code.OK)、節點不存在(Code.NONODE)、沒有權限(Code.NOAUTH)。
協議的響應主體內容部分,包含了響應的所有數據,不同的響應類型請求體不同。對於會話創建而言,其響應體如下:
class ConnectResponse { int protocolVersion; int timeOut; long sessionId; buffer passwd; }
針對客戶端的會話創建請求,服務端會返回客戶端一個ConnectResponse響應,該響應體包含了版本號protocolVersion、會話的超時時間timeOut、會話標識sessionId和會話密碼passwd。
對於獲取節點數據而言,其響應體如下:
class GetDataResponse { buffer data; org.apache.zookeeper.data.Stat stat; }
針對客戶端的獲取節點數據請求,服務端會返回客戶端一個GetDataResponse響應,該響應體包含了數據節點內容data、節點狀態stat。
對於更新節點數據而言,其響應體如下:
class SetDataResponse { org.apache.zookeeper.data.Stat stat; }
針對客戶端的更新節點數據請求,服務端會返回客戶端一個SetDataResponse響應,該響應體包含了最新的節點狀態stat。
針對不同的響應類型,Zookeeper都會定義不同的響應體,也可以在zookeeper.jute中查看。