LAB1 mapreduce
mapreduce中包含了兩個角色,coordinator和worker,其中,前者掌管任務的分發和回收,后者執行任務。mapreduce分為兩個階段,map階段和reduce階段。
map階段對應的是map任務。coordinator將會把任務分成多個部分,例如,有多個文件待處理,則每個文件的處理是一個任務。coordinator根據待處理文件生成多個任務,將這些任務用available管道暫存,供worker取用。worker將任務完成之后,需要告知coordinator,coordinator需要記錄任務的狀態。為了標識任務,每個任務需要有唯一的taskId。coordinator可以用taskId為key的map來存儲所有task,worker完成一個task之后,這個task就沒有必要保存,coordinator可以從map中刪除該task。coordinator存儲未完成的task,除了供worker比對之外,還可以用來重新分發超時的任務。worker調用coordinator的applyForTask函數,來從avaliable隊列中得到新的任務。在map階段,worker收到任務后會調用mapf函數,這個函數是用戶傳入的參數,指向任務的具體執行過程。對mapf的執行結果,worker根據reduce的個數,將執行結果hash成reduce份。例如,對於wordcount任務,每個文件中的詞的統計數量將根據詞分為reduce份,保存在reduce個文件中。
reduce階段對應的是reduce任務。coordinator將生成reduce個新的任務,每個任務處理一個hash桶中的內容。同樣用available管道供worker取用。當然,這時worker只需要知道自己取到的是第幾個hash桶對應的reduce任務,即可通過共享文件和統一的文件命名規則獲取到此時需要處理的文件。根據用戶reducef函數的輸入,worker將輸入文件中的內容排序之后,將相同key的value存儲成數組,輸入reducef函數處理。
值得探討的點:
-
worker通知coordinator任務完成:worker對任務完成的通知可以不必發一個新的包,因為worker每次完成任務的同時都會立即向coordinator請求新的任務,因此可以在請求包中附送上一個已經完成的taskId。coordinator經過比對taskId和workerId確認無誤之后,在分發新任務之前就可以處理舊的已完成任務。
-
超時任務檢測:有兩種選擇,一是worker接收任務之后定時發心跳包,但是這種方式較為繁瑣。另一種是coordinator定時檢查,task的map中對每個task維護一個ddl,若當前時刻已經超過了ddl時間,就視為超時。
-
available管道初始化容量:不初始化容量的話,管道會阻塞。
-
任務結果文件重命名:worker處理階段,為了防止其他worker也在處理這一文件導致的寫沖突,會將處理結果文件命名中加上workerId,但reduce階段不需要知道map結果是由哪個worker生成的,因此coordinator確認任務完成后會對結果文件重新命名,去掉workerId的標記。reduce階段同理。
mapreduce框架的一個重要瓶頸就是可能有大量的數據需要在服務器之間進行傳遞,其論文中詳細討論了這一點及其解決方案。
LAB2 raft
raft是一個分布式共識算法。分為領導選舉【Leader election】、日志復制【Log replication】和安全【Safety】。
分布式共識的應用:
- 邏輯時間的共識,用來決定事件發生的順序;
- 互斥性的共識,用來決定當下誰正擁有訪問的資源;
- 協調者的共識,誰是當下的leader。
在一個raft集群中,server總是在三種狀態之間轉換,follower、candidate、leader,且保證任何時刻系統中最多只有一個leader。系統將時間划分為多個term,term順序遞增,candidate進行選舉的時候會先將自身的term加一,表示自己認為已經可以開始新的term了。在一個term內的穩定狀態下,raft集群中只有一個leader,其余的server是follower,系統所處term的切換意味着leader的切換。leader定時向其余服務器發送一個heart beat心跳信息,表示自己仍然存活,此外,接收外界對raft系統的數據請求,提供對外服務,生成日志條目,並且將日志條目復制給其他的follower,以此實現數據的多存儲;follower接受leader發送的heart beat,確認當前系統存在leader,並且接收leader發來的日志條目副本,更新本地的日志。
【Leader election】若follower的heart beat超時,即,在一段時間內都沒有收到leader發來的heart beat。此時,這台follower認為leader已經掛掉,於是自動轉化為candidate狀態,開始競選成為新一期的leader。candidate將自身的term加一,投票給自己,同時向所有server發送requestVote的請求,對於收到requestVote請求的服務器來說,只要它們在這個term沒有投出票,則投給這個candidate,換句話說,一個server在一個term只能投一次票。在一輪投票中,若所得票數大於總服務器數量的一半,則贏得選舉,成為本期leader,同時立即發送一條heart beat宣布上任,系統回到穩定狀態。同一時刻允許同時存在多個candidate,此時可能會出現選票平分的情況,這時無法選出新的leader,candidate將重新發起投票,並且term再加一。重新投票將會影響系統的性能,為了減小同時出現多個candidate的可能性,每台server的heart beat超時時間(等待heart beat的時間)將設置為一個區間范圍內的隨機數。一般要求:heart beat時間<<選舉超時時間<<平均故障時間。
由於raft集群的server總是在三種狀態之間切換,不同狀態執行不同的任務,因此將使用狀態機來實現。server之間互相發送的包是心跳包和requestVote包以及它們的reply。
主線任務
-
leader:【發送心跳包給follower和candidate,收到不合法的心跳則拒收】向集群中其他所有成員定時發送heart beat,確認存活,同時接收其他成員反饋的reply信息。對於reply信息,有多種情況:
- reply.Success = true:成員承認本leader;
- reply.Success = false:成員拒絕承認本leader。原因是該成員的term>leader.term,本leader的任期已過,集群已經在新的term了。於是這台機器退位,降級為follower,並更新自身的term等信息,保持與集群同步。
-
follower:【從leader接收心跳包,從candidate接收requestVote包】
- 接收投票要求
- 如果投票的term大於自己,說明有人發現leader掛了,在發起新一輪的投票,投票,同時視為收到了心跳;
- 否則,拒絕投票,並且告訴通過reply.term告知candidate本機認為當前所處的term;
- 接收心跳:重置心跳超時計時器;
- 檢查是否心跳超時:若超時,成為candidate,並且立即發起投票;
- 接收投票要求
-
candidate:【從leader接收心跳,發送requestVote包給follower和其他candidate,從其他candidate接收requestVote包】
-
發起投票。對於投票結果:
- 若超過半數同意,則立即成為leader並且執行leader任務;
- 若有人拒絕:查看reply.term,如果reply.term>=自己,說明是自己out了,降級為follower,取消本輪投票;否則就是單純的不投我,那就算了;
-
檢查投票是否超時,若超時,重新發起投票;
-
接收心跳,如果在投票過程中收到term>=自己的心跳,說明現在已經有leader了,降級到follower狀態,取消本輪投票。
關於投票取消的時候可能發生的異常討論
follower同意投票的同時,將term更新,立即視為進入了新的term並且將這個candidate視為當前term的leader,這是沒有問題的。如果candidate選舉成功,顯然是沒問題的;如果candidate選舉不成功,即,取消投票,有以下情況:收到reply.term>=candidate.term的選舉回復,說明系統正在試圖開啟更大的term;收到term>=自己的心跳,說明當前系統中正處於更大的term,並且已經處於有leader的穩定狀態。不論是試圖開啟還是已經達到,當這個更大的term達到穩定的狀態時,其leader會發送心跳,心跳的term大於candidate的term,投票給candidate的server不會拒絕這些心跳,並且會立即響應進入新的term,從前的錯誤投票在新的term下毫無影響。
-
附:檢測和修復data race https://www.sohamkamani.com/golang/data-races/
——————————————
(重構)
對於一台server,需要做的事情有三個方面:選舉、日志復制、apply。其中,選舉和apply兩項是所有server都主動進行的,因此在初始化的時候使用兩個goroutine來控制,日志復制應該是由client調用start來控制進行的。
timeout一直在倒計時,一旦超出了倒計時就稱為candidate開始選舉,倒計時期間,可能由於收到leader或者任期更大的server的消息而reset倒計時。
logApplier不斷地推動lastApplied追上commitIndex,通過發送ApplyMsg給applyCh通道接口來apply日志,如果已經兩者已經一致了,就wait直到有新的commit。如何檢查有新的commit呢?可以使用sync.cond條件變量,等commitIndex更新的時候用broadcast喚醒這個cond,從而疏通堵塞。
附: 關於sync.cond https://ieevee.com/tech/2019/06/15/cond.html
【選舉】
投票條件:
- 候選人最后一條Log條目的任期號大於本地最后一條Log條目的任期號;
- 或者,候選人最后一條Log條目的任期號等於本地最后一條Log條目的任期號,且候選人的Log記錄長度大於等於本地Log記錄的長度
becomeCandidate時,立即開始選舉,當然,這時候需要一些前序步驟:將term++標識進入了新的term,將votedfor置為me表示投票給自己了。
選舉方:選舉過程需要一個“得票數”的變量votesRcvd來記錄已得票數(在分布式系統中,它的增加需要原子操作,因此用一個鎖sync.cond鎖來保護),此外,還要用一個finish變量來確定已經做出回答的server有多少。每當得到一枚票,就喚醒(broadcast)一次cond鎖,堵塞疏通,做出“繼續等待/處理最終票數/直接return”的選擇。其中,繼續等待是當票數不夠一半,但還有server沒有做出回復的時候。處理最終票數是剩余情況。直接return比較特殊,因為可能在等待得票的過程中,本candidate已經不是candidate了,可能降級為follower了。處理最終票數就很簡單了,如果夠一半就升級為leader(開始心跳goroutine),不夠就變成follower(此時是因為所有server都已經做出了回復所以開始處理最終票數的),處理最終票數的過程中,要通過判斷和加鎖的方式,確保本candidate仍然是candidate,且當前任期和得票的任期一樣。
接待員(中間函數):構造args和reply,調用投票方的投票函數。對返回結果,只在voteGranted為true的時候返回true,否則返回false,如果reply.term更大,就令candidate降級為follower。同時,在處理期間也要保證本candidate是candidate的時候才有必要繼續進行,但繼續進行的時候,非必要不得對candidate加鎖,否則容易形成死鎖。
投票方:先檢查args.term,如果比自己大,那就先承認一下自己的follower地位, 如果args.term比自己小,那就voteGranted置為false,讓選舉方承認自己follower的地位,並且返回,沒必要再理會這次選舉。繼續處理的是args.term>=自己的情況。如果還沒投,或者已經投給了這個candidate,並且term相同的話選舉方log更長,那就投給選舉方,並且reset選舉超時計時器。否則不投。簡而言之,投票要檢查term,term相等的話看log是不是新於自己,以及票是不是已經投出去了。
設計技巧:
- 將單個詢問、處理回復和分發、回收分為兩個過程。前者是接待員,為單個投票方提供單個接待服務,后者是總管,給各個投票方分配出各自的接待員。
- 盡量不加鎖,或者鎖粒度盡可能小,在處理的時候判斷一下是不是狀態還未過時。
【日志復制】
接待員(發送方/中間函數):取出目標server對應的nextIndex和matchIndex。如果nextIndex,即即將發送的entries的開始位置,<=snapshotLastIndex,就是已經被壓縮了,那就將snapshot發送給目標server,返回。如果nextIndex在log里,就構造AppendEntriesArgs,把nextIndex后面所有的entries全發送過去,這時,要附帶nextIndex-1這一條的index和term,用來給目標server做一致性檢查。對於返回值,首先檢查term判斷是否本leader需要降級為follower,然后再判斷是否成功。如果成功,就更新nextIndex和matchIndex,再看看需不需要commit。如果不成功,那就是一致性檢查出問題了,找到沖突點,重新執行接待任務。
快速回退法: 發生沖突的時候,讓follower返回足夠的信息給leader,這樣leader可以以term為單位來回退,而不用每次只回退一條log條目,因此當log不匹配的時候,leader只需要在每個不同的term發送一條appendEntries,這是一種加速策略。
沖突點回溯:找到args.PrevlogTerm的第一條log的index,就是目前看來的沖突index。不會往之前的term找,因為無法確定那里是不是沖突了。這個沖突index可能會有點悲觀,這里會增加網絡負載,可以優化。
設計技巧:
- matchIndex只在發送成功的時候更新,並且是為了commit設置的。follower的commitIndex始終是隨着AppendEntriesArgs帶來的leader的commitIndex更新的,自己不能主動判斷更新。另,commit的時候會喚醒applyCond。
- nextIndex總是很樂觀的,靠一致性檢查和沖突點回溯來防止錯誤。
- 一條log entry的index和它在log中的下標不是同一個東西。
- 對log的操作可能很多,設計一個log類來專門管理這些操作,像cmu數據庫一樣寫一些基本的常用操作函數。
- 向管道中塞東西,可能會發生堵塞,因此要使用goroutine。例如 go rf.applyCh<-msg
- appendNewEntry時的index
【關於日志復制時可能出現的異常情況討論】
如果leader正常工作,raft系統中不會出現什么問題,follower只需要接收leader發來的日志信息,將log的狀態與leader的log狀態靠齊即可。
一個舊leader故障之后,新的leader是否可以使系統達到一致?
假設現在系統中有三台機器,S1,S2和S3,其中S3是舊的leader,且系統此刻是一致的。S3可能引發不一致的故障時刻有三種:
- 將新條目添加到本地log之后立即故障:根據多數選舉的規則,S1和S2中可以出現新的leader,系統繼續服務。
- 將新條目添加到S1之后故障:S1可以成為leader,系統繼續服務,S1會將這條條目傳遞給其他機器並且提交。
- 將新條目添加到S1並且提交之后故障:同上。
因此,舊leader S3故障之后,剩下的團體也可以正常服務。如果此時舊leader重新與集群建立了聯系,系統將會如何?
不論中間經過了多少個term,假設現在的leader是S1,舊leader是S3,S3重新加入集群的時候,首先S3肯定會降級為follower,如果S3可以立即被選舉為leader,那么就可以視為S3沒有發生過故障。S1會發送新條目給S3的時候,S3會進行不一致性檢查,經過多次發送並嘗試append條目,S1會令S3的log狀態與自己的達成一致。
【關於已經commit的log是否會丟失的進一步討論】
假設當前leader是S3。已知leader選舉,當term一致的時候,只能給log長於自己的投選舉票。那么只有log長於其中超過半數機器的機器可以成為leader。已知leader永遠不會丟棄自己已有的log,那么存在於leader中的被commit的log肯定不會被丟棄。丟棄的情況只會是一條log被大多數機器記錄,但leader沒有記錄。
假設該log的index是i1,term是t1。根據log append的連續性,S3至多接收到i1-1之后就沒有接收到t1的其他任何log了。進一步地,由於i1被保留到了S3入選的term,因此t1之后的leader都有i1記錄,因此S3至多接收到i1-1之后就再也沒有收到到選舉為止的其他任何log了。在這種情況下,還要保證經歷了所有的term(才能term與其他選舉者一致),即使之后的所有term都不再append條目到任何機器上,那也有大多數機器比S3多了i1這條log,S3不可能選舉成功。推出矛盾,因此commit的log不會丟失。
【persist】
persist類是raft類中的一個成員。其作用應該是為了保存state信息和snapshot信息,state信息包括currentTerm,votedFor,log。只有這三者需要被持久化存儲,log是唯一記錄了應用程序狀態的地方,其中存儲的一系列操作是唯一能在斷電重啟之后用來重建應用程序狀態的信息;votedfor和currenterm是為了保證每個任期最多只有一個leader。其他的狀態,例如lastApplied和commitIndex都可以通過leader和follower之間的交流來重新獲得。
【snapshot】
每個server會自己創建自己的snapshot,也會接受並install leader發送的snapshot(這發生在日志同步的時候nextIndex<=ssLastshot時)。只有leader可以讓其他server install自己的snapshot,這和只有leader可以讓其他server appendEntries一樣,因此,發送處理和接收處理之前都必須check發送方的leader身份,並且可以以此來代替加鎖。
收到installSnapshot和收到AppendEntries類似,都需要有檢查leader身份,確認自己follower身份和reset election timer等操作。將得到的snapshot發送到applyCh即可。
假死問題:由於網絡原因導致的心跳超時,認為leader已死,但其實leader還活着。
腦裂問題:指的是分布式集群系統中由於網絡故障等原因,選舉出了兩個leader,集群分裂成兩個集群。出現腦裂問題的原因是分布式算法中沒有考慮過半機制。腦裂問題對分布式系統是致命的,兩個集群同時對外提供服務,會出現各種不一致問題,如果兩個集群突然可以聯通了,將不得不面對數據合並、數據沖突的解決等問題。
為了解決腦裂問題,通常有四種做法:
- zookeeper和raft中使用的過半原則;
- 添加心跳線。集群中采取多種通信方式,防止一種通信方式失效導致集群中的節點無法通信,比如原來只有一條心跳線路,此時若斷開,則判斷對方已死亡,若有兩條心跳線,一條斷開,另一條仍然可以收發心跳,保證集群服務正常運行,備用線路與主線路可以互相監測,正常情況下備用線路為了節約資源而不起作。
- 使用磁盤鎖的形式,保證集群中只能有一個Leader獲取磁盤鎖,對外提供服務,避免數據錯亂發生。但是,也會存在一個問題,若該Leader節點宕機,則不能主動釋放鎖,那么其他的Follower就永遠獲取不了共享資源。於是有人在HA中設計了"智能"鎖。正在服務的一方只有在發現心跳線全部斷開(察覺不到對端)時才啟用磁盤鎖。平時就不上鎖了。
- 仲裁機制。比如提供一個參考的IP地址,心跳機制斷開時,節點各自ping一下參考IP,如果ping不通,那么表示該節點網絡已經出現問題,則該節點需要自行退出爭搶資源,釋放占有的共享資源,將服務的提供功能讓給功能更全面的節點。
過半原則:根據鴿巢原理,raft中任意一個操作都需要過半的服務器的認同,這樣能保證始終只有一個leader。此外,服務器通常選擇奇數台機器部署,這樣可以用較少的機器實現相同的集群容忍度。
快速領導者選舉算法:在選舉的過程中進行過半驗證,這樣不需要等待所有server都認同,速度比較快。
Lab3 KV-raft
在此,從一個比lab2更高層次的角度看待分布式系統。lab2中的raft是用於機器之間互相溝通形成一致的log和state,但機器之間並不關心log中存儲的command是什么,因此全部使用interface{}作為command的接口。lab3中,我們要實現的是client調用Get()、Append()、Put(),server通過raft達成集群內的一致,然后將raft apply的command正式執行。raft系統在這一過程中,只起到了一致性的作用,是命令的被調用和真正執行之間的一層。
這里需要注意的是線性一致性,為了實現這一點,給command遞增的index(由raft調用start后返回),使用一個map記錄每個client最近最后一個被執行command的index以及執行結果,由此可以推測出command序列執行到哪一條了,防止重復執行。
另外,由於raft系統在start和apply之間需要一定的時間,因此,客戶端調用讀寫函數,讀寫函數調用start通知raft集群之后,注冊一個index對應的待相應result channel,存儲在以index為key的map中。當raft系統達成一致,apply這條命令的時候,從apply函數調用真正的讀寫過程,執行結果push到index對應的channel中。於是,客戶端調用的讀寫函數只需要直接去result channel中取出這條命令的執行結果。這樣做非常的簡潔流暢,用channel阻塞的時間來等待raft系統一致、apply執行讀寫。
關於start和apply之間leader被更換的討論:
一條command,在其start和apply中間,可能raft系統已經更換了leader,對於新的leader來說,它沒有為這條command創建channel(start不是通過新leader進行的),試圖將result放入channel的時候會失敗,導致直接返回。然而,舊leader雖然降級為follower,但仍然會對這條apply,因此即使更換leader也沒關系,但需要注意的是,從channel取出result的時候,就不必判斷這個機器是不是leader了,只要在start的時候判斷了就可以了。
關於command的index是否會發生變化的討論:
command的index是由start調用的時候,leader的log中當前log最后一條entry的index+1決定的順序index,如果這條command的entry被覆蓋,那就會超時,client將更換server重新執行,如果沒有被覆蓋,將會保持這個index。
如果command的entry被覆蓋了,且這個index對應的map中仍然有channel在等待答案(發生於leader降級,被新的leader清除了index對應位置,並且沒有覆蓋,leader又當選為leader,並建立了新的index位置),那么將會發生不匹配,因此,應該在從channel中取出result的時候檢查op是否是在等待的那個。
如果op正好與在等待的那個一致,但是seq又不是那個呢?沒有關系,只要執行內容一致就可以了。client中等待之后那條op結果的timer會超時,重新執行之后那條op。
B部分是壓縮,kv中有一個變量maxraftState限制了log的長度,若即將超過這個長度,就對log進行壓縮。同時,kv的data和peocessed也應該被持久化存儲。
此外,LAB3可以使用init函數完成logger注冊,並記錄。當然,這不是必需的。
關於golang中的init函數
golang里的main函數是程序的入口函數,main函數返回后,程序也就結束了。golang還有另外一個特殊的函數init函數,先於main函數執行,實現包級別的一些初始化操作。
init函數的主要作用:
- 初始化不能采用初始化表達式初始化的變量。
- 程序運行前的注冊。
- 實現sync.Once功能。
- 其他
init函數的主要特點:
- init函數先於main函數自動執行,不能被其他函數調用;
- init函數沒有輸入參數、返回值;
- 每個包可以有多個init函數;
- 包的每個源文件也可以有多個init函數,這點比較特殊;
- 同一個包的init執行順序,golang沒有明確定義,編程時要注意程序不要依賴這個執行順序。
- 不同包的init函數按照包導入的依賴關系決定執行順序。