菜鳥系列Fabric源碼學習 — orderer服務啟動


Fabric 1.4 orderer 服務啟動流程

1.提要

orderer提供broadcast和deliver兩個服務接口。orderer節點與各個peer節點通過grpc連接,orderer將所有peer節點通過broadcast發來的交易(Envelope格式,比如peer部署后的數據)按照配置的大小依次封裝到一個個block中並寫入orderer自己的賬本中,然后供各個peer節點的gossip服務通過deliver來消費這個賬本中的數據進行自身結點賬本的同步。

2.初始化過程

先看看main函數。

// Main is the entry point of orderer process
func Main() {
	fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))

	// "version" command
	if fullCmd == version.FullCommand() {
		fmt.Println(metadata.GetVersionInfo())
		return
	}

	conf, err := localconfig.Load()
	if err != nil {
		logger.Error("failed to parse config: ", err)
		os.Exit(1)
	}
	initializeLogging()
	initializeLocalMsp(conf)

	prettyPrintStruct(conf)
	Start(fullCmd, conf)
}

從中可知 orderer服務命令行是通過kingpin來實現的,基本上只是簡單使用了下,也只實現了3個命令:

  start*
    Start the orderer node
  version
    Show version information
  benchmark
    Run orderer in benchmark mode

並且從上述main函數可知,僅version有對應操作,而orderer 默認為orderer start。

啟動流程為:

  1. 加載配置(orderer.yaml/Defaults/環境變量)
  2. 初始化log(log級別和log格式)
  3. 初始化本地msp
  4. 啟動服務Start()

接下來主要關注第4步。前面基本上是配置初始化第過程。
查看一下start函數:

  1. 從配置文件啟動塊路徑獲取配置塊及驗證是否可作為配置塊(系統通道第一個塊)
  2. 集群相關初始化配置
  3. 判斷是否是raft共識及使用的是最新配置塊,如果是,則進行下列流程:
    1. 獲取所有應用鏈及其創世區塊塊(discoverChannels)
    2. 根據orderer是否在應用鏈配置塊的raft節點中分類(channelsToPull topull/nottopull)
    3. 創建所有的應用通道賬本
    4. 獲取topull應用通道的賬本(從orderer處獲取)
    5. 獲取系統通道賬本
if clusterType && conf.General.GenesisMethod == "file" {
		r.replicateIfNeeded(bootstrapBlock)
	}
	
func (r *Replicator) ReplicateChains() []string {
	var replicatedChains []string
	channels := r.discoverChannels()
	pullHints := r.channelsToPull(channels)
	totalChannelCount := len(pullHints.channelsToPull) + len(pullHints.channelsNotToPull)

	for _, channels := range [][]ChannelGenesisBlock{pullHints.channelsToPull, pullHints.channelsNotToPull} {
		for _, channel := range channels {
			...
			r.appendBlock(gb, ledger, channel.ChannelName)
		}
	}

	for _, channel := range pullHints.channelsToPull {
		err := r.PullChannel(channel.ChannelName)
		...
	}

	// Last, pull the system chain.
	if err := r.PullChannel(r.SystemChannel); err != nil && err != ErrSkipped {
		r.Logger.Panicf("Failed pulling system channel: %v", err)
	}
	return replicatedChains
}
  1. 啟動及初始化必要模塊

    1. 創建系統鏈
    // Are we bootstrapping?
    if len(lf.ChainIDs()) == 0 {
    	initializeBootstrapChannel(genesisBlock, lf)
    } else {
    	logger.Info("Not bootstrapping because of existing chains")
    }
    
    1. 多通道初始化(initializeMultichannelRegistrar)

      • 初始化registrar實例
      registrar := multichannel.NewRegistrar(lf, signer, metricsProvider, callbacks...)
      
      // Registrar serves as a point of access and control for the individual channel resources.
      type Registrar struct {
      	lock   sync.RWMutex
      	//當前所有通道的chain對象
      	chains map[string]*ChainSupport
          //不同共識類型的consenter
      	consenters         map[string]consensus.Consenter
      	//Factory通過chainID檢索或創建新的分類帳
      	ledgerFactory      blockledger.Factory
          //簽名相關
      	signer             crypto.LocalSigner
      	blockcutterMetrics *blockcutter.Metrics
      	//系統鏈id
      	systemChannelID    string
      	//系統鏈chainSupport
      	systemChannel      *ChainSupport
      	//通道配置模版
      	templator          msgprocessor.ChannelConfigTemplator
      	callbacks          []channelconfig.BundleActor
      }
      
      • 初始化共識機制
      consenters["solo"] = solo.New()
      var kafkaMetrics *kafka.Metrics
      consenters["kafka"], kafkaMetrics = kafka.New(conf, metricsProvider, healthChecker, registrar)
      go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
      if isClusterType(bootstrapBlock) {
      	initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
      }
      
      • 啟動orderer現存的鏈(系統鏈/應用鏈,通過讀取鏈的目錄查看現存鏈),為每條鏈實例化了ChainSupport對象,然后啟動
      chain := newChainSupport(
         	r,
         	ledgerResources,
         	r.consenters,
         	r.signer,
         	r.blockcutterMetrics,
         )
      
      for _, chainID := range existingChains {
              ...
      		chain.start()
      		...
      }
      
    2. 啟動GRPC服務
      server.go中的服務端對象實例server在main.go的main()中由server := NewServer(manager, signer)生成,使用ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)進行了注冊,隨后grpcServer.Start()啟動起來。
      其主要的兩個接口為:

    type AtomicBroadcastServer interface {
        Broadcast(AtomicBroadcast_BroadcastServer) error
        Deliver(AtomicBroadcast_DeliverServer) error
    }
    

    其接口的實現在:orderer/common/server/server.go

3.相關模塊介紹

3.1 ChainSupport

提供支持chain相關操作的資源,既包含賬本本身,也包含了賬本用到的各種工具對象,如分割工具cutter,簽名工具signer,最新配置在chain中的位置信息(lastConfig的值代表當前鏈中最新配置所在的block的編號,lastConfigSeq的值則代表當前鏈中最新配置消息自身的編號)等

type ChainSupport struct {
    // 賬本相關資源 包括賬本的讀寫及配置的獲取
	*ledgerResources
	// 提供從客戶端獲取消息分類處理接口
	msgprocessor.Processor
	// 將區塊寫入磁盤
	*BlockWriter
    // 鏈 提供對messages對處理方法
    //This design allows for two primary flows
    // 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
    // 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
	consensus.Chain
    // 廣播消息接收器 提供切塊方法	
	cutter blockcutter.Receiver
	//簽名
	crypto.LocalSigner
	// chains need to know if they are system or standard channel.
	systemChannel bool
}

3.2 blockcutter模塊

  • 塊分割工具,用於分割block,具體為orderer/common/blockcutter/blockcutter.go中定義的receiver。一條一條消息數據在流水線上被傳送到cutter處,cutter按照configtx.yaml中的配置,把一條條消息打包成一批消息,同時返回整理好的這批消息對應的committer集合,至此cutter的任務完成。
MaxMessageCount指定了block中最多存儲的消息數量
AbsoluteMaxBytes指定了block最大的字節數
PreferredMaxBytes指定了一條消息的最優的最大字節數(blockcutter處理消息的過程中會努力使每一批消息盡量保持在這個值上)。
  1. 若一個Envelope的數據大小(Payload+簽名)大於PreferredMaxBytes時,無論當前緩存如何,立即Cut;
  2. 若一個Envelope被要求單純存儲在一個block(即該消息對應的committer的Isolated()返回為true),要立即Cut
  3. 若一個Envelope的大小加上blockcutter已有的消息的大小之和大於PreferredMaxBytes時,要立即Cut;
  4. 若blockcutter當前緩存的消息數量大於MaxMessageCount了,要立即Cut。
  5. 由configtx.yaml中BatchTimeout配置項(默認2s)控制,當時間超過此值,chain啟動的處理消息的流程中主動觸發的Cut。

3.3 chain start()模塊

主要是對消息進行處理,將交易消息傳輸給block cutter切成塊及寫入賬本。不同的共識機制操作不同。(后續結合consenter模塊一起詳細介紹)

chain := newChainSupport(
				r,
				ledgerResources,
				r.consenters,
				r.signer,
				r.blockcutterMetrics,
			)
r.chains[chainID] = chain
chain.start()

3.4 consenter模塊:

solo/kafka/etcdraft三種共識類型,用於序列化生產(即各個peer點傳送來的Envelope)出來的消息。

參考:
https://blog.csdn.net/idsuf698987/article/details/78639203

有興趣的關注IT程序員客棧哦


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM