MIT 6.824 Lab3 RaftKV


Raft 博士論文的翻譯

實驗內容

在lab2的Raft函數庫之上,搭建一個能夠容錯的key/value存儲服務,需要提供強一致性保證。

強一致性的解釋如下:對於單個請求,整個服務需要表現得像個單機服務,並且對狀態機的修改基於之前所有的請求。對於並發的請求,返回的值和最終的狀態必須相同,就好像所有請求都是串行的一樣。即使有些請求發生在了同一時間,那么也應當一個一個響應。此外,在一個請求被執行之前,這之前的請求都必須已經被完成(在技術上我們也叫着線性化(linearizability))。

kv服務支持三種操作:Put, Append, Get。通過在內存維護一個簡單的鍵/值對數據庫,鍵和值都是字符串;

整體架構

image-20211123154210343

image-20211204184921567

Part A - 不需要日志壓縮的key/value服務

Clerk客戶端實現

客戶端實現比較簡單

需要解決的問題:客戶端發送請求,服務端同步成功並且提交,接着apply后返回給客戶端執行結果,但返回客戶端時候rpc丟失,客戶端只能進行重試直到明確地寫入成功或失敗為止,但該操作可能已經在服務端應用過了,違背了線性一致性。

clientId和commandId來唯一的標識一個客戶端,從而保證線性一致性。RAFT原文介紹需要保證日志僅被執行一次,即它可以被 commit 多次,但一定只能 apply 一次。

可以看一下:

鏈接:https://www.zhihu.com/question/278551592/answer/400962941

在Raft論文的6.3節,這個問題有詳細討論。

用普通后台術語就是冪等。Raft作者把這歸為實現linearizable semantics所需要處理的一部分。Raft論文里,也給出了具體的通用解決辦法。基本思路是:

  • 每個要做proposal的client需要一個唯一的identifier,它的每個不同proposal需要有一個順序遞增的序列號,client id和這個序列號由此可以唯一確定一個不同的proposal,從而使得各個raft節點可以記錄保存各proposal應用以后的結果。
  • 當一個proposal超時,client不提高proposal的序列號,使用原proposal序列號重試。
  • 當一個proposal被成功提交並應用且被成功回復給client以后,client順序提高proposal的序列號,並記錄下收到的成功回復的proposal的序列號。raft節點收到一個proposal請求以后,得到請求中夾帶的這個最大成功回復的proposal的序列號,它和它之前所有的應用結果都可以刪去。proposal序列號和client id可用於判斷這個proposal是否應用過,如果已經應用過,則不再再次應用,直接返回已保存的結果。等於是每個不同的proposal可以被commit多次,在log中出現多次,但永遠只會被apply一次。
  • 系統維護一定數量允許的client數量,比如可以用LRU策略淘汰。請求過來了,而client已經被LRU淘汰掉了,則讓client直接fail掉。
  • 這些已經注冊的client信息,包括和這些client配套的上述proposal結果、各序列號等等,需要在raft組內一致的維護。也就是說,上述各raft端數據結構和它們的操作實際是state machine的一部分。在做snapshotting的時候,它們同樣需要被保存與恢復。

可能感覺讓這樣重試的request被commit多次有奇怪。其實不奇怪,實際操作中,它們對狀態機而言是個NO-OP。原文中的原話也清楚列明這些log entry會在raft log中重復出現,由狀態機來負責過濾掉,狀態機能看到接觸到的自然是commit以后的log entry。

The Raft log provides a serial order in which commands are applied on every server. Commands take effect instantaneously and exactly once according to their first appearance in the Raft log, since any subsequent appearances are filtered out by the state machines as described above.

不是所有的應用都需要這樣的功能。最直接的例子就是membership change本身。membership change的時候,比如有一個node的id是XYZ,因為超時你試圖去再次提交一個membership remove操作,再次去刪除這個id為XYZ的節點,它並不帶來實際損害(很多raft庫不允許一個已經被刪除的節點再次以相同node id加入回來)。

還需要注意的是:請求服務端超時或請求的服務端不為主節點時,能嘗試連接其他服務端。

參考清華大佬代碼結構實現,重構了PutAndAppend函數

type Clerk struct {
	servers []*labrpc.ClientEnd
	// You will have to modify this struct.
	leaderId int64
	clientId int64
	commandId int64

}

func nrand() int64 {
	max := big.NewInt(int64(1) << 62)
	bigx, _ := rand.Int(rand.Reader, max)
	x := bigx.Int64()
	return x
}

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
	ck := new(Clerk)
	ck.servers = servers
	// You'll have to add code here.
	ck.leaderId = 0
	ck.clientId = nrand()
	ck.commandId = 0
	return ck
}

//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
//
func (ck *Clerk) Get(key string) string {
	// You will have to modify this function.
	return ck.Command(&CommandArgs{Key : key, Op: OpGet})
}

func (ck *Clerk) Put(key string, value string) {
	ck.Command(&CommandArgs{Key : key, Value : value, Op: OpPut})
	//ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
	ck.Command(&CommandArgs{Key : key, Value : value, Op: OpAppend})
	//ck.PutAppend(key, value, "Append")
}

//
// shared by Get, Put and Append.
//
// you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.Command", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
//
//func (ck *Clerk) PutAppend(key string, value string, op string) {
//	// You will have to modify this function.
//}
func (ck *Clerk) Command(args *CommandArgs) string {
	args.CommandId, args.ClientId = ck.commandId, ck.clientId
	DPrintf("command is %v", args)
	for{
		var reply CommandReply
		if !ck.servers[ck.leaderId].Call("KVServer.Command", args, &reply) || reply.Err == ErrTimeout || reply.Err == ErrWrongLeader {
			ck.leaderId = (ck.leaderId + 1) % int64(len(ck.servers))
			continue
		}
		ck.commandId++
		return reply.Value
	}
}

const Debug = false

func DPrintf(format string, a ...interface{}) (n int, err error) {
	if Debug {
		log.Printf(format, a...)
	}
	return
}
type OperationContext struct {
	MaxAppliedCommandId int64
	LastReply       *CommandReply
}
type Err uint8
const (
	OK Err = iota
	ErrNoKey
	ErrWrongLeader
	ErrTimeout
)
func (err Err) String() string {
	switch err {
	case OK:
		return "Ok"
	case ErrNoKey:
		return "ErrNoKey"
	case ErrWrongLeader:
		return "ErrWrongLeader"
	case ErrTimeout:
		return "ErrTimeout"
	}
	// 手動觸發宕機
	panic(fmt.Sprintf("unexpected Err %d", err))
}

type OperationOp uint8
const (
	OpGet OperationOp = iota
	OpPut
	OpAppend
)
func (op OperationOp) String() string {
	switch op {
	case OpPut:
		return "OpPut"
	case OpAppend:
		return "OpAppend"
	case OpGet:
		return "OpGet"
	}
	panic(fmt.Sprintf("unexpected OperationOp %d", op))
}

type Command struct {
	*CommandArgs
}

type CommandArgs struct{
	Key string
	Value string
	Op OperationOp
	ClientId int64
	CommandId int64
}

func (request CommandArgs) String() string {
	return fmt.Sprintf("{Key:%v,Value:%v,Op:%v,ClientId:%v,CommandId:%v}", request.Key, request.Value, request.Op, request.ClientId, request.CommandId)
}

type CommandReply struct{
	Err Err
	Value string
}

func (response CommandReply) String() string {
	return fmt.Sprintf("{Err:%v,Value:%v}", response.Err, response.Value)
}

Server服務器實現

一圖勝千言

image-20211204184921567

server結構體與初始化代碼實現:

  1. 一個存儲kv的map,即狀態機,但這里實現一個基於內存版本KV即可的,但實際生產環境下必然不可能把數據全部存在內存當中,系統往往采用的是 LSM 的架構,例如 RocksDB 等,抽象成KVStateMachine 的接口。
  2. 一個能記錄某一個客戶端最后一次操作序號和應用結果的map
  3. 一個能記錄每個raft同步操作結果的map
type KVServer struct {
	mu      sync.RWMutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg
	dead    int32 // set by Kill()

	maxraftstate int // snapshot if log grows this big

	// Your definitions here.
	lastApplied    int                        //
	stateMachine   KVStateMachine             // 服務器數據存儲(key,value)
	lastOperations map[int64]OperationContext // 客戶端id最后的命令id和回復內容 (clientId,{最后的commdId,最后的LastReply})
	notifyChans    map[int]chan *CommandReply // Leader回復給客戶端的響應(日志Index, CommandReply)
}
//
// servers[] contains the ports of the set of
// servers that will cooperate via Raft to
// form the fault-tolerant key/value service.
// me is the index of the current server in servers[].
// the k/v server should store snapshots through the underlying Raft
// implementation, which should call persister.SaveStateAndSnapshot() to
// atomically save the Raft state along with the snapshot.
// the k/v server should snapshot when Raft's saved state exceeds maxraftstate bytes,
// in order to allow Raft to garbage-collect its log. if maxraftstate is -1,
// you don't need to snapshot.
// StartKVServer() must return quickly, so it should start goroutines
// for any long-running work.
//
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
	// call labgob.Register on structures you want
	// Go's RPC library to marshall/unmarshall.

	labgob.Register(Command{})

	kv := new(KVServer)
	kv.me = me
	kv.maxraftstate = maxraftstate

	// You may need initialization code here.
	kv.applyCh = make(chan raft.ApplyMsg)
	kv.rf = raft.Make(servers, me, persister, kv.applyCh)
	kv.dead = 0
	kv.lastApplied = 0
	kv.stateMachine = NewMemoryKV()
	kv.lastOperations = make(map[int64]OperationContext)
	kv.notifyChans = make(map[int]chan *CommandReply)

	// You may need initialization code here.
	go kv.applier()

	DPrintf("{Node %v} has started", kv.rf.Me())
	return kv
}

狀態機抽象類:

type KVStateMachine interface {
	Get(key string) (string, Err)
	Put(key, value string) Err
	Append(key, value string) Err
}

type MemoryKV struct {
	KV map[string]string
}

func NewMemoryKV() *MemoryKV {
	return &MemoryKV{make(map[string]string)}
}

func (memoryKV *MemoryKV) Get(key string) (string, Err) {
	if value, ok := memoryKV.KV[key]; ok {
		return value, OK
	}
	return "", ErrNoKey
}

func (memoryKV *MemoryKV) Put(key, value string) Err {
	memoryKV.KV[key] = value
	return OK
}

func (memoryKV *MemoryKV) Append(key, value string) Err {
	memoryKV.KV[key] += value
	return OK
}

kv.applier協程:單獨開一個go routine來遠程監視apply channel,一旦底層的Raft commit一個到apply channel,狀態機就立馬執行且通過commandIndex通知到該客戶端的NotifyChan,Command函數取消阻塞返回給客戶端。

kv.applier協程主要實現要點:

  1. raft同步完成后,也需要判斷請求是否為重復請求。因為同一請求可能由於重試會被同步多次。
  2. 當要通過channel返回操作結果時,需判斷當前節點為主才返回操作結果,否則返回WrongLeader。
  3. 為了保證強一致性,僅對當前 term 日志的 notifyChan 進行通知,讓之前 term 的客戶端協程都超時重試。避免leader 降級為 follower 后又迅速重新當選了 leader,而此時依然有客戶端協程未超時在阻塞等待,那么此時 apply 日志后,根據 index 獲得 channel 並向其中 push 執行結果就可能出錯,因為可能並不對應。
  4. 在目前的實現中,讀(Get)請求也會生成一條 raft 日志去同步,最簡單粗暴的方式保證線性一致性,即LogRead方法。但是,這樣子實現的讀性能會相當的差,實際生產級別的 raft 讀請求實現一般都采用了 Read Index 或者 Lease Read 的方式,具體原理可以參考此博客,具體實現可以參照 SOFAJRaft 的實現博客
func (kv *KVServer) applier() {
	for kv.killed() == false {
		select {
		case message := <-kv.applyCh:
			if message.CommandValid {
				kv.mu.Lock()
				if message.CommandIndex <= kv.lastApplied {
					kv.mu.Unlock()
					continue
				}
				kv.lastApplied = message.CommandIndex

				var reply *CommandReply
				command := message.Command.(Command)
				if command.Op != OpGet && kv.isDuplicateRequest(command.ClientId, command.CommandId) {
					reply = kv.lastOperations[command.ClientId].LastReply
				} else {
					reply = kv.applyLogToStateMachine(command)
					if command.Op != OpGet {
						kv.lastOperations[command.ClientId] = OperationContext{command.CommandId, reply}
					}
				}
				if currentTerm, isLeader := kv.rf.GetState(); isLeader && message.CommandTerm == currentTerm {
					ch := kv.getNotifyChan(message.CommandIndex)
					ch <- reply
				}
				kv.mu.Unlock()
			} else {
				panic(fmt.Sprintf("unexpected Message %v", message))
			}

		}
	}
}
func (kv *KVServer) getNotifyChan(index int) chan *CommandReply {
	if _, ok := kv.notifyChans[index]; !ok {
		kv.notifyChans[index] = make(chan *CommandReply, 1)
	}
	return kv.notifyChans[index]
}
//每個RPC都意味着客戶端已經看到了它之前的RPC的回復。
//因此,我們只需要判斷一個clientId的最新commandId是否滿足條件
func (kv *KVServer) isDuplicateRequest(clientId int64, commandId int64) bool {
	operationContext, ok := kv.lastOperations[clientId]
	return ok && commandId <= operationContext.MaxAppliedCommandId
}

leader 比 follower 多出一個 notifyChan 環節,是因為 leader 需要處理 rpc 請求響應,而 follower 不用,一個很簡單的流程其實就是 client -> kvservice -> Start() -> applyCh -> kvservice -> client,但是applyCh是逐個 commit 一個一個返回,所以需要明確返回的 commit 對應的是哪一個請求,即通過 commitIndex唯一確定一個請求,然后通知該請求執行流程可以返回了。

對於讀請求,由於其不影響系統狀態,所以直接去狀態機執行即可,當然,其結果也不需要再記錄到去重的數據結構中。

// 處理並返回客戶端結果
func (kv *KVServer) Command(args *CommandArgs, reply *CommandReply) {
	defer DPrintf("{Node %v} processes CommandRequest %v with CommandResponse %v", kv.rf.Me(), args, reply)
	kv.mu.RLock()
	if args.Op != OpGet && kv.isDuplicateRequest(args.ClientId, args.CommandId) {
		lastReply := kv.lastOperations[args.ClientId].LastReply
		reply.Err, reply.Value = lastReply.Err, lastReply.Value
		kv.mu.RUnlock()
		return
	}
	kv.mu.RUnlock()
	index, _, isLeader := kv.rf.Start(Command{args})
	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}

	kv.mu.Lock()
	ch := kv.getNotifyChan(index)
	kv.mu.Unlock()
	select {
	case res := <-ch:
		reply.Err, reply.Value = res.Err, res.Value
	case <-time.After(ExecuteTimeout):
		reply.Err = ErrTimeout
	}
	go func() {
		kv.mu.Lock()
		delete(kv.notifyChans, index)
		kv.mu.Unlock()
	}()
}



func (kv *KVServer) applyLogToStateMachine(command Command) *CommandReply {
	var value string
	var err Err
	switch command.Op {
	case OpGet:
		value, err = kv.stateMachine.Get(command.Key)
	case OpPut:
		err = kv.stateMachine.Put(command.Key, command.Value)
	case OpAppend:
		err = kv.stateMachine.Append(command.Key, command.Value)
	}
	return &CommandReply{err, value}
}

測試

image-20211205152416859

Part B - 包含日志壓縮的key/value服務

將服務端的一些重要變量編碼后生成snapshot,然后通知raft進行日志壓縮。

所需要持久化的狀態由:

  • 狀態機kv鍵值對stateMachine
  • 去重的 lastOperations 哈希表

有兩種情況的持久化,分為主動和被動

  • 主動:這里我的實現是,從applyCh中每次拿到 msg 之后,如果是CommandValid,則說明raft 的狀態增加了,主動檢測一次是否達到了maxraftstate,是則主動調用Snapshot
  • 被動:從applyCh中拿到的是SnapshotValid,則證明是 leader 發來的安裝快照信息,主動調用CondInstallSnapshot,並根據返回的 ok 確認是否安裝該快照
func (kv *KVServer) applier() {
	for kv.killed() == false {
		select {
		case message := <-kv.applyCh:
			if message.CommandValid {
				kv.mu.Lock()
				if message.CommandIndex <= kv.lastApplied {
					kv.mu.Unlock()
					continue
				}
				kv.lastApplied = message.CommandIndex

				var reply *CommandReply
				command := message.Command.(Command)
				if command.Op != OpGet && kv.isDuplicateRequest(command.ClientId, command.CommandId) {
					reply = kv.lastOperations[command.ClientId].LastReply
				} else {
					reply = kv.applyLogToStateMachine(command)
					if command.Op != OpGet {
						kv.lastOperations[command.ClientId] = OperationContext{command.CommandId, reply}
					}
				}
				if currentTerm, isLeader := kv.rf.GetState(); isLeader && message.CommandTerm == currentTerm {
					ch := kv.getNotifyChan(message.CommandIndex)
					ch <- reply
				}
				needSnapshot := kv.needSnapshot()
				if needSnapshot {
					kv.takeSnapshot(message.CommandIndex)
				}
				kv.mu.Unlock()
			}else if message.SnapshotValid {
				kv.mu.Lock()
				if kv.rf.CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot) {
					kv.restoreSnapshot(message.Snapshot)
					kv.lastApplied = message.SnapshotIndex
				}
				kv.mu.Unlock()
			} else {
				panic(fmt.Sprintf("unexpected Message %v", message))
			}

		}
	}
}
// 持久化
func (kv *KVServer) restoreSnapshot(snapshot []byte) {
	if snapshot == nil || len(snapshot) == 0 {
		return
	}
	r := bytes.NewBuffer(snapshot)
	d := labgob.NewDecoder(r)
	var stateMachine MemoryKV
	var lastOperations map[int64]OperationContext
	if d.Decode(&stateMachine) != nil ||
		d.Decode(&lastOperations) != nil {
	}
	kv.stateMachine, kv.lastOperations = &stateMachine, lastOperations
}
func (kv *KVServer) needSnapshot() bool {
	return kv.maxraftstate != -1 && kv.rf.GetRaftStateSize() >= kv.maxraftstate
}

func (kv *KVServer) takeSnapshot(index int) {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(kv.stateMachine)
	e.Encode(kv.lastOperations)
	kv.rf.Snapshot(index, w.Bytes())
}

func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
	// call labgob.Register on structures you want
	// Go's RPC library to marshall/unmarshall.

	labgob.Register(Command{})

	kv := new(KVServer)
	kv.me = me
	kv.maxraftstate = maxraftstate

	// You may need initialization code here.
	kv.applyCh = make(chan raft.ApplyMsg)
	kv.rf = raft.Make(servers, me, persister, kv.applyCh)
	kv.dead = 0
	kv.lastApplied = 0
	kv.stateMachine = NewMemoryKV()
	kv.lastOperations = make(map[int64]OperationContext)
	kv.notifyChans = make(map[int]chan *CommandReply)

	// You may need initialization code here.
	kv.restoreSnapshot(persister.ReadSnapshot())
	go kv.applier()

	DPrintf("{Node %v} has started", kv.rf.Me())
	return kv
}

image-20211205154609831


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM