深入淺出etcd之raft實現


etcd是coreOS使用golang開發的分布式,一致性的kv存儲系統,因其易用性和高可靠性被廣泛運用於服務發現、消息發布和訂閱、分布式鎖和共享配置等方面,也被認為是zookeeper的強有力的競爭者。作為分布式kv,其底層使用raft算法實現多副本數據的強一致性。etcd作為raft開源實現的標桿,在設計上,將 raft 算法邏輯和持久化、網絡、線程等完全抽離出來單獨實現,充分解耦,在工程上,實現了諸多性能優化,是 raft 開源實踐中較早的工業級的實現,很多后來的 raft 實踐者都直接或者間接的參考了 ectd-raft 的設計和實現,例如kubernetes,tiDb等。其廣泛的影響力和優雅的golang代碼實踐也使得ectd成為golang的明星項目。在我們實際的分布式存儲系統的項目開發中,raft也被應用於元信息管理和數據存儲等多個模塊,因此熟悉和理解etcd-raft的實現具有重大意義,本文從raft的基本原理出發,深入淺出地分析了raft在ectd中的具體實現。

raft原理

架構

深入淺出etcd之raft實現

 

image

每個節點都包含狀態機,日志模塊和一致性模塊。功能分別是:

  • 狀態機:數據一致性指的即是狀態機的一致性,從內部服務看表現為狀態機中的數據都保持一致
  • log模塊:保存了所有的操作記錄
  • 一致性模塊:一致性模塊算法保證寫入log命令的一致性,是raft的核心內容。

實現一致性的過程可分為Leader選舉(Leader election),日志同步(Log replication),安全性(safty),日志壓縮(Log compaction),成員變更(membership change)

leader 選舉

競選過程

  • 節點由Follower變為Candidate,同時設置當前Term。
  • Candidate給自己投票,帶上termid 和日志序號,同時向其他節點發送拉票請求
  • 等待結果,成為Leader,follower 或者在選舉未成為產生結果的情況下節點狀態保持為Candidatae。

選舉結果

  • 成功當選收到超過半數的選票時,成為Leader,定時給其他節點發送心跳,並帶上任期id,其他節點發現當前的任期id小於接收到leader發送過來的id,則將將狀態切換至follower.
  • 選舉失敗在Candidate狀態接收到其他節點發送的心跳信息,且心跳中的任期id大於自己,則變為follower。
  • 未產生結果沒有一個Candidate所獲得的選票超過半數,未產生leader,則Candidate再進入下一輪投票。為了避免長期沒有leader產生,raft采用如下策略避免:
  • 選舉超時時間為隨機值,第一個超時的節點帶着最大的任期id立刻進入新一任的選舉
  • 如果存在多個Candidate同時競選的情況,發送拉票請求也是一段隨機延時。

日志同步(Log Replication)

深入淺出etcd之raft實現

 

image

Leader選出后接受客戶端請求,Leader把請求日志作為日志條目加入到日志中,然后向其他Follower節點復制日志,但超過半數的日志復制成功,則Leader將日志應用到狀態機並向客戶端返回執行結果,同時Follower也將結果提交。如果存在Follower沒有成功復制日志,Leader會無限重試。

日志同步的關鍵點:

  • 日志由有序編號的日志條目組成,每條日志包含創建的任期和用於執行的命令,日志是保證所有節點數據一致的關鍵。
  • Leader 負責一致性檢查,同時讓所有的Follower都和自己保持一致。
  • 在Leader發生切換時,如何保證各節點日志一致。leader為每一個follower維護一個nextIndex,將index和termid信息發送至follower,從缺失的termid和index 為follow 補齊數據,直至和leader完全一致。
  • 只允許主節點提交包含當前term的日志。否則會出現已經commit的日志出現更改的情況

安全性

安全性的原則是一個term只有一個leader,被提交至狀態機的數據不能發生更改。保證安全性主要通過限制leader的選舉來保證:

  • Candidate在拉票時需要攜帶本地已持久化的最新的日志信息,如果投票節點發現本地的日志信息比Candidate更新,則拒絕投票。
  • 只允許Leader提交當前Term的日志。
  • 擁有最新的已提交的log entry的Follower才有資格成為Leader。

raft協議實現

raft的golang的開源實現主要包含兩個:coreOS的raft實現 , 使用的項目如tidb和cockroachdb這兩個經典的newsql。另外一個是hashicrop的raft實現,使用的項目如服務發現解決方案consul和時序數據庫influxdb。對比二者的實現主要有如下特點:

  • hashicrop的實現完整度高,包含了snapshot,wal,storage等,在集成時只需要關注業務邏輯
  • etcd中的raft模塊則是raft協議的輕量級實現,對於上述功能只定義了相關interface,需要業務方去具體實現,優點是增加靈活性,etcdserver就是集成raft算法並實現snapshot,wal,storage這樣一個應用程序。

etcd/raft 代碼結構

  • 日志持久化storage.go:持久化日志保存模塊,以interface的方式定義了實現的方式,並基於內存實現了memoryStorage用於存儲日志數據。log.go:raft算法日志模塊的邏輯log_unstable.go:raft 算法的日志緩存,日志優先寫緩存,待狀態穩定后進行持久化
  • 節點node.go: raft集群節點行為的實現,定義了各節點通信方式process.go:從leader的角度,為每個follower維護一個子狀態機,根據狀態的切換決定leader該發什么消息給Follower.
  • Raft算法raft.go:raft算法的具體邏輯實現,每個節點都有一個raft實例read_only.go: 實現了線性一致讀(linearizable read),線性一致讀要求讀請求讀到最新提交的數據。針對raft存在的stale read(多leader場景),此模塊通過ReadIndex的方式保證了一致性。

etcd/raft的實現分析

分析raft的實現流程,我們可以從raft的幾個核心問題入手:

  • 如何選舉leader?
  • 如何實現log的復制?
  • 如何進行leadership的transfer?
  • 如何實現線性一致讀?

其中leader的選舉、log復制和線性一致讀是raft協議的最基本要求,而leadership的轉移在工程實踐中有重大意義。

核心數據結構

  • struct node node 中主要定義一系列channel,raft的實現就是通過channel 傳遞消息,當節點啟動通過select機制監聽上述channel確定相應的狀態切換。
// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChange
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status

logger Logger
}
  • interface node定義了node要實現raft算法必須實現的方法
type Node interface {
Tick() //時鍾的實現,選舉超時和心跳超時基於此實現
Campaign(ctx context.Context) error //參與leader競爭
Propose(ctx context.Context, data []byte) error //在日志中追加數據,需要實現方保證數據追加的成功
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // 集群配置變更
Step(ctx context.Context, msg pb.Message) error //根據消息變更狀態機的狀態
//標志某一狀態的完成,收到狀態變化的節點必須提交變更
Ready() <-chan Ready
//進行狀態的提交,收到完成標志后,必須提交過后節點才會實際進行狀態機的更新。在包含快照的場景,為了避免快照落地帶來的長時間阻塞,允許繼續接受和提交其他狀態,即使之前的快照狀態變更並沒有完成。
Advance()
//進行集群配置變更
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
//變更leader
TransferLeadership(ctx context.Context, lead, transferee uint64)
//保證線性一致性讀,
ReadIndex(ctx context.Context, rctx []byte) error
//狀態機當前的配置
Status() Status
// ReportUnreachable reports the given node is not reachable for the last send.
//上報節點的不可達
ReportUnreachable(id uint64)
//上報快照狀態
ReportSnapshot(id uint64, status SnapshotStatus)
//停止節點
Stop()
}

節點的啟動和運行

節點初始化raft,讀取配置啟動各個各個節點,初始化logindex.啟動后 以for-loop方式循環運行,用select 機制監聽不同的channel 實現對狀態變化的監聽,並執行相應動作。

//啟動
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c) //初始化raft算法實例
r.becomeFollower(1, None)
//將配置中的節點加入集群
for _, peer := range peers {
...
}
//初始化logindex
r.raftLog.committed = r.raftLog.lastIndex()
for _, peer := range peers {
//初始化節點狀態機(progress)
r.addNode(peer.ID)
}
n := newNode()
n.logger = c.Logger
go n.run(r)
return &n
}

//運行
func (n *node) run(r *raft) {
...

select {
//接收到寫消息
case pm := <-propc:
...
//接收到readindex 請求
case m := <-n.recvc:
...
//配置變更
case cc := <-n.confc:
...
//超時時間到,包括心跳超時和選舉超時等
case <-n.tickc:
...
//數據ready
case readyc <- rd:
...
//可以進行狀態變更和日志提交
case <-advancec:
...
//節點狀態信號
case c := <-n.status:
...
//收到停止信號
case <-n.stop:
...
}
}
}

leader 選舉

初始化node為follower,設置任期為1,並初始化tickElection函數,這是實際參與選舉的函數,同時也初始化step為stepFollower,這是作為follower的核心信息處理函數,后續選舉,日志復制和快照等功能都基於此函數進行:

	r := newRaft(c)
r.becomeFollower(1, None)

當節點接收leader的heartbeat超時時(每個節點都有隨機的超時時間),會觸發run函數中的tickc這個channel。發送MsgHup消息,並調用campaign參選, 將自身設置為candidate,並遞增currentTerm,向其他節點發送競選消息。其他節點通過監聽propc channel獲取其他節點發送的投票消息,並調用Step對消息進行判斷,選擇是否投票。

其中投票的判斷邏輯主要分兩步:1.如果投票信息中的任期id 是否 小於自身的id,則直接返回nil。2.通過isUpToDate判斷能否投票,通過和本地已存在的最新log比較,首先要有最大任期id,如果任期id相同則要求有最大的logindex。

candidate節點收到其他節點的回復后,判斷獲取的票數是否超過半數,如果是則設置自身為leader,否則為follower。

func (n *node) run(r *raft) {
...
for {
select {
...
//觸發heartbeat 超時
case <-n.tickc:
r.tick()
...
}
}
}

//超時觸發選舉
func (r *raft) tickElection() {
r.electionElapsed++

if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}

//隨機超時時間
func (r *raft) pastElectionTimeout() bool {
return r.electionElapsed >= r.randomizedElectionTimeout
}

func (r *raft) resetRandomizedElectionTimeout() {
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}


//參與選舉
func (r *raft) campaign(t CampaignType) {
var term uint64
var voteMsg pb.MessageType
//成為candicate,將任期id加1
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
//判斷獲取的票數是否超過半數,如果是當選為leader
if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
//向其他節點發送競選消息
for id := range r.prs {
if id == r.id {
continue
}
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}

//節點投票過程
func (r *raft) Step(m pb.Message) error {
...
//比較任期id
case m.Term > r.Term:
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
force := bytes.Equal(m.Context, []byte(campaignTransfer))
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
return nil
}
}

switch m.Type {
case pb.MsgVote, pb.MsgPreVote:
...
//與本地最新的持久化日志比較
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
//發送投票信息
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
}
...
return nil
}

func (l *raftLog) isUpToDate(lasti, term uint64) bool {
return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
}

//投票結果判斷
case myVoteRespType:
gr := r.poll(m.From, m.Type, !m.Reject)
//計算票數是否超過半數
switch r.quorum() {
case gr:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case len(r.votes) - gr:
r.becomeFollower(r.Term, None)
}

日志復制

node節點為外界提供了日志提交接口 Propose,在ectd的server對該接口進行了封裝。Propose 內部具體調用stepWithWaitOption實現日志消息的傳遞,並阻塞/非阻塞地等待結果的返回。

func (n *node) Propose(ctx context.Context, data []byte) error {
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
...
//提交日志數據至 node的 propc channel 隊列
select {
case ch <- pm:
if !wait {
//非阻塞直接返回
return nil
}
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
select {
//等待結果的返回
case rsp := <-pm.result:
if rsp != nil {
return rsp
}
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
return nil
}

proc消息進入stepFollower處理,因為只有leader才能處理客戶端提交的信息,因此將消息的接收者設置為leader后轉發。在stepLeader中調用appendEntry將消息追到leader的raftLog之中,但不進行數據的commit。之后調用bcastAppend 將消息廣播至其他follower節點。

func stepLeader(r *raft, m pb.Message) error {
case pb.MsgProp:
...
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
...
}

follower節點接收到請求后,調用handleAppendEntries判斷是否接受leader提交的日志。判斷邏輯如下:如果leader提交的logindex小於本地已經提交的logindex則將本地的logindex回復給leader。查找追加的日志和本地log的沖突,如果有沖突,則先找到沖突的位置,用leader的日志從沖突位置開始進行覆蓋,日志追加成功后,返回最新的logindex至leader。如何任期信息不一致,則直接拒絕leader的追加請求。

func (r *raft) handleAppendEntries(m pb.Message) {
//leader提交的logindex小於本地已經提交的logindex
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
//追加日志,可能存在沖突的情況,需要找到沖突的位置用leader的日志進行覆蓋
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
//mlastIndex表示最佳成功的最新位置
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
//任期信息不一致,拒絕此次追加請求,並把最新的logindex回復給leader,便於進行追加
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}

leader接收到follower的請求后,針對拒絕和接收的兩個場景有不同的處理邏輯,這也是保證follower一致性的關鍵環節。

  • follower 正常接收append請求 當leader 確認follower已經接收了append請求后,則調用maybeCommit進行提交,在提交過程中確認各個節點的matchindex,排序后取中間值比較,如果中間值都都比本地的commitindex大,就認為超過半數已經認可此次提交,可以進行commit,之后調用sendAppend向所有節點廣播消息,follower接收到請求后調用maybeAppend進行日志的提交。值得注意的是,日志的append過程可能由於之前的請求被拒絕,等待snapshot或者消息發送窗口(inflight)已滿導致中止,這時需要重新向follower節點發送最新的append請求。
   func stepLeader(r *raft, m pb.Message) error {
case pb.MsgAppResp:
pr.RecentActive = true

if m.Reject {
...
} else {
oldPaused := pr.IsPaused()
//更新索引信息,更新該follower的match index 和next index.
if pr.maybeUpdate(m.Index) {
switch {
//日志追加成功,狀態由復制探測狀態變成復制狀態,加快日志的追加
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
//pr.ins用於限制消息發送的速率,用於統計當前處於發送狀態的日志數量
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
}
//leader進行本地的提交
if r.maybeCommit() {
//廣播至所有follower 通知進行log的提交
r.bcastAppend()
} else if oldPaused {
//append請求被中止,則重新發送最新的請求
r.sendAppend(m.From)
}
}
}
}
}

func (r *raft) maybeCommit() bool {
if cap(r.matchBuf) < len(r.prs) {
r.matchBuf = make(uint64Slice, len(r.prs))
}
mis := r.matchBuf[:len(r.prs)]
idx := 0
for _, p := range r.prs {
mis[idx] = p.Match
idx++
}
//排序取取中間值
sort.Sort(mis)
mci := mis[len(mis)-r.quorum()]
return r.raftLog.maybeCommit(mci, r.Term)
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
//match的中間值是否已經大於本地已經commit的matchindex
if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
return true
}
return false
}
  • follower拒絕leader的append請求 在異常情況下,follower會拒絕leader的append請求。其判斷邏輯主要位於matchTerm,當leader append請求中的logindex在當前節點已提交的日志中到不到對應的任期,或者任期與leader提交的任期不一致時follower會拒絕當前append請求。leader接收到拒絕請求后會進入探測狀態,探測follower最新匹配的位置。
   //follower接收leader的請求
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
...
}
//拒絕leader當前的append請求
return 0, false
}
//對leader提交append請求中的logindex和termid進行判斷
func (l *raftLog) matchTerm(i, term uint64) bool {
t, err := l.term(i)
if err != nil {
return false
}
return t == term
}

func stepLeader(r *raft, m pb.Message) error {
case pb.MsgAppResp:
pr.RecentActive = true

if m.Reject {
if pr.maybeDecrTo(m.Index, m.RejectHint) {
//由復制狀態進入探測狀態,探測follower最新的匹配位置
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
}

下面來分析leader接收到拒絕請求后的處理邏輯。由於各種原因可能導致follower節點的日志與leader不一致,如下圖所示:

深入淺出etcd之raft實現

 

日志同步

在raft的論文中提出通過遍歷index和term的方式保證日志的一致性。具體的實現位於maybeDecrTo,因為follower在拒絕請求時帶上了當前最新的logindex,因此在進行日志補推時,直接將next至為follower中最新的logindex 和當前index中的最小值。

func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
if pr.State == ProgressStateReplicate {
if rejected <= pr.Match {
return false
}
// directly decrease next to match + 1
//復制狀態將pr的next置為當前匹配位置+1
pr.Next = pr.Match + 1
return true
}

if pr.Next-1 != rejected {
return false
}

//如果是探測狀態,則將next置為follower中最新的logindex 和當前index中的最小值。
if pr.Next = min(rejected, last+1); pr.Next < 1 {
pr.Next = 1
}
pr.resume()
return true
}
日志推送的具體實現位於maybeSendAppend.func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.getProgress(to)
if pr.IsPaused() {
return false
}
m := pb.Message{}
m.To = to

//發送給follower的最后一條日志對應的任期
term, errt := r.raftLog.term(pr.Next - 1)
//需要發送給follower的日志條數
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if len(ents) == 0 && !sendIfEmpty {
return false
}

if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
...
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = term
m.Entries = ents
//leader 已經提交的最新index
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
//在日志復制狀態,樂觀地增加next, 加快日志的推送速度
case ProgressStateReplicate:
last := m.Entries[n-1].Index
pr.optimisticUpdate(last)
pr.ins.add(last)
case ProgressStateProbe:
pr.pause()
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
r.send(m)
return true
}

至此raft集群的日志復制基本已經完成,但是僅限於raft協議層面,日志和快照目前還是保存在Ready結構中,並放入了readyc隊列,等待上游的模塊處理。之前提到過etcd-raft 只是協議層的實現,提供了WAL,snapshot和storage等模塊的擴展接口,應用層需要實現上述接口最終實現的數據的落地。

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
...
//日志數據
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
...
}

leadership transfer

leadership transfer 指的是leader身份的轉換,raft提供接口允許客戶端進行leader切換,此功能可用來做負載均衡,讓客戶端有機會結合實際的機器和負載情況去選擇最優的leader;同時也是multi-raft實現的基礎。下面具體分析transfer的實現。

raft協議提供了transferLeaderShip方法供應用層使用用於觸發leader的轉換,transferLeaderShip會發送MsgTransferLeader類型消息至recvc消息隊列中(channel)。當follower收到TransferLeader消息后不處理將消息轉發至leader進行處理。

 //etcd/raft/raft.go
func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
select {
//通過recvc發送MsgTransferLeader消息至集群中節點
case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
case <-n.done:
case <-ctx.Done():
}
}

leader收到transfer消息后,如果發現當前正在進行leader切換或者不發生leader變換則直接放棄。一個節點要成為leader的要求是有最新的日志數據。如果有則立即發送MsgTimeoutNow消息,transfee收到消息后立即調用campaign方法進行選擇,而不是像正常leader選舉時需要等待超時,而且也不需要采用預投票的方式,之后的選舉流程與正常選舉過程一致。如果transfee沒有最新的日志數據,則leader進行日志的同步,當同步完成收到回復且正處在leader transfer的過程中,發送MsgTimeoutNow,之后與上述流程一致。

 //etcd/raft/raft.go
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
...
case pb.MsgTransferLeader:
if pr.IsLearner {
r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
return nil
}
leadTransferee := m.From
lastLeadTransferee := r.leadTransferee
//上一次transfer正在進行
if lastLeadTransferee != None {
if lastLeadTransferee == leadTransferee {
r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
r.id, r.Term, leadTransferee, leadTransferee)
return nil
}
r.abortLeaderTransfer()
r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
}
//transfee和當前leader相同
if leadTransferee == r.id {
r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
return nil
}
// Transfer leadership to third party.
// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
r.electionElapsed = 0
r.leadTransferee = leadTransferee
if pr.Match == r.raftLog.lastIndex() {
//transfee的日志已經是最新和leader保持一致了,則立刻發送MsgTimeoutNow,觸發選舉
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
//日志非最新進行日志的同步
r.sendAppend(leadTransferee)
}
}
return nil
}

線性一致讀

線性一致性讀是分布式系統的基本要求,在raft中leader和follower都可以接受讀請求,但在以下場景下可能出現數據的不一致:

  • Leader和Follower復制期間的狀態不一致
  • 因為網絡分區導致多個leader的存在,不同leader間的狀態不一致,即腦裂(split-brain)現象。如果請求分別被新舊leader處理,所得的結果也不一致

為解決raft的線性一致讀問題,etcd-raft提供了兩種實現方案:

  • ReadIndex(ReadOnlySafe)。其原理是接收到客戶端請求后,向集群發起ReadIndex請求來讀取commitedIndex,Leader收到請求后向節點發送心跳,當收到大多數節點的確認自己仍是leader后,回復ReadIndex請求並告知最新的commitedIndex。ReadIndex是etcd-raft的默認方案。
  • Lease read方案(ReadOnlyLeaseBased)。其原理是通過維護leader的租期,確認leader的唯一性,不需要通過心跳來進行leader的確認。其風險在於需要全局一直的時鍾來保證lease機制的准確性。etcd-raft不推薦采用此方案,pingcap開源的分布式數據庫tidb中的pd 模塊在實現TSO(Timestamp Oracle)的前提下,采用此方案。
ReadIndex實現分析

在raft初始化的過程中完成了linearizable read的配置,包括需要采用的方案。

   func newRaft(c *Config) *raft {
...
}
r := &raft{
id: c.ID,
...
//初始化readOnly配置
readOnly: newReadOnly(c.ReadOnlyOption),
disableProposalForwarding: c.DisableProposalForwarding,
}
}

const (
//ReadIndex方案
ReadOnlySafe ReadOnlyOption = iota
//leaseRead方案
ReadOnlyLeaseBased
)

阻塞的recvc channel收到ReadIndex請求后,將請求加入隊列,初始化ReadIndex狀態。之后發送廣播心跳。

   	func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
...
case pb.MsgReadIndex:
switch r.readOnly.option {
case ReadOnlySafe:
//加入請求隊列
r.readOnly.addRequest(r.raftLog.committed, m)
//廣播心跳消息
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
}
} else {
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
}
}
}

func (ro *readOnly) addRequest(index uint64, m pb.Message) {
ctx := string(m.Entries[0].Data)
if _, ok := ro.pendingReadIndex[ctx]; ok {
return
}
//index是當前集群的committedIndex,acks 用來收集節點心跳回復包
ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
ro.readIndexQueue = append(ro.readIndexQueue, ctx)
}

當leader收到心跳回復后,對心跳進行統計,如果是本地請求直接將消息追加到readstatus中,最終會由newReady函數將消息發送到readyc channel,監聽ready channel的客戶端會最終回復請求。

   	func stepLeader(r *raft, m pb.Message) error {
case pb.MsgHeartbeatResp:
...
}
//統計回復結果,如果未超過半數則直接返回
ackCount := r.readOnly.recvAck(m)
if ackCount < r.quorum() {
return nil
}

rss := r.readOnly.advance(m)
for _, rs := range rss {
req := rs.req
//如果是本地的請求
if req.From == None || req.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
} else {
//如果是來自follower的請求,將結果返回給follower
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
}
}
}

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
...
//readIndex消息追加
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
}
rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
return rd
}

func (n *node) run(r *raft) {
....
for {
if advancec != nil {
readyc = nil
} else {
//消息加入readyc隊列
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}
....
}

如果是follower接收到ReadIndex請求,直接將消息轉發至leader,leader按上述流程處理,follower接收到消息后采用上述類似機制加入readyc隊列,異步回復客戶端。

   func stepFollower(r *raft, m pb.Message) error { 
...
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return nil
}
//將ReadIndex請求轉發給leader
m.To = r.lead
r.send(m)
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return nil
}
//收到leader回復后將消息加入readStatus
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
...
}

總結

本文從raft算法的基本原理出發,簡單的分析了leader選舉和日志復制的實現過程。之后從工程實踐的角度出發分析了etcd-raft的代碼實現,重點剖析了leader選舉,日志復制,leadership transfer和線性一致讀的核心流程。而raft算法博大精深,etcd也是工業級的完整實現,除了本文介紹的幾個核心環節外,leader的預選舉(prevote)、節點成員變更、配置變更和日志的批量追加等也是raft的關鍵環節,因篇幅所限就不再一一介紹。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM