菜鳥系列Fabric源碼學習 — 區塊同步


Fabric 1.4 源碼分析 區塊同步

本文主要從源碼層面介紹fabric peer同步區塊過程,peer同步區塊主要有2個過程:
1)peer組織的leader與orderer同步區塊
2)peer組織間peer同步區塊。

1. peer leader和orderer同步區塊

首先,orderer對外主要是broadcast和deliver兩個服務orderer服務介紹。並且我們知道peer和orderer同步區塊肯定是deliver服務實現的,但是到底是peer從orderer拉還是ordrer推送給peer呢?由於peer可以知道orderer信息(配置塊)並且是grpc服務,則推斷是peer從orderer拉區塊。如果是拉區塊,那么peer如何獲取區塊,獲取區塊的方式是什么?

1.1 Orderer Deliver服務

首先,查看orderer deliver服務是怎么運行的,是如何同步區塊的。
當deliver服務被調用時,轉到Handle()方法處理

func (h *Handler) Handle(ctx context.Context, srv *Server) error {
    ...
	for {
		logger.Debugf("Attempting to read seek info message from %s", addr)
		// 接受發來envelope
		envelope, err := srv.Recv()
        ...
        // 分發區塊
		status, err := h.deliverBlocks(ctx, srv, envelope)
		...
	}
}

其中,srv.Recv()接收envelope,在根據envelope信息分發block。

func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) {
	addr := util.ExtractRemoteAddress(ctx)
	payload, err := utils.UnmarshalPayload(envelope.Payload)
	if payload.Header == nil {
		logger.Warningf("Malformed envelope received from %s with bad header", addr)
		return cb.Status_BAD_REQUEST, nil
	}
	chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
	err = h.validateChannelHeader(ctx, chdr)
	chain := h.ChainManager.GetChain(chdr.ChannelId)
	defer func() {
		labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS))
		h.Metrics.RequestsCompleted.With(labels...).Add(1)
	}()

	accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, crypto.ExpiresAt)
	if err != nil {
		logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err)
		return cb.Status_BAD_REQUEST, nil
	}

	if err := accessControl.Evaluate(); err != nil {
		logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
		return cb.Status_FORBIDDEN, nil
	}

	seekInfo := &ab.SeekInfo{}
    // 返回迭代器及起始區塊號
	cursor, number := chain.Reader().Iterator(seekInfo.Start)
	defer cursor.Close()
	var stopNum uint64
	switch stop := seekInfo.Stop.Type.(type) {
	case *ab.SeekPosition_Oldest:
		stopNum = number
	case *ab.SeekPosition_Newest:
		stopNum = chain.Reader().Height() - 1
	case *ab.SeekPosition_Specified:
		stopNum = stop.Specified.Number
		if stopNum < number {
			logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
			return cb.Status_BAD_REQUEST, nil
		}
	}

	for {
		if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
			if number > chain.Reader().Height()-1 {
				return cb.Status_NOT_FOUND, nil
			}
		}

		var block *cb.Block
		var status cb.Status

		iterCh := make(chan struct{})
		go func() {
		    // 獲取區塊
			block, status = cursor.Next()
			close(iterCh)
		}()

		select {
		case <-ctx.Done():
			logger.Debugf("Context canceled, aborting wait for next block")
			return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
		case <-erroredChan:
			// TODO, today, the only user of the errorChan is the orderer consensus implementations.  If the peer ever reports
			// this error, we will need to update this error message, possibly finding a way to signal what error text to return.
			logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error")
			return cb.Status_SERVICE_UNAVAILABLE, nil
		case <-iterCh:
			// Iterator has set the block and status vars
		}

		if status != cb.Status_SUCCESS {
			logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
			return status, nil
		}

		// increment block number to support FAIL_IF_NOT_READY deliver behavior
		number++

		if err := accessControl.Evaluate(); err != nil {
			logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
			return cb.Status_FORBIDDEN, nil
		}

		logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr)
        // 發送區塊
		if err := srv.SendBlockResponse(block); err != nil {
			logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
			return cb.Status_INTERNAL_SERVER_ERROR, err
		}

		h.Metrics.BlocksSent.With(labels...).Add(1)
        // 如果到了client請求對最后區塊跳出循環
		if stopNum == block.Header.Number {
			break
		}
	}

	logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)

	return cb.Status_SUCCESS, nil
}
  1. 反序列化envelope.Payload
  2. 對payload.Header和ChannelHeader進行驗證
  3. 根據通道獲取對應對chain
  4. 訪問控制相關驗證,policy,signature
  5. 從payload.data解析出SeekInfo
  6. 新建一個迭代器cursor
  7. 通過stop.type判斷stopNum()
  8. cursor.Next()獲取下個區塊及SendBlockResponse()發送區塊
  9. 判斷是否達到請求的最后區塊,是就跳過循環
// Chain encapsulates chain operations and data.
type Chain interface {
	// Sequence returns the current config sequence number, can be used to detect config changes
	Sequence() uint64

	// PolicyManager returns the current policy manager as specified by the chain configuration
	PolicyManager() policies.Manager

	// Reader returns the chain Reader for the chain
	Reader() blockledger.Reader

	// Errored returns a channel which closes when the backing consenter has errored
	Errored() <-chan struct{}
}
type SeekInfo struct {
	Start                *SeekPosition         `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"`
	Stop                 *SeekPosition         `protobuf:"bytes,2,opt,name=stop,proto3" json:"stop,omitempty"`
	Behavior             SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,proto3,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"`
	XXX_NoUnkeyedLiteral struct{}              `json:"-"`
	XXX_unrecognized     []byte                `json:"-"`
	XXX_sizecache        int32                 `json:"-"`
}

1.2 peer從orderer同步區塊

這里主要解決1個問題:peer如何觸發orderer deliver服務?即peer和orderer怎么同步區塊的?
在介紹之前參閱peer節點啟動流程。在peer節點啟動過程中會執行peer.Initialize()方法,對peer所在的所有chain實例化。其中調用了createChain()接口創建鏈對象。在createChain()方法中調用了GossipService.InitializeChannel()方法。然后調用g.deliveryService[chainID].StartDeliverForChannel()方法獲取區塊。

func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
	d.lock.Lock()
	defer d.lock.Unlock()
	if d.stopping {
		errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID)
		logger.Errorf(errMsg)
		return errors.New(errMsg)
	}
	if _, exist := d.blockProviders[chainID]; exist {
		errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
		logger.Errorf(errMsg)
		return errors.New(errMsg)
	} else {
		client := d.newClient(chainID, ledgerInfo)
		logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
		// 創建區塊deliver實例
		d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
		// 執行
		go d.launchBlockProvider(chainID, finalizer)
	}
	return nil
}

其中newClient()創建一個broadcastClient,傳入參數為requester.RequestBlocks(ledgerInfoProvider)方法。很顯然,peer是通過該方法獲取區塊的,那么該方法主要實現是什么?

func (b *blocksRequester) RequestBlocks(ledgerInfoProvider blocksprovider.LedgerInfo) error {
	height, err := ledgerInfoProvider.LedgerHeight()
	if err != nil {
		logger.Errorf("Can't get ledger height for channel %s from committer [%s]", b.chainID, err)
		return err
	}

	if height > 0 {
		logger.Debugf("Starting deliver with block [%d] for channel %s", height, b.chainID)
		if err := b.seekLatestFromCommitter(height); err != nil {
			return err
		}
	} else {
		logger.Debugf("Starting deliver with oldest block for channel %s", b.chainID)
		if err := b.seekOldest(); err != nil {
			return err
		}
	}

	return nil
}

調用了seek_XXX方法,其中

type SeekInfo struct {
	Start                *SeekPosition         `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"`
	Stop                 *SeekPosition         `protobuf:"bytes,2,opt,name=stop,proto3" json:"stop,omitempty"`
	Behavior             SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,proto3,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"`
	XXX_NoUnkeyedLiteral struct{}              `json:"-"`
	XXX_unrecognized     []byte                `json:"-"`
	XXX_sizecache        int32                 `json:"-"`
}
func (b *blocksRequester) seekOldest() error {
	seekInfo := &orderer.SeekInfo{
		Start:    &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
		Stop:     &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
		Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
	}

	//TODO- epoch and msgVersion may need to be obtained for nowfollowing usage in orderer/configupdate/configupdate.go
	msgVersion := int32(0)
	epoch := uint64(0)
	tlsCertHash := b.getTLSCertHash()
	env, err := utils.CreateSignedEnvelopeWithTLSBinding(common.HeaderType_DELIVER_SEEK_INFO, b.chainID, localmsp.NewSigner(), seekInfo, msgVersion, epoch, tlsCertHash)
	if err != nil {
		return err
	}
	// 發送envelope給orderer獲取區塊
	return b.client.Send(env)
}

從其中RequestBlocks()調用的2個方法可知,seekInfo的stopNum都為math.MaxUint64,則該方法會持續請求區塊知道最大值(可以看作現在到未來的所有區塊)。

上文可知,broadcastClient已經實例化,並且通過調用broadcastClient.onConnect向orderer發送獲取區塊的envelope。在實例化后,調用launchBlockProvider。然后會調用 pb.DeliverBlocks()方法(開始獲取區塊)。

type broadcastClient struct {
	stopFlag     int32
	stopChan     chan struct{}
	createClient clientFactory
	shouldRetry  retryPolicy
	onConnect    broadcastSetup
	prod         comm.ConnectionProducer

	mutex           sync.Mutex
	blocksDeliverer blocksprovider.BlocksDeliverer
	conn            *connection
	endpoint        string
}
// DeliverBlocks used to pull out blocks from the ordering service to
// distributed them across peers
func (b *blocksProviderImpl) DeliverBlocks() {
	errorStatusCounter := 0
	statusCounter := 0
	defer b.client.Close()
	for !b.isDone() {
	    // 接收orderer分發的區塊
		msg, err := b.client.Recv()
		if err != nil {
			logger.Warningf("[%s] Receive error: %s", b.chainID, err.Error())
			return
		}
		switch t := msg.Type.(type) {
		case *orderer.DeliverResponse_Status:
			if t.Status == common.Status_SUCCESS {
				logger.Warningf("[%s] ERROR! Received success for a seek that should never complete", b.chainID)
				return
			}
			if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN {
				logger.Errorf("[%s] Got error %v", b.chainID, t)
				errorStatusCounter++
				if errorStatusCounter > b.wrongStatusThreshold {
					logger.Criticalf("[%s] Wrong statuses threshold passed, stopping block provider", b.chainID)
					return
				}
			} else {
				errorStatusCounter = 0
				logger.Warningf("[%s] Got error %v", b.chainID, t)
			}
			maxDelay := float64(maxRetryDelay)
			currDelay := float64(time.Duration(math.Pow(2, float64(statusCounter))) * 100 * time.Millisecond)
			time.Sleep(time.Duration(math.Min(maxDelay, currDelay)))
			if currDelay < maxDelay {
				statusCounter++
			}
			if t.Status == common.Status_BAD_REQUEST {
				b.client.Disconnect(false)
			} else {
				b.client.Disconnect(true)
			}
			continue
		case *orderer.DeliverResponse_Block:
			errorStatusCounter = 0
			statusCounter = 0
			blockNum := t.Block.Header.Number

			marshaledBlock, err := proto.Marshal(t.Block)
			if err != nil {
				logger.Errorf("[%s] Error serializing block with sequence number %d, due to %s", b.chainID, blockNum, err)
				continue
			}
			if err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), blockNum, marshaledBlock); err != nil {
				logger.Errorf("[%s] Error verifying block with sequnce number %d, due to %s", b.chainID, blockNum, err)
				continue
			}

			numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
			// Create payload with a block received
			payload := createPayload(blockNum, marshaledBlock)
			// Use payload to create gossip message
			gossipMsg := createGossipMsg(b.chainID, payload)

			logger.Debugf("[%s] Adding payload to local buffer, blockNum = [%d]", b.chainID, blockNum)
			// Add payload to local state payloads buffer
			if err := b.gossip.AddPayload(b.chainID, payload); err != nil {
				logger.Warningf("Block [%d] received from ordering service wasn't added to payload buffer: %v", blockNum, err)
			}

			// Gossip messages with other nodes
			logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, blockNum, numberOfPeers)
		
			if !b.isDone() {
			    // peer節點間通過gossip同步區塊
				b.gossip.Gossip(gossipMsg)
			}
		default:
			logger.Warningf("[%s] Received unknown: %v", b.chainID, t)
			return
		}
	}
}

DeliverBlocks()方法介紹,首先調用 b.client.Recv()接收orderer傳過來的響應,

// Recv receives a message from the ordering service
func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) {
	o, err := bc.try(func() (interface{}, error) {
		if bc.shouldStop() {
			return nil, errors.New("closing")
		}
		return bc.tryReceive()
	})
	if err != nil {
		return nil, err
	}
	return o.(*orderer.DeliverResponse), nil
}

這里我們知道大致是peer從orderer這里拉區塊的,但是還存在疑問,那就是peer如何觸發orderer的deliver服務的?peer是如何調用requestBlock方法的?

  • peer 調用deliver服務
    從Recv()接口會調用try(),try()會調用bc.doAction(action, resetAttemptCounter),然后調用bc.connect(),里面調用bc.createClient(conn).Deliver(ctx),從而peer調用了orderer的deliver服務。
  • peer 調用requestBlock方法
    從上面可知,requestBlock賦值給broadcastSetup,而broadcastSetup在連接orderer后會立即調用。
// broadcastSetup is a function that is called by the broadcastClient immediately after each
// successful connection to the ordering service

1.3 組織間peer同步區塊

peer間同步區塊是通過gossip服務來同步的,並且通過上述代碼可知,leader和orderer同步區塊也是伴隨着gossip服務啟動(不過是屬於leader的)。

	// Gossip messages with other nodes
	logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, blockNum, numberOfPeers)

	if !b.isDone() {
	    // peer節點間通過gossip同步區塊
		b.gossip.Gossip(gossipMsg)
	}

1.4 peer是如何寫區塊

首先,其他peer是通過gossip服務同步區塊,則保存區塊應該是在gossip服務里面調用的,回到peer啟動時gossip服務的設置

service.GetGossipService().InitializeChannel(bundle.ConfigtxValidator().ChainID(), ordererAddresses, service.Support{
		Validator:            validator,
		Committer:            c,
		Store:                store,
		Cs:                   simpleCollectionStore,
		IdDeserializeFactory: csStoreSupport,
	})
	

里面會調用

	g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator,
		g.metrics.StateMetrics, getStateConfiguration())

里面會調用

	// Listen for incoming communication
	go s.listen()
	// Deliver in order messages into the incoming channel
	go s.deliverPayloads()

deliverPayloads()會將gossip.payload 區塊給寫入賬本。

func (s *GossipStateProviderImpl) deliverPayloads() {
	defer s.done.Done()

	for {
		select {
		// Wait for notification that next seq has arrived
		case <-s.payloads.Ready():
			logger.Debugf("[%s] Ready to transfer payloads (blocks) to the ledger, next block number is = [%d]", s.chainID, s.payloads.Next())
			// Collect all subsequent payloads
			for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
				rawBlock := &common.Block{}
				if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {
					logger.Errorf("Error getting block with seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
					continue
				}
				if rawBlock.Data == nil || rawBlock.Header == nil {
					logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)",
						payload.SeqNum, rawBlock.Header, rawBlock.Data)
					continue
				}
				logger.Debugf("[%s] Transferring block [%d] with %d transaction(s) to the ledger", s.chainID, payload.SeqNum, len(rawBlock.Data.Data))

				// Read all private data into slice
				var p util.PvtDataCollections
				if payload.PrivateData != nil {
					err := p.Unmarshal(payload.PrivateData)
					if err != nil {
						logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err))
						continue
					}
				}
				// 此處會保存區塊
				if err := s.commitBlock(rawBlock, p); err != nil {
					if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
						logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
						return
					}
					logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
				}
			}
		case <-s.stopCh:
			s.stopCh <- struct{}{}
			logger.Debug("State provider has been stopped, finishing to push new blocks.")
			return
		}
	}
}

總結

  1. peer啟動時會啟動gossip服務模塊
  2. leader啟動gossip服務時會從orderer源源不斷的請求塊
  3. peer之間通過gossip同步區塊,當leader獲取區塊后會通過區塊
  4. peer當gossip模塊中會將gossip.payload區塊信息寫入賬本
有興趣的關注IT程序員客棧哦


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM