zookeeper(3) 持久化


zookeeper為了防止,系統宕機或重啟導致的數據丟失,會對數據進行定時持久化。有兩種持久化方式:

  1.為每次事務操作記錄到日志文件,這樣就可以通過執行這些日志文件來恢復數據。

  2.為了加快ZooKeeper恢復的速度,ZooKeeper還提供了對樹結構和session信息進行數據快照持久化的操作。

日志文件

    日志文件記錄zookeeper服務器上的每一次事務操作。

  日志文件格式:log.ZXID,ZXID非常重要,它表示該文件起始的事務id。

數據快照

  數據快照用來記錄zookeeper服務器上某一時刻的全量內存數據內容,並寫入指定磁盤文件中。

  數據快照文件格式:snapshot.ZXID,ZXID非常重要,ZooKeeper會根據ZXID來確定數據恢復的起始點。

  鏡像文件主要存儲zookeeper的樹結構和session信息。

類圖

FileTxnSnapLog

  是操作數據持久化的核心類,底層通過TxnLog和SnapShot來分別操作日志文件和數據快照。

  存儲數據快照

復制代碼
public void save(DataTree dataTree,
            ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
        throws IOException {
        long lastZxid = dataTree.lastProcessedZxid;
        LOG.info("Snapshotting: " + Long.toHexString(lastZxid));
        File snapshot=new File(
                snapDir, Util.makeSnapshotName(lastZxid));
        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshot);
        
    }
復制代碼

   日志文件操作

復制代碼
public boolean append(Request si) throws IOException {
        return txnLog.append(si.hdr, si.txn);
    }
    public void commit() throws IOException {
        txnLog.commit();
    }
    public void rollLog() throws IOException {
        txnLog.rollLog();
    }
復制代碼

  數據恢復

復制代碼
public long restore(DataTree dt, Map<Long, Integer> sessions, 
            PlayBackListener listener) throws IOException {
        snapLog.deserialize(dt, sessions);
        FileTxnLog txnLog = new FileTxnLog(dataDir);
        TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
        long highestZxid = dt.lastProcessedZxid;
        TxnHeader hdr;
        while (true) {
            
            hdr = itr.getHeader();
            ...if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                LOG.error(highestZxid + "(higestZxid) > "
                        + hdr.getZxid() + "(next log) for type "
                        + hdr.getType());
            } else {
                highestZxid = hdr.getZxid();
            }
            try {
                processTransaction(hdr,dt,sessions, itr.getTxn());
            } catch(KeeperException.NoNodeException e) {
                throw new IOException("Failed to process transaction type: " +
                        hdr.getType() + " error: " + e.getMessage());
            }
            if (!itr.next()) 
                break;
        }
        return highestZxid;
    }
    
復制代碼

 FileTxnLog

  負責維護事務日志對外的接口,包括事務日志的寫入和讀取等。

  寫入事務日志

  1.如果日志文件打開,使用該日志文件;如果沒有,使用該事務的zxid做為后綴,創建新的日志文件。

  2.如果當前日志文件剩余空間不足4kb,對日志文件擴容到64mb,使用0來填充。預分配的原因是提高io效率。

  3.對事務的頭和事務體序列號

  4.生成checksum

  5.寫入文件流。

復制代碼
public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr != null) {
            ...
            if (logStream==null) {
                ...
               logFileWrite = new File(logDir, ("log." + 
                       Long.toHexString(hdr.getZxid())));
               fos = new FileOutputStream(logFileWrite);
               logStream=new BufferedOutputStream(fos);
               oa = BinaryOutputArchive.getArchive(logStream);
               FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
               fhdr.serialize(oa, "fileheader");
               // Make sure that the magic number is written before padding.
               logStream.flush();
               currentSize = fos.getChannel().position();
               streamsToFlush.add(fos);
            }
            padFile(fos);
            byte[] buf = Util.marshallTxnEntry(hdr, txn);
            ...
            Checksum crc = makeChecksumAlgorithm();
            crc.update(buf, 0, buf.length);
            oa.writeLong(crc.getValue(), "txnEntryCRC");
            Util.writeTxnBytes(oa, buf);
            return true;
        }
        return false;
    }
復制代碼

  持久化本質是將內存中對象數據以二進制的方式存儲到磁盤上,這個過程,底層通過jute來序列號。

  序列化和反序列化的本質就是數據流與對象數據之間的變換jute的序列化理念是讓需要序列化的對象自己定義序列化協議。所以使用jute進行序列化的對象需要實現Record接口,該接口需要對象實現序列化和反序列化方法。此外jute還對序列化的流進行了抽象,OutputArchive代表輸入流,InputArchive代表輸入流,各種類型流的讀寫通過實現這兩個接口實現。通過實現Record接口,對象定義序列化和反序列化的協議;通過實現OutputArchive和InputArchive,實現數據存儲和讀取

Record代碼:

1 public interface Record {
2     public void serialize(OutputArchive archive, String tag)
3         throws IOException;
4     public void deserialize(InputArchive archive, String tag)
5         throws IOException;
6 }
View Code

OutputArchive代碼:

 1 public interface OutputArchive {
 2     public void writeByte(byte b, String tag) throws IOException;
 3     public void writeBool(boolean b, String tag) throws IOException;
 4     public void writeInt(int i, String tag) throws IOException;
 5     public void writeLong(long l, String tag) throws IOException;
 6     public void writeFloat(float f, String tag) throws IOException;
 7     public void writeDouble(double d, String tag) throws IOException;
 8     public void writeString(String s, String tag) throws IOException;
 9     public void writeBuffer(byte buf[], String tag)
10         throws IOException;
11     public void writeRecord(Record r, String tag) throws IOException;
12     public void startRecord(Record r, String tag) throws IOException;
13     public void endRecord(Record r, String tag) throws IOException;
14     public void startVector(List v, String tag) throws IOException;
15     public void endVector(List v, String tag) throws IOException;
16     public void startMap(TreeMap v, String tag) throws IOException;
17     public void endMap(TreeMap v, String tag) throws IOException;
18 
19 }
View Code

InputArchive代碼:

 1 public interface InputArchive {
 2     public byte readByte(String tag) throws IOException;
 3     public boolean readBool(String tag) throws IOException;
 4     public int readInt(String tag) throws IOException;
 5     public long readLong(String tag) throws IOException;
 6     public float readFloat(String tag) throws IOException;
 7     public double readDouble(String tag) throws IOException;
 8     public String readString(String tag) throws IOException;
 9     public byte[] readBuffer(String tag) throws IOException;
10     public void readRecord(Record r, String tag) throws IOException;
11     public void startRecord(String tag) throws IOException;
12     public void endRecord(String tag) throws IOException;
13     public Index startVector(String tag) throws IOException;
14     public void endVector(String tag) throws IOException;
15     public Index startMap(String tag) throws IOException;
16     public void endMap(String tag) throws IOException;
17 }
View Code

  例如對FileHeader實現序列化,分別在serialize和deserialize方法中定義序列化協議,然后調用相應方法就可以將該對象序列化和反序列化。

 1 public class FileHeader implements Record {
 2   private int magic;
 3   private int version;
 4   private long dbid;
 5   public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
 6     a_.startRecord(this,tag);
 7     a_.writeInt(magic,"magic");
 8     a_.writeInt(version,"version");
 9     a_.writeLong(dbid,"dbid");
10     a_.endRecord(this,tag);
11   }
12   public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
13     a_.startRecord(tag);
14     magic=a_.readInt("magic");
15     version=a_.readInt("version");
16     dbid=a_.readLong("dbid");
17     a_.endRecord(tag);
18 }
19 }
View Code

  具體對象會序列化為什么樣的數據形式以及從什么樣數據形式中反序列化,取決於OutputArchive和InputArchive的實現。

二進制數據流實現:

BinaryOutputArchive:

  1 public class BinaryOutputArchive implements OutputArchive {
  2     private ByteBuffer bb = ByteBuffer.allocate(1024);
  3     private DataOutput out;
  4     public static BinaryOutputArchive getArchive(OutputStream strm) {
  5         return new BinaryOutputArchive(new DataOutputStream(strm));
  6     }
  7     public BinaryOutputArchive(DataOutput out) {
  8         this.out = out;
  9     }
 10     public void writeByte(byte b, String tag) throws IOException {
 11         out.writeByte(b);
 12     }
 13     public void writeBool(boolean b, String tag) throws IOException {
 14         out.writeBoolean(b);
 15     }
 16     public void writeInt(int i, String tag) throws IOException {
 17         out.writeInt(i);
 18     }
 19     public void writeLong(long l, String tag) throws IOException {
 20         out.writeLong(l);
 21     }
 22     public void writeFloat(float f, String tag) throws IOException {
 23         out.writeFloat(f);
 24     }
 25     public void writeDouble(double d, String tag) throws IOException {
 26         out.writeDouble(d);
 27     }
 28     
 29     /**
 30      * create our own char encoder to utf8. This is faster 
 31      * then string.getbytes(UTF8).
 32      * @param s the string to encode into utf8
 33      * @return utf8 byte sequence.
 34      */
 35     final private ByteBuffer stringToByteBuffer(CharSequence s) {
 36         bb.clear();
 37         final int len = s.length();
 38         for (int i = 0; i < len; i++) {
 39             if (bb.remaining() < 3) {
 40                 ByteBuffer n = ByteBuffer.allocate(bb.capacity() << 1);
 41                 bb.flip();
 42                 n.put(bb);
 43                 bb = n;
 44             }
 45             char c = s.charAt(i);
 46             if (c < 0x80) {
 47                 bb.put((byte) c);
 48             } else if (c < 0x800) {
 49                 bb.put((byte) (0xc0 | (c >> 6)));
 50                 bb.put((byte) (0x80 | (c & 0x3f)));
 51             } else {
 52                 bb.put((byte) (0xe0 | (c >> 12)));
 53                 bb.put((byte) (0x80 | ((c >> 6) & 0x3f)));
 54                 bb.put((byte) (0x80 | (c & 0x3f)));
 55             }
 56         }
 57         bb.flip();
 58         return bb;
 59     }
 60 
 61     public void writeString(String s, String tag) throws IOException {
 62         if (s == null) {
 63             writeInt(-1, "len");
 64             return;
 65         }
 66         ByteBuffer bb = stringToByteBuffer(s);
 67         writeInt(bb.remaining(), "len");
 68         out.write(bb.array(), bb.position(), bb.limit());
 69     }
 70 
 71     public void writeBuffer(byte barr[], String tag)
 72     throws IOException {
 73         if (barr == null) {
 74             out.writeInt(-1);
 75             return;
 76         }
 77         out.writeInt(barr.length);
 78         out.write(barr);
 79     }
 80     
 81     public void writeRecord(Record r, String tag) throws IOException {
 82         r.serialize(this, tag);
 83     }
 84     public void startRecord(Record r, String tag) throws IOException {}
 85     
 86     public void endRecord(Record r, String tag) throws IOException {}
 87     
 88     public void startVector(List v, String tag) throws IOException {
 89         if (v == null) {
 90             writeInt(-1, tag);
 91             return;
 92         }
 93         writeInt(v.size(), tag);
 94     }
 95     public void endVector(List v, String tag) throws IOException {}
 96     
 97     public void startMap(TreeMap v, String tag) throws IOException {
 98         writeInt(v.size(), tag);
 99     }
100     public void endMap(TreeMap v, String tag) throws IOException {}
101 }
View Code

BinaryInputArchive:

  1 public class BinaryInputArchive implements InputArchive {
  2     
  3     private DataInput in;
  4     
  5     static public BinaryInputArchive getArchive(InputStream strm) {
  6         return new BinaryInputArchive(new DataInputStream(strm));
  7     }
  8     
  9     static private class BinaryIndex implements Index {
 10         private int nelems;
 11         BinaryIndex(int nelems) {
 12             this.nelems = nelems;
 13         }
 14         public boolean done() {
 15             return (nelems <= 0);
 16         }
 17         public void incr() {
 18             nelems--;
 19         }
 20     }
 21     /** Creates a new instance of BinaryInputArchive */
 22     public BinaryInputArchive(DataInput in) {
 23         this.in = in;
 24     }
 25     
 26     public byte readByte(String tag) throws IOException {
 27         return in.readByte();
 28     }
 29     
 30     public boolean readBool(String tag) throws IOException {
 31         return in.readBoolean();
 32     }
 33     
 34     public int readInt(String tag) throws IOException {
 35         return in.readInt();
 36     }
 37     
 38     public long readLong(String tag) throws IOException {
 39         return in.readLong();
 40     }
 41     
 42     public float readFloat(String tag) throws IOException {
 43         return in.readFloat();
 44     }
 45     
 46     public double readDouble(String tag) throws IOException {
 47         return in.readDouble();
 48     }
 49     
 50     public String readString(String tag) throws IOException {
 51         int len = in.readInt();
 52         if (len == -1) return null;
 53         byte b[] = new byte[len];
 54         in.readFully(b);
 55         return new String(b, "UTF8");
 56     }
 57     
 58     static public final int maxBuffer = determineMaxBuffer();
 59     private static int determineMaxBuffer() {
 60         String maxBufferString = System.getProperty("jute.maxbuffer");
 61         try {
 62             return Integer.parseInt(maxBufferString);
 63         } catch(Exception e) {
 64             return 0xfffff;
 65         }
 66         
 67     }
 68     public byte[] readBuffer(String tag) throws IOException {
 69         int len = readInt(tag);
 70         if (len == -1) return null;
 71         if (len < 0 || len > maxBuffer) {
 72             throw new IOException("Unreasonable length = " + len);
 73         }
 74         byte[] arr = new byte[len];
 75         in.readFully(arr);
 76         return arr;
 77     }
 78     
 79     public void readRecord(Record r, String tag) throws IOException {
 80         r.deserialize(this, tag);
 81     }
 82     
 83     public void startRecord(String tag) throws IOException {}
 84     
 85     public void endRecord(String tag) throws IOException {}
 86     
 87     public Index startVector(String tag) throws IOException {
 88         int len = readInt(tag);
 89         if (len == -1) {
 90             return null;
 91         }
 92         return new BinaryIndex(len);
 93     }
 94     
 95     public void endVector(String tag) throws IOException {}
 96     
 97     public Index startMap(String tag) throws IOException {
 98         return new BinaryIndex(readInt(tag));
 99     }
100     
101     public void endMap(String tag) throws IOException {}
102     
103 }
View Code

  其他的實現還有,cvs文件(CsvInputArchive,CsvOutputArchive);xml文件(XmlInputArchive,XmlOutputArchive)。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM