etcd是coreOS使用golang開發的分布式,一致性的kv存儲系統,因其易用性和高可靠性被廣泛運用於服務發現、消息發布和訂閱、分布式鎖和共享配置等方面,也被認為是zookeeper的強有力的競爭者。作為分布式kv,其底層使用raft算法實現多副本數據的強一致性。etcd作為raft開源實現的標桿,在設計上,將 raft 算法邏輯和持久化、網絡、線程等完全抽離出來單獨實現,充分解耦,在工程上,實現了諸多性能優化,是 raft 開源實踐中較早的工業級的實現,很多后來的 raft 實踐者都直接或者間接的參考了 ectd-raft 的設計和實現,例如kubernetes,tiDb等。其廣泛的影響力和優雅的golang代碼實踐也使得ectd成為golang的明星項目。在我們實際的分布式存儲系統的項目開發中,raft也被應用於元信息管理和數據存儲等多個模塊,因此熟悉和理解etcd-raft的實現具有重大意義,本文從raft的基本原理出發,深入淺出地分析了raft在ectd中的具體實現。
raft原理
架構

image
每個節點都包含狀態機,日志模塊和一致性模塊。功能分別是:
- 狀態機:數據一致性指的即是狀態機的一致性,從內部服務看表現為狀態機中的數據都保持一致
- log模塊:保存了所有的操作記錄
- 一致性模塊:一致性模塊算法保證寫入log命令的一致性,是raft的核心內容。
實現一致性的過程可分為Leader選舉(Leader election),日志同步(Log replication),安全性(safty),日志壓縮(Log compaction),成員變更(membership change)
leader 選舉
競選過程
- 節點由Follower變為Candidate,同時設置當前Term。
- Candidate給自己投票,帶上termid 和日志序號,同時向其他節點發送拉票請求
- 等待結果,成為Leader,follower 或者在選舉未成為產生結果的情況下節點狀態保持為Candidatae。
選舉結果
- 成功當選收到超過半數的選票時,成為Leader,定時給其他節點發送心跳,並帶上任期id,其他節點發現當前的任期id小於接收到leader發送過來的id,則將將狀態切換至follower.
- 選舉失敗在Candidate狀態接收到其他節點發送的心跳信息,且心跳中的任期id大於自己,則變為follower。
- 未產生結果沒有一個Candidate所獲得的選票超過半數,未產生leader,則Candidate再進入下一輪投票。為了避免長期沒有leader產生,raft采用如下策略避免:
- 選舉超時時間為隨機值,第一個超時的節點帶着最大的任期id立刻進入新一任的選舉
- 如果存在多個Candidate同時競選的情況,發送拉票請求也是一段隨機延時。
日志同步(Log Replication)

image
Leader選出后接受客戶端請求,Leader把請求日志作為日志條目加入到日志中,然后向其他Follower節點復制日志,但超過半數的日志復制成功,則Leader將日志應用到狀態機並向客戶端返回執行結果,同時Follower也將結果提交。如果存在Follower沒有成功復制日志,Leader會無限重試。
日志同步的關鍵點:
- 日志由有序編號的日志條目組成,每條日志包含創建的任期和用於執行的命令,日志是保證所有節點數據一致的關鍵。
- Leader 負責一致性檢查,同時讓所有的Follower都和自己保持一致。
- 在Leader發生切換時,如何保證各節點日志一致。leader為每一個follower維護一個nextIndex,將index和termid信息發送至follower,從缺失的termid和index 為follow 補齊數據,直至和leader完全一致。
- 只允許主節點提交包含當前term的日志。否則會出現已經commit的日志出現更改的情況
安全性
安全性的原則是一個term只有一個leader,被提交至狀態機的數據不能發生更改。保證安全性主要通過限制leader的選舉來保證:
- Candidate在拉票時需要攜帶本地已持久化的最新的日志信息,如果投票節點發現本地的日志信息比Candidate更新,則拒絕投票。
- 只允許Leader提交當前Term的日志。
- 擁有最新的已提交的log entry的Follower才有資格成為Leader。
raft協議實現
raft的golang的開源實現主要包含兩個:coreOS的raft實現 , 使用的項目如tidb和cockroachdb這兩個經典的newsql。另外一個是hashicrop的raft實現,使用的項目如服務發現解決方案consul和時序數據庫influxdb。對比二者的實現主要有如下特點:
- hashicrop的實現完整度高,包含了snapshot,wal,storage等,在集成時只需要關注業務邏輯
- etcd中的raft模塊則是raft協議的輕量級實現,對於上述功能只定義了相關interface,需要業務方去具體實現,優點是增加靈活性,etcdserver就是集成raft算法並實現snapshot,wal,storage這樣一個應用程序。
etcd/raft 代碼結構
- 日志持久化storage.go:持久化日志保存模塊,以interface的方式定義了實現的方式,並基於內存實現了memoryStorage用於存儲日志數據。log.go:raft算法日志模塊的邏輯log_unstable.go:raft 算法的日志緩存,日志優先寫緩存,待狀態穩定后進行持久化
- 節點node.go: raft集群節點行為的實現,定義了各節點通信方式process.go:從leader的角度,為每個follower維護一個子狀態機,根據狀態的切換決定leader該發什么消息給Follower.
- Raft算法raft.go:raft算法的具體邏輯實現,每個節點都有一個raft實例read_only.go: 實現了線性一致讀(linearizable read),線性一致讀要求讀請求讀到最新提交的數據。針對raft存在的stale read(多leader場景),此模塊通過ReadIndex的方式保證了一致性。
etcd/raft的實現分析
分析raft的實現流程,我們可以從raft的幾個核心問題入手:
- 如何選舉leader?
- 如何實現log的復制?
- 如何進行leadership的transfer?
- 如何實現線性一致讀?
其中leader的選舉、log復制和線性一致讀是raft協議的最基本要求,而leadership的轉移在工程實踐中有重大意義。
核心數據結構
- struct node node 中主要定義一系列channel,raft的實現就是通過channel 傳遞消息,當節點啟動通過select機制監聽上述channel確定相應的狀態切換。
// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChange
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
logger Logger
}
- interface node定義了node要實現raft算法必須實現的方法
type Node interface {
Tick() //時鍾的實現,選舉超時和心跳超時基於此實現
Campaign(ctx context.Context) error //參與leader競爭
Propose(ctx context.Context, data []byte) error //在日志中追加數據,需要實現方保證數據追加的成功
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // 集群配置變更
Step(ctx context.Context, msg pb.Message) error //根據消息變更狀態機的狀態
//標志某一狀態的完成,收到狀態變化的節點必須提交變更
Ready() <-chan Ready
//進行狀態的提交,收到完成標志后,必須提交過后節點才會實際進行狀態機的更新。在包含快照的場景,為了避免快照落地帶來的長時間阻塞,允許繼續接受和提交其他狀態,即使之前的快照狀態變更並沒有完成。
Advance()
//進行集群配置變更
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
//變更leader
TransferLeadership(ctx context.Context, lead, transferee uint64)
//保證線性一致性讀,
ReadIndex(ctx context.Context, rctx []byte) error
//狀態機當前的配置
Status() Status
// ReportUnreachable reports the given node is not reachable for the last send.
//上報節點的不可達
ReportUnreachable(id uint64)
//上報快照狀態
ReportSnapshot(id uint64, status SnapshotStatus)
//停止節點
Stop()
}
節點的啟動和運行
節點初始化raft,讀取配置啟動各個各個節點,初始化logindex.啟動后 以for-loop方式循環運行,用select 機制監聽不同的channel 實現對狀態變化的監聽,並執行相應動作。
//啟動
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c) //初始化raft算法實例
r.becomeFollower(1, None)
//將配置中的節點加入集群
for _, peer := range peers {
...
}
//初始化logindex
r.raftLog.committed = r.raftLog.lastIndex()
for _, peer := range peers {
//初始化節點狀態機(progress)
r.addNode(peer.ID)
}
n := newNode()
n.logger = c.Logger
go n.run(r)
return &n
}
//運行
func (n *node) run(r *raft) {
...
select {
//接收到寫消息
case pm := <-propc:
...
//接收到readindex 請求
case m := <-n.recvc:
...
//配置變更
case cc := <-n.confc:
...
//超時時間到,包括心跳超時和選舉超時等
case <-n.tickc:
...
//數據ready
case readyc <- rd:
...
//可以進行狀態變更和日志提交
case <-advancec:
...
//節點狀態信號
case c := <-n.status:
...
//收到停止信號
case <-n.stop:
...
}
}
}
leader 選舉
初始化node為follower,設置任期為1,並初始化tickElection函數,這是實際參與選舉的函數,同時也初始化step為stepFollower,這是作為follower的核心信息處理函數,后續選舉,日志復制和快照等功能都基於此函數進行:
r := newRaft(c)
r.becomeFollower(1, None)
當節點接收leader的heartbeat超時時(每個節點都有隨機的超時時間),會觸發run函數中的tickc這個channel。發送MsgHup消息,並調用campaign參選, 將自身設置為candidate,並遞增currentTerm,向其他節點發送競選消息。其他節點通過監聽propc channel獲取其他節點發送的投票消息,並調用Step對消息進行判斷,選擇是否投票。
其中投票的判斷邏輯主要分兩步:1.如果投票信息中的任期id 是否 小於自身的id,則直接返回nil。2.通過isUpToDate判斷能否投票,通過和本地已存在的最新log比較,首先要有最大任期id,如果任期id相同則要求有最大的logindex。
candidate節點收到其他節點的回復后,判斷獲取的票數是否超過半數,如果是則設置自身為leader,否則為follower。
func (n *node) run(r *raft) {
...
for {
select {
...
//觸發heartbeat 超時
case <-n.tickc:
r.tick()
...
}
}
}
//超時觸發選舉
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
//隨機超時時間
func (r *raft) pastElectionTimeout() bool {
return r.electionElapsed >= r.randomizedElectionTimeout
}
func (r *raft) resetRandomizedElectionTimeout() {
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}
//參與選舉
func (r *raft) campaign(t CampaignType) {
var term uint64
var voteMsg pb.MessageType
//成為candicate,將任期id加1
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
//判斷獲取的票數是否超過半數,如果是當選為leader
if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
//向其他節點發送競選消息
for id := range r.prs {
if id == r.id {
continue
}
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
//節點投票過程
func (r *raft) Step(m pb.Message) error {
...
//比較任期id
case m.Term > r.Term:
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
force := bytes.Equal(m.Context, []byte(campaignTransfer))
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
return nil
}
}
switch m.Type {
case pb.MsgVote, pb.MsgPreVote:
...
//與本地最新的持久化日志比較
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
//發送投票信息
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0