etcd學習(9)-etcd中的存儲實現


etcd中的存儲實現

前言

前面了關於etcd的raft相關的實現,這里來看下存儲的相關實現

通過看etcd的代碼我們可以看到目前有兩個大的版本,v2和v3。

V3和V2版本的對比

etcd的v2版本有下面的一些問題

功能局限性

1、etcd v2 不支持范圍查詢和分頁;

2、etcd v2 不支持多 key 事務;

Watch 機制可靠性問題

etcd v2 是內存型、不支持保存 key 歷史版本的數據庫,只在內存中使用滑動窗口保存了最近的 1000 條變更事件,當 etcd server 寫請求較多、網絡波動時等場景,很容易出現事件丟失問題,進而又觸發 client 數據全量拉取,產生大量 expensive request,甚至導致 etcd 雪崩。

性能瓶頸問題

1、etcd v2早起使用的是 HTTP/1.x API。HTTP/1.x 協議沒有壓縮機制,大量的請求可能導致 etcd 出現 CPU 高負載、OOM、丟包等問題;

2、etcd v2 client 會通過 HTTP 長連接輪詢 Watch 事件,當 watcher 較多的時候,因 HTTP/1.x 不支持多路復用,會創建大量的連接,消耗 server 端過多的 socket 和內存資源;

3、對於 key 中的 TTL過期時間,如果大量 key TTL 一樣,也需要分別為每個 key 發起續期操作,當 key 較多的時候,這會顯著增加集群負載、導致集群性能顯著下降;

內存開銷問題

etcd v2 在內存維護了一顆樹來保存所有節點 key 及 value。在數據量場景略大的場景,如配置項較多、存儲了大量 Kubernetes Events, 它會導致較大的內存開銷,同時 etcd 需要定時把全量內存樹持久化到磁盤。這會消耗大量的 CPU 和磁盤 I/O 資源,對系統的穩定性造成一定影響。

etcd v3 的出現就是為了解決以上穩定性、擴展性、性能問題

1、在內存開銷、Watch 事件可靠性、功能局限上,它通過引入 B-tree、boltdb 實現一個 MVCC 數據庫,數據模型從層次型目錄結構改成扁平的 key-value,提供穩定可靠的事件通知,實現了事務,支持多 key 原子更新,同時基於 boltdb 的持久化存儲,顯著降低了 etcd 的內存占用、避免了 etcd v2 定期生成快照時的昂貴的資源開銷;

2、etcd v3 使用了 gRPC API,使用 protobuf 定義消息,消息編解碼性能相比 JSON 超過 2 倍以上,並通過 HTTP/2.0 多路復用機制,減少了大量 watcher 等場景下的連接數;

3、使用 Lease 優化 TTL 機制,每個 Lease 具有一個 TTL,相同的 TTL 的 key 關聯一個 Lease,Lease 過期的時候自動刪除相關聯的所有 key,不再需要為每個 key 單獨續期;

4、etcd v3 支持范圍、分頁查詢,可避免大包等 expensive request。

MVCC

MVCC 機制正是基於多版本技術實現的一種樂觀鎖機制,它樂觀地認為數據不會發生沖突,但是當事務提交時,具備檢測數據是否沖突的能力。

在 MVCC 數據庫中,你更新一個 key-value 數據的時候,它並不會直接覆蓋原數據,而是新增一個版本來存儲新的數據,每個數據都有一個版本號,版本號是一個邏輯時鍾,不會因為服務器時間的差異而受影響。

上面講到了樂觀鎖,我們來再來復習下悲觀鎖是什么意思?

悲觀鎖是一種事先預防機制,它悲觀地認為多個並發事務可能會發生沖突,因此它要求事務必須先獲得鎖,才能進行修改數據操作。但是悲觀鎖粒度過大、高並發場景下大量事務會阻塞等,會導致服務性能較差。

treeIndex 原理

在 treeIndex 中,每個節點的 key 是一個 keyIndex 結構,etcd 就是通過它保存了用戶的 key 與版本號的映射關系。

來看下keyIndex數據結構

// etcd/server/mvcc/key_index.go
type keyIndex struct {
	key         []byte // 用戶的key名稱
	modified    revision // 最后一次修改key時的etcd版本號
	generations []generation // generation保存了一個key若干代版本號信息,每代中包含對key的多次修改的版本號列表
}

generations 表示一個 key 從創建到刪除的過程,每代對應 key 的一個生命周期的開始與結束。當你第一次創建一個 key 時,會生成第 0 代,后續的修改操作都是在往第 0 代中追加修改版本號。當你把 key 刪除后,它就會生成新的第 1 代,一個 key 不斷經歷創建、刪除的過程,它就會生成多個代。

// generation contains multiple revisions of a key.
type generation struct {
	ver     int64 // 表示此key的修改次數
	created revision // 表示generation結構創建時的版本號
	revs    []revision // 每次修改key時的revision追加到此數組
}

再來看下 revision

// A revision indicates modification of the key-value space.
// The set of changes that share same main revision changes the key-value space atomically.
type revision struct {
	// 一個全局遞增的主版本號,隨put/txn/delete事務遞增,一個事務內的key main版本號是一致的
	main int64

	// 一個事務內的子版本號,從0開始隨事務內put/delete操作遞增
	sub int64
}

看完基本的數據結構,我們來看下 mvcc 對 key 的操作流程

MVCC 更新 key

執行 put 操作的時候首先從 treeIndex 模塊中查詢 key 的 keyIndex 索引信息,keyIndex 在上面已經介紹了。

  • 如果首次操作,也就是 treeIndex 中找不到對應的,etcd 會根據當前的全局版本號(空集群啟動時默認為 1)自增,生成 put 操作對應的版本號 revision{2,0},這就是 boltdb 的 key。

  • 如果能找到,在當前的 keyIndex append 一個操作的 revision

// etcd/server/mvcc/index.go
func (ti *treeIndex) Put(key []byte, rev revision) {
	keyi := &keyIndex{key: key}

	ti.Lock()
	defer ti.Unlock()
	item := ti.tree.Get(keyi)
	// 沒有找到
	if item == nil {
		keyi.put(ti.lg, rev.main, rev.sub)
		ti.tree.ReplaceOrInsert(keyi)
		return
	}
	okeyi := item.(*keyIndex)
	okeyi.put(ti.lg, rev.main, rev.sub)
}

// etcd/server/mvcc/key_index.go
// put puts a revision to the keyIndex.
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
	rev := revision{main: main, sub: sub}

	if len(ki.generations) == 0 {
		ki.generations = append(ki.generations, generation{})
	}
	g := &ki.generations[len(ki.generations)-1]
	if len(g.revs) == 0 { // create a new key
		keysGauge.Inc()
		g.created = rev
	}
	g.revs = append(g.revs, rev)
	g.ver++
	ki.modified = rev
}

填充完 treeIndex ,這時候就會將數據保存到 boltdb 的緩存中,並同步更新 buffer

MVCC 查詢 key

在讀事務中,它首先需要根據 key 從 treeIndex 模塊獲取版本號,如果未帶版本號,默認是讀取最新的數據。treeIndex 模塊從 B-tree 中,根據 key 查找到 keyIndex 對象后,匹配有效的 generation,返回 generation 的 revisions 數組中最后一個版本號給讀事務對象。

讀事務對象根據此版本號為 key,通過 Backend 的並發讀事務(ConcurrentReadTx)接口,優先從 buffer 中查詢,命中則直接返回,否則從 boltdb 中查詢此 key 的 value 信息。具體可參見下文的只讀事務。

當然上面是查找最新的數據,如果我們查詢歷史中的某一個版本的信息呢?

處理過程是一樣的,只不過是根據 key 從 treeIndex 模塊獲取版本號,不是獲取最新的,而是獲取小於等於 我們指定的版本號 的最大歷史版本號,然后再去查詢對應的值信息。

// etcd/server/mvcc/index.go
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
	keyi := &keyIndex{key: key}
	ti.RLock()
	defer ti.RUnlock()
	if keyi = ti.keyIndex(keyi); keyi == nil {
		return revision{}, revision{}, 0, ErrRevisionNotFound
	}
	return keyi.get(ti.lg, atRev)
}

// get gets the modified, created revision and version of the key that satisfies the given atRev.
// Rev must be higher than or equal to the given atRev.
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
	if ki.isEmpty() {
		lg.Panic(
			"'get' got an unexpected empty keyIndex",
			zap.String("key", string(ki.key)),
		)
	}
	g := ki.findGeneration(atRev)
	if g.isEmpty() {
		return revision{}, revision{}, 0, ErrRevisionNotFound
	}

	n := g.walk(func(rev revision) bool { return rev.main > atRev })
	if n != -1 {
		return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
	}

	return revision{}, revision{}, 0, ErrRevisionNotFound
}

// 找出給定的 rev 所屬的 generation
func (ki *keyIndex) findGeneration(rev int64) *generation {
	lastg := len(ki.generations) - 1
	cg := lastg

	for cg >= 0 {
		if len(ki.generations[cg].revs) == 0 {
			cg--
			continue
		}
		g := ki.generations[cg]
		if cg != lastg {
			if tomb := g.revs[len(g.revs)-1].main; tomb <= rev {
				return nil
			}
		}
		if g.revs[0].main <= rev {
			return &ki.generations[cg]
		}
		cg--
	}
	return nil
}

關於從獲取 key 的 value 信息的過程可參考下文的 只讀事務。

MVCC 刪除 key

再來看下刪除的邏輯

etcd 中的刪除操作,是延期刪除模式,和更新 key 類似

相比更新操作:

1、生成的 boltdb key 版本號追加了刪除標識(tombstone, 簡寫 t),boltdb value 變成只含用戶 key 的 KeyValue 結構體;

2、treeIndex 模塊也會給此 key hello 對應的 keyIndex 對象,追加一個空的 generation 對象,表示此索引對應的 key 被刪除了;

當你再次查詢對應 key 的時候,treeIndex 模塊根據 key 查找到 keyindex 對象后,若發現其存在空的 generation 對象,並且查詢的版本號大於等於被刪除時的版本號,則會返回空。

那么 key 打上刪除標記后有哪些用途呢?什么時候會真正刪除它呢?

一方面刪除 key 時會生成 events,Watch 模塊根據 key 的刪除標識,會生成對應的 Delete 事件。

另一方面,當你重啟 etcd,遍歷 boltdb 中的 key 構建 treeIndex 內存樹時,你需要知道哪些 key 是已經被刪除的,並為對應的 key 索引生成 tombstone 標識。而真正刪除 treeIndex 中的索引對象、boltdb 中的 key 是通過壓縮 (compactor) 組件異步完成。

正因為 etcd 的刪除 key 操作是基於以上延期刪除原理實現的,因此只要壓縮組件未回收歷史版本,我們就能從 etcd 中找回誤刪的數據。

// etcd/server/mvcc/kvstore_txn.go
func (tw *storeTxnWrite) delete(key []byte) {
	ibytes := newRevBytes()
	idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
	revToBytes(idxRev, ibytes)

	ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)

	kv := mvccpb.KeyValue{Key: key}

	d, err := kv.Marshal()
	if err != nil {
		...
	}

	tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
	err = tw.s.kvindex.Tombstone(key, idxRev)
	if err != nil {
		...
	}
	tw.changes = append(tw.changes, kv)

	item := lease.LeaseItem{Key: string(key)}
	leaseID := tw.s.le.GetLease(item)

	if leaseID != lease.NoLease {
		err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
		if err != nil {
			...
		}
	}
}

刪除操作會向結構體中的 generation 追加一個新的 tombstone 標記,用於標識當前的 Key 已經被刪除;除此之外,上述方法還會將每一個更新操作的 revision 存到單獨的 keyBucketName 中

var Key = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true})

壓縮

我們知道 etcd 中的每一次更新、刪除 key 操作,treeIndex 的 keyIndex 索引中都會追加一個版本號,在 boltdb 中會生成一個新版本 boltdb key 和 value。也就是隨着你不停更新、刪除,你的 etcd 進程內存占用和 db 文件就會越來越大。很顯然,這會導致 etcd OOM 和 db 大小增長到最大 db 配額,最終不可寫。

etcd 就是通過壓縮機制來控制 db 的大小的

關於壓縮,etcd 提供了手動和自動的兩種壓縮策略

  • 人工壓縮: client API 提供了人工壓縮的 API,通過手動調用便能完成壓縮

  • 自動壓縮:etcd 本身提供了兩種自動壓縮的方式

1、周期性壓縮

2、版本號壓縮

周期性壓縮

適用於場景

如果我們希望 etcd 只保留最近一段時間寫入的歷史版本時,你就可以選擇配置 etcd 的壓縮模式為 periodic,保留時間為你自定義的 1h 等。

是如何做壓縮的呢?比如我們定義了時間?

etcd server 啟動后,根據你的配置的模式 periodic,會創建 periodic Compactor,它會異步的獲取、記錄過去一段時間的版本號。periodic Compactor 組件獲取你設置的壓縮間隔參數 1h, 並將其划分成 10 個區間,也就是每個區間 6 分鍾。每隔 6 分鍾,它會通過 etcd MVCC 模塊的接口獲取當前的 server 版本號,追加到 rev 數組中。

如果要保留過去 1 個小時的歷史版本,periodic Compactor 組件會通過當前時間減去上一次成功執行 Compact 操作的時間,如果間隔大於一個小時,它會取出 rev 數組的首元素,通過 etcd server 的 Compact 接口,發起壓縮操作。

版本號壓縮

來看下適用場景

如果寫請求比較多,可能產生比較多的歷史版本導致 db 增長時,或者不確定配置 periodic 周期為多少才是最佳的時候,你可以通過設置壓縮模式為 revision,指定保留的歷史版本號數。比如你希望 etcd 盡量只保存 1 萬個歷史版本,那么你可以指定 compaction-mode 為 revision,auto-compaction-retention 為 10000。

是如何去操作的呢?

etcd 啟動后會根據你的壓縮模式 revision,創建 revision Compactor。revision Compactor 會根據你設置的保留版本號數,每隔 5 分鍾定時獲取當前 server 的最大版本號,減去你想保留的歷史版本數,然后通過 etcd server 的 Compact 接口發起如下的壓縮操作即可。

壓縮后 db 的大小會不會減少呢?

boltdb 將 db 文件划分成若干個 page 頁,page 頁又有四種類型,分別是 meta page、branch page、leaf page 以及 freelist page。branch page 保存 B+ tree 的非葉子節點 key 數據,leaf page 保存 bucket 和 key-value 數據,freelist 會記錄哪些頁是空閑的。

當我們通過 boltdb 刪除大量的 key,在事務提交后 B+ tree 經過分裂、平衡,會釋放出若干 branch/leaf page 頁面,然而 boltdb 並不會將其釋放給磁盤,調整 db 大小操作是昂貴的,會對性能有較大的損害。

boltdb 是通過 freelist page 記錄這些空閑頁的分布位置,當收到新的寫請求時,優先從空閑頁數組中申請若干連續頁使用,實現高性能的讀寫(而不是直接擴大 db 大小)。當連續空閑頁申請無法得到滿足的時候, boltdb 才會通過增大 db 大小來補充空閑頁。

所以壓縮之后釋放的空閑頁可以滿足后續的新增寫請求的空閑頁需求,db 的打消會趨於穩定

boltdb 存儲

下來看下 Backend 的細節, etcd 中通過 Backend,很好的封裝了存儲引擎的實現細節,為上層提供一個更一致的接口,方便了 etcd 中其他模塊的使用

// etcd/server/mvcc/backend/backend.go
type Backend interface {
	// ReadTx 返回一個讀事務。它被主數據路徑中的 ConcurrentReadTx 替換
	ReadTx() ReadTx
	BatchTx() BatchTx
	// ConcurrentReadTx returns a non-blocking read transaction.
	ConcurrentReadTx() ReadTx

	Snapshot() Snapshot
	Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
	// Size 返回后端物理分配的當前大小。
	Size() int64
	// SizeInUse 返回邏輯上正在使用的后端的當前大小。
	SizeInUse() int64
	OpenReadTxN() int64
	Defrag() error
	ForceCommit()
	Close() error
}

再來看下 pacakge 內部的 backend 結構體,這是一個實現了 Backend 接口的結構:

// etcd/server/mvcc/backend/backend.go
type backend struct {
	// size and commits are used with atomic operations so they must be
	// 64-bit aligned, otherwise 32-bit tests will crash

	// size is the number of bytes allocated in the backend
	size int64
	// sizeInUse is the number of bytes actually used in the backend
	sizeInUse int64
	// commits counts number of commits since start
	commits int64
	// openReadTxN is the number of currently open read transactions in the backend
	openReadTxN int64
	// mlock prevents backend database file to be swapped
	mlock bool

	mu sync.RWMutex
	db *bolt.DB

	// 默認100ms
	batchInterval time.Duration
	// 默認defaultBatchLimit    = 10000
	batchLimit    int
	batchTx       *batchTxBuffered

	readTx *readTx
	// txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf.
	// When creating "concurrentReadTx":
	// - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped
	// - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required
	txReadBufferCache txReadBufferCache

	stopc chan struct{}
	donec chan struct{}

	hooks Hooks

	lg *zap.Logger
}

readTx 和 batchTx 分別實現了 ReadTx 和 BatchTx 接口,其中 readTx 負責讀請求,batchTx 負責寫請求

// etcd/server/mvcc/backend/read_tx.go
type ReadTx interface {
	Lock()
	Unlock()
	RLock()
	RUnlock()

	UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
	UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}

// etcd/server/mvcc/backend/batch_tx.go
type BatchTx interface {
	ReadTx
	UnsafeCreateBucket(bucket Bucket)
	UnsafeDeleteBucket(bucket Bucket)
	UnsafePut(bucket Bucket, key []byte, value []byte)
	UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
	UnsafeDelete(bucket Bucket, key []byte)
	// Commit commits a previous tx and begins a new writable one.
	Commit()
	// CommitAndStop commits the previous tx and does not create a new one.
	CommitAndStop()
}

readTx 和 batchTx 的創建在 newBackend 中完成

func newBackend(bcfg BackendConfig) *backend {
	if bcfg.Logger == nil {
		bcfg.Logger = zap.NewNop()
	}

	bopts := &bolt.Options{}
	if boltOpenOptions != nil {
		*bopts = *boltOpenOptions
	}
	bopts.InitialMmapSize = bcfg.mmapSize()
	bopts.FreelistType = bcfg.BackendFreelistType
	bopts.NoSync = bcfg.UnsafeNoFsync
	bopts.NoGrowSync = bcfg.UnsafeNoFsync
	bopts.Mlock = bcfg.Mlock

	db, err := bolt.Open(bcfg.Path, 0600, bopts)
	if err != nil {
		bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
	}

	// In future, may want to make buffering optional for low-concurrency systems
	// or dynamically swap between buffered/non-buffered depending on workload.
	b := &backend{
		db: db,

		batchInterval: bcfg.BatchInterval,
		batchLimit:    bcfg.BatchLimit,
		mlock:         bcfg.Mlock,

		readTx: &readTx{
			baseReadTx: baseReadTx{
				buf: txReadBuffer{
					txBuffer:   txBuffer{make(map[BucketID]*bucketBuffer)},
					bufVersion: 0,
				},
				buckets: make(map[BucketID]*bolt.Bucket),
				txWg:    new(sync.WaitGroup),
				txMu:    new(sync.RWMutex),
			},
		},
		txReadBufferCache: txReadBufferCache{
			mu:         sync.Mutex{},
			bufVersion: 0,
			buf:        nil,
		},

		stopc: make(chan struct{}),
		donec: make(chan struct{}),

		lg: bcfg.Logger,
	}

	b.batchTx = newBatchTxBuffered(b)
	// We set it after newBatchTxBuffered to skip the 'empty' commit.
	b.hooks = bcfg.Hooks

	go b.run()
	return b
}

func (b *backend) run() {
	defer close(b.donec)
	// 定時提交事務
	t := time.NewTimer(b.batchInterval)
	defer t.Stop()
	for {
		select {
		case <-t.C:
		case <-b.stopc:
			b.batchTx.CommitAndStop()
			return
		}
		if b.batchTx.safePending() != 0 {
			b.batchTx.Commit()
		}
		t.Reset(b.batchInterval)
	}
}

newBackend 在啟動的時候會開啟一個 goroutine ,定期的提交事務

只讀事務

來看下只讀事務的現實

// Base type for readTx and concurrentReadTx to eliminate duplicate functions between these
type baseReadTx struct {
	// 保護 txReadBuffer 的訪問
	mu  sync.RWMutex
	buf txReadBuffer

	// 保護 tx
	txMu    *sync.RWMutex
	tx      *bolt.Tx
	buckets map[BucketID]*bolt.Bucket
	// txWg 保護 tx 在批處理間隔結束時不會被回滾,直到使用此 tx 的所有讀取完成。
	txWg *sync.WaitGroup
}

可以引入了兩把讀寫鎖來保護相應的資源,除了用於保護 tx 的 txmu 讀寫鎖之外,還存在另外一個 mu 讀寫鎖,它的作用是保證 buf 中的數據不會出現問題,buf 和結構體中的 buckets 都是用於加速讀效率的緩存。

它對位提供了兩個方法 UnsafeRange 和 UnsafeForEach

UnsafeRange 從名字就可以答題推斷出這個函數的作用就是做范圍查詢。

在 etcd 中無論我們想要后去單個 Key 還是一個范圍內的 Key 最終都是通過 Range 來實現的

// etcd/server/mvcc/backend/read_tx.go
func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
	if endKey == nil {
		// forbid duplicates for single keys
		limit = 1
	}
	if limit <= 0 {
		limit = math.MaxInt64
	}
	if limit > 1 && !bucketType.IsSafeRangeBucket() {
		panic("do not use unsafeRange on non-keys bucket")
	}
	// 首先從緩存中查詢鍵值對
	keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
	// 檢測緩存中返回的鍵值對是否達到Limit的要求,如果達到Limit的指定上限,直接返回緩存的查詢結果
	if int64(len(keys)) == limit {
		return keys, vals
	}

	// find/cache bucket
	bn := bucketType.ID()
	baseReadTx.txMu.RLock()
	bucket, ok := baseReadTx.buckets[bn]
	baseReadTx.txMu.RUnlock()
	lockHeld := false
	if !ok {
		baseReadTx.txMu.Lock()
		lockHeld = true
		bucket = baseReadTx.tx.Bucket(bucketType.Name())
		baseReadTx.buckets[bn] = bucket
	}

	// ignore missing bucket since may have been created in this batch
	if bucket == nil {
		if lockHeld {
			baseReadTx.txMu.Unlock()
		}
		return keys, vals
	}
	if !lockHeld {
		baseReadTx.txMu.Lock()
	}
	c := bucket.Cursor()
	baseReadTx.txMu.Unlock()

	// 將查詢緩存的結采與查詢 BlotDB 的結果合並 然后返回
	k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
	return append(k2, keys...), append(v2, vals...)
}

func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
	if limit <= 0 {
		limit = math.MaxInt64
	}
	var isMatch func(b []byte) bool
	if len(endKey) > 0 {
		isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
	} else {
		isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
		limit = 1
	}

	for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
		vs = append(vs, cv)
		keys = append(keys, ck)
		if limit == int64(len(keys)) {
			break
		}
	}
	return keys, vs
}

梳理下流程:

1、首先從 baseReadTx 的 buf 里面查詢,如果從 buf 里面已經拿到了足夠的 KV (入參里面有限制 range 查詢的最大數量),那么就直接返回拿到的 KVs;

2、如果 buf 里面的KV不足以滿足要求,那么這里就會利用 BoltDB 的讀事務接口去 BoltDB 里面查詢 KV,然后返回。

讀寫事務

讀寫事務提供了讀和寫的數據的能力

type batchTx struct {
	sync.Mutex
	tx      *bolt.Tx
	backend *backend

	pending int
}

寫數據的請求會調用 UnsafePut 寫入數據到 BoltDB 中

// go.etcd.io/bbolt@v1.3.6/tx.go
// UnsafePut must be called holding the lock on the tx.
func (t *batchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {
	t.unsafePut(bucket, key, value, false)
}

func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
	// 獲取bucket的實例
	bucket := t.tx.Bucket(bucketType.Name())
	if bucket == nil {
		...
	}
	if seq {
		// 如果順序寫入,將填充率設置成90%
		bucket.FillPercent = 0.9
	}
	// 使用 BoltDB 的 put 寫入數據
	if err := bucket.Put(key, value); err != nil {
        ...
	}
	t.pending++
}

數據存儲到 BoltDB 中,BoltDB 本身提供了 Put 的寫入 API

UnsafeDelete 和這個差不多,跳過

在執行 PUT 和 DELETE 之后,數據沒有提交,我們還需要手動或者等待 etcd 自動將請求提交:

// etcd/server/mvcc/backend/batch_tx.go
// Commit commits a previous tx and begins a new writable one.
func (t *batchTx) Commit() {
	t.Lock()
	t.commit(false)
	t.Unlock()
}

func (t *batchTx) commit(stop bool) {
	// commit the last tx
	if t.tx != nil {
		// 前讀寫事務未做任何修改就無須開啟新的事務
		if t.pending == 0 && !stop {
			return
		}

		start := time.Now()

		// 通過 BoltDB 提供的api提交當前的事務
		err := t.tx.Commit()
		// gofail: var afterCommit struct{}

		rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
		spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
		writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
		commitSec.Observe(time.Since(start).Seconds())
		// 增加 backend.commits 數量
		atomic.AddInt64(&t.backend.commits, 1)

		// 重置 pending 的數量
		t.pending = 0
		if err != nil {
			t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
		}
	}
	if !stop {
		// 開啟新的讀寫事務
		t.tx = t.backend.begin(true)
	}
}

事務的提交到這就介紹完了

總結

1、treeIndex 模塊基於 Google 開源的 btree 庫實現,它的核心數據結構 keyIndex,保存了用戶 key 與版本號關系。每次修改 key 都會生成新的版本號,生成新的 boltdb key-value。boltdb 的 key 為版本號,value 包含用戶 key-value、各種版本號、lease 的 mvccpb.KeyValue 結構體。

2、如果我們不帶版本號查詢的時候,返回的是最新的數據,如果攜帶版本號,將會返回版本對應的快照信息;

3、刪除一個數據時,etcd 並未真正刪除它,而是基於 lazy delete 實現的異步刪除。刪除原理本質上與更新操作類似,只不過 boltdb 的 key 會打上刪除標記,keyIndex 索引中追加空的 generation。真正刪除 key 是通過 etcd 的壓縮組件去異步實現的;

參考

【etcd Backend存儲引擎實現原理】https://blog.csdn.net/u010853261/article/details/109630223
【高可用分布式存儲 etcd 的實現原理】https://draveness.me/etcd-introduction/
【如何實現多版本並發控制?】https://time.geekbang.org/column/article/340226
【etcd v3.5.0】https://github.com/etcd-io/etcd/releases/tag/v3.5.0
【ETCD中的存儲實現】https://boilingfrog.github.io/2021/09/10/etcd中的存儲實現/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM