書接上文Raft Part C | MIT 6.824 Lab2C Persistence。
實驗准備
- 實驗代碼:
git://g.csail.mit.edu/6.824-golabs-2021/src/raft - 如何測試:
go test -run 2D -race - 相關論文:Raft Extended Section 7
- 實驗指導:6.824 Lab 2: Raft (mit.edu)
實驗目標
實現Snapshot、CondInstallSnapshot、InstallSnapshot RPC,並修改之前的代碼以支持本次實驗的內容。
一些提示
- 不要使用論文中的偏移機制為數據分片,每個分片作為一個快照。而是每次RPC發送全部數據作為一個快照。
- 丟棄舊日志的全部引用,以便GC回收。
- 由於保存快照要丟棄部分日志,不能再使用日志長度來作為索引日志的標准。
- 考慮是否需要持久化
lastIncludeTerm和lastIncludeIndex。 - 使用
rf.persister.SaveStateAndSnapshot()持久化快照。
日志壓縮
日志序列不斷擴張,是無法全部存儲在內存中的,對於已經應用到狀態機的部分日志,就不再需要維護在Raft中。
但由於仍可能存在部分Follower的日志序列遠遠落后於Leader,因此這部分日志不能被Leader丟棄,在同步日志時,若Leader中原應被同步的日志在快照中,則將快照發送給Follower。
lastIncluedTerm & lastIncludeIndex
日志壓縮后,Raft需要記錄額外的兩個信息,lastIncludeIndex、lastIncludeTerm表示快照中最后一個log的index和Term。

此處設計新的log類型如下。
type Log struct {
Entries []LogEntry
Base int
}
需要注意的是,Log.Entries從1開始存儲,因此Log.Entries[0].Term用於存儲lastIncludeTerm,Log.Base表示Log.Entries[0]的邏輯位置,也是lastIncludeIndex的值。

本例中,lastIncludeIndex = 4,lastIncludeTerm = 2,snapshot = [1,1,1,2]。
為Log添加相關成員函數。
func (l *Log) size() {
return l.Base + len(l.Entries)
}
func (l *Log) get(i int) {
return l.Entries[i-l.Base]
}
func (l *Log) set(i int, e LogEntry) {
l.[i-l.Base] = e
}
Snapshot()
Snapshot(index int, snapshot []byte)由狀態機調用,傳入的index表示lastIncludeIndex,snapshot由狀態機生成,需要Raft保存,用於發送Follower時需要。
func (rf *Raft) Snapshot(index int, snapshot []byte) {
if index <= rf.log.Base {
return
}
rf.log.Entries = rf.log.Entries[index-rf.log.Base:]
rf.log.Base = index
rf.snapshot = snapshot
rf.saveStateAndSnapshot()
}
index <= rf.log.Base說明傳入的snapshot是一個舊的快照。
InstallSnapshot RPC
首先是heartbeat()應該新增如下邏輯,當Leader中應被同步到Follower的日志在快照中時,將快照發送給Follower。
if next <= rf.log.Base {
go rf.sendSnapshot(i, peer, InstallSnapshotArgs{
Term: rf.currentTerm,
LastIncludeIndex: rf.log.Base,
LastIncludeTerm: rf.log.Entries[0].Term,
Data: rf.snapshot,
})
}
sendSnapshot()和發送日志序列類似。
func (rf *Raft) sendSnapshot(id int, peer *labrpc.ClientEnd, args InstallSnapshotArgs) {
reply := InstallSnapshotReply{}
ok := peer.Call("Raft.InstallSnapshot", &args, &reply)
if !ok {
return
}
if reply.Term > rf.currentTerm {
rf.toFollower(reply.Term)
return
}
rf.nextIndex[id] = args.LastIncludedIndex + 1
rf.matchIndex[id] = args.LastIncludedIndex
}
InstallSnapshot()和AppendEntries()類似,args.LastIncludedIndex <= rf.log.Base也是一樣的,表示一個舊的快照。
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.lastRecv = time.Now()
if args.Term > rf.currentTerm {
rf.toFollower(args.Term)
}
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm || args.LastIncludedIndex <= rf.log.Base {
return
}
rf.applyCh <- ApplyMsg{
SnapshotValid: true,
Snapshot: args.Data,
SnapshotTerm: args.LastIncludedTerm,
SnapshotIndex: args.LastIncludedIndex,
}
}
注意:快照是狀態機中的概念,需要在狀態機中加載快照,因此要通過applyCh將快照發送給狀態機,但是發送后Raft並不立即保存快照,而是等待狀態機調用CondInstallSnapshot(),如果從收到InstallSnapshot()后到收到CondInstallSnapshot()前,沒有新的日志提交到狀態機,則Raft返回True,Raft和狀態機保存快照,否則Raft返回False,兩者都不保存快照。
如此保證了Raft和狀態機保存快照是一個原子操作。當然在InstallSnapshot()將快照發送給狀態機后再將快照保存到Raft,令CondInstallSnap()永遠返回True,也可以保證原子操作,但是這樣做必須等待快照發送給狀態機完成,但是rf.applyCh <- ApplyMsg是有可能阻塞的,由於InstallSnapshot()需要持有全局的互斥鎖,這可能導致整個節點無法工作。
為什么要保證原子操作?因為負責將commit狀態的日志提交到狀態機的goroutine不負責快照部分,因此必須是先保存快照,再同步日志。
本系列文章給出的代碼為了好讀,沒有考慮同步問題,正常來講
applyCh <- ApplyMsg這個操作是需要令起一個goroutine去做的。
如何判斷InstallSnapshot()到CondInstallSnapshot()之間沒有新的日志提交到狀態機呢?這里使用commitIndex來判斷,當lastIncludeIndex <= commitIndex時,說明這期間原本沒有的快照部分的日志補全了,雖然commit狀態並不一定是apply狀態,但這里以commit為准,更安全。
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
if lastIncludedIndex <= rf.commitIndex {
return false
}
if lastIncludedIndex <= rf.log.size()-1 && rf.log.get(lastIncludedIndex).Term == lastIncludedTerm {
rf.log.Entries = append([]LogEntry(nil), rf.log.Entries[lastIncludedIndex-rf.log.Base:]...)
} else {
rf.log.Entries = append([]LogEntry(nil), LogEntry{Term: lastIncludedTerm})
}
rf.log.Base = lastIncludedIndex
rf.snapshot = snapshot
rf.commitIndex = lastIncludedIndex
rf.lastApplied = lastIncludedIndex
rf.saveStateAndSnapshot()
return true
}
需要注意的是,這里截斷
rf.log.Entries的方式,如果使用s = s[i:]這樣的方式,依然維持對底層數組全部元素的引用,是無法被GC回收的。
還有一點要注意的是,不要忘記在Make()中讀取持久化的snapshot,並初始化lastApplied的值。

最后,為了證明我不是在亂寫,附上我的測試結果。
