實驗背景與目的
官網:6.824 Lab 4: Sharded Key/Value Service
在Lab2
和Lab3
,實現了基於單RAFT
(單一集群)的多節點間數據一致性、支持增刪查改、數據同步和快照保存的KV
數據庫。但忽視了集群負載問題,隨着數據增長到一定程度時,所有的數據請求都集中在leader
上,增加集群壓力,延長請求響應時。
Lab4
的內容就是將數據按照某種方式分開存儲到不同的RAFT
集群(Group
)上,分片(shard
)的策略有很多,比如:所有以a
開頭的鍵是一個分片,所有以b
開頭的鍵是一個分片。保證相應數據請求引流到對應的集群,降低單一集群的壓力,提供更為高效、更為健壯的服務。
整體架構如下圖:
-
具體的
lab4
要實現一個支持multi-raft
分片 、分片數據動態遷移的線性一致性分布式KV
存儲服務。 -
shard
表示互不相交並且組成完整數據庫的每一個數據庫子集。group
表示server
的集合,包含一個或多個server
。一個shard
只可屬於一個group
,一個group
可包含(管理)多個shard
。 -
lab4A
實現ShardCtrler
服務,作用:提供高可用的集群配置管理服務,實現分片的負載均衡,並盡可能少地移動分片。記錄了每組(Group
)ShardKVServer
的集群信息和每個分片(shard
)服務於哪組(Group
)ShardKVServer
。具體實現通過Raft
維護 一個Configs
數組,單個config
具體內容如下:Num
:config number
,Num=0
表示configuration
無效,邊界條件。Shards
:shard -> gid
,分片位置信息,Shards[3]=2
,說明分片序號為3
的分片負貴的集群是Group2
(gid=2
)Groups
:gid -> servers[]
,集群成員信息,Group[3]=['server1','server2']
,說明gid = 3
的集群Group3
包含兩台名稱為server1 & server2
的機器
-
lab4B
實現ShardKVServer
服務,ShardKVServer
則需要實現所有分片的讀寫任務,相比於MIT 6.824 Lab3 RaftKV的提供基礎的讀寫服務,還需要功能和難點為配置更新,分片數據遷移,分片數據清理,空日志檢測。
Lab 4A
介紹
lab4
系列除了構建一個 鍵值存儲系統,還需要將系統按鍵 分片(shard) 或對一組副本進行分區;分片的策略有很多,比如:所有以 “a” 開頭的鍵是一個分片,所有以 “b” 開頭的鍵是一個分片,等等
為什么需要分片?從性能考慮,每個副本只處理一部分分片的 put 和 get,並且這些副本之間是支持並行操作的;因此,系統的總吞吐量和副本的數量成比例增加
shard kv 系統組成
分片鍵值對存儲系統將由兩個組件組成;首先是一組副本,每個副本負責處理分片的一部分。一個副本由使用 raft 副本組分片的服務器組成。
第二個組件是 分片控制器,分片控制器決定哪個副本組對應服務哪個分片,即管理配置信息。配置隨時間變化,客戶端咨詢 分片控制器,尋找對應 key 的副本組,而 副本組會請求控制器,以找到需要裝載的分片。分片控制器是單例,采用 raft 實現容錯。
shard kv 系統功能
分片存儲系統必須能在多個副本組之間轉移分片,需要支持該功能的原因是:一某些組的負載可能會比其他組重,移動分片來實現負載均衡;二是某些組可能會加入或退出系統,又或者增加新的組以增加分片容量,或將已有的副本組下線修復實驗挑戰
處理重新配置功能,即將分片重新分配給副本組。在一個副本組中,所有組成員必須就客戶端的請求在進行重新配置時達成一致。如,請求在重配置的同時到達,重配置導致副本組不再對持有該鍵的分片負責。所以組中的所有副本服務器必須就請求在重配置前還是后達成一致,如果在重配置前,則請求生效,分片的新所有者需要看到生效效果。否則,請求無法生效,客戶端必須在新所有者處重新請求。推薦的方法是,讓每個副本組使用
raft
,不僅記錄請求的序列,還記錄重配置的序列;需要確保在任何時間最多只有一個副本組為所有分片提供服務。重配置還涉及到副本組之間的交互,如在配置編號10中,G1組負責分片S1,配置11中,G2組負責分配S1。在10到11 的配置變換過程中,G1和G2需要使用RPC將分片S1內容做遷移。
只有RPC可以用於客戶端和服務器之間的交互。服務器的不同實例不允許共享Go變量或文件,邏輯上是物理分離的
重配置將分片分配給副本組,重配置不是 raft 集群成員變更,我們不需要實現 raft 集群成員變更
實驗提示
- 從一個簡化的
kvraft
服務器副本開始- 實現對分片控制器的rpc的重復客戶端請求檢測。
shardctrler
的測試沒有測試這個,但是shardkv
測試稍后會在一個不可靠的網絡上使用shardctrler;如果shardctrler沒有過濾掉重復的rpc,那么將不能通過
對於golang
的map
:map
的迭代順序是不確定的,其次,且map
是引用對象,需要用深拷貝做復制.go test -race
很好用
lab4A
實現ShardCtrler
服務,作用:提供高可用的集群配置管理服務,記錄了每組(Group
)ShardKVServer
的集群信息和每個分片(shard
)服務於哪組(Group
)ShardKVServer
。具體實現通過Raft
維護 一個Configs
數組,具體內容如下:
Num
:config number
Shards
:shard -> gid
,分片位置信息,Shards[3]=2
,說明分片序號為3
的分片負責的集群是Group2
(gid=2
)Groups
:gid -> servers[]
,集群成員信息,Group[3]=['server1','server2']
,說明gid = 3
的集群Group3
包含兩台名稱為server1 & server2
的機器
代碼實現基本與Lab3
類似,可以直接照抄復制MIT 6.824 Lab3 RaftKV,且不需要實現快照服務,具體根據實現 Join, Leave, Move, Query
服務。
Query
: 查詢最新的Config
信息。Move
將數據庫子集Shard
分配給GID
的Group
。Join
: 新加入的Group
信息,要求在每一個group
平衡分布shard
,即任意兩個group
之間的shard
數目相差不能為1
,具體實現每一次找出含有shard
數目最多的和最少的,最多的給最少的一個,循環直到滿足條件為止。坑為:GID = 0
是無效配置,一開始所有分片分配給GID=0
,需要優先分配;map
的迭代時無序的,不確定順序的話,同一個命令在不同節點上計算出來的新配置不一致,按sort
排序之后遍歷即可。且map
是引用對象,需要用深拷貝做復制。Leave
: 移除Group
,同樣別忘記實現均衡,將移除的Group
的shard
每一次分配給數目最小的Group
就行,如果全部刪除,別忘記將shard
置為無效的0。
其他代碼與MIT 6.824 Lab3 RaftKV完全一致,關鍵代碼實現:
func (mcf *MemoryConfig) Query(num int) (Config, Err) {
// 如果該數字為 -1 或大於已知的最大配置數字,則 shardctrler 應回復最新配置。
if num < 0 || num >= len(mcf.Configs) {
return mcf.Configs[len(mcf.Configs)-1], OK
}
return mcf.Configs[num], OK
}
func (mcf *MemoryConfig) Move(shard int, gid int) Err {
lastConfig := mcf.Configs[len(mcf.Configs)-1]
newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}
newConfig.Shards[shard] = gid
mcf.Configs = append(mcf.Configs, newConfig)
return OK
}
func (mcf *MemoryConfig) Join(groups map[int][]string) Err {
lastConfig := mcf.Configs[len(mcf.Configs)-1]
newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}
for gid, servers := range groups {
if _, ok := newConfig.Groups[gid]; !ok {
newServers := make([]string, len(servers))
copy(newServers, servers)
newConfig.Groups[gid] = newServers
}
}
// balance
g2s := groupToShards(newConfig)
for {
s, t := getMaxNumShardByGid(g2s), getMinNumShardByGid(g2s)
if s != 0 && len(g2s[s])-len(g2s[t]) <= 1 {
break
}
g2s[t] = append(g2s[t], g2s[s][0])
g2s[s] = g2s[s][1:]
}
var newShards [NShards]int
for gid, shards := range g2s {
for _, shardId := range shards {
newShards[shardId] = gid
}
}
newConfig.Shards = newShards
mcf.Configs = append(mcf.Configs, newConfig)
return OK
}
func (mcf *MemoryConfig) Leave(gids []int) Err {
lastConfig := mcf.Configs[len(mcf.Configs)-1]
newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}
g2s := groupToShards(newConfig)
noUsedShards := make([]int, 0)
for _, gid := range gids {
if _, ok := newConfig.Groups[gid]; ok {
delete(newConfig.Groups, gid)
}
if shards, ok := g2s[gid]; ok {
noUsedShards = append(noUsedShards, shards...)
delete(g2s, gid)
}
}
var newShards [NShards]int
if len(newConfig.Groups) > 0 {
for _, shardId := range noUsedShards {
t := getMinNumShardByGid(g2s)
g2s[t] = append(g2s[t], shardId)
}
for gid, shards := range g2s {
for _, shardId := range shards {
newShards[shardId] = gid
}
}
}
newConfig.Shards = newShards
mcf.Configs = append(mcf.Configs, newConfig)
return OK
}
func getMinNumShardByGid(g2s map[int][]int) int {
// 不固定順序的話,可能會導致兩次的config不同
gids := make([]int, 0)
for key := range g2s {
gids = append(gids, key)
}
sort.Ints(gids)
min, index := NShards+1, -1
for _, gid := range gids {
if gid != 0 && len(g2s[gid]) < min {
min = len(g2s[gid])
index = gid
}
}
return index
}
func getMaxNumShardByGid(g2s map[int][]int) int {
// GID = 0 是無效配置,一開始所有分片分配給GID=0
if shards, ok := g2s[0]; ok && len(shards) > 0 {
return 0
}
gids := make([]int, 0)
for key := range g2s {
gids = append(gids, key)
}
sort.Ints(gids)
max, index := -1, -1
for _, gid := range gids {
if len(g2s[gid]) > max {
max = len(g2s[gid])
index = gid
}
}
return index
}
func groupToShards(config Config) map[int][]int {
g2s := make(map[int][]int)
for gid := range config.Groups {
g2s[gid] = make([]int, 0)
}
for shardId, gid := range config.Shards {
g2s[gid] = append(g2s[gid], shardId)
}
return g2s
}
func deepCopy(groups map[int][]string) map[int][]string {
newGroups := make(map[int][]string)
for gid, servers := range groups {
newServers := make([]string, len(servers))
copy(newServers, servers)
newGroups[gid] = newServers
}
return newGroups
}
Lab 4B
實驗提示
服務器不需要調用分片控制器的
Join()
,tester
才會去調用;服務器將需要定期輪詢 shardctrler 以監聽新的配置。預期大約每100毫秒輪詢一次;可以更頻繁,但過少可能會導致 bug。
服務器需要互相發送rpc,以便在配置更改期間傳輸分片。shardctrler的Config結構包含服務器名,一個 Server 需要一個labrpc.ClientEnd,以便發送RPC。使用make_end()函數傳給StartServer()函數將服務器名轉換為ClientEnd。shardkv /client.go需要實現這些邏輯。
在server.go中添加代碼去周期性從 shardctrler 拉取最新的配置,並且當請求分片不屬於自身時,拒絕請求
當被請求到錯誤分片時,需要返回ErrWrongGroup給客戶端,並確保Get, Put, Append在面臨並發重配置時能正確作出決定
重配置需要按流程執行唯一一次
labgob 的提示錯誤不能忽視,它可能導致實驗不過
分片重分配的請求也需要做重復請求檢測
若客戶端收到ErrWrongGroup,是否更改請求序列號?若服務器執行請求時返回ErrWrongGroup,是否更新客戶端信息?
當服務器轉移到新配置后,它可以繼續存儲它不再負責的分片(生產環境中這是不允許的),但這個可以簡化實現
當 G1 在配置變更時需要來自 G2 的分片數據,G2 處理日志條目的哪個時間點將分片發送給 G1 是最好的?
你可以在整個 rpc 請求或回復中發送整個 map,這可以簡化分片傳輸
map 是引用類型,所以在發送 map 的時候,建議先拷貝一次,避免 data race(在 labrpc 框架下,接收 map 時也需要拷貝)
在配置更改期間,一對組可能需要互相傳送分片,這可能會發生死鎖
challenge
如果想達到生產環境系統級別,如下兩個挑戰是需要實現的
challenge1:Garbage collection of state
當一個副本組失去一個分片的所有權時,副本組需要刪除該分片數據。但這給遷移帶來一些問題,考慮兩個組G1 和 G2,並且新配置C 將分片從 G1 移動到 G2,若 G1 在轉換配置到C時刪除了數據庫中的分片,當G2 轉換到C時,如何獲取 G1 的數據實驗要求
使每個副本組保留舊分片的時長不再是無限時長,即使副本組(如上面的G1)中的所有服務器崩潰並恢復正常,解決方案也必須工作。如果您通過TestChallenge1Delete
,您就完成了這個挑戰。解決方案
分片遷移成功之后,立馬進行分片GC
了,GC
完畢后再進入到配置更新階段。chanllenge2:Client requests during configuration changes
配置更改期間最簡單的方式是禁止所有客戶端操作直到轉換完成,雖然簡單但是不滿足於生產環境要求,這將導致客戶端長時間停滯,最好可以繼續為不受當前配置更改的分片提供服務上述優化還能更好,若 G3 在過渡到配置C時,需要來自G1 的分片S1 和 G2 的分片S2。希望 G3 能在收到其中一個分片后可以立即開始接收針對該分片的請求。如G1宕機了,G3在收到G2的分片數據后,可以立即為 S2 分片提供服務,而不需要等待 C 配置轉換完全完成
實驗要求
修改您的解決方案,以便在配置更改期間繼續執行不受影響的分片中的 key 的客戶端操作。當您通過TestChallenge2Unaffected
測試時,您已經完成了這個挑戰。修改您的解決方案,在配置轉換進行中,副本組也可以立即開始提供分片服務。當您通過
TestChallenge2Partial
測試時,您已經完成了這個挑戰。解決方案
分片遷移以group
為單位,這樣即使一個group
掛了,也不會影響到另一個group
中的分片遷移。
本實驗設計方案主要參考:https://github.com/OneSizeFitsQuorum/MIT6.824-2021/blob/master/docs/lab4.md(這個大佬真的頂,設計方案優秀,代碼邏輯清晰,膜拜。)
上面的實驗ShardCtrler
集群組實現了配置更新,分片均勻分配等任務,ShardKVServer
則需要承載所有分片的讀寫任務,相比於MIT 6.824 Lab3 RaftKV的提供基礎的讀寫服務,還需要功能為配置更新,分片數據遷移,分片數據清理,空日志檢測。
客戶端Clerk
主要實現為請求邏輯:
- 使用
key2shard()
去找到一個key
對應哪個分片Shard
; - 根據
Shard
從當前配置config
中獲取的gid
; - 根據
gid
從當前配置config
中獲取group
信息; - 在
group
循環查找leaderId
,直到返回請求成功、ErrWrongGroup
或整個 group 都遍歷請求過; Query
最新的配置,回到步驟1循環重復;
func MakeClerk(ctrlers []*labrpc.ClientEnd, makeEnd func(string) *labrpc.ClientEnd) *Clerk {
ck := &Clerk{
sm: shardctrler.MakeClerk(ctrlers),
makeEnd: makeEnd,
leaderIds: make(map[int]int),
clientId: nrand(),
commandId: 0,
}
ck.config = ck.sm.Query(-1)
return ck
}
func (ck *Clerk) Get(key string) string {
return ck.Command(&CommandRequest{Key: key, Op: OpGet})
}
func (ck *Clerk) Put(key string, value string) {
ck.Command(&CommandRequest{Key: key, Value: value, Op: OpPut})
}
func (ck *Clerk) Append(key string, value string) {
ck.Command(&CommandRequest{Key: key, Value: value, Op: OpAppend})
}
func (ck *Clerk) Command(request *CommandRequest) string {
request.ClientId, request.CommandId = ck.clientId, ck.commandId
for {
shard := key2shard(request.Key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
if _, ok = ck.leaderIds[gid]; !ok {
ck.leaderIds[gid] = 0
}
oldLeaderId := ck.leaderIds[gid]
newLeaderId := oldLeaderId
for {
var response CommandResponse
ok := ck.makeEnd(servers[newLeaderId]).Call("ShardKV.Command", request, &response)
if ok && (response.Err == OK || response.Err == ErrNoKey) {
ck.commandId++
return response.Value
} else if ok && response.Err == ErrWrongGroup {
break
} else {
newLeaderId = (newLeaderId + 1) % len(servers)
if newLeaderId == oldLeaderId {
break
}
continue
}
}
}
time.Sleep(100 * time.Millisecond)
// 獲取最新配置
ck.config = ck.sm.Query(-1)
}
}
服務端Server
首先明確整體系統的運行方式:
- 客戶端首先和
ShardCtrler
交互,獲取最新的配置,根據最新配置找到對應key
的shard
,請求該shard
的group
。 - 服務端
ShardKVServer
會創建多個raft
組來承載所有分片的讀寫任務。 - 服務端
ShardKVServer
需要定期和ShardCtrler
交互,保證更新到最新配置(monitor
)。 - 服務端
ShardKVServer
需要根據最新配置完成配置更新,分片數據遷移,分片數據清理,空日志檢測等功能。
結構
首先ShardKVServer
給出結構體,相比於MIT 6.824 Lab3 RaftKV的多了currentConfig
和lastConfig
數據,這樣其他協程便能夠通過其計算需要需要向誰拉取分片或者需要讓誰去刪分片。
啟動了五個協程:apply
協程,配置更新協程,數據遷移協程,數據清理協程,空日志檢測協程來實現功能。四個協程都需要 leader 來執行,因此抽象出了一個簡單地周期執行函數 Monitor
。
type ShardKV struct {
mu sync.RWMutex
dead int32
rf *raft.Raft
applyCh chan raft.ApplyMsg
makeEnd func(string) *labrpc.ClientEnd
gid int
sc *shardctrler.Clerk
maxRaftState int // snapshot if log grows this big
lastApplied int // record the lastApplied to prevent stateMachine from rollback
lastConfig shardctrler.Config
currentConfig shardctrler.Config
stateMachines map[int]*Shard // 服務器數據存儲(key,value)
lastOperations map[int64]OperationContext // 客戶端id最后的命令id和回復內容 (clientId,{最后的commdId,最后的LastReply})
notifyChans map[int]chan *CommandResponse // Leader回復給客戶端的響應(日志Index, CommandReply)
}
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxRaftState int, gid int, ctrlers []*labrpc.ClientEnd, makeEnd func(string) *labrpc.ClientEnd) *ShardKV {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Command{})
labgob.Register(CommandRequest{})
labgob.Register(shardctrler.Config{})
labgob.Register(ShardOperationResponse{})
labgob.Register(ShardOperationRequest{})
applyCh := make(chan raft.ApplyMsg)
kv := &ShardKV{
dead: 0,
rf: raft.Make(servers, me, persister, applyCh),
applyCh: applyCh,
makeEnd: makeEnd,
gid: gid,
sc: shardctrler.MakeClerk(ctrlers),
lastApplied: 0,
maxRaftState: maxRaftState,
currentConfig: shardctrler.DefaultConfig(),
lastConfig: shardctrler.DefaultConfig(),
stateMachines: make(map[int]*Shard),
lastOperations: make(map[int64]OperationContext),
notifyChans: make(map[int]chan *CommandResponse),
}
kv.restoreSnapshot(persister.ReadSnapshot())
// 將 committed logs 應用到 stateMachine
go kv.applier()
// 開啟協程獲取最新配置
go kv.Monitor(kv.configureAction, ConfigureMonitorTimeout)
// 開啟協程拉取相關分片
go kv.Monitor(kv.migrationAction, MigrationMonitorTimeout)
// 開啟協程 刪除遠程組中無用的分片
go kv.Monitor(kv.gcAction, GCMonitorTimeout)
// 開啟協程在當前term中附加空條目來提高 commitIndex 以避免活鎖
go kv.Monitor(kv.checkEntryInCurrentTermAction, EmptyEntryDetectorTimeout)
DPrintf("{Node %v}{Group %v} has started", kv.rf.Me(), kv.gid)
return kv
}
// applier協程:將提交的日志應用到 stateMachine,並實現快照
func (kv *ShardKV) applier() {
for kv.killed() == false {
select {
case message := <-kv.applyCh:
if message.CommandValid {
kv.mu.Lock()
if message.CommandIndex <= kv.lastApplied {
kv.mu.Unlock()
continue
}
kv.lastApplied = message.CommandIndex
var response *CommandResponse
command := message.Command.(Command)
switch command.Op {
case Operation:
operation := command.Data.(CommandRequest)
response = kv.applyOperation(&message, &operation)
case Configuration:
nextConfig := command.Data.(shardctrler.Config)
response = kv.applyConfiguration(&nextConfig)
case InsertShards:
shardsInfo := command.Data.(ShardOperationResponse)
response = kv.applyInsertShards(&shardsInfo)
case DeleteShards:
shardsInfo := command.Data.(ShardOperationRequest)
response = kv.applyDeleteShards(&shardsInfo)
case EmptyEntry:
response = kv.applyEmptyEntry()
}
if currentTerm, isLeader := kv.rf.GetState(); isLeader && message.CommandTerm == currentTerm {
ch := kv.getNotifyChan(message.CommandIndex)
ch <- response
}
needSnapshot := kv.needSnapshot()
if needSnapshot {
kv.takeSnapshot(message.CommandIndex)
}
kv.mu.Unlock()
} else if message.SnapshotValid {
kv.mu.Lock()
if kv.rf.CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot) {
kv.restoreSnapshot(message.Snapshot)
kv.lastApplied = message.SnapshotIndex
}
kv.mu.Unlock()
} else {
panic(fmt.Sprintf("unexpected Message %v", message))
}
}
}
}
相比於MIT 6.824 Lab3 RaftKV需要維護分片Shard
的狀態變量來完成該實驗,原因如下:
- 這樣可以防止分片
Shard
中間狀態被覆蓋,從而導致任務被丟棄。只有所有分片Shard
的狀態都為默認狀態才能拉取最新配置。 - 實驗
challenge2
限制,challenge2
不僅要求apply
協程不被阻塞,還要求配置的更新和分片的狀態變化彼此獨立。即需要不同raft
組所屬的分片數據獨立起來,分別提交多條raft
日志來維護狀態,因此需要維護狀態變量。 - 必須使用單獨的協程異步根據分片的狀態來進行分片的遷移和清理等任務。如果不采用上述方案,
apply
協程更新配置的時候由leader
異步啟動對應的協程,讓其獨立的根據raft
組為粒度拉取數據?讓設想這樣一個場景:leader apply 了新配置后便掛了,然后此時 follower 也 apply 了該配置但並不會啟動該任務,在該raft
組的新leader
選出來后,該任務已經無法被執行了。所有apply配置的時候只應該更新shard
的狀態。
每個分片共有 4 種狀態:
Serving
:分片的默認狀態,如果當前raft
組在當前config
下負責管理此分片,則該分片可以提供讀寫服務,否則該分片暫不可以提供讀寫服務,但不會阻塞配置更新協程拉取新配置。Pulling
:表示當前raft
組在當前config
下負責管理此分片,暫不可以提供讀寫服務,需要當前raft
組從上一個配置該分片所屬raft
組拉數據過來之后才可以提供讀寫服務,系統會有一個分片遷移協程檢測所有分片的Pulling
狀態,接着以raft
組為單位去對應raft
組拉取數據,接着嘗試重放該分片的所有數據到本地並將分片狀態置為Serving
,以繼續提供服務。BePulling
:表示當前raft
組在當前config
下不負責管理此分片,不可以提供讀寫服務,但當前raft
組在上一個config
時復制管理此分片,因此當前config
下負責管理此分片的raft
組拉取完數據后會向本raft
組發送分片清理的rpc
,接着本raft
組將數據清空並重置為serving
狀態即可。GCing
:表示當前raft
組在當前config
下負責管理此分片,可以提供讀寫服務,但需要清理掉上一個配置該分片所屬raft
組的數據。系統會有一個分片清理協程檢測所有分片的GCing
狀態,接着以raft
組為單位去對應raft
組刪除數據,一旦遠程raft
組刪除數據成功,則本地會嘗試將相關分片的狀態置為Serving
。
type ShardStatus uint8
const (
Serving ShardStatus = iota
Pulling
BePulling
GCing
)
type Shard struct {
KV map[string]string
Status ShardStatus
}
func NewShard() *Shard {
return &Shard{make(map[string]string), Serving}
}
func (shard *Shard) Get(key string) (string, Err) {
if value, ok := shard.KV[key]; ok {
return value, OK
}
return "", ErrNoKey
}
func (shard *Shard) Put(key, value string) Err {
shard.KV[key] = value
return OK
}
func (shard *Shard) Append(key, value string) Err {
shard.KV[key] += value
return OK
}
func (shard *Shard) deepCopy() map[string]string {
newShard := make(map[string]string)
for k, v := range shard.KV {
newShard[k] = v
}
return newShard
}
日志類型
在 lab3
中,客戶端的請求會被包裝成一個 Op
傳給 Raft
層,則在 lab4
中,不難想到,Servers
之間的交互,也可以看做是包裝成 Op
傳給 Raft
層;定義了五種類型的日志:
-
Operation
:客戶端傳來的讀寫操作日志,有Put
,Get
,Append
等請求。 -
Configuration
:配置更新日志,包含一個配置。 -
InsertShards
:分片更新日志,包含至少一個分片的數據和配置版本。 -
DeleteShards
:分片刪除日志,包含至少一個分片的 id 和配置版本。 -
EmptyEntry
:空日志,Data
為空,使得狀態機達到最新。
type Command struct {
Op CommandType
Data interface{}
}
func (command Command) String() string {
return fmt.Sprintf("{Type:%v,Data:%v}", command.Op, command.Data)
}
func NewOperationCommand(request *CommandRequest) Command {
return Command{Operation, *request}
}
func NewConfigurationCommand(config *shardctrler.Config) Command {
return Command{Configuration, *config}
}
func NewInsertShardsCommand(response *ShardOperationResponse) Command {
return Command{InsertShards, *response}
}
func NewDeleteShardsCommand(request *ShardOperationRequest) Command {
return Command{DeleteShards, *request}
}
func NewEmptyEntryCommand() Command {
return Command{EmptyEntry, nil}
}
type CommandType uint8
const (
Operation CommandType = iota
Configuration
InsertShards
DeleteShards
EmptyEntry
)
讀寫服務
讀寫操作的基本邏輯相比於MIT 6.824 Lab3 RaftKV基本一致,需要增加分片狀態判斷。根據上述定義,分片的狀態為 Serving
或 GCing
,當前 raft
組在當前 config
下負責管理此分片,本 raft
組才可以為該分片提供讀寫服務,否則返回 ErrWrongGroup
讓客戶端重新拉取最新的 config
並重試即可。
canServe
的判斷需要在向 raft
提交前和 apply
時都檢測一遍以保證正確性並盡可能提升性能。
// 檢查該raft group 目前是否可以服務該shard
func (kv *ShardKV) canServe(shardID int) bool {
return kv.currentConfig.Shards[shardID] == kv.gid && (kv.stateMachines[shardID].Status == Serving || kv.stateMachines[shardID].Status == GCing)
}
func (kv *ShardKV) Command(request *CommandRequest, response *CommandResponse) {
kv.mu.RLock()
if request.Op != OpGet && kv.isDuplicateRequest(request.ClientId, request.CommandId) {
lastResponse := kv.lastOperations[request.ClientId].LastResponse
response.Value, response.Err = lastResponse.Value, lastResponse.Err
kv.mu.RUnlock()
return
}
// 如果當前分片無法提供服務,則返回 ErrWrongGroup 讓客戶端獲取最新配置,
if !kv.canServe(key2shard(request.Key)) {
response.Err = ErrWrongGroup
kv.mu.RUnlock()
return
}
kv.mu.RUnlock()
kv.Execute(NewOperationCommand(request), response)
}
func (kv *ShardKV) Execute(command Command, response *CommandResponse) {
// 不持有鎖以提高吞吐量
// 當 KVServer 持有鎖進行快照時,底層 raft 仍然可以提交 raft 日志
index, _, isLeader := kv.rf.Start(command)
if !isLeader {
response.Err = ErrWrongLeader
return
}
//defer DPrintf("{Node %v}{Group %v} processes Command %v with CommandResponse %v", kv.rf.Me(), kv.gid, command, response)
kv.mu.Lock()
ch := kv.getNotifyChan(index)
kv.mu.Unlock()
select {
case result := <-ch:
response.Value, response.Err = result.Value, result.Err
case <-time.After(ExecuteTimeout):
response.Err = ErrTimeout
}
// 釋放 notifyChan 以減少內存占用
// 異步為了提高吞吐量,這里不需要阻塞客戶端請求
go func() {
kv.mu.Lock()
kv.removeOutdatedNotifyChan(index)
kv.mu.Unlock()
}()
}
func (kv *ShardKV) applyOperation(message *raft.ApplyMsg, operation *CommandRequest) *CommandResponse {
var response *CommandResponse
shardID := key2shard(operation.Key)
if kv.canServe(shardID) {
if operation.Op != OpGet && kv.isDuplicateRequest(operation.ClientId, operation.CommandId) {
DPrintf("{Node %v}{Group %v} doesn't apply duplicated message %v to stateMachine because maxAppliedCommandId is %v for client %v", kv.rf.Me(), kv.gid, message, kv.lastOperations[operation.ClientId], operation.ClientId)
return kv.lastOperations[operation.ClientId].LastResponse
} else {
response = kv.applyLogToStateMachines(operation, shardID)
if operation.Op != OpGet {
kv.lastOperations[operation.ClientId] = OperationContext{operation.CommandId, response}
}
return response
}
}
return &CommandResponse{ErrWrongGroup, ""}
}
接下來就只能瞻仰大佬的思路,目前的想法是先照着大佬實現pull
方案實現,且網上大部分方案是pull
,但我感覺push
方案稍微優異點,因為push
模式可以在發送方在收到接收方應用成功reply
的時候就,發送方可以直接進行GC
,等我按照大佬思路實現完pull
方案,汲取到經驗之后,就實現push
方案。
配置更新
配置更新協程負責定時檢測所有分片的狀態,一旦存在至少一個分片的狀態不為默認狀態,則預示其他協程仍然還沒有完成任務,那么此時需要阻塞新配置的拉取和提交。
在 apply
配置更新日志時需要保證冪等性:
- 不同版本的配置更新日志:
apply
時僅可逐步遞增的去更新配置,否則返回失敗。 - 相同版本的配置更新日志:由於配置更新日志僅由配置更新協程提交,而配置更新協程只有檢測到比本地更大地配置時才會提交配置更新日志,所以該情形不會出現。
func (kv *ShardKV) configureAction() {
canPerformNextConfig := true
kv.mu.RLock()
for _, shard := range kv.stateMachines {
if shard.Status != Serving {
canPerformNextConfig = false
DPrintf("{Node %v}{Group %v} will not try to fetch latest configuration because shards status are %v when currentConfig is %v", kv.rf.Me(), kv.gid, kv.getShardStatus(), kv.currentConfig)
break
}
}
currentConfigNum := kv.currentConfig.Num
kv.mu.RUnlock()
if canPerformNextConfig {
nextConfig := kv.sc.Query(currentConfigNum + 1)
if nextConfig.Num == currentConfigNum+1 {
DPrintf("{Node %v}{Group %v} fetches latest configuration %v when currentConfigNum is %v", kv.rf.Me(), kv.gid, nextConfig, currentConfigNum)
kv.Execute(NewConfigurationCommand(&nextConfig), &CommandResponse{})
}
}
}
func (kv *ShardKV) applyConfiguration(nextConfig *shardctrler.Config) *CommandResponse {
if nextConfig.Num == kv.currentConfig.Num+1 {
DPrintf("{Node %v}{Group %v} updates currentConfig from %v to %v", kv.rf.Me(), kv.gid, kv.currentConfig, nextConfig)
kv.updateShardStatus(nextConfig)
kv.lastConfig = kv.currentConfig
kv.currentConfig = *nextConfig
return &CommandResponse{OK, ""}
}
DPrintf("{Node %v}{Group %v} rejects outdated config %v when currentConfig is %v", kv.rf.Me(), kv.gid, nextConfig, kv.currentConfig)
return &CommandResponse{ErrOutDated, ""}
}
分片遷移
分片遷移協程負責定時檢測分片的 Pulling
狀態,利用 lastConfig
計算出對應 raft
組的 gid
和要拉取的分片,然后並行地去拉取數據。
注意這里使用了 waitGroup
來保證所有獨立地任務完成后才會進行下一次任務。此外 wg.Wait()
一定要在釋放讀鎖之后,否則無法滿足 challenge2
的要求。
在拉取分片的 handler
中,首先僅可由 leader
處理該請求,其次如果發現請求中的配置版本大於本地的版本,那說明請求拉取的是未來的數據,則返回 ErrNotReady
讓其稍后重試,否則將分片數據和去重表都深度拷貝到 response
即可。
在 apply
分片更新日志時需要保證冪等性:
- 不同版本的配置更新日志:僅可執行與當前配置版本相同地分片更新日志,否則返回 ErrOutDated。
- 相同版本的配置更新日志:僅在對應分片狀態為
Pulling
時為第一次應用,此時覆蓋狀態機即可並修改狀態為GCing
,以讓分片清理協程檢測到GCing
狀態並嘗試刪除遠端的分片。否則說明已經應用過,直接break
即可。
func (kv *ShardKV) migrationAction() {
kv.mu.RLock()
gid2shardIDs := kv.getShardIDsByStatus(Pulling)
var wg sync.WaitGroup
for gid, shardIDs := range gid2shardIDs {
DPrintf("{Node %v}{Group %v} starts a PullTask to get shards %v from group %v when config is %v", kv.rf.Me(), kv.gid, shardIDs, gid, kv.currentConfig)
wg.Add(1)
go func(servers []string, configNum int, shardIDs []int) {
defer wg.Done()
pullTaskRequest := ShardOperationRequest{configNum, shardIDs}
for _, server := range servers {
var pullTaskResponse ShardOperationResponse
srv := kv.makeEnd(server)
if srv.Call("ShardKV.GetShardsData", &pullTaskRequest, &pullTaskResponse) && pullTaskResponse.Err == OK {
DPrintf("{Node %v}{Group %v} gets a PullTaskResponse %v and tries to commit it when currentConfigNum is %v", kv.rf.Me(), kv.gid, pullTaskResponse, configNum)
kv.Execute(NewInsertShardsCommand(&pullTaskResponse), &CommandResponse{})
}
}
}(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIDs)
}
kv.mu.RUnlock()
wg.Wait()
}
func (kv *ShardKV) GetShardsData(request *ShardOperationRequest, response *ShardOperationResponse) {
// only pull shards from leader
if _, isLeader := kv.rf.GetState(); !isLeader {
response.Err = ErrWrongLeader
return
}
kv.mu.RLock()
defer kv.mu.RUnlock()
defer DPrintf("{Node %v}{Group %v} processes PullTaskRequest %v with response %v", kv.rf.Me(), kv.gid, request, response)
if kv.currentConfig.Num < request.ConfigNum {
response.Err = ErrNotReady
return
}
response.Shards = make(map[int]map[string]string)
for _, shardID := range request.ShardIDs {
response.Shards[shardID] = kv.stateMachines[shardID].deepCopy()
}
response.LastOperations = make(map[int64]OperationContext)
for clientID, operation := range kv.lastOperations {
response.LastOperations[clientID] = operation.deepCopy()
}
response.ConfigNum, response.Err = request.ConfigNum, OK
}
func (kv *ShardKV) applyInsertShards(shardsInfo *ShardOperationResponse) *CommandResponse {
if shardsInfo.ConfigNum == kv.currentConfig.Num {
DPrintf("{Node %v}{Group %v} accepts shards insertion %v when currentConfig is %v", kv.rf.Me(), kv.gid, shardsInfo, kv.currentConfig)
for shardId, shardData := range shardsInfo.Shards {
shard := kv.stateMachines[shardId]
if shard.Status == Pulling {
for key, value := range shardData {
shard.KV[key] = value
}
shard.Status = GCing
} else {
DPrintf("{Node %v}{Group %v} encounters duplicated shards insertion %v when currentConfig is %v", kv.rf.Me(), kv.gid, shardsInfo, kv.currentConfig)
break
}
}
for clientId, operationContext := range shardsInfo.LastOperations {
if lastOperation, ok := kv.lastOperations[clientId]; !ok || lastOperation.MaxAppliedCommandId < operationContext.MaxAppliedCommandId {
kv.lastOperations[clientId] = operationContext
}
}
return &CommandResponse{OK, ""}
}
DPrintf("{Node %v}{Group %v} rejects outdated shards insertion %v when currentConfig is %v", kv.rf.Me(), kv.gid, shardsInfo, kv.currentConfig)
return &CommandResponse{ErrOutDated, ""}
}
分片清理
分片清理協程負責定時檢測分片的 GCing
狀態,利用 lastConfig
計算出對應 raft
組的 gid
和要拉取的分片,然后並行地去刪除分片。
注意這里使用了 waitGroup
來保證所有獨立地任務完成后才會進行下一次任務。此外 wg.Wait()
一定要在釋放讀鎖之后,否則無法滿足 challenge2
的要求。
在刪除分片的 handler
中,首先僅可由 leader
處理該請求,其次如果發現請求中的配置版本小於本地的版本,那說明該請求已經執行過,否則本地的 config
也無法增大,此時直接返回 OK 即可,否則在本地提交一個刪除分片的日志。
在 apply
分片刪除日志時需要保證冪等性:
- 不同版本的配置更新日志:僅可執行與當前配置版本相同地分片刪除日志,否則已經刪除過,直接返回 OK 即可。
- 相同版本的配置更新日志:如果分片狀態為
GCing
,說明是本raft
組已成功刪除遠端raft
組的數據,現需要更新分片狀態為默認狀態以支持配置的進一步更新;否則如果分片狀態為BePulling
,則說明本raft
組第一次刪除該分片的數據,此時直接重置分片即可。否則說明該請求已經應用過,直接break
返回 OK 即可。
func (kv *ShardKV) gcAction() {
kv.mu.RLock()
gid2shardIDs := kv.getShardIDsByStatus(GCing)
var wg sync.WaitGroup
for gid, shardIDs := range gid2shardIDs {
DPrintf("{Node %v}{Group %v} starts a GCTask to delete shards %v in group %v when config is %v", kv.rf.Me(), kv.gid, shardIDs, gid, kv.currentConfig)
wg.Add(1)
go func(servers []string, configNum int, shardIDs []int) {
defer wg.Done()
gcTaskRequest := ShardOperationRequest{configNum, shardIDs}
for _, server := range servers {
var gcTaskResponse ShardOperationResponse
srv := kv.makeEnd(server)
if srv.Call("ShardKV.DeleteShardsData", &gcTaskRequest, &gcTaskResponse) && gcTaskResponse.Err == OK {
DPrintf("{Node %v}{Group %v} deletes shards %v in remote group successfully when currentConfigNum is %v", kv.rf.Me(), kv.gid, shardIDs, configNum)
kv.Execute(NewDeleteShardsCommand(&gcTaskRequest), &CommandResponse{})
}
}
}(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIDs)
}
kv.mu.RUnlock()
wg.Wait()
}
func (kv *ShardKV) DeleteShardsData(request *ShardOperationRequest, response *ShardOperationResponse) {
// only delete shards when role is leader
if _, isLeader := kv.rf.GetState(); !isLeader {
response.Err = ErrWrongLeader
return
}
defer DPrintf("{Node %v}{Group %v} processes GCTaskRequest %v with response %v", kv.rf.Me(), kv.gid, request, response)
kv.mu.RLock()
if kv.currentConfig.Num > request.ConfigNum {
DPrintf("{Node %v}{Group %v}'s encounters duplicated shards deletion %v when currentConfig is %v", kv.rf.Me(), kv.gid, request, kv.currentConfig)
response.Err = OK
kv.mu.RUnlock()
return
}
kv.mu.RUnlock()
var commandResponse CommandResponse
kv.Execute(NewDeleteShardsCommand(request), &commandResponse)
response.Err = commandResponse.Err
}
func (kv *ShardKV) applyDeleteShards(shardsInfo *ShardOperationRequest) *CommandResponse {
if shardsInfo.ConfigNum == kv.currentConfig.Num {
DPrintf("{Node %v}{Group %v}'s shards status are %v before accepting shards deletion %v when currentConfig is %v", kv.rf.Me(), kv.gid, kv.getShardStatus(), shardsInfo, kv.currentConfig)
for _, shardId := range shardsInfo.ShardIDs {
shard := kv.stateMachines[shardId]
if shard.Status == GCing {
shard.Status = Serving
} else if shard.Status == BePulling {
kv.stateMachines[shardId] = NewShard()
} else {
DPrintf("{Node %v}{Group %v} encounters duplicated shards deletion %v when currentConfig is %v", kv.rf.Me(), kv.gid, shardsInfo, kv.currentConfig)
break
}
}
DPrintf("{Node %v}{Group %v}'s shards status are %v after accepting shards deletion %v when currentConfig is %v", kv.rf.Me(), kv.gid, kv.getShardStatus(), shardsInfo, kv.currentConfig)
return &CommandResponse{OK, ""}
}
DPrintf("{Node %v}{Group %v}'s encounters duplicated shards deletion %v when currentConfig is %v", kv.rf.Me(), kv.gid, shardsInfo, kv.currentConfig)
return &CommandResponse{OK, ""}
}
空日志檢測
分片清理協程負責定時檢測 raft
層的 leader
是否擁有當前 term
的日志,如果沒有則提交一條空日志,這使得新 leader
的狀態機能夠迅速達到最新狀態,從而避免多 raft
組間的活鎖狀態。
func (kv *ShardKV) checkEntryInCurrentTermAction() {
if !kv.rf.HasLogInCurrentTerm() {
kv.Execute(NewEmptyEntryCommand(), &CommandResponse{})
}
}
func (kv *ShardKV) applyEmptyEntry() *CommandResponse {
return &CommandResponse{OK, ""}
}