Fabric 1.4 orderer 服務啟動流程
1.提要
orderer提供broadcast和deliver兩個服務接口。orderer節點與各個peer節點通過grpc連接,orderer將所有peer節點通過broadcast發來的交易(Envelope格式,比如peer部署后的數據)按照配置的大小依次封裝到一個個block中並寫入orderer自己的賬本中,然后供各個peer節點的gossip服務通過deliver來消費這個賬本中的數據進行自身結點賬本的同步。
2.初始化過程
先看看main函數。
// Main is the entry point of orderer process
func Main() {
fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))
// "version" command
if fullCmd == version.FullCommand() {
fmt.Println(metadata.GetVersionInfo())
return
}
conf, err := localconfig.Load()
if err != nil {
logger.Error("failed to parse config: ", err)
os.Exit(1)
}
initializeLogging()
initializeLocalMsp(conf)
prettyPrintStruct(conf)
Start(fullCmd, conf)
}
從中可知 orderer服務命令行是通過kingpin來實現的,基本上只是簡單使用了下,也只實現了3個命令:
start*
Start the orderer node
version
Show version information
benchmark
Run orderer in benchmark mode
並且從上述main函數可知,僅version有對應操作,而orderer 默認為orderer start。
啟動流程為:
- 加載配置(orderer.yaml/Defaults/環境變量)
- 初始化log(log級別和log格式)
- 初始化本地msp
- 啟動服務Start()
接下來主要關注第4步。前面基本上是配置初始化第過程。
查看一下start函數:
- 從配置文件啟動塊路徑獲取配置塊及驗證是否可作為配置塊(系統通道第一個塊)
- 集群相關初始化配置
- 判斷是否是raft共識及使用的是最新配置塊,如果是,則進行下列流程:
- 獲取所有應用鏈及其創世區塊塊(discoverChannels)
- 根據orderer是否在應用鏈配置塊的raft節點中分類(channelsToPull topull/nottopull)
- 創建所有的應用通道賬本
- 獲取topull應用通道的賬本(從orderer處獲取)
- 獲取系統通道賬本
if clusterType && conf.General.GenesisMethod == "file" {
r.replicateIfNeeded(bootstrapBlock)
}
func (r *Replicator) ReplicateChains() []string {
var replicatedChains []string
channels := r.discoverChannels()
pullHints := r.channelsToPull(channels)
totalChannelCount := len(pullHints.channelsToPull) + len(pullHints.channelsNotToPull)
for _, channels := range [][]ChannelGenesisBlock{pullHints.channelsToPull, pullHints.channelsNotToPull} {
for _, channel := range channels {
...
r.appendBlock(gb, ledger, channel.ChannelName)
}
}
for _, channel := range pullHints.channelsToPull {
err := r.PullChannel(channel.ChannelName)
...
}
// Last, pull the system chain.
if err := r.PullChannel(r.SystemChannel); err != nil && err != ErrSkipped {
r.Logger.Panicf("Failed pulling system channel: %v", err)
}
return replicatedChains
}
-
啟動及初始化必要模塊
- 創建系統鏈
// Are we bootstrapping? if len(lf.ChainIDs()) == 0 { initializeBootstrapChannel(genesisBlock, lf) } else { logger.Info("Not bootstrapping because of existing chains") }
-
多通道初始化(initializeMultichannelRegistrar)
- 初始化registrar實例
registrar := multichannel.NewRegistrar(lf, signer, metricsProvider, callbacks...)
// Registrar serves as a point of access and control for the individual channel resources. type Registrar struct { lock sync.RWMutex //當前所有通道的chain對象 chains map[string]*ChainSupport //不同共識類型的consenter consenters map[string]consensus.Consenter //Factory通過chainID檢索或創建新的分類帳 ledgerFactory blockledger.Factory //簽名相關 signer crypto.LocalSigner blockcutterMetrics *blockcutter.Metrics //系統鏈id systemChannelID string //系統鏈chainSupport systemChannel *ChainSupport //通道配置模版 templator msgprocessor.ChannelConfigTemplator callbacks []channelconfig.BundleActor }
- 初始化共識機制
consenters["solo"] = solo.New() var kafkaMetrics *kafka.Metrics consenters["kafka"], kafkaMetrics = kafka.New(conf, metricsProvider, healthChecker, registrar) go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil) if isClusterType(bootstrapBlock) { initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider) }
- 啟動orderer現存的鏈(系統鏈/應用鏈,通過讀取鏈的目錄查看現存鏈),為每條鏈實例化了ChainSupport對象,然后啟動
chain := newChainSupport( r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, )
for _, chainID := range existingChains { ... chain.start() ... }
-
啟動GRPC服務
server.go中的服務端對象實例server在main.go的main()中由server := NewServer(manager, signer)生成,使用ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)進行了注冊,隨后grpcServer.Start()啟動起來。
其主要的兩個接口為:
type AtomicBroadcastServer interface { Broadcast(AtomicBroadcast_BroadcastServer) error Deliver(AtomicBroadcast_DeliverServer) error }
其接口的實現在:orderer/common/server/server.go
3.相關模塊介紹
3.1 ChainSupport
提供支持chain相關操作的資源,既包含賬本本身,也包含了賬本用到的各種工具對象,如分割工具cutter,簽名工具signer,最新配置在chain中的位置信息(lastConfig的值代表當前鏈中最新配置所在的block的編號,lastConfigSeq的值則代表當前鏈中最新配置消息自身的編號)等
type ChainSupport struct {
// 賬本相關資源 包括賬本的讀寫及配置的獲取
*ledgerResources
// 提供從客戶端獲取消息分類處理接口
msgprocessor.Processor
// 將區塊寫入磁盤
*BlockWriter
// 鏈 提供對messages對處理方法
//This design allows for two primary flows
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
consensus.Chain
// 廣播消息接收器 提供切塊方法
cutter blockcutter.Receiver
//簽名
crypto.LocalSigner
// chains need to know if they are system or standard channel.
systemChannel bool
}
3.2 blockcutter模塊
- 塊分割工具,用於分割block,具體為orderer/common/blockcutter/blockcutter.go中定義的receiver。一條一條消息數據在流水線上被傳送到cutter處,cutter按照configtx.yaml中的配置,把一條條消息打包成一批消息,同時返回整理好的這批消息對應的committer集合,至此cutter的任務完成。
MaxMessageCount指定了block中最多存儲的消息數量
AbsoluteMaxBytes指定了block最大的字節數
PreferredMaxBytes指定了一條消息的最優的最大字節數(blockcutter處理消息的過程中會努力使每一批消息盡量保持在這個值上)。
- 若一個Envelope的數據大小(Payload+簽名)大於PreferredMaxBytes時,無論當前緩存如何,立即Cut;
- 若一個Envelope被要求單純存儲在一個block(即該消息對應的committer的Isolated()返回為true),要立即Cut
- 若一個Envelope的大小加上blockcutter已有的消息的大小之和大於PreferredMaxBytes時,要立即Cut;
- 若blockcutter當前緩存的消息數量大於MaxMessageCount了,要立即Cut。
- 由configtx.yaml中BatchTimeout配置項(默認2s)控制,當時間超過此值,chain啟動的處理消息的流程中主動觸發的Cut。
3.3 chain start()模塊
主要是對消息進行處理,將交易消息傳輸給block cutter切成塊及寫入賬本。不同的共識機制操作不同。(后續結合consenter模塊一起詳細介紹)
chain := newChainSupport(
r,
ledgerResources,
r.consenters,
r.signer,
r.blockcutterMetrics,
)
r.chains[chainID] = chain
chain.start()
3.4 consenter模塊:
solo/kafka/etcdraft三種共識類型,用於序列化生產(即各個peer點傳送來的Envelope)出來的消息。
參考:
https://blog.csdn.net/idsuf698987/article/details/78639203