Zookeeper學習之Jute序列化以及通信協議詳解


一、Jute序列化工具

  1、Jute概述

  Zookeeper的客戶端與服務端之間會進行一系列的網絡通信來實現數據傳輸,Zookeeper使用Jute組件來完成數據的序列化和反序列化操作,其用於Zookeeper進行網絡數據傳輸和本地磁盤數據存儲的序列化和反序列化工作。

  實體類要使用Jute進行序列化和反序列化步驟:

  • 1.需要實現Record接口的serialize和deserialize方法;
  • 2.構建一個序列化器BinaryOutputArchive;
  • 3.序列化:調用實體類的serialize方法,將對象序列化到指定的tag中去,比如這里將對象序列化到header中;
  • 4.反序列化:調用實體類的deserialize方法,從指定的tag中反序列化出數據內容。

  2、Record接口

  Zookeeper中所需要進行網絡傳輸或是本地磁盤存儲的類型定義,都實現了該接口,是Jute序列化的核心。Record定義了兩個基本的方法,分別是serialize和deserialize,分別用於序列化和反序列化。其中archive是底層真正的序列化器和反序列化器,並且每個archive中可以包含對多個對象的序列化和反序列化,因此兩個接口中都標記了參數tag,用於序列化器和反序列化器標識對象自己的標記。

  3、OutputArchive和InputArchive

  OutputArchive和InputArchive分別是Jute底層的序列化器和反序列化器定義。有BinaryOutputArchive/BinaryInputArchive、CsvOutputArchive/CsvInputArchive和XmlOutputArchive/XmlInputArchive三種實現,無論哪種實現都是基於OutputStream和InputStream進行操作。

  BinaryOutputArchive對數據對象的序列化和反序列化,主要用於進行網絡傳輸和本地磁盤的存儲,是Zookeeper底層最主要的序列化方式。CsvOutputArchive對數據的序列化,更多的是方便數據的可視化展示,因此被用在toString方法中。XmlOutputArchive則是為了將數據對象以xml格式保存和還原,但目前在Zookeeper中基本沒使用到。

  4、測試示例

  首先我們構建一個實體類,實現Record接口的serialize和deserialize方法:

public class MockReHeader implements Record {
    private long sessionId;
    private String type;
    public MockReHeader() {}

    public MockReHeader(long sessionId, String type) {
        this.sessionId = sessionId;
        this.type = type;
    }

    public void setSessionId(long sessionId) {
        this.sessionId = sessionId;
    }

    public void setType(String type) {
        this.type = type;
    }

    public long getSessionId() {
        return sessionId;
    }

    public String getType() {
        return type;
    }

    public void serialize(OutputArchive outputArchive, String tag) throws java.io.IOException {
        outputArchive.startRecord(this, tag);
        outputArchive.writeLong(sessionId, "sessionId"); outputArchive.writeString(type, "type");
        outputArchive.endRecord(this, tag);
    }

    public void deserialize(InputArchive inputArchive, String tag) throws java.io.IOException {
        inputArchive.startRecord(tag);
        this.sessionId = inputArchive.readLong("sessionId"); this.type = inputArchive.readString("type");
        inputArchive.endRecord(tag);
    }

    @Override
    public String toString() {
        return "sessionId = " + sessionId + ", type = " + type;
    }
}

  可以看到MockReHeader實體類需要實現Record接口並且實現serialize和deserialize方法。OutputArchive和InputArchive分別是Jute底層的序列化器和反序列化器。

  接下來測試我們的實體類:

public class JuteTest {

    public static void main(String[] args) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryOutputArchive binaryOutputArchive = BinaryOutputArchive.getArchive(byteArrayOutputStream);
        new MockReHeader(0x3421eccb92a34el, "ping").serialize(binaryOutputArchive, "header");
        ByteBuffer byteBuffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());

        ByteBufferInputStream byteBufferInputStream = new ByteBufferInputStream(byteBuffer);
        BinaryInputArchive binaryInputArchive = BinaryInputArchive.getArchive(byteBufferInputStream);

        MockReHeader mockReHeader = new MockReHeader();
        System.out.println(mockReHeader);
        mockReHeader.deserialize(binaryInputArchive, "header");
        System.out.println(mockReHeader);
        byteBufferInputStream.close();
        byteArrayOutputStream.close();
    }

}

  輸出如下:

sessionId = 0, type = null
sessionId = 14673999700337486, type = ping

  5、zookeeper.jute

  在Zookeeper的src文件夾下有zookeeper.jute文件,這個文件定義了所有的實體類的所屬包名、類名及類的所有成員變量和類型,該文件會在源代碼編譯時,Jute會使用不同的代碼生成器為這些類定義生成實際編程語言的類文件,如java語言生成的類文件保存在src/java/generated目錄下,每個類都會實現Record接口。zookeeper.jute文件部分源碼如下:

module org.apache.zookeeper.data {
    class Id {
        ustring scheme;
        ustring id;
    }
    class ACL {
        int perms;
        Id id;
    }
    // information shared with the client
    class Stat {
        long czxid;      // created zxid
        long mzxid;      // last modified zxid
        long ctime;      // created
        long mtime;      // last modified
        int version;     // version
        int cversion;    // child version
        int aversion;    // acl version
        long ephemeralOwner; // owner id if ephemeral, 0 otw
        int dataLength;  //length of the data in the node
        int numChildren; //number of children of this node
        long pzxid;      // last modified children
    }
    // information explicitly stored by the server persistently
    class StatPersisted {
        long czxid;      // created zxid
        long mzxid;      // last modified zxid
        long ctime;      // created
        long mtime;      // last modified
        int version;     // version
        int cversion;    // child version
        int aversion;    // acl version
        long ephemeralOwner; // owner id if ephemeral, 0 otw
        long pzxid;      // last modified children
    }
}

module org.apache.zookeeper.proto {
    class ConnectRequest {
        int protocolVersion;
        long lastZxidSeen;
        int timeOut;
        long sessionId;
        buffer passwd;
    }
    class ConnectResponse {
        int protocolVersion;
        int timeOut;
        long sessionId;
        buffer passwd;
    }
    class SetWatches {
        long relativeZxid;
        vector<ustring>dataWatches;
        vector<ustring>existWatches;
        vector<ustring>childWatches;
    }class GetDataRequest {
        ustring path;
        boolean watch;
    }

    class SetDataRequest {
        ustring path;
        buffer data;
        int version;
    }
    class ReconfigRequest {
        ustring joiningServers;
        ustring leavingServers;
        ustring newMembers;
        long curConfigId;
    }
}

module org.apache.zookeeper.server.quorum {
    class LearnerInfo {
        long serverid;
        int protocolVersion;
        long configVersion;
    }
    class QuorumPacket {
        int type; // Request, Ack, Commit, Ping
        long zxid;
        buffer data; // Only significant when type is request
        vector<org.apache.zookeeper.data.Id> authinfo;
    }
    class QuorumAuthPacket {
        long magic;
        int status;
        buffer token;
    }
}

module org.apache.zookeeper.server.persistence {
    class FileHeader {
        int magic;
        int version;
        long dbid;
    }
}

二、ZooKeeper通信協議

  基於TCP/IP協議,Zookeeper實現了自己的通信協議來玩按成客戶端與服務端、服務端與服務端之間的網絡通信,對於請求,主要包含請求頭和請求體,對於響應,主要包含響應頭和響應體。

                        

  1、請求協議

  對於請求協議而言,如下為獲取節點數據請求的完整協議定義:

              

    class RequestHeader {
        int xid;
        int type;
    }

  從zookeeper.jute中可知RequestHeader包含了xid和type,xid用於記錄客戶端請求發起的先后序號,用來確保單個客戶端請求的響應順序,type代表請求的操作類型,如創建節點(OpCode.create)、刪除節點(OpCode.delete)、獲取節點數據(OpCode.getData)。 

  協議的請求主體內容部分,包含了請求的所有操作內容,不同的請求類型請求體不同。對於會話創建而言,其請求體如下:

    class ConnectRequest {
        int protocolVersion;
        long lastZxidSeen;
        int timeOut;
        long sessionId;
        buffer passwd;
    }

  Zookeeper客戶端和服務器在創建會話時,會發送ConnectRequest請求,該請求包含協議版本號protocolVersion、最近一次接收到服務器ZXID lastZxidSeen、會話超時時間timeOut、會話標識sessionId和會話密碼passwd。

  對於獲取節點數據而言,其請求體如下:

    class GetDataRequest {
        ustring path;
        boolean watch;
    }

  Zookeeper客戶端在向服務器發送節點數據請求時,會發送GetDataRequest請求,該請求包含了數據節點路徑path、是否注冊Watcher的標識watch。

  對於更新節點數據而言,其請求體如下:

    class SetDataRequest {
        ustring path;
        buffer data;
        int version;
    }

  Zookeeper客戶端在向服務器發送更新節點數據請求時,會發送SetDataRequest請求,該請求包含了數據節點路徑path、數據內容data、節點數據的期望版本號version。

  針對不同的請求類型,Zookeeper都會定義不同的請求體,可以在zookeeper.jute中查看,所有的請求都會按照此文件的描述進行序列化/反序列化。

  2、響應協議

  對於響應協議而言,如下為獲取節點數據響應的完整協議定義:

            

  響應頭中包含了每個響應最基本的信息,包括xid、zxid和err: 

    class ReplyHeader {
        int xid;
        long zxid;
        int err;
    }

  xid與請求頭中的xid一致,zxid表示Zookeeper服務器上當前最新的事務ID,err則是一個錯誤碼,表示當請求處理過程出現異常情況時,就會在錯誤碼中標識出來,常見的包括處理成功(Code.OK)、節點不存在(Code.NONODE)、沒有權限(Code.NOAUTH)。

  協議的響應主體內容部分,包含了響應的所有數據,不同的響應類型請求體不同。對於會話創建而言,其響應體如下:

    class ConnectResponse {
        int protocolVersion;
        int timeOut;
        long sessionId;
        buffer passwd;
    }

  針對客戶端的會話創建請求,服務端會返回客戶端一個ConnectResponse響應,該響應體包含了版本號protocolVersion、會話的超時時間timeOut、會話標識sessionId和會話密碼passwd。

  對於獲取節點數據而言,其響應體如下:

    class GetDataResponse {
        buffer data;
        org.apache.zookeeper.data.Stat stat;
    }

  針對客戶端的獲取節點數據請求,服務端會返回客戶端一個GetDataResponse響應,該響應體包含了數據節點內容data、節點狀態stat。

  對於更新節點數據而言,其響應體如下:  

    class SetDataResponse {
        org.apache.zookeeper.data.Stat stat;
    }

  針對客戶端的更新節點數據請求,服務端會返回客戶端一個SetDataResponse響應,該響應體包含了最新的節點狀態stat。

  針對不同的響應類型,Zookeeper都會定義不同的響應體,也可以在zookeeper.jute中查看。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM