線性一致性
CAP
什么是CAP
在聊什么是線性一致性的時候,我們先來看看什么是CAP
CAP理論:一個分布式系統最多只能同時滿足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance)這三項中的兩項。
- 1、一致性(Consistency)
一致性表示所有客戶端同時看到相同的數據,無論它們連接到哪個節點。 要做到這一點,每當數據寫入一個節點時,就必須立即將其轉發或復制到系統中的所有其他節點,然后寫操作才被視為“成功”。
- 2、可用性(Availability)
可用性表示發出數據請求的任何客戶端都會得到響應,即使一個或多個節點宕機。 可用性的另一種狀態:分布式系統中的所有工作節點都返回任何請求的有效響應,而不會發生異常。
- 3、分區容錯性(Partition tolerance)
分區是分布式系統中的通信中斷 - 兩個節點之間的丟失連接或連接臨時延遲。 分區容錯意味着集群必須持續工作,無論系統中的節點之間有任何數量的通信中斷。
CAP的權衡
根據定理,分布式系統只能滿足三項中的兩項而不可能滿足全部三項。
AP wihtout C
允許分區下的高可用,就需要放棄一致性。一旦分區發生,節點之間可能會失去聯系,為了高可用,每個節點只能用本地數據提供服務,而這樣會導致全局數據的不一致性。
CA without P
如果不會出現分區,一直性和可用性是可以同時保證的。但是我們現在的系統基本上是都是分布式的,也就是我們的服務肯定是被多台機器所提供的,所以分區就難以避免。
CP without A
如果不要求A(可用),相當於每個請求都需要在Server之間強一致,而P(分區)會導致同步時間無限延長,如此CP也是可以保證的。
線性一致性
線性一致性又叫做原子一致性,強一致性。線性一致性可以看做只有一個單核處理器,或者可以看做只有一個數據副本,並且所有操作都是原子的。在可線性化的分布式系統中,如果某個節點更新了數據,那么在其他節點如果都能讀取到這個最新的數據。可以看見線性一致性和我們的CAP中的C是一致的。
舉個非線性一致性的例子,比如有個秒殺活動,你和你的朋友同時去搶購一樣東西,有可能他那里的庫存已經沒了,但是在你手機上顯示還有幾件,這個就違反了線性一致性,哪怕過了一會你的手機也顯示庫存沒有,也依然是違反了。
etcd中如何實現線性一致性
線性一致性寫
所有的寫操作,都要經過leader節點,一旦leader被選舉成功,就可以對客戶端提供服務了。客戶端提交每一條命令都會被按順序記錄到leader的日志中,每一條命令都包含term編號和順序索引,然后向其他節點並行發送AppendEntries RPC用以復制命令(如果命令丟失會不斷重發),當復制成功也就是大多數節點成功復制后,leader就會提交命令,即執行該命令並且將執行結果返回客戶端,raft保證已經提交的命令最終也會被其他節點成功執行。具體源碼參見日志同步
因為日志是順序記錄的,並且有嚴格的確認機制,所以可以認為寫是滿足線性一致性的。
由於在Raft算法中,寫操作成功僅僅意味着日志達成了一致(已經落盤),而並不能確保當前狀態機也已經apply了日志。狀態機apply日志的行為在大多數Raft算法的實現中都是異步的,所以此時讀取狀態機並不能准確反應數據的狀態,很可能會讀到過期數據。
如何實現讀取的線性一致性,就需要引入ReadIndex了
線性一致性讀
ReadIndex算法:
每次讀操作的時候記錄此時集群的commited index,當狀態機的apply index大於或等於commited index時才讀取數據並返回。由於此時狀態機已經把讀請求發起時的已提交日志進行了apply動作,所以此時狀態機的狀態就可以反應讀請求發起時的狀態,符合線性一致性讀的要求。
Leader執行ReadIndex大致的流程如下:
- 1、記錄當前的commit index,稱為ReadIndex;
所有的請求都會交給leader,如果follower收到讀請求,會將請求forward給leader
- 2、向 Follower 發起一次心跳,如果大多數節點回復了,那就能確定現在仍然是Leader;
確認當前leader的狀態,避免當前節點狀態切換,數據不能及時被同步更新
比如發生了網絡分區:可參見網絡分區問題
1、當前的leader被分到了小的分區中,然后大的集群中有數據更新,小的集群是無感知的,如果讀的請求被定位到小的集群中,所以讀取就可能讀取到舊的數據。
2、小集群中的數據同樣是不能被寫入信息的,提交給該集群的指令對應的日志因不滿足過半數的條件而無法被提交。
3、說以只有當前節點是集群中有效的leader才可以,也就是能收到大多數節點的回復信息。
- 3、等待狀態機的apply index大於或等於commited index時才讀取數據;
apply index大於或等於commited index就能表示當前狀態機已經把讀請求發起時的已提交日志進行了apply動作,所以此時狀態機的狀態就可以反應讀請求發起時的狀態,滿足一致性讀;
- 4、執行讀請求,將結果返回給Client。
進一步來看下etcd的源碼是如何實現的呢
1、客戶端的get請求
func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
r, err := kv.Do(ctx, OpGet(key, opts...))
return r.get, toErr(ctx, err)
}
// OpGet returns "get" operation based on given key and operation options.
func OpGet(key string, opts ...OpOption) Op {
// WithPrefix and WithFromKey are not supported together
if IsOptsWithPrefix(opts) && IsOptsWithFromKey(opts) {
panic("`WithPrefix` and `WithFromKey` cannot be set at the same time, choose one")
}
ret := Op{t: tRange, key: []byte(key)}
ret.applyOpts(opts)
return ret
}
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t {
case tRange:
var resp *pb.RangeResponse
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
...
}
return OpResponse{}, toErr(ctx, err)
}
func (c *kVClient) Range(ctx context.Context, in *RangeRequest, opts ...grpc.CallOption) (*RangeResponse, error) {
out := new(RangeResponse)
err := c.cc.Invoke(ctx, "/etcdserverpb.KV/Range", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
service KV {
// Range gets the keys in the range from the key-value store.
rpc Range(RangeRequest) returns (RangeResponse) {
option (google.api.http) = {
post: "/v3/kv/range"
body: "*"
};
}
}
可以看到get的請求最終通過通過rpc發送到Range
2、服務端響應讀取請求
// etcd/server/etcdserver/v3_server.go
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
...
// 判斷是否需要serializable read
// Serializable為true表示需要serializable read
// serializable read 會直接讀取當前節點的數據返回給客戶端,它並不能保證返回給客戶端的數據是最新的
// Serializable為false表示需要linearizable read
// Linearizable Read 需要阻塞等待直到讀到最新的數據
if !r.Serializable {
err = s.linearizableReadNotify(ctx)
trace.Step("agreement among raft nodes before linearized reading")
if err != nil {
return nil, err
}
}
...
return resp, err
}
// etcd/server/etcdserver/v3_server.go
func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
s.readMu.RLock()
nc := s.readNotifier
s.readMu.RUnlock()
// linearizableReadLoop會阻塞監聽readwaitc
// 這邊寫入一個空結構體到readwaitc中,linearizableReadLoop就會開始結束阻塞開始工作
select {
case s.readwaitc <- struct{}{}:
default:
}
// wait for read state notification
select {
case <-nc.c:
return nc.err
case <-ctx.Done():
return ctx.Err()
case <-s.done:
return ErrStopped
}
}
// start會啟動一個linearizableReadLoop
func (s *EtcdServer) Start() {
...
s.GoAttach(s.linearizableReadLoop)
...
}
// etcd/server/etcdserver/v3_server.go
func (s *EtcdServer) linearizableReadLoop() {
for {
requestId := s.reqIDGen.Next()
leaderChangedNotifier := s.LeaderChangedNotify()
select {
case <-leaderChangedNotifier:
continue
// 在client發起一次Linearizable Read的時候,會向readwaitc寫入一個空的結構體作為信號
case <-s.readwaitc:
case <-s.stopping:
return
}
...
// 處理不同的消息
// 這里會監聽readwaitc,等待MsgReadIndex信息的處理結果
// 同時獲取當前已提交的日志索引
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
if isStopped(err) {
return
}
if err != nil {
nr.notify(err)
continue
}
...
// 此處是重點
// 等待 apply index >= read index
if appliedIndex < confirmedIndex {
select {
case <-s.applyWait.Wait(confirmedIndex):
case <-s.stopping:
return
}
}
// 發出可以進行讀取狀態機的信號
nr.notify(nil)
...
}
}
總結:
服務端對於讀的操作,如果是Linearizable Read,也就是線性一致性的讀,最終會通過linearizableReadLoop,監聽readwaitc來觸發,阻塞直到apply index >= read index,最終發送可以讀取的信息。
3、raft中如何處理一個讀的請求
linearizableReadLoop收到readwaitc,最終會調用sendReadIndex
// etcd/server/etcdserver/v3_server.go
func (s *EtcdServer) sendReadIndex(requestIndex uint64) error {
...
err := s.r.ReadIndex(cctx, ctxToSend)
...
return nil
}
// etcd/raft/node.go
func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
}
通過MsgReadIndex的消息來發送讀的請求
-
如果follower收到了客戶端的MsgReadIndex類型的消息,因為客戶端不能處理只讀請求,需要將消息轉發到leader節點進行處理;
-
如果是leader收到了MsgReadIndex;
1、如果消息來自客戶端,直接寫入到readStates,start函數會將readStates中最后的一個放到readStateC,通知上游的處理結果;
2、如果消息來自follower,通過消息MsgReadIndexResp回復follower的響應結果,同時follower也是會將readStates中最后的一個放到readStateC,通知上游的處理結果;
上面的linearizableReadLoop監聽readStateC,當收到請求,獲取當前leader已經提交的日志索引,然后等待直到狀態機已應用索引 (applied index) 大於等於 Leader 的已提交索引時 (committed Index),然后去通知讀請求,數據已趕上 Leader,就可以去狀態機中訪問數據了,處理數據返回給客戶端
我們知道ReadIndex算法中,leader節點需要,向follower節點發起心跳,確認自己的leader地位,具體的就是通過ReadOnly來實現,下面會一一介紹到
如果follower收到只讀的消息
follower會將消息轉發到leader
// etcd/raft/raft.go
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
...
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return nil
}
// 目標為leader
m.To = r.lead
// 轉發信息
r.send(m)
}
...
return nil
}
再來看下leader是如何處理的
// etcd/raft/raft.go
func stepLeader(r *raft, m pb.Message) error {
// These message types do not require any progress for m.From.
switch m.Type {
case pb.MsgReadIndex:
...
sendMsgReadIndexResponse(r, m)
return nil
}
return nil
}
// raft結構體中的readOnly作用是批量處理只讀請求,只讀請求有兩種模式,分別是ReadOnlySafe和ReadOnlyLeaseBased
// ReadOnlySafe是ETCD作者推薦的模式,因為這種模式不受節點之間時鍾差異和網絡分區的影響
// 線性一致性讀用的就是ReadOnlySafe
func sendMsgReadIndexResponse(r *raft, m pb.Message) {
switch r.readOnly.option {
// If more than the local vote is needed, go through a full broadcast.
case ReadOnlySafe:
// 清空readOnly中指定消息ID及之前的所有記錄
// 開啟leader向follower的確認機制
r.readOnly.addRequest(r.raftLog.committed, m)
// recvAck通知只讀結構raft狀態機已收到對附加只讀請求上下文的心跳信號的確認。
// 也就是記錄下只讀的請求
r.readOnly.recvAck(r.id, m.Entries[0].Data)
// leader 節點向其他節點發起廣播
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
r.send(resp)
}
}
}
這里省略follower對leader節點的心跳回應,直接看leader對心跳回執的信息處理
func stepLeader(r *raft, m pb.Message) error {
// All other message types require a progress for m.From (pr).
pr := r.prs.Progress[m.From]
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
}
switch m.Type {
case pb.MsgHeartbeatResp:
...
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
}
// 判斷leader有沒有收到大多數節點的確認
// 也就是ReadIndex算法中,leader節點得到follower的確認,證明自己目前還是Leader
if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
// 收到了響應節點超過半數,會清空readOnly中指定消息ID及之前的所有記錄
rss := r.readOnly.advance(m)
// 返回follower的心跳回執
for _, rs := range rss {
if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
r.send(resp)
}
}
}
return nil
}
// responseToReadIndexReq 為 `req` 構造一個響應。如果`req`來自對等方
// 本身,將返回一個空值。
func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
// 通過from來判斷該消息是否是follower節點轉發到leader中的
...
// 如果是其他follower節點轉發到leader節點的MsgReadIndex消息
// leader會回向follower節點返回響應的MsgReadIndexResp消息,follower會響應給client
return pb.Message{
Type: pb.MsgReadIndexResp,
To: req.From,
Index: readIndex,
Entries: req.Entries,
}
}
然后follower收到響應,將MsgReadIndex消息中的已提交位置和消息id封裝成ReadState實例,添加到readStates
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return nil
}
// 將MsgReadIndex消息中的已提交位置和消息id封裝成ReadState實例,添加到readStates
// raft 模塊也有一個 for-loop 的 goroutine,來讀取該數組,並對MsgReadIndex進行響應
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}
return nil
}
raft 模塊有一個for-loop的goroutine,來讀取該數組,並對MsgReadIndex進行響應,將ReadStates中的最后一項將寫入到readStateC中,通知監聽readStateC的linearizableReadLoop函數的結果。
// etcd/server/etcdserver/raft.goetcd/raft/node.go
func (r *raftNode) start(rh *raftReadyHandler) {
internalTimeout := time.Second
go func() {
defer r.onStop()
islead := false
for {
select {
case rd := <-r.Ready():
...
if len(rd.ReadStates) != 0 {
select {
// ReadStates中最后意向將會被寫入到readStateC中
// linearizableReadLoop會監聽readStateC,獲取MsgReadIndex的處理信息
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-time.After(internalTimeout):
r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
case <-r.stopped:
return
}
}
...
}
}
}()
}
如果leader收到只讀請求
func stepLeader(r *raft, m pb.Message) error {
// All other message types require a progress for m.From (pr).
pr := r.prs.Progress[m.From]
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
}
switch m.Type {
case pb.MsgReadIndex:
// 表示當前只有一個節點,當前節點就是leader
if r.prs.IsSingleton() {
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
r.send(resp)
}
return nil
}
...
return nil
}
return nil
}
// responseToReadIndexReq 為 `req` 構造一個響應。如果`req`來自對等方
// 本身,將返回一個空值。
func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
// 通過from來判斷該消息是否是follower節點轉發到leader中的
// 如果是客戶端直接發到leader節點的消息,將MsgReadIndex消息中的已提交位置和消息id封裝成ReadState實例,添加到readStates
// raft 模塊也有一個 for-loop 的 goroutine,來讀取該數組,並對MsgReadIndex進行響應
if req.From == None || req.From == r.id {
r.readStates = append(r.readStates, ReadState{
Index: readIndex,
RequestCtx: req.Entries[0].Data,
})
return pb.Message{}
}
...
}
如果當前只有一個節點,那么當前的節點也是leader節點,所有的只讀請求,將會發送到leader,leader直接對信息進行處理
如何從狀態機中訪問數據了,就需要了解下MVCC了
MVCC
Multiversion concurrency control簡稱MVCC。這個模塊是為了解決 etcd v2 不支持保存 key 的歷史版本、不支持多 key 事務等問題而產生的。
它核心由內存樹形索引模塊 (treeIndex) 和嵌入式的 KV 持久化存儲庫 boltdb 組成。
那么 etcd 如何基於 boltdb 保存一個 key 的多個歷史版本呢?
每次修改操作,生成一個新的版本號 (revision),以版本號為 key, value 為用戶 key-value 等信息組成的結構體。boltdb 的 key 是全局遞增的版本號 (revision),value 是用戶 key、value 等字段組合成的結構體,然后通過 treeIndex 模塊來保存用戶 key 和版本號的映射關系。
treeIndex 與 boltdb 關系如下面的讀事務流程圖所示,從 treeIndex 中獲取 key hello 的版本號,再以版本號作為 boltdb 的 key,從 boltdb 中獲取其 value 信息。
treeIndex
treeIndex 模塊是基於 Google 開源的內存版 btree 庫實現的
treeIndex 模塊只會保存用戶的 key 和相關版本號信息,用戶 key 的 value 數據存儲在 boltdb 里面,相比 ZooKeeper 和 etcd v2 全內存存儲,etcd v3 對內存要求更低。
buffer
在獲取到版本號信息后,就可從 boltdb 模塊中獲取用戶的 key-value 數據了。不過有一點你要注意,並不是所有請求都一定要從 boltdb 獲取數據。etcd 出於數據一致性、性能等考慮,在訪問 boltdb 前,首先會從一個內存讀事務 buffer 中,二分查找你要訪問 key 是否在 buffer 里面,若命中則直接返回。
boltdb
boltdb 使用 B+ tree 來組織用戶的 key-value 數據,獲取 bucket key 對象后,通過 boltdb 的游標 Cursor 可快速在 B+ tree 找到 key hello 對應的 value 數據,返回給 client。
總結
etcd中對於寫的請求,因為所有的寫請求都是通過leader的,leader的確認機制將會保證消息復制到大多數節點中;
對於只讀的請求,同樣也是需要全部轉發到leader節點中,通過ReadIndex算法,來實現線性一致性讀;
raft執行ReadIndex大致的流程如下:
-
1、記錄當前的commit index,稱為ReadIndex;
-
2、向Follower發起一次心跳,如果大多數節點回復了,那就能確定現在仍然是Leader;
-
3、等待狀態機的apply index大於或等於commited index時才讀取數據;
-
4、執行讀請求,將結果返回給Client。
關於狀態機數據的讀取,首先從treeIndex獲取版本號,然后在buffer是否有對應的值,沒有就去boltdb查詢對應的值
參考
【CAP定理】https://zh.wikipedia.org/wiki/CAP定理
【CAP定理】https://www.ibm.com/cn-zh/cloud/learn/cap-theorem
【線性一致性:什么是線性一致性?】https://zhuanlan.zhihu.com/p/42239873
【什么是數據一致性】https://github.com/javagrowing/JGrowing/blob/master/分布式/談談數據一致性.md
【etcd 中線性一致性讀的具體實現】https://zhengyinyong.com/post/etcd-linearizable-read-implementation/
