MIT 6.824 Lab4 Sharded KeyValue Service


實驗背景與目的

官網:6.824 Lab 4: Sharded Key/Value Service

Lab2Lab3,實現了基於單RAFT(單一集群)的多節點間數據一致性、支持增刪查改、數據同步和快照保存的KV數據庫。但忽視了集群負載問題,隨着數據增長到一定程度時,所有的數據請求都集中在leader上,增加集群壓力,延長請求響應時。

Lab4的內容就是將數據按照某種方式分開存儲到不同的RAFT集群(Group)上,分片(shard)的策略有很多,比如:所有以a開頭的鍵是一個分片,所有以b開頭的鍵是一個分片。保證相應數據請求引流到對應的集群,降低單一集群的壓力,提供更為高效、更為健壯的服務。

​ 整體架構如下圖:

  1. 具體的lab4要實現一個支持 multi-raft分片 、分片數據動態遷移的線性一致性分布式 KV 存儲服務。

  2. shard表示互不相交並且組成完整數據庫的每一個數據庫子集。group表示server的集合,包含一個或多個server。一個shard只可屬於一個group,一個group可包含(管理)多個shard

  3. lab4A實現ShardCtrler服務,作用:提供高可用的集群配置管理服務,實現分片的負載均衡,並盡可能少地移動分片。記錄了每組(GroupShardKVServer的集群信息和每個分片(shard)服務於哪組(GroupShardKVServer。具體實現通過Raft維護 一個Configs數組,單個config具體內容如下:

    • Numconfig numberNum=0表示configuration無效,邊界條件
    • Shardsshard -> gid,分片位置信息,Shards[3]=2,說明分片序號為3的分片負貴的集群是Group2gid=2
    • Groupsgid -> servers[],集群成員信息,Group[3]=['server1','server2'],說明gid = 3的集群Group3包含兩台名稱為server1 & server2的機器
  4. lab4B實現ShardKVServer服務,ShardKVServer則需要實現所有分片的讀寫任務,相比於MIT 6.824 Lab3 RaftKV的提供基礎的讀寫服務,還需要功能和難點為配置更新,分片數據遷移,分片數據清理,空日志檢測

Lab 4A

介紹

lab4 系列除了構建一個 鍵值存儲系統,還需要將系統按鍵 分片(shard) 或對一組副本進行分區;

分片的策略有很多,比如:所有以 “a” 開頭的鍵是一個分片,所有以 “b” 開頭的鍵是一個分片,等等

為什么需要分片?從性能考慮,每個副本只處理一部分分片的 put 和 get,並且這些副本之間是支持並行操作的;因此,系統的總吞吐量和副本的數量成比例增加

shard kv 系統組成
分片鍵值對存儲系統將由兩個組件組成;

首先是一組副本,每個副本負責處理分片的一部分。一個副本由使用 raft 副本組分片的服務器組成。

第二個組件是 分片控制器,分片控制器決定哪個副本組對應服務哪個分片,即管理配置信息。配置隨時間變化,客戶端咨詢 分片控制器,尋找對應 key 的副本組,而 副本組會請求控制器,以找到需要裝載的分片。分片控制器是單例,采用 raft 實現容錯。

shard kv 系統功能
分片存儲系統必須能在多個副本組之間轉移分片,需要支持該功能的原因是:一某些組的負載可能會比其他組重,移動分片來實現負載均衡;二是某些組可能會加入或退出系統,又或者增加新的組以增加分片容量,或將已有的副本組下線修復

實驗挑戰
處理重新配置功能,即將分片重新分配給副本組。在一個副本組中,所有組成員必須就客戶端的請求在進行重新配置時達成一致。如,請求在重配置的同時到達,重配置導致副本組不再對持有該鍵的分片負責。所以組中的所有副本服務器必須就請求在重配置前還是后達成一致,如果在重配置前,則請求生效,分片的新所有者需要看到生效效果。否則,請求無法生效,客戶端必須在新所有者處重新請求。

推薦的方法是,讓每個副本組使用 raft,不僅記錄請求的序列,還記錄重配置的序列;需要確保在任何時間最多只有一個副本組為所有分片提供服務。

重配置還涉及到副本組之間的交互,如在配置編號10中,G1組負責分片S1,配置11中,G2組負責分配S1。在10到11 的配置變換過程中,G1和G2需要使用RPC將分片S1內容做遷移。

  • 只有RPC可以用於客戶端和服務器之間的交互。服務器的不同實例不允許共享Go變量或文件,邏輯上是物理分離的

  • 重配置將分片分配給副本組,重配置不是 raft 集群成員變更,我們不需要實現 raft 集群成員變更

實驗提示

  • 從一個簡化的 kvraft 服務器副本開始
  • 實現對分片控制器的rpc的重復客戶端請求檢測。shardctrler 的測試沒有測試這個,但是shardkv測試稍后會在一個不可靠的網絡上使用shardctrler;如果shardctrler沒有過濾掉重復的rpc,那么將不能通過
    對於 golangmapmap 的迭代順序是不確定的,其次,且 map 是引用對象,需要用深拷貝做復制.
  • go test -race很好用

lab4A實現ShardCtrler服務,作用:提供高可用的集群配置管理服務,記錄了每組(GroupShardKVServer的集群信息和每個分片(shard)服務於哪組(GroupShardKVServer。具體實現通過Raft維護 一個Configs數組,具體內容如下:

  • Numconfig number
  • Shardsshard -> gid,分片位置信息,Shards[3]=2,說明分片序號為3的分片負責的集群是Group2gid=2
  • Groupsgid -> servers[],集群成員信息,Group[3]=['server1','server2'],說明gid = 3的集群Group3包含兩台名稱為server1 & server2的機器

​ 代碼實現基本與Lab3 類似,可以直接照抄復制MIT 6.824 Lab3 RaftKV,且不需要實現快照服務,具體根據實現 Join, Leave, Move, Query 服務。

  1. Query: 查詢最新的Config信息。
  2. Move 將數據庫子集Shard分配給GIDGroup
  3. Join: 新加入的Group信息,要求在每一個group平衡分布shard,即任意兩個group之間的shard數目相差不能為1,具體實現每一次找出含有shard數目最多的和最少的,最多的給最少的一個,循環直到滿足條件為止。坑為:GID = 0 是無效配置,一開始所有分片分配給GID=0,需要優先分配;map的迭代時無序的,不確定順序的話,同一個命令在不同節點上計算出來的新配置不一致,按sort排序之后遍歷即可。且 map 是引用對象,需要用深拷貝做復制。
  4. Leave: 移除Group,同樣別忘記實現均衡,將移除的Groupshard每一次分配給數目最小的Group就行,如果全部刪除,別忘記將shard置為無效的0。

其他代碼與MIT 6.824 Lab3 RaftKV完全一致,關鍵代碼實現:

func (mcf *MemoryConfig) Query(num int) (Config, Err) {
	// 如果該數字為 -1 或大於已知的最大配置數字,則 shardctrler 應回復最新配置。
	if num < 0 || num >= len(mcf.Configs) {
		return mcf.Configs[len(mcf.Configs)-1], OK
	}
	return mcf.Configs[num], OK
}

func (mcf *MemoryConfig) Move(shard int, gid int) Err {
	lastConfig := mcf.Configs[len(mcf.Configs)-1]
	newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}
	newConfig.Shards[shard] = gid
	mcf.Configs = append(mcf.Configs, newConfig)
	return OK
}

func (mcf *MemoryConfig) Join(groups map[int][]string) Err {
	lastConfig := mcf.Configs[len(mcf.Configs)-1]
	newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}

	for gid, servers := range groups {
		if _, ok := newConfig.Groups[gid]; !ok {
			newServers := make([]string, len(servers))
			copy(newServers, servers)
			newConfig.Groups[gid] = newServers
		}
	}

	// balance
	g2s := groupToShards(newConfig)

	for {
		s, t := getMaxNumShardByGid(g2s), getMinNumShardByGid(g2s)
		if s != 0 && len(g2s[s])-len(g2s[t]) <= 1 {
			break
		}
		g2s[t] = append(g2s[t], g2s[s][0])
		g2s[s] = g2s[s][1:]
	}

	var newShards [NShards]int
	for gid, shards := range g2s {
		for _, shardId := range shards {
			newShards[shardId] = gid
		}
	}
	newConfig.Shards = newShards
	mcf.Configs = append(mcf.Configs, newConfig)
	return OK
}

func (mcf *MemoryConfig) Leave(gids []int) Err {
	lastConfig := mcf.Configs[len(mcf.Configs)-1]
	newConfig := Config{len(mcf.Configs), lastConfig.Shards, deepCopy(lastConfig.Groups)}

	g2s := groupToShards(newConfig)

	noUsedShards := make([]int, 0)
	for _, gid := range gids {
		if _, ok := newConfig.Groups[gid]; ok {
			delete(newConfig.Groups, gid)
		}
		if shards, ok := g2s[gid]; ok {
			noUsedShards = append(noUsedShards, shards...)
			delete(g2s, gid)
		}
	}

	var newShards [NShards]int
	if len(newConfig.Groups) > 0 {
		for _, shardId := range noUsedShards {
			t := getMinNumShardByGid(g2s)
			g2s[t] = append(g2s[t], shardId)
		}

		for gid, shards := range g2s {
			for _, shardId := range shards {
				newShards[shardId] = gid
			}
		}
	}
	newConfig.Shards = newShards
	mcf.Configs = append(mcf.Configs, newConfig)
	return OK
}

func getMinNumShardByGid(g2s map[int][]int) int {
	// 不固定順序的話,可能會導致兩次的config不同
	gids := make([]int, 0)
	for key := range g2s {
		gids = append(gids, key)
	}

	sort.Ints(gids)

	min, index := NShards+1, -1
	for _, gid := range gids {
		if gid != 0 && len(g2s[gid]) < min {
			min = len(g2s[gid])
			index = gid
		}
	}
	return index
}

func getMaxNumShardByGid(g2s map[int][]int) int {
	// GID = 0 是無效配置,一開始所有分片分配給GID=0
	if shards, ok := g2s[0]; ok && len(shards) > 0 {
		return 0
	}

	gids := make([]int, 0)
	for key := range g2s {
		gids = append(gids, key)
	}

	sort.Ints(gids)

	max, index := -1, -1
	for _, gid := range gids {
		if len(g2s[gid]) > max {
			max = len(g2s[gid])
			index = gid
		}
	}
	return index
}
func groupToShards(config Config) map[int][]int {
	g2s := make(map[int][]int)
	for gid := range config.Groups {
		g2s[gid] = make([]int, 0)
	}
	for shardId, gid := range config.Shards {
		g2s[gid] = append(g2s[gid], shardId)
	}
	return g2s
}

func deepCopy(groups map[int][]string) map[int][]string {
	newGroups := make(map[int][]string)
	for gid, servers := range groups {
		newServers := make([]string, len(servers))
		copy(newServers, servers)
		newGroups[gid] = newServers
	}
	return newGroups
}

Lab 4B

實驗提示

  • 服務器不需要調用分片控制器的Join()tester 才會去調用;

  • 服務器將需要定期輪詢 shardctrler 以監聽新的配置。預期大約每100毫秒輪詢一次;可以更頻繁,但過少可能會導致 bug。

  • 服務器需要互相發送rpc,以便在配置更改期間傳輸分片。shardctrler的Config結構包含服務器名,一個 Server 需要一個labrpc.ClientEnd,以便發送RPC。使用make_end()函數傳給StartServer()函數將服務器名轉換為ClientEnd。shardkv /client.go需要實現這些邏輯。

  • 在server.go中添加代碼去周期性從 shardctrler 拉取最新的配置,並且當請求分片不屬於自身時,拒絕請求

  • 當被請求到錯誤分片時,需要返回ErrWrongGroup給客戶端,並確保Get, Put, Append在面臨並發重配置時能正確作出決定

  • 重配置需要按流程執行唯一一次

  • labgob 的提示錯誤不能忽視,它可能導致實驗不過

  • 分片重分配的請求也需要做重復請求檢測

  • 若客戶端收到ErrWrongGroup,是否更改請求序列號?若服務器執行請求時返回ErrWrongGroup,是否更新客戶端信息?

  • 當服務器轉移到新配置后,它可以繼續存儲它不再負責的分片(生產環境中這是不允許的),但這個可以簡化實現

  • 當 G1 在配置變更時需要來自 G2 的分片數據,G2 處理日志條目的哪個時間點將分片發送給 G1 是最好的?

  • 你可以在整個 rpc 請求或回復中發送整個 map,這可以簡化分片傳輸

  • map 是引用類型,所以在發送 map 的時候,建議先拷貝一次,避免 data race(在 labrpc 框架下,接收 map 時也需要拷貝)

  • 在配置更改期間,一對組可能需要互相傳送分片,這可能會發生死鎖

challenge

如果想達到生產環境系統級別,如下兩個挑戰是需要實現的

challenge1:Garbage collection of state
當一個副本組失去一個分片的所有權時,副本組需要刪除該分片數據。但這給遷移帶來一些問題,考慮兩個組G1 和 G2,並且新配置C 將分片從 G1 移動到 G2,若 G1 在轉換配置到C時刪除了數據庫中的分片,當G2 轉換到C時,如何獲取 G1 的數據

實驗要求
使每個副本組保留舊分片的時長不再是無限時長,即使副本組(如上面的G1)中的所有服務器崩潰並恢復正常,解決方案也必須工作。如果您通過TestChallenge1Delete,您就完成了這個挑戰。

解決方案
分片遷移成功之后,立馬進行分片 GC 了,GC 完畢后再進入到配置更新階段。

chanllenge2:Client requests during configuration changes
配置更改期間最簡單的方式是禁止所有客戶端操作直到轉換完成,雖然簡單但是不滿足於生產環境要求,這將導致客戶端長時間停滯,最好可以繼續為不受當前配置更改的分片提供服務

上述優化還能更好,若 G3 在過渡到配置C時,需要來自G1 的分片S1 和 G2 的分片S2。希望 G3 能在收到其中一個分片后可以立即開始接收針對該分片的請求。如G1宕機了,G3在收到G2的分片數據后,可以立即為 S2 分片提供服務,而不需要等待 C 配置轉換完全完成

實驗要求
修改您的解決方案,以便在配置更改期間繼續執行不受影響的分片中的 key 的客戶端操作。當您通過 TestChallenge2Unaffected 測試時,您已經完成了這個挑戰。

修改您的解決方案,在配置轉換進行中,副本組也可以立即開始提供分片服務。當您通過TestChallenge2Partial測試時,您已經完成了這個挑戰。

解決方案
分片遷移以 group 為單位,這樣即使一個 group掛了,也不會影響到另一個 group中的分片遷移。

本實驗設計方案主要參考:https://github.com/OneSizeFitsQuorum/MIT6.824-2021/blob/master/docs/lab4.md(這個大佬真的頂,設計方案優秀,代碼邏輯清晰,膜拜。)

上面的實驗ShardCtrler 集群組實現了配置更新,分片均勻分配等任務,ShardKVServer則需要承載所有分片的讀寫任務,相比於MIT 6.824 Lab3 RaftKV的提供基礎的讀寫服務,還需要功能為配置更新,分片數據遷移,分片數據清理,空日志檢測

客戶端Clerk

主要實現為請求邏輯:

  1. 使用key2shard()去找到一個 key 對應哪個分片Shard
  2. 根據Shard從當前配置config中獲取的 gid
  3. 根據gid從當前配置config中獲取 group 信息;
  4. group循環查找leaderId,直到返回請求成功、ErrWrongGroup或整個 group 都遍歷請求過;
  5. Query 最新的配置,回到步驟1循環重復;
func MakeClerk(ctrlers []*labrpc.ClientEnd, makeEnd func(string) *labrpc.ClientEnd) *Clerk {
	ck := &Clerk{
		sm:        shardctrler.MakeClerk(ctrlers),
		makeEnd:   makeEnd,
		leaderIds: make(map[int]int),
		clientId:  nrand(),
		commandId: 0,
	}
	ck.config = ck.sm.Query(-1)
	return ck
}

func (ck *Clerk) Get(key string) string {
	return ck.Command(&CommandRequest{Key: key, Op: OpGet})
}

func (ck *Clerk) Put(key string, value string) {
	ck.Command(&CommandRequest{Key: key, Value: value, Op: OpPut})
}
func (ck *Clerk) Append(key string, value string) {
	ck.Command(&CommandRequest{Key: key, Value: value, Op: OpAppend})
}

func (ck *Clerk) Command(request *CommandRequest) string {
	request.ClientId, request.CommandId = ck.clientId, ck.commandId
	for {
		shard := key2shard(request.Key)
		gid := ck.config.Shards[shard]
		if servers, ok := ck.config.Groups[gid]; ok {
			if _, ok = ck.leaderIds[gid]; !ok {
				ck.leaderIds[gid] = 0
			}
			oldLeaderId := ck.leaderIds[gid]
			newLeaderId := oldLeaderId
			for {
				var response CommandResponse
				ok := ck.makeEnd(servers[newLeaderId]).Call("ShardKV.Command", request, &response)
				if ok && (response.Err == OK || response.Err == ErrNoKey) {
					ck.commandId++
					return response.Value
				} else if ok && response.Err == ErrWrongGroup {
					break
				} else {
					newLeaderId = (newLeaderId + 1) % len(servers)
					if newLeaderId == oldLeaderId {
						break
					}
					continue
				}
			}
		}
		time.Sleep(100 * time.Millisecond)
		// 獲取最新配置
		ck.config = ck.sm.Query(-1)
	}
}

服務端Server

首先明確整體系統的運行方式:

  • 客戶端首先和ShardCtrler交互,獲取最新的配置,根據最新配置找到對應keyshard,請求該shardgroup
  • 服務端ShardKVServer會創建多個 raft 組來承載所有分片的讀寫任務。
  • 服務端ShardKVServer需要定期和ShardCtrler交互,保證更新到最新配置(monitor)。
  • 服務端ShardKVServer需要根據最新配置完成配置更新,分片數據遷移,分片數據清理,空日志檢測等功能。

結構

首先ShardKVServer 給出結構體,相比於MIT 6.824 Lab3 RaftKV的多了currentConfiglastConfig數據,這樣其他協程便能夠通過其計算需要需要向誰拉取分片或者需要讓誰去刪分片。

啟動了五個協程:apply 協程,配置更新協程,數據遷移協程,數據清理協程,空日志檢測協程來實現功能。四個協程都需要 leader 來執行,因此抽象出了一個簡單地周期執行函數 Monitor

type ShardKV struct {
	mu      sync.RWMutex
	dead    int32
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg

	makeEnd func(string) *labrpc.ClientEnd
	gid     int
	sc      *shardctrler.Clerk

	maxRaftState int // snapshot if log grows this big
	lastApplied  int // record the lastApplied to prevent stateMachine from rollback

	lastConfig    shardctrler.Config
	currentConfig shardctrler.Config

	stateMachines  map[int]*Shard                // 服務器數據存儲(key,value)
	lastOperations map[int64]OperationContext    // 客戶端id最后的命令id和回復內容 (clientId,{最后的commdId,最后的LastReply})
	notifyChans    map[int]chan *CommandResponse // Leader回復給客戶端的響應(日志Index, CommandReply)
}
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxRaftState int, gid int, ctrlers []*labrpc.ClientEnd, makeEnd func(string) *labrpc.ClientEnd) *ShardKV {
	// call labgob.Register on structures you want
	// Go's RPC library to marshall/unmarshall.
	labgob.Register(Command{})
	labgob.Register(CommandRequest{})
	labgob.Register(shardctrler.Config{})
	labgob.Register(ShardOperationResponse{})
	labgob.Register(ShardOperationRequest{})

	applyCh := make(chan raft.ApplyMsg)

	kv := &ShardKV{
		dead:           0,
		rf:             raft.Make(servers, me, persister, applyCh),
		applyCh:        applyCh,
		makeEnd:        makeEnd,
		gid:            gid,
		sc:             shardctrler.MakeClerk(ctrlers),
		lastApplied:    0,
		maxRaftState:   maxRaftState,
		currentConfig:  shardctrler.DefaultConfig(),
		lastConfig:     shardctrler.DefaultConfig(),
		stateMachines:  make(map[int]*Shard),
		lastOperations: make(map[int64]OperationContext),
		notifyChans:    make(map[int]chan *CommandResponse),
	}
	kv.restoreSnapshot(persister.ReadSnapshot())
	// 將 committed logs 應用到 stateMachine
	go kv.applier()
	// 開啟協程獲取最新配置
	go kv.Monitor(kv.configureAction, ConfigureMonitorTimeout)
	// 開啟協程拉取相關分片
	go kv.Monitor(kv.migrationAction, MigrationMonitorTimeout)
	// 開啟協程 刪除遠程組中無用的分片
	go kv.Monitor(kv.gcAction, GCMonitorTimeout)
	// 開啟協程在當前term中附加空條目來提高 commitIndex 以避免活鎖
	go kv.Monitor(kv.checkEntryInCurrentTermAction, EmptyEntryDetectorTimeout)

	DPrintf("{Node %v}{Group %v} has started", kv.rf.Me(), kv.gid)
	return kv
}

// applier協程:將提交的日志應用到 stateMachine,並實現快照
func (kv *ShardKV) applier() {
	for kv.killed() == false {
		select {
		case message := <-kv.applyCh:
			if message.CommandValid {
				kv.mu.Lock()
				if message.CommandIndex <= kv.lastApplied {
					kv.mu.Unlock()
					continue
				}
				kv.lastApplied = message.CommandIndex

				var response *CommandResponse
				command := message.Command.(Command)
				switch command.Op {
				case Operation:
					operation := command.Data.(CommandRequest)
					response = kv.applyOperation(&message, &operation)
				case Configuration:
					nextConfig := command.Data.(shardctrler.Config)
					response = kv.applyConfiguration(&nextConfig)
				case InsertShards:
					shardsInfo := command.Data.(ShardOperationResponse)
					response = kv.applyInsertShards(&shardsInfo)
				case DeleteShards:
					shardsInfo := command.Data.(ShardOperationRequest)
					response = kv.applyDeleteShards(&shardsInfo)
				case EmptyEntry:
					response = kv.applyEmptyEntry()
				}

				
				if currentTerm, isLeader := kv.rf.GetState(); isLeader && message.CommandTerm == currentTerm {
					ch := kv.getNotifyChan(message.CommandIndex)
					ch <- response
				}

				needSnapshot := kv.needSnapshot()
				if needSnapshot {
					kv.takeSnapshot(message.CommandIndex)
				}
				kv.mu.Unlock()
			} else if message.SnapshotValid {
				kv.mu.Lock()
				if kv.rf.CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot) {
					kv.restoreSnapshot(message.Snapshot)
					kv.lastApplied = message.SnapshotIndex
				}
				kv.mu.Unlock()
			} else {
				panic(fmt.Sprintf("unexpected Message %v", message))
			}
		}
	}
}

相比於MIT 6.824 Lab3 RaftKV需要維護分片Shard的狀態變量來完成該實驗,原因如下:

  1. 這樣可以防止分片Shard中間狀態被覆蓋,從而導致任務被丟棄。只有所有分片Shard的狀態都為默認狀態才能拉取最新配置。
  2. 實驗challenge2限制, challenge2 不僅要求 apply 協程不被阻塞,還要求配置的更新和分片的狀態變化彼此獨立。即需要不同 raft 組所屬的分片數據獨立起來,分別提交多條 raft 日志來維護狀態,因此需要維護狀態變量。
  3. 必須使用單獨的協程異步根據分片的狀態來進行分片的遷移和清理等任務。如果不采用上述方案,apply 協程更新配置的時候由 leader 異步啟動對應的協程,讓其獨立的根據 raft 組為粒度拉取數據?讓設想這樣一個場景:leader apply 了新配置后便掛了,然后此時 follower 也 apply 了該配置但並不會啟動該任務,在該 raft 組的新 leader 選出來后,該任務已經無法被執行了。所有apply配置的時候只應該更新shard 的狀態。

每個分片共有 4 種狀態:

  • Serving:分片的默認狀態,如果當前 raft 組在當前 config 下負責管理此分片,則該分片可以提供讀寫服務,否則該分片暫不可以提供讀寫服務,但不會阻塞配置更新協程拉取新配置。
  • Pulling:表示當前 raft 組在當前 config 下負責管理此分片,暫不可以提供讀寫服務,需要當前 raft 組從上一個配置該分片所屬 raft 組拉數據過來之后才可以提供讀寫服務,系統會有一個分片遷移協程檢測所有分片的 Pulling 狀態,接着以 raft 組為單位去對應 raft 組拉取數據,接着嘗試重放該分片的所有數據到本地並將分片狀態置為 Serving,以繼續提供服務。
  • BePulling:表示當前 raft 組在當前 config 下不負責管理此分片,不可以提供讀寫服務,但當前 raft 組在上一個 config 時復制管理此分片,因此當前 config 下負責管理此分片的 raft 組拉取完數據后會向本 raft 組發送分片清理的 rpc,接着本 raft 組將數據清空並重置為 serving 狀態即可。
  • GCing:表示當前 raft 組在當前 config 下負責管理此分片,可以提供讀寫服務,但需要清理掉上一個配置該分片所屬 raft 組的數據。系統會有一個分片清理協程檢測所有分片的 GCing 狀態,接着以 raft 組為單位去對應 raft 組刪除數據,一旦遠程 raft 組刪除數據成功,則本地會嘗試將相關分片的狀態置為 Serving
type ShardStatus uint8

const (
	Serving ShardStatus = iota
	Pulling
	BePulling
	GCing
)

type Shard struct {
	KV     map[string]string
	Status ShardStatus
}

func NewShard() *Shard {
	return &Shard{make(map[string]string), Serving}
}

func (shard *Shard) Get(key string) (string, Err) {
	if value, ok := shard.KV[key]; ok {
		return value, OK
	}
	return "", ErrNoKey
}

func (shard *Shard) Put(key, value string) Err {
	shard.KV[key] = value
	return OK
}

func (shard *Shard) Append(key, value string) Err {
	shard.KV[key] += value
	return OK
}

func (shard *Shard) deepCopy() map[string]string {
	newShard := make(map[string]string)
	for k, v := range shard.KV {
		newShard[k] = v
	}
	return newShard
}

日志類型

lab3 中,客戶端的請求會被包裝成一個 Op 傳給 Raft 層,則在 lab4 中,不難想到,Servers 之間的交互,也可以看做是包裝成 Op 傳給 Raft 層;定義了五種類型的日志:

  • Operation:客戶端傳來的讀寫操作日志,有 PutGetAppend 等請求。

  • Configuration:配置更新日志,包含一個配置。

  • InsertShards:分片更新日志,包含至少一個分片的數據和配置版本。

  • DeleteShards:分片刪除日志,包含至少一個分片的 id 和配置版本。

  • EmptyEntry:空日志,Data 為空,使得狀態機達到最新。

type Command struct {
	Op   CommandType
	Data interface{}
}

func (command Command) String() string {
	return fmt.Sprintf("{Type:%v,Data:%v}", command.Op, command.Data)
}

func NewOperationCommand(request *CommandRequest) Command {
	return Command{Operation, *request}
}

func NewConfigurationCommand(config *shardctrler.Config) Command {
	return Command{Configuration, *config}
}

func NewInsertShardsCommand(response *ShardOperationResponse) Command {
	return Command{InsertShards, *response}
}

func NewDeleteShardsCommand(request *ShardOperationRequest) Command {
	return Command{DeleteShards, *request}
}

func NewEmptyEntryCommand() Command {
	return Command{EmptyEntry, nil}
}

type CommandType uint8

const (
	Operation CommandType = iota
	Configuration
	InsertShards
	DeleteShards
	EmptyEntry
)

讀寫服務

讀寫操作的基本邏輯相比於MIT 6.824 Lab3 RaftKV基本一致,需要增加分片狀態判斷。根據上述定義,分片的狀態為 ServingGCing,當前 raft 組在當前 config 下負責管理此分片,本 raft 組才可以為該分片提供讀寫服務,否則返回 ErrWrongGroup 讓客戶端重新拉取最新的 config 並重試即可。

canServe 的判斷需要在向 raft 提交前和 apply 時都檢測一遍以保證正確性並盡可能提升性能。

// 檢查該raft group 目前是否可以服務該shard
func (kv *ShardKV) canServe(shardID int) bool {
	return kv.currentConfig.Shards[shardID] == kv.gid && (kv.stateMachines[shardID].Status == Serving || kv.stateMachines[shardID].Status == GCing)
}
func (kv *ShardKV) Command(request *CommandRequest, response *CommandResponse) {
	kv.mu.RLock()
	if request.Op != OpGet && kv.isDuplicateRequest(request.ClientId, request.CommandId) {
		lastResponse := kv.lastOperations[request.ClientId].LastResponse
		response.Value, response.Err = lastResponse.Value, lastResponse.Err
		kv.mu.RUnlock()
		return
	}
	// 如果當前分片無法提供服務,則返回 ErrWrongGroup 讓客戶端獲取最新配置,
	if !kv.canServe(key2shard(request.Key)) {
		response.Err = ErrWrongGroup
		kv.mu.RUnlock()
		return
	}
	kv.mu.RUnlock()
	kv.Execute(NewOperationCommand(request), response)
}

func (kv *ShardKV) Execute(command Command, response *CommandResponse) {
	// 不持有鎖以提高吞吐量
	// 當 KVServer 持有鎖進行快照時,底層 raft 仍然可以提交 raft 日志
	index, _, isLeader := kv.rf.Start(command)
	if !isLeader {
		response.Err = ErrWrongLeader
		return
	}
	//defer DPrintf("{Node %v}{Group %v} processes Command %v with CommandResponse %v", kv.rf.Me(), kv.gid, command, response)
	kv.mu.Lock()
	ch := kv.getNotifyChan(index)
	kv.mu.Unlock()
	select {
	case result := <-ch:
		response.Value, response.Err = result.Value, result.Err
	case <-time.After(ExecuteTimeout):
		response.Err = ErrTimeout
	}

	// 釋放 notifyChan 以減少內存占用
	// 異步為了提高吞吐量,這里不需要阻塞客戶端請求
	go func() {
		kv.mu.Lock()
		kv.removeOutdatedNotifyChan(index)
		kv.mu.Unlock()
	}()
}
func (kv *ShardKV) applyOperation(message *raft.ApplyMsg, operation *CommandRequest) *CommandResponse {
	var response *CommandResponse
	shardID := key2shard(operation.Key)
	if kv.canServe(shardID) {
		if operation.Op != OpGet && kv.isDuplicateRequest(operation.ClientId, operation.CommandId) {
			DPrintf("{Node %v}{Group %v} doesn't apply duplicated message %v to stateMachine because maxAppliedCommandId is %v for client %v", kv.rf.Me(), kv.gid, message, kv.lastOperations[operation.ClientId], operation.ClientId)
			return kv.lastOperations[operation.ClientId].LastResponse
		} else {
			response = kv.applyLogToStateMachines(operation, shardID)
			if operation.Op != OpGet {
				kv.lastOperations[operation.ClientId] = OperationContext{operation.CommandId, response}
			}
			return response
		}
	}
	return &CommandResponse{ErrWrongGroup, ""}
}

接下來就只能瞻仰大佬的思路,目前的想法是先照着大佬實現pull方案實現,且網上大部分方案是pull,但我感覺push方案稍微優異點,因為push模式可以在發送方在收到接收方應用成功reply的時候就,發送方可以直接進行GC,等我按照大佬思路實現完pull方案,汲取到經驗之后,就實現push方案。

配置更新

配置更新協程負責定時檢測所有分片的狀態,一旦存在至少一個分片的狀態不為默認狀態,則預示其他協程仍然還沒有完成任務,那么此時需要阻塞新配置的拉取和提交。

apply 配置更新日志時需要保證冪等性:

  • 不同版本的配置更新日志:apply 時僅可逐步遞增的去更新配置,否則返回失敗。
  • 相同版本的配置更新日志:由於配置更新日志僅由配置更新協程提交,而配置更新協程只有檢測到比本地更大地配置時才會提交配置更新日志,所以該情形不會出現。
func (kv *ShardKV) configureAction() {
	canPerformNextConfig := true
	kv.mu.RLock()
	for _, shard := range kv.stateMachines {
		if shard.Status != Serving {
			canPerformNextConfig = false
			DPrintf("{Node %v}{Group %v} will not try to fetch latest configuration because shards status are %v when currentConfig is %v", kv.rf.Me(), kv.gid, kv.getShardStatus(), kv.currentConfig)
			break
		}
	}
	currentConfigNum := kv.currentConfig.Num
	kv.mu.RUnlock()
	if canPerformNextConfig {
		nextConfig := kv.sc.Query(currentConfigNum + 1)
		if nextConfig.Num == currentConfigNum+1 {
			DPrintf("{Node %v}{Group %v} fetches latest configuration %v when currentConfigNum is %v", kv.rf.Me(), kv.gid, nextConfig, currentConfigNum)
			kv.Execute(NewConfigurationCommand(&nextConfig), &CommandResponse{})
		}
	}
}

func (kv *ShardKV) applyConfiguration(nextConfig *shardctrler.Config) *CommandResponse {
	if nextConfig.Num == kv.currentConfig.Num+1 {
		DPrintf("{Node %v}{Group %v} updates currentConfig from %v to %v", kv.rf.Me(), kv.gid, kv.currentConfig, nextConfig)
		kv.updateShardStatus(nextConfig)
		kv.lastConfig = kv.currentConfig
		kv.currentConfig = *nextConfig
		return &CommandResponse{OK, ""}
	}
	DPrintf("{Node %v}{Group %v} rejects outdated config %v when currentConfig is %v", kv.rf.Me(), kv.gid, nextConfig, kv.currentConfig)
	return &CommandResponse{ErrOutDated, ""}
}

分片遷移

分片遷移協程負責定時檢測分片的 Pulling 狀態,利用 lastConfig 計算出對應 raft 組的 gid 和要拉取的分片,然后並行地去拉取數據。

注意這里使用了 waitGroup 來保證所有獨立地任務完成后才會進行下一次任務。此外 wg.Wait() 一定要在釋放讀鎖之后,否則無法滿足 challenge2 的要求。

在拉取分片的 handler 中,首先僅可由 leader 處理該請求,其次如果發現請求中的配置版本大於本地的版本,那說明請求拉取的是未來的數據,則返回 ErrNotReady 讓其稍后重試,否則將分片數據和去重表都深度拷貝到 response 即可。

apply 分片更新日志時需要保證冪等性:

  • 不同版本的配置更新日志:僅可執行與當前配置版本相同地分片更新日志,否則返回 ErrOutDated。
  • 相同版本的配置更新日志:僅在對應分片狀態為 Pulling 時為第一次應用,此時覆蓋狀態機即可並修改狀態為 GCing,以讓分片清理協程檢測到 GCing 狀態並嘗試刪除遠端的分片。否則說明已經應用過,直接 break 即可。
func (kv *ShardKV) migrationAction() {
	kv.mu.RLock()
	gid2shardIDs := kv.getShardIDsByStatus(Pulling)
	var wg sync.WaitGroup
	for gid, shardIDs := range gid2shardIDs {
		DPrintf("{Node %v}{Group %v} starts a PullTask to get shards %v from group %v when config is %v", kv.rf.Me(), kv.gid, shardIDs, gid, kv.currentConfig)
		wg.Add(1)
		go func(servers []string, configNum int, shardIDs []int) {
			defer wg.Done()
			pullTaskRequest := ShardOperationRequest{configNum, shardIDs}
			for _, server := range servers {
				var pullTaskResponse ShardOperationResponse
				srv := kv.makeEnd(server)
				if srv.Call("ShardKV.GetShardsData", &pullTaskRequest, &pullTaskResponse) && pullTaskResponse.Err == OK {
					DPrintf("{Node %v}{Group %v} gets a PullTaskResponse %v and tries to commit it when currentConfigNum is %v", kv.rf.Me(), kv.gid, pullTaskResponse, configNum)
					kv.Execute(NewInsertShardsCommand(&pullTaskResponse), &CommandResponse{})
				}
			}
		}(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIDs)
	}
	kv.mu.RUnlock()
	wg.Wait()
}

func (kv *ShardKV) GetShardsData(request *ShardOperationRequest, response *ShardOperationResponse) {
	// only pull shards from leader
	if _, isLeader := kv.rf.GetState(); !isLeader {
		response.Err = ErrWrongLeader
		return
	}
	kv.mu.RLock()
	defer kv.mu.RUnlock()
	defer DPrintf("{Node %v}{Group %v} processes PullTaskRequest %v with response %v", kv.rf.Me(), kv.gid, request, response)

	if kv.currentConfig.Num < request.ConfigNum {
		response.Err = ErrNotReady
		return
	}

	response.Shards = make(map[int]map[string]string)
	for _, shardID := range request.ShardIDs {
		response.Shards[shardID] = kv.stateMachines[shardID].deepCopy()
	}

	response.LastOperations = make(map[int64]OperationContext)
	for clientID, operation := range kv.lastOperations {
		response.LastOperations[clientID] = operation.deepCopy()
	}

	response.ConfigNum, response.Err = request.ConfigNum, OK
}

func (kv *ShardKV) applyInsertShards(shardsInfo *ShardOperationResponse) *CommandResponse {
	if shardsInfo.ConfigNum == kv.currentConfig.Num {
		DPrintf("{Node %v}{Group %v} accepts shards insertion %v when currentConfig is %v", kv.rf.Me(), kv.gid, shardsInfo, kv.currentConfig)
		for shardId, shardData := range shardsInfo.Shards {
			shard := kv.stateMachines[shardId]
			if shard.Status == Pulling {
				for key, value := range shardData {
					shard.KV[key] = value
				}
				shard.Status = GCing
			} else {
				DPrintf("{Node %v}{Group %v} encounters duplicated shards insertion %v when currentConfig is %v", kv.rf.Me(), kv.gid, shardsInfo, kv.currentConfig)
				break
			}
		}
		for clientId, operationContext := range shardsInfo.LastOperations {
			if lastOperation, ok := kv.lastOperations[clientId]; !ok || lastOperation.MaxAppliedCommandId < operationContext.MaxAppliedCommandId {
				kv.lastOperations[clientId] = operationContext
			}
		}
		return &CommandResponse{OK, ""}
	}
	DPrintf("{Node %v}{Group %v} rejects outdated shards insertion %v when currentConfig is %v", kv.rf.Me(), kv.gid, shardsInfo, kv.currentConfig)
	return &CommandResponse{ErrOutDated, ""}
}

分片清理

分片清理協程負責定時檢測分片的 GCing 狀態,利用 lastConfig 計算出對應 raft 組的 gid 和要拉取的分片,然后並行地去刪除分片。

注意這里使用了 waitGroup 來保證所有獨立地任務完成后才會進行下一次任務。此外 wg.Wait() 一定要在釋放讀鎖之后,否則無法滿足 challenge2 的要求。

在刪除分片的 handler 中,首先僅可由 leader 處理該請求,其次如果發現請求中的配置版本小於本地的版本,那說明該請求已經執行過,否則本地的 config 也無法增大,此時直接返回 OK 即可,否則在本地提交一個刪除分片的日志。

apply 分片刪除日志時需要保證冪等性:

  • 不同版本的配置更新日志:僅可執行與當前配置版本相同地分片刪除日志,否則已經刪除過,直接返回 OK 即可。
  • 相同版本的配置更新日志:如果分片狀態為 GCing,說明是本 raft 組已成功刪除遠端 raft 組的數據,現需要更新分片狀態為默認狀態以支持配置的進一步更新;否則如果分片狀態為 BePulling,則說明本 raft 組第一次刪除該分片的數據,此時直接重置分片即可。否則說明該請求已經應用過,直接 break 返回 OK 即可。
func (kv *ShardKV) gcAction() {
	kv.mu.RLock()
	gid2shardIDs := kv.getShardIDsByStatus(GCing)
	var wg sync.WaitGroup
	for gid, shardIDs := range gid2shardIDs {
		DPrintf("{Node %v}{Group %v} starts a GCTask to delete shards %v in group %v when config is %v", kv.rf.Me(), kv.gid, shardIDs, gid, kv.currentConfig)
		wg.Add(1)
		go func(servers []string, configNum int, shardIDs []int) {
			defer wg.Done()
			gcTaskRequest := ShardOperationRequest{configNum, shardIDs}
			for _, server := range servers {
				var gcTaskResponse ShardOperationResponse
				srv := kv.makeEnd(server)
				if srv.Call("ShardKV.DeleteShardsData", &gcTaskRequest, &gcTaskResponse) && gcTaskResponse.Err == OK {
					DPrintf("{Node %v}{Group %v} deletes shards %v in remote group successfully when currentConfigNum is %v", kv.rf.Me(), kv.gid, shardIDs, configNum)
					kv.Execute(NewDeleteShardsCommand(&gcTaskRequest), &CommandResponse{})
				}
			}
		}(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIDs)
	}
	kv.mu.RUnlock()
	wg.Wait()
}

func (kv *ShardKV) DeleteShardsData(request *ShardOperationRequest, response *ShardOperationResponse) {
	// only delete shards when role is leader
	if _, isLeader := kv.rf.GetState(); !isLeader {
		response.Err = ErrWrongLeader
		return
	}

	defer DPrintf("{Node %v}{Group %v} processes GCTaskRequest %v with response %v", kv.rf.Me(), kv.gid, request, response)

	kv.mu.RLock()
	if kv.currentConfig.Num > request.ConfigNum {
		DPrintf("{Node %v}{Group %v}'s encounters duplicated shards deletion %v when currentConfig is %v", kv.rf.Me(), kv.gid, request, kv.currentConfig)
		response.Err = OK
		kv.mu.RUnlock()
		return
	}
	kv.mu.RUnlock()

	var commandResponse CommandResponse
	kv.Execute(NewDeleteShardsCommand(request), &commandResponse)

	response.Err = commandResponse.Err
}

func (kv *ShardKV) applyDeleteShards(shardsInfo *ShardOperationRequest) *CommandResponse {
	if shardsInfo.ConfigNum == kv.currentConfig.Num {
		DPrintf("{Node %v}{Group %v}'s shards status are %v before accepting shards deletion %v when currentConfig is %v", kv.rf.Me(), kv.gid, kv.getShardStatus(), shardsInfo, kv.currentConfig)
		for _, shardId := range shardsInfo.ShardIDs {
			shard := kv.stateMachines[shardId]
			if shard.Status == GCing {
				shard.Status = Serving
			} else if shard.Status == BePulling {
				kv.stateMachines[shardId] = NewShard()
			} else {
				DPrintf("{Node %v}{Group %v} encounters duplicated shards deletion %v when currentConfig is %v", kv.rf.Me(), kv.gid, shardsInfo, kv.currentConfig)
				break
			}
		}
		DPrintf("{Node %v}{Group %v}'s shards status are %v after accepting shards deletion %v when currentConfig is %v", kv.rf.Me(), kv.gid, kv.getShardStatus(), shardsInfo, kv.currentConfig)
		return &CommandResponse{OK, ""}
	}
	DPrintf("{Node %v}{Group %v}'s encounters duplicated shards deletion %v when currentConfig is %v", kv.rf.Me(), kv.gid, shardsInfo, kv.currentConfig)
	return &CommandResponse{OK, ""}
}

空日志檢測

分片清理協程負責定時檢測 raft 層的 leader 是否擁有當前 term 的日志,如果沒有則提交一條空日志,這使得新 leader 的狀態機能夠迅速達到最新狀態,從而避免多 raft 組間的活鎖狀態。

func (kv *ShardKV) checkEntryInCurrentTermAction() {
	if !kv.rf.HasLogInCurrentTerm() {
		kv.Execute(NewEmptyEntryCommand(), &CommandResponse{})
	}
}

func (kv *ShardKV) applyEmptyEntry() *CommandResponse {
	return &CommandResponse{OK, ""}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM