ETCD整體機制
etcd 是一個分布式的、可靠的 key-value 存儲系統,它適用於存儲分布式系統中的關鍵數據。
etcd 集群中多個節點之間通過Raft算法完成分布式一致性協同,算法會選舉出一個主節點作為 leader,由 leader 負責數據的同步與分發。當 leader 出現故障后系統會自動地重新選取另一個節點成為 leader,並重新完成數據的同步。
etcd集群實現高可用主要是基於quorum機制,即:集群中半數以上的節點可用時,集群才可繼續提供服務,quorum機制在分布式一致性算法中應用非常廣泛,此處不再詳細闡述。
raft數據更新和etcd調用是基於兩階段機制:
第一階段 leader記錄log (uncommited);日志復制到follower;follower響應,操作成功,響應客戶端;調用者調用leader,leader會將kv數據存儲在日志中,並利用實時算法raft進行復制
第二階段 leader commit;通知follower;當復制給了N+1個節點后,本地提交,返回給客戶端,最后leader異步通知follower完成通知
ETCD核心API分析
etcd提供的api主要有kv相關、lease相關及watch,查看其源碼可知:
kv相關接口:
type KV interface {
// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
// Txn creates a transaction.
Txn(ctx context.Context) Txn
}
主要有Put、Get、Delete、Compact、Do和Txn方法;Put用於向etcd集群中寫入消息,以key value的形式存儲;Get可以根據key查看其對應存儲在etcd中的數據;Delete通過刪除key來刪除etcd中的數據;Compact 方法用於壓縮 etcd 鍵值對存儲中的事件歷史,避免事件歷史無限制的持續增長;Txn 方法在單個事務中處理多個請求,etcd事務模式為:
if compare
then op
else op
commit
lease相關接口:
type Lease interface {
// Grant creates a new lease.
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// Revoke revokes the given lease.
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// KeepAlive keeps the given lease alive forever. If the keepalive response
// posted to the channel is not consumed immediately, the lease client will
// continue sending keep alive requests to the etcd server at least every
// second until latest response is consumed.
//
// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
// alive stream is interrupted in some way the client cannot handle itself;
// given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
// from this closed channel is nil.
//
// If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
// no leader") or canceled by the caller (e.g. context.Canceled), the error
// is returned. Otherwise, it retries.
//
// TODO(v4.0): post errors to last keep alive message before closing
// (see https://github.com/coreos/etcd/pull/7866)
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// KeepAliveOnce renews the lease once. The response corresponds to the
// first message from calling KeepAlive. If the response has a recoverable
// error, KeepAliveOnce will retry the RPC with a new keep alive message.
//
// In most of the cases, Keepalive should be used instead of KeepAliveOnce.
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
}
lease 是分布式系統中一個常見的概念,用於代表一個分布式租約。典型情況下,在分布式系統中需要去檢測一個節點是否存活的時,就需要租約機制。
Grant方法用於創建一個租約,當服務器在給定 time to live 時間內沒有接收到 keepAlive 時租約過期;Revoke撤銷一個租約,所有附加到租約的key將過期並被刪除;TimeToLive 獲取租約信息;KeepAlive 通過從客戶端到服務器端的流化的 keep alive 請求和從服務器端到客戶端的流化的 keep alive 應答來維持租約;檢測分布式系統中一個進程是否存活,可以在進程中去創建一個租約,並在該進程中周期性的調用 KeepAlive 的方法。如果一切正常,該節點的租約會一致保持,如果這個進程掛掉了,最終這個租約就會自動過期,在 etcd 中,允許將多個 key 關聯在同一個 lease 之上,可以大幅減少 lease 對象刷新帶來的開銷。
watch相關接口:
type Watcher interface {
// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel. If revisions waiting to be sent over the
// watch are compacted, then the watch will be canceled by the server, the
// client will post a compacted error watch response, and the channel will close.
// If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
// and "WatchResponse" from this closed channel has zero events and nil "Err()".
// The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
// to release the associated resources.
//
// If the context is "context.Background/TODO", returned "WatchChan" will
// not be closed and block until event is triggered, except when server
// returns a non-recoverable error (e.g. ErrCompacted).
// For example, when context passed with "WithRequireLeader" and the
// connected server has no leader (e.g. due to network partition),
// error "etcdserver: no leader" (ErrNoLeader) will be returned,
// and then "WatchChan" is closed with non-nil "Err()".
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
//
// Otherwise, as long as the context has not been canceled or timed out,
// watch will retry on other recoverable errors forever until reconnected.
//
// TODO: explicitly set context error in the last "WatchResponse" message and close channel?
// Currently, client contexts are overwritten with "valCtx" that never closes.
// TODO(v3.4): configure watch retry policy, limit maximum retry number
// (see https://github.com/etcd-io/etcd/issues/8980)
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// RequestProgress requests a progress notify response be sent in all watch channels.
RequestProgress(ctx context.Context) error
// Close closes the watcher and cancels all watch requests.
Close() error
}
etcd 的Watch 機制可以實時地訂閱到 etcd 中增量的數據更新,watch 支持指定單個 key,也可以指定一個 key 的前綴。Watch 觀察將要發生或者已經發生的事件,輸入和輸出都是流;輸入流用於創建和取消觀察,輸出流發送事件。一個觀察 RPC 可以在一次性在多個key范圍上觀察,並為多個觀察流化事件,整個事件歷史可以從最后壓縮修訂版本開始觀察。
ETCD數據版本機制
etcd數據版本中主要有term表示leader的任期,revision 代表的是全局數據的版本。當集群發生 Leader 切換,term 的值就會 +1,在節點故障,或者 Leader 節點網絡出現問題,再或者是將整個集群停止后再次拉起,都會發生 Leader 的切換;當數據發生變更,包括創建、修改、刪除,其 revision 對應的都會 +1,在集群中跨 Leader 任期之間,revision 都會保持全局單調遞增,集群中任意一次的修改都對應着一個唯一的 revision,因此我們可以通過 revision 來支持數據的 MVCC,也可以支持數據的 Watch。
對於每一個 KeyValue 數據節點,etcd 中都記錄了三個版本:
- 第一個版本叫做 create_revision,是 KeyValue 在創建時對應的 revision;
- 第二個叫做 mod_revision,是其數據被操作的時候對應的 revision;
- 第三個 version 就是一個計數器,代表了 KeyValue 被修改了多少次。
在同一個 Leader 任期之內,所有的修改操作,其對應的 term 值始終相等,而 revision 則保持單調遞增。當重啟集群之后,所有的修改操作對應的 term 值都加1了。
ETCD之MVCC並發控制
說起mvcc大家都不陌生,mysql的innodb中就使用mvcc實現高並發的數據訪問,對數據進行多版本處理,並通過事務的可見性來保證事務能看到自己應該看到的數據版本,同樣,在etcd中也使用mvcc進行並發控制。
etcd支持對同一個 Key 發起多次數據修改,每次數據修改都對應一個版本號。etcd記錄了每一次修改對應的數據,即一個 key 在 etcd 中存在多個歷史版本。在查詢數據的時候如果不指定版本號,etcd 會返回 Key 對應的最新版本,同時etcd 也支持指定一個版本號來查詢歷史數據。
etcd將每一次修改都記錄下來,使用 watch訂閱數據時,可以支持從任意歷史時刻(指定 revision)開始創建一個 watcher,在客戶端與 etcd 之間建立一個數據管道,etcd 會推送從指定 revision 開始的所有數據變更。etcd 提供的 watch 機制保證,該 Key 的數據后續的被修改之后,通過這個數據管道即時的推送給客戶端。
分析其源碼可知:
type revision struct {
// main is the main revision of a set of changes that happen atomically.
main int64
// sub is the the sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing sub revision in that
// set.
sub int64
}
func (a revision) GreaterThan(b revision) bool {
if a.main > b.main {
return true
}
if a.main < b.main {
return false
}
return a.sub > b.sub
}
在etcd的mvcc實現中有一個revision結構體,main 表示當前操作的事務 id,全局自增的邏輯時間戳,sub 表示當前操作在事務內部的子 id,事務內自增,從 0 開始;通過GreaterThan方法進行事務版本的比較。
ETCD存儲數據結構
etcd 中所有的數據都存儲在一個 btree的數據結構中,該btree保存在磁盤中,並通過mmap的方式映射到內存用來支持快速的訪問,treeIndex的定義如下:
type treeIndex struct {
sync.RWMutex
tree *btree.BTree
}
func newTreeIndex() index {
return &treeIndex{
tree: btree.New(32),
}
}
index所綁定對btree的操作有Put、Get、Revision、Range及Visit等,以Put方法為例,其源碼如下:
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
ti.Lock()
defer ti.Unlock()
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(rev.main, rev.sub)
}
通過源碼可知對btree數據的讀寫操作都是在加鎖下完成的,從而來保證並發下數據的一致性。
