MIT 6.824 Lab2D Raft之日志壓縮


書接上文Raft Part C | MIT 6.824 Lab2C Persistence

實驗准備

  1. 實驗代碼:git://g.csail.mit.edu/6.824-golabs-2021/src/raft
  2. 如何測試:go test -run 2D -race
  3. 相關論文:Raft Extended Section 7
  4. 實驗指導:6.824 Lab 2: Raft (mit.edu)

實驗目標

實現SnapshotCondInstallSnapshotInstallSnapshot RPC,並修改之前的代碼以支持本次實驗的內容。

一些提示

  1. 不要使用論文中的偏移機制為數據分片,每個分片作為一個快照。而是每次RPC發送全部數據作為一個快照。
  2. 丟棄舊日志的全部引用,以便GC回收。
  3. 由於保存快照要丟棄部分日志,不能再使用日志長度來作為索引日志的標准。
  4. 考慮是否需要持久化lastIncludeTermlastIncludeIndex
  5. 使用rf.persister.SaveStateAndSnapshot()持久化快照。

日志壓縮

日志序列不斷擴張,是無法全部存儲在內存中的,對於已經應用到狀態機的部分日志,就不再需要維護在Raft中。

但由於仍可能存在部分Follower的日志序列遠遠落后於Leader,因此這部分日志不能被Leader丟棄,在同步日志時,若Leader中原應被同步的日志在快照中,則將快照發送給Follower。

lastIncluedTerm & lastIncludeIndex

日志壓縮后,Raft需要記錄額外的兩個信息,lastIncludeIndexlastIncludeTerm表示快照中最后一個log的index和Term。

image.png

此處設計新的log類型如下。

type Log struct {
    Entries []LogEntry
    Base    int
}

需要注意的是,Log.Entries從1開始存儲,因此Log.Entries[0].Term用於存儲lastIncludeTermLog.Base表示Log.Entries[0]的邏輯位置,也是lastIncludeIndex的值。

image.png

本例中,lastIncludeIndex = 4,lastIncludeTerm = 2,snapshot = [1,1,1,2]。

為Log添加相關成員函數。

func (l *Log) size() {
    return l.Base + len(l.Entries)
}

func (l *Log) get(i int) {
    return l.Entries[i-l.Base]
}

func (l *Log) set(i int, e LogEntry) {
    l.[i-l.Base] = e
}

Snapshot()

Snapshot(index int, snapshot []byte)由狀態機調用,傳入的index表示lastIncludeIndexsnapshot由狀態機生成,需要Raft保存,用於發送Follower時需要。

func (rf *Raft) Snapshot(index int, snapshot []byte) {
    if index <= rf.log.Base {
        return
    }
    rf.log.Entries = rf.log.Entries[index-rf.log.Base:]
    rf.log.Base = index
    rf.snapshot = snapshot
    rf.saveStateAndSnapshot()
}

index <= rf.log.Base說明傳入的snapshot是一個舊的快照。

InstallSnapshot RPC

首先是heartbeat()應該新增如下邏輯,當Leader中應被同步到Follower的日志在快照中時,將快照發送給Follower。

if next <= rf.log.Base {
    go rf.sendSnapshot(i, peer, InstallSnapshotArgs{
        Term: rf.currentTerm,
        LastIncludeIndex: rf.log.Base,
        LastIncludeTerm: rf.log.Entries[0].Term,
        Data: rf.snapshot,
    })
}

sendSnapshot()和發送日志序列類似。

func (rf *Raft) sendSnapshot(id int, peer *labrpc.ClientEnd, args InstallSnapshotArgs) {
	reply := InstallSnapshotReply{}
	ok := peer.Call("Raft.InstallSnapshot", &args, &reply)
	if !ok {
		return
	}

	if reply.Term > rf.currentTerm {
		rf.toFollower(reply.Term)
		return
	}

	rf.nextIndex[id] = args.LastIncludedIndex + 1
	rf.matchIndex[id] = args.LastIncludedIndex
}

InstallSnapshot()AppendEntries()類似,args.LastIncludedIndex <= rf.log.Base也是一樣的,表示一個舊的快照。

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
	rf.lastRecv = time.Now()

	if args.Term > rf.currentTerm {
		rf.toFollower(args.Term)
	}
	reply.Term = rf.currentTerm

	if args.Term < rf.currentTerm || args.LastIncludedIndex <= rf.log.Base {
		return
	}

	rf.applyCh <- ApplyMsg{
		SnapshotValid: true,
		Snapshot:      args.Data,
		SnapshotTerm:  args.LastIncludedTerm,
		SnapshotIndex: args.LastIncludedIndex,
	}
}

注意:快照是狀態機中的概念,需要在狀態機中加載快照,因此要通過applyCh將快照發送給狀態機,但是發送后Raft並不立即保存快照,而是等待狀態機調用CondInstallSnapshot(),如果從收到InstallSnapshot()后到收到CondInstallSnapshot()前,沒有新的日志提交到狀態機,則Raft返回True,Raft和狀態機保存快照,否則Raft返回False,兩者都不保存快照。

如此保證了Raft和狀態機保存快照是一個原子操作。當然在InstallSnapshot()將快照發送給狀態機后再將快照保存到Raft,令CondInstallSnap()永遠返回True,也可以保證原子操作,但是這樣做必須等待快照發送給狀態機完成,但是rf.applyCh <- ApplyMsg是有可能阻塞的,由於InstallSnapshot()需要持有全局的互斥鎖,這可能導致整個節點無法工作。

為什么要保證原子操作?因為負責將commit狀態的日志提交到狀態機的goroutine不負責快照部分,因此必須是先保存快照,再同步日志。

本系列文章給出的代碼為了好讀,沒有考慮同步問題,正常來講applyCh <- ApplyMsg這個操作是需要令起一個goroutine去做的。

如何判斷InstallSnapshot()CondInstallSnapshot()之間沒有新的日志提交到狀態機呢?這里使用commitIndex來判斷,當lastIncludeIndex <= commitIndex時,說明這期間原本沒有的快照部分的日志補全了,雖然commit狀態並不一定是apply狀態,但這里以commit為准,更安全。

func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
	if lastIncludedIndex <= rf.commitIndex {
		return false
	}

	if lastIncludedIndex <= rf.log.size()-1 && rf.log.get(lastIncludedIndex).Term == lastIncludedTerm {
		rf.log.Entries = append([]LogEntry(nil), rf.log.Entries[lastIncludedIndex-rf.log.Base:]...)
	} else {
		rf.log.Entries = append([]LogEntry(nil), LogEntry{Term: lastIncludedTerm})
	}

	rf.log.Base = lastIncludedIndex
	rf.snapshot = snapshot
	rf.commitIndex = lastIncludedIndex
	rf.lastApplied = lastIncludedIndex
	rf.saveStateAndSnapshot()
	return true
}

需要注意的是,這里截斷rf.log.Entries的方式,如果使用s = s[i:]這樣的方式,依然維持對底層數組全部元素的引用,是無法被GC回收的。

還有一點要注意的是,不要忘記在Make()中讀取持久化的snapshot,並初始化lastApplied的值。

QQ截圖20211111201612.png

最后,為了證明我不是在亂寫,附上我的測試結果。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM