etcd中的Lease
前言
之前我們了解過grpc使用etcd做服務發現
之前的服務發現我們使用了 Lease,每次注冊一個服務分配一個租約,通過 Lease 自動上報機模式,實現了一種活性檢測機制,保證了故障機器的及時剔除。這次我們來想寫的學習 Lease 租約的實現。
Lease
Lease 整體架構
這里放一個來自【etcd實戰課程】的一張圖片

來看下服務端中Lease中的幾個主要函數
// etcd/server/lease/lessor.go
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
type Lessor interface {
...
// Grant 表示創建一個 TTL 為你指定秒數的 Lease
Grant(id LeaseID, ttl int64) (*Lease, error)
// Revoke 撤銷具有給定 ID 的租約
Revoke(id LeaseID) error
// 將給定的租約附加到具有給定 LeaseID 的租約。
Attach(id LeaseID, items []LeaseItem) error
// Renew 使用給定的 ID 續訂租約。它返回更新后的 TTL
Renew(id LeaseID) (int64, error)
...
}
同時對於客戶端 Lease 也提供了下面幾個API
// etcd/client/v3/lease.go
type Lease interface {
// Grant 表示創建一個 TTL 為你指定秒數的 Lease,Lessor 會將 Lease 信息持久化存儲在 boltdb 中;
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// 表示撤銷 Lease 並刪除其關聯的數據;
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// 表示獲取一個 Lease 的有效期、剩余時間;
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// 表示為 Lease 續期
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// 使用once只在第一次調用
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
}
服務端在啟動 Lessor 模塊的時候,會啟動兩個 goroutine ,revokeExpiredLeases()
和 checkpointScheduledLeases()
。
-
revokeExpiredLeases: 定時檢查是否有過期 Lease,發起撤銷過期的 Lease 操作;
-
checkpointScheduledLeases: 定時觸發更新 Lease 的剩余到期時間的操作;
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
...
l := &lessor{
...
}
l.initAndRecover()
go l.runLoop()
return l
}
func (le *lessor) runLoop() {
defer close(le.doneC)
for {
le.revokeExpiredLeases()
le.checkpointScheduledLeases()
select {
case <-time.After(500 * time.Millisecond):
case <-le.stopC:
return
}
}
}
// revokeExpiredLeases 找到所有過期的租約,並將它們發送到過期通道被撤銷
func (le *lessor) revokeExpiredLeases() {
var ls []*Lease
// rate limit
revokeLimit := leaseRevokeRate / 2
le.mu.RLock()
if le.isPrimary() {
// 在leaseExpiredNotifier中的小頂堆中刪除過期的lease
ls = le.findExpiredLeases(revokeLimit)
}
le.mu.RUnlock()
if len(ls) != 0 {
select {
case <-le.stopC:
return
case le.expiredC <- ls:
default:
// the receiver of expiredC is probably busy handling
// other stuff
// let's try this next time after 500ms
}
}
}
// checkpointScheduledLeases 查找所有到期的預定租約檢查點將它們提交給檢查點以將它們持久化到共識日志中。
func (le *lessor) checkpointScheduledLeases() {
var cps []*pb.LeaseCheckpoint
// rate limit
for i := 0; i < leaseCheckpointRate/2; i++ {
le.mu.Lock()
if le.isPrimary() {
cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
}
le.mu.Unlock()
if len(cps) != 0 {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
}
if len(cps) < maxLeaseCheckpointBatchSize {
return
}
}
}
我們可以看到對於revokeExpiredLeases()
和 checkpointScheduledLeases()
的操作,定時是500毫秒處理一次,直到收到退出的信息。
key 如何關聯 Lease
然后我們來分析下一個基於 Lease 特性實現檢測一個節點存活的過程
客戶端通過 Grant 創建一個 TTL 時間的 Lease
// etcd/client/v3/lease.go
func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
r := &pb.LeaseGrantRequest{TTL: ttl}
// 通過grpc低啊用服務端的創建函數
resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
if err == nil {
gresp := &LeaseGrantResponse{
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,
Error: resp.Error,
}
return gresp, nil
}
return nil, toErr(ctx, err)
}
客戶端創建的時候會通過 LeaseGrant 也就是grpc調用服務端的 Grant 的創建函數
來看下服務端的 Grant
// etcd/server/lease/lessor.go
func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
...
// TODO: when lessor is under high load, it should give out lease
// with longer TTL to reduce renew load.
l := &Lease{
ID: id,
ttl: ttl,
itemSet: make(map[LeaseItem]struct{}),
revokec: make(chan struct{}),
}
le.mu.Lock()
defer le.mu.Unlock()
// 檢查內存leaseMap是否存在這個lease
if _, ok := le.leaseMap[id]; ok {
return nil, ErrLeaseExists
}
// 這里有個ttl的最小值
if l.ttl < le.minLeaseTTL {
l.ttl = le.minLeaseTTL
}
if le.isPrimary() {
l.refresh(0)
} else {
l.forever()
}
le.leaseMap[id] = l
// 將 Lease 數據保存到 boltdb 的 Lease bucket 中
l.persistTo(le.b)
...
return l, nil
}
首先 Lessor 的 Grant 接口會把 Lease 保存到內存的 ItemMap 數據結構中,將數據數據保存到 boltdb 的 Lease bucket 中,返回給客戶端 leaseId
當然 Grant 只是申請了一個 Lease,將 key 和 Lease 進行關聯的操作是在 Attach 中完成的
// 將給定的租約附加到具有給定 LeaseID 的租約。
func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
le.mu.Lock()
defer le.mu.Unlock()
l := le.leaseMap[id]
if l == nil {
return ErrLeaseNotFound
}
l.mu.Lock()
// 將租約放到itemMap
// 一個租約是可以關聯多個key的
for _, it := range items {
l.itemSet[it] = struct{}{}
le.itemMap[it] = id
}
l.mu.Unlock()
return nil
}
一個 Lease 關聯的 key 集合是保存在內存中的,那么 etcd 重啟時,是如何知道每個 Lease 上關聯了哪些 key 呢?
答案是 etcd 的 MVCC 模塊在持久化存儲 key-value 的時候,保存到 boltdb 的 value 是個結構體(mvccpb.KeyValue), 它不僅包含你的 key-value 數據,還包含了關聯的 LeaseID 等信息。因此當 etcd 重啟時,可根據此信息,重建關聯各個 Lease 的 key 集合列表。
func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Lease)
_, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
for i := range vs {
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[i])
if err != nil {
tx.Unlock()
panic("failed to unmarshal lease proto item")
}
ID := LeaseID(lpb.ID)
if lpb.TTL < le.minLeaseTTL {
lpb.TTL = le.minLeaseTTL
}
le.leaseMap[ID] = &Lease{
ID: ID,
ttl: lpb.TTL,
// itemSet will be filled in when recover key-value pairs
// set expiry to forever, refresh when promoted
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
}
}
le.leaseExpiredNotifier.Init()
heap.Init(&le.leaseCheckpointHeap)
tx.Unlock()
le.b.ForceCommit()
}
Lease的續期
續期我們通過定期發送 KeepAlive 請求給 etcd 續期健康狀態的 Lease
// etcd/client/v3/lease.go
// KeepAlive嘗試保持給定的租約永久alive
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
l.mu.Lock()
// ensure that recvKeepAliveLoop is still running
select {
case <-l.donec:
err := l.loopErr
l.mu.Unlock()
close(ch)
return ch, ErrKeepAliveHalted{Reason: err}
default:
}
ka, ok := l.keepAlives[id]
if !ok {
// create fresh keep alive
ka = &keepAlive{
chs: []chan<- *LeaseKeepAliveResponse{ch},
ctxs: []context.Context{ctx},
deadline: time.Now().Add(l.firstKeepAliveTimeout),
nextKeepAlive: time.Now(),
donec: make(chan struct{}),
}
l.keepAlives[id] = ka
} else {
// add channel and context to existing keep alive
ka.ctxs = append(ka.ctxs, ctx)
ka.chs = append(ka.chs, ch)
}
l.mu.Unlock()
go l.keepAliveCtxCloser(ctx, id, ka.donec)
// 使用once只在第一次調用
l.firstKeepAliveOnce.Do(func() {
// 500毫秒一次,不斷的發送保持活動請求
go l.recvKeepAliveLoop()
// 刪除等待太久沒反饋的租約
go l.deadlineLoop()
})
return ch, nil
}
// deadlineLoop獲取在租約TTL中沒有收到響應的任何保持活動的通道
func (l *lessor) deadlineLoop() {
for {
select {
case <-time.After(time.Second):
// donec 關閉,當 recvKeepAliveLoop 停止時設置 loopErr
case <-l.donec:
return
}
now := time.Now()
l.mu.Lock()
for id, ka := range l.keepAlives {
if ka.deadline.Before(now) {
// 等待響應太久;租約可能已過期
ka.close()
delete(l.keepAlives, id)
}
}
l.mu.Unlock()
}
}
func (l *lessor) recvKeepAliveLoop() (gerr error) {
defer func() {
l.mu.Lock()
close(l.donec)
l.loopErr = gerr
for _, ka := range l.keepAlives {
ka.close()
}
l.keepAlives = make(map[LeaseID]*keepAlive)
l.mu.Unlock()
}()
for {
// resetRecv 打開一個新的lease stream並開始發送保持活動請求。
stream, err := l.resetRecv()
if err != nil {
if canceledByCaller(l.stopCtx, err) {
return err
}
} else {
for {
// 接收lease stream的返回返回
resp, err := stream.Recv()
if err != nil {
if canceledByCaller(l.stopCtx, err) {
return err
}
if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
l.closeRequireLeader()
}
break
}
// 根據LeaseKeepAliveResponse更新租約
// 如果租約過期刪除所有alive channels
l.recvKeepAlive(resp)
}
}
select {
case <-time.After(retryConnWait):
continue
case <-l.stopCtx.Done():
return l.stopCtx.Err()
}
}
}
// resetRecv 打開一個新的lease stream並開始發送保持活動請求。
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
sctx, cancel := context.WithCancel(l.stopCtx)
// 建立服務端和客戶端連接的lease stream
stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
if err != nil {
cancel()
return nil, err
}
l.mu.Lock()
defer l.mu.Unlock()
if l.stream != nil && l.streamCancel != nil {
l.streamCancel()
}
l.streamCancel = cancel
l.stream = stream
go l.sendKeepAliveLoop(stream)
return stream, nil
}
// sendKeepAliveLoop 在給定流的生命周期內發送保持活動請求
func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
for {
var tosend []LeaseID
now := time.Now()
l.mu.Lock()
for id, ka := range l.keepAlives {
if ka.nextKeepAlive.Before(now) {
tosend = append(tosend, id)
}
}
l.mu.Unlock()
for _, id := range tosend {
r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
if err := stream.Send(r); err != nil {
// TODO do something with this error?
return
}
}
select {
// 每500毫秒執行一次
case <-time.After(500 * time.Millisecond):
case <-stream.Context().Done():
return
case <-l.donec:
return
case <-l.stopCtx.Done():
return
}
}
}
關於續期的新能優化
對於 TTL 的選擇,TTL 過長會導致節點異常后,無法及時從 etcd 中刪除,影響服務可用性,而過短,則要求 client 頻繁發送續期請求。
etcd v3 通過復用 lease 和引入 gRPC,提高了續期的效率
1、etcd v3 版本引入了 lease,上面的代碼我們也可以看到,不同 key 若 TTL 相同,可復用同一個 Lease, 顯著減少了 Lease 數。
2、同時 etcd v3 版本引入了 gRPC 。通過 gRPC HTTP/2 實現了多路復用,流式傳輸,同一連接可支持為多個 Lease 續期,能夠大大減少連接數,提高續期的效率。
過期 Lease 的刪除
上面的代碼我們介紹了 etcd 在啟動 lease 的時候會啟動一個 goroutine revokeExpiredLeases(),他會沒500毫秒執行一次清除操作。
func (le *lessor) runLoop() {
defer close(le.doneC)
for {
// 函數最終調用expireExists來完成清除操作
le.revokeExpiredLeases()
le.checkpointScheduledLeases()
select {
case <-time.After(500 * time.Millisecond):
case <-le.stopC:
return
}
}
}
// expireExists returns true if expiry items exist.
// It pops only when expiry item exists.
// "next" is true, to indicate that it may exist in next attempt.
func (le *lessor) expireExists() (l *Lease, ok bool, next bool) {
if le.leaseExpiredNotifier.Len() == 0 {
return nil, false, false
}
item := le.leaseExpiredNotifier.Poll()
l = le.leaseMap[item.id]
if l == nil {
// lease has expired or been revoked
// no need to revoke (nothing is expiry)
le.leaseExpiredNotifier.Unregister() // O(log N)
return nil, false, true
}
now := time.Now()
if now.Before(item.time) /* item.time: expiration time */ {
// Candidate expirations are caught up, reinsert this item
// and no need to revoke (nothing is expiry)
return l, false, false
}
// recheck if revoke is complete after retry interval
item.time = now.Add(le.expiredLeaseRetryInterval)
le.leaseExpiredNotifier.RegisterOrUpdate(item)
return l, true, false
}
etcd Lease 高效淘汰方案最小堆的實現方法的,每次新增 Lease、續期的時候,它會插入、更新一個對象到最小堆中,對象含有 LeaseID 和其到期時間 unixnano,對象之間按到期時間升序排序。
etcd Lessor 主循環每隔 500ms 執行一次撤銷 Lease 檢查(RevokeExpiredLease),每次輪詢堆頂的元素,若已過期則加入到待淘汰列表,直到堆頂的 Lease 過期時間大於當前,則結束本輪輪詢。
Lessor 模塊會將已確認過期的 LeaseID,保存在一個名為 expiredC 的 channel 中,而 etcd server 的主循環會定期從 channel 中獲取 LeaseID,發起 revoke 請求,通過 Raft Log 傳遞給 Follower 節點。
各個節點收到 revoke Lease 請求后,獲取關聯到此 Lease 上的 key 列表,從 boltdb 中刪除 key,從 Lessor 的 Lease map 內存中刪除此 Lease 對象,最后還需要從 boltdb 的 Lease bucket 中刪除這個 Lease。
// revokeExpiredLeases finds all leases past their expiry and sends them to expired channel for
// to be revoked.
func (le *lessor) revokeExpiredLeases() {
var ls []*Lease
// rate limit
revokeLimit := leaseRevokeRate / 2
le.mu.RLock()
if le.isPrimary() {
ls = le.findExpiredLeases(revokeLimit)
}
le.mu.RUnlock()
if len(ls) != 0 {
select {
case <-le.stopC:
return
// 已經過期的lease會被放入到expiredC中,然后被上游進行處理
case le.expiredC <- ls:
default:
// the receiver of expiredC is probably busy handling
// other stuff
// let's try this next time after 500ms
}
}
}
checkpoint 機制
對於 lease 的處理都是發生在 leader 節點,如果leader節點掛掉了呢?我們知道會重新發起選舉選出新的 leader,那么問題就來了
當你的集群發生 Leader 切換后,新的 Leader 基於 Lease map 信息,按 Lease 過期時間構建一個最小堆時,etcd 早期版本為了優化性能,並未持久化存儲 Lease 剩余 TTL 信息,因此重建的時候就會自動給所有 Lease 自動續期了。
然而若較頻繁出現 Leader 切換,切換時間小於 Lease 的 TTL,這會導致 Lease 永遠無法刪除,大量 key 堆積,db 大小超過配額等異常。
為了解決這個問題,所以引入了 checkpoint 機制
一方面,etcd 啟動的時候,Leader 節點后台會運行此異步任務,定期批量地將 Lease 剩余的 TTL 基於 Raft Log 同步給 Follower 節點,Follower 節點收到 CheckPoint 請求后,更新內存數據結構 LeaseMap 的剩余 TTL 信息。
另一方面,當 Leader 節點收到 KeepAlive 請求的時候,它也會通過 checkpoint 機制把此 Lease 的剩余 TTL 重置,並同步給 Follower 節點,盡量確保續期后集群各個節點的 Lease 剩余 TTL 一致性。
總結
對於 TTL 的選擇,TTL 過長會導致節點異常后,無法及時從 etcd 中刪除,影響服務可用性,而過短,則要求 client 頻繁發送續期請求。
etcd v3 通過復用 lease 和引入 gRPC,提高了續期的效率
1、etcd v3 版本引入了 lease,上面的代碼我們也可以看到,不同 key 若 TTL 相同,可復用同一個 Lease, 顯著減少了 Lease 數。
2、同時 etcd v3 版本引入了 gRPC 。通過 gRPC HTTP/2 實現了多路復用,流式傳輸,同一連接可支持為多個 Lease 續期,能夠大大減少連接數,提高續期的效率。
Lease 中過期的刪除,使用的結構是最小堆,主循環每隔 500ms 執行一次撤銷 Lease 檢查(RevokeExpiredLease),每次輪詢堆頂的元素,若已過期則加入到待淘汰列表,直到堆頂的 Lease 過期時間大於當前,則結束本輪輪詢。
如果 leader 發生頻繁節點切換,切換時間小於 Lease 的 TTL,這會導致 Lease 永遠無法刪除,大量 key 堆積,db 大小超過配額等異常,引入了 checkpoint 機制。
參考
【Load Balancing in gRPC】https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
【文中的代碼示例】https://github.com/boilingfrog/etcd-learning/tree/main/discovery
【06 | 租約:如何檢測你的客戶端存活?】https://time.geekbang.org/column/article/339337
【ETCD中Lease如何續期】https://boilingfrog.github.io/2021/09/06/etcd中lease的續期/