【Zookeeper】源碼分析之持久化(二)之FileSnap


一、前言

  前篇博文已經分析了FileTxnLog的源碼,現在接着分析持久化中的FileSnap,其主要提供了快照相應的接口。

二、SnapShot源碼分析

  SnapShot是FileTxnLog的父類,接口類型,其方法如下  

public interface SnapShot {
    
    /**
     * deserialize a data tree from the last valid snapshot and 
     * return the last zxid that was deserialized
     * @param dt the datatree to be deserialized into
     * @param sessions the sessions to be deserialized into
     * @return the last zxid that was deserialized from the snapshot
     * @throws IOException
     */
    // 反序列化
    long deserialize(DataTree dt, Map<Long, Integer> sessions) 
        throws IOException;
    
    /**
     * persist the datatree and the sessions into a persistence storage
     * @param dt the datatree to be serialized
     * @param sessions 
     * @throws IOException
     */
    // 序列化
    void serialize(DataTree dt, Map<Long, Integer> sessions, 
            File name) 
        throws IOException;
    
    /**
     * find the most recent snapshot file
     * @return the most recent snapshot file
     * @throws IOException
     */
    // 查找最新的snapshot文件
    File findMostRecentSnapshot() throws IOException;
    
    /**
     * free resources from this snapshot immediately
     * @throws IOException
     */
    // 釋放資源
    void close() throws IOException;
} 

  說明:可以看到SnapShot只定義了四個方法,反序列化、序列化、查找最新的snapshot文件、釋放資源。

三、FileSnap源碼分析

  FileSnap實現了SnapShot接口,主要用作存儲、序列化、反序列化、訪問相應snapshot文件。

  3.1 類的屬性 

public class FileSnap implements SnapShot {
    // snapshot目錄文件
    File snapDir;
    // 是否已經關閉標識
    private volatile boolean close = false;
    // 版本號
    private static final int VERSION=2;
    // database id
    private static final long dbId=-1;
    // Logger
    private static final Logger LOG = LoggerFactory.getLogger(FileSnap.class);
    // snapshot文件的魔數(類似class文件的魔數)
    public final static int SNAP_MAGIC
        = ByteBuffer.wrap("ZKSN".getBytes()).getInt();
}

  說明:FileSnap主要的屬性包含了是否已經關閉標識。

  3.2 類的核心函數

  1. deserialize函數

  函數簽名如下:

  public long deserialize(DataTree dt, Map<Long, Integer> sessions),是對SnapShot的deserialize函數的實現。其源碼如下  

    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
        // we run through 100 snapshots (not all of them)
        // if we cannot get it running within 100 snapshots
        // we should  give up
        // 查找100個合法的snapshot文件
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) { // 無snapshot文件,直接返回
            return -1L;
        }
        // 
        File snap = null;
        // 默認為不合法
        boolean foundValid = false;
        for (int i = 0; i < snapList.size(); i++) { // 遍歷snapList
            snap = snapList.get(i);
            // 輸入流
            InputStream snapIS = null;
            CheckedInputStream crcIn = null;
            try {
                LOG.info("Reading snapshot " + snap);
                // 讀取指定的snapshot文件
                snapIS = new BufferedInputStream(new FileInputStream(snap));
                // 驗證
                crcIn = new CheckedInputStream(snapIS, new Adler32());
                InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                // 反序列化
                deserialize(dt,sessions, ia);
                // 獲取驗證的值Checksum
                long checkSum = crcIn.getChecksum().getValue();
                // 從文件中讀取val值
                long val = ia.readLong("val");
                if (val != checkSum) { // 比較驗證,不相等,拋出異常
                    throw new IOException("CRC corruption in snapshot :  " + snap);
                }
                // 合法
                foundValid = true;
                // 跳出循環
                break;
            } catch(IOException e) {
                LOG.warn("problem reading snap file " + snap, e);
            } finally { // 關閉流
                if (snapIS != null) 
                    snapIS.close();
                if (crcIn != null) 
                    crcIn.close();
            } 
        }
        if (!foundValid) { // 遍歷所有文件都未驗證成功
            throw new IOException("Not able to find valid snapshots in " + snapDir);
        }
        // 從文件名中解析出zxid
        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
        return dt.lastProcessedZxid;
    }

  說明:deserialize主要用作反序列化,並將反序列化結果保存至dt和sessions中。 其大致步驟如下

  ① 獲取100個合法的snapshot文件,並且snapshot文件已經通過zxid進行降序排序,進入②

  ② 遍歷100個snapshot文件,從zxid最大的開始,讀取該文件,並創建相應的InputArchive,進入③

  ③ 調用deserialize(dt,sessions, ia)函數完成反序列化操作,進入④

  ④ 驗證從文件中讀取的Checksum是否與新生的Checksum相等,若不等,則拋出異常,否則,進入⑤

  ⑤ 跳出循環並關閉相應的輸入流,並從文件名中解析出相應的zxid返回。

  ⑥ 在遍歷100個snapshot文件后仍然無法找到通過驗證的文件,則拋出異常。

  在deserialize函數中,會調用findNValidSnapshots以及同名的deserialize(dt,sessions, ia)函數,findNValidSnapshots函數源碼如下  

    private List<File> findNValidSnapshots(int n) throws IOException {
        // 按照zxid對snapshot文件進行降序排序
        List<File> files = Util.sortDataDir(snapDir.listFiles(),"snapshot", false);
        int count = 0;
        List<File> list = new ArrayList<File>();
        for (File f : files) { // 遍歷snapshot文件
            // we should catch the exceptions
            // from the valid snapshot and continue
            // until we find a valid one
            try {
                // 驗證文件是否合法,在寫snapshot文件時服務器宕機
                // 此時的snapshot文件非法;非snapshot文件也非法
                if (Util.isValidSnapshot(f)) {
                    // 合法則添加
                    list.add(f);
                    // 計數器加一
                    count++;
                    if (count == n) { // 等於n則跳出循環
                        break;
                    }
                }
            } catch (IOException e) {
                LOG.info("invalid snapshot " + f, e);
            }
        }
        return list;
    }

  說明:該函數主要是查找N個合法的snapshot文件並進行降序排序后返回,Util的isValidSnapshot函數主要是從文件名和文件的結尾符號是否是"/"來判斷snapshot文件是否合法。其源碼如下 

    public static boolean isValidSnapshot(File f) throws IOException {
        // 文件為空或者非snapshot文件,則返回false
        if (f==null || Util.getZxidFromName(f.getName(), "snapshot") == -1)
            return false;

        // Check for a valid snapshot
        // 隨機訪問文件
        RandomAccessFile raf = new RandomAccessFile(f, "r");
        try {
            // including the header and the last / bytes
            // the snapshot should be atleast 10 bytes
            if (raf.length() < 10) { // 文件大小小於10個字節,返回false
                return false;
            }
            // 移動至倒數第五個字節
            raf.seek(raf.length() - 5);
            byte bytes[] = new byte[5];
            int readlen = 0;
            int l;
            while(readlen < 5 &&
                  (l = raf.read(bytes, readlen, bytes.length - readlen)) >= 0) { // 將最后五個字節存入bytes中
                readlen += l;
            }
            if (readlen != bytes.length) {
                LOG.info("Invalid snapshot " + f
                        + " too short, len = " + readlen);
                return false;
            }
            ByteBuffer bb = ByteBuffer.wrap(bytes);
            int len = bb.getInt();
            byte b = bb.get();
            if (len != 1 || b != '/') { // 最后字符不為"/",不合法
                LOG.info("Invalid snapshot " + f + " len = " + len
                        + " byte = " + (b & 0xff));
                return false;
            }
        } finally {
            raf.close();
        }

        return true;
    }

  deserialize(dt,sessions, ia)函數的源碼如下  

    public void deserialize(DataTree dt, Map<Long, Integer> sessions,
            InputArchive ia) throws IOException {
        FileHeader header = new FileHeader();
        // 反序列化至header
        header.deserialize(ia, "fileheader");
        if (header.getMagic() != SNAP_MAGIC) { // 驗證魔數是否相等
            throw new IOException("mismatching magic headers "
                    + header.getMagic() + 
                    " !=  " + FileSnap.SNAP_MAGIC);
        }
        // 反序列化至dt、sessions
        SerializeUtils.deserializeSnapshot(dt,ia,sessions);
    }

  說明:該函數主要作用反序列化,並將反序列化結果保存至header和sessions中。其中會驗證header的魔數是否相等。

  2. serialize函數 

  函數簽名如下:protected void serialize(DataTree dt,Map<Long, Integer> sessions, OutputArchive oa, FileHeader header) throws IOException

    protected void serialize(DataTree dt,Map<Long, Integer> sessions,
            OutputArchive oa, FileHeader header) throws IOException {
        // this is really a programmatic error and not something that can
        // happen at runtime
        if(header==null) // 文件頭為null
            throw new IllegalStateException(
                    "Snapshot's not open for writing: uninitialized header");
        // 將header序列化
        header.serialize(oa, "fileheader");
        // 將dt、sessions序列化
        SerializeUtils.serializeSnapshot(dt,oa,sessions);
    }

  說明:該函數主要用於序列化dt、sessions和header,其中,首先會檢查header是否為空,然后依次序列化header,sessions和dt。

  3. serialize函數

  函數簽名如下:public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot) throws IOException  

    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
            throws IOException {
        if (!close) { // 未關閉
            // 輸出流
            OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
            CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32());
            //CheckedOutputStream cout = new CheckedOutputStream()
            OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
            // 新生文件頭
            FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
            // 序列化dt、sessions、header
            serialize(dt,sessions,oa, header);
            // 獲取驗證的值
            long val = crcOut.getChecksum().getValue();
            // 寫入值
            oa.writeLong(val, "val");
            // 寫入"/"
            oa.writeString("/", "path");
            // 強制刷新
            sessOS.flush();
            crcOut.close();
            sessOS.close();
        }
    }

  說明:該函數用於將header、sessions、dt序列化至本地snapshot文件中,並且在最后會寫入"/"字符。該方法是同步的,即是線程安全的。

四、總結

  FileSnap源碼相對較簡單,其主要是用於操作snapshot文件,也謝謝各位園友的觀看~  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM