Fabric1.4源碼解析:客戶端創建通道過程


在使用Fabric創建通道的時候,通常我們執行一條命令完成,這篇文章就解析一下執行這條命令后Fabric源碼中執行的流程。

peer channel create -o orderer.example.com:7050 -c mychannel -f ./channel-artifacts/channel.tx --tls true --cafile $ORDERER_CA

整個流程的切入點在fabric/peer/main.go文件中的main()方法 (本文中使用的是Fabric1.4版本,不同版本中內容可能不同)。這個方法中也定義了Peer節點可以執行的命令,有關於版本的:version.Cmd(),關於節點狀態的:node.Cmd(),關於鏈碼的:chaincode.Cmd(nil),關於客戶端日志的:clilogging.Cmd(nil),最后一個就是關於通道的:channel.Cmd(nil)。所以我們就從這里入手,看一下創建通道的整體流程是什么樣的。
點進行后,轉到了peer/channel/channel.go文件中第49行,其中定義了Peer節點可以執行的對通道進行操作的相關命令:

func Cmd(cf *ChannelCmdFactory) *cobra.Command {
	AddFlags(channelCmd)
    
    #創建通道
	channelCmd.AddCommand(createCmd(cf))  
    #從通道獲取區塊
	channelCmd.AddCommand(fetchCmd(cf))
    #加入通道
	channelCmd.AddCommand(joinCmd(cf))
    #列出當前節點所加入的通道
	channelCmd.AddCommand(listCmd(cf))
    #簽名並更新通道配置信息
	channelCmd.AddCommand(updateCmd(cf))
    #只對通道配置信息進行簽名
	channelCmd.AddCommand(signconfigtxCmd(cf))
    #獲取通道信息
	channelCmd.AddCommand(getinfoCmd(cf))

	return channelCmd
}

具體的Peer節點命令使用方法可以參考Fabric官方文檔,這里不在一一解釋。
我們看一下createCmd(cf)方法,該方法轉到了peer/channel/create.go文件中第51行,看文件名字就知道和創建通道相關了。

func createCmd(cf *ChannelCmdFactory) *cobra.Command {
	createCmd := &cobra.Command{
		Use:   "create",   #使用create關鍵詞創建通道
		Short: "Create a channel",  
		Long:  "Create a channel and write the genesis block to a file.",   #創建通道並將創世區塊寫入文件
		RunE: func(cmd *cobra.Command, args []string) error {
            #這一行命令就是對通道進行創建了,點進行看一下
			return create(cmd, args, cf)
		},
	}
...
}

create(cmd, args, cf)方法在本文件中第227行:

func create(cmd *cobra.Command, args []string, cf *ChannelCmdFactory) error {
	// the global chainID filled by the "-c" command
    #官方注釋用-c來表明通道ID
	if channelID == common.UndefinedParamValue {
        #UndefinedParamValue="",如果通道ID等於空
		return errors.New("must supply channel ID")
	}

	// Parsing of the command line is done so silence cmd usage
	cmd.SilenceUsage = true

	var err error
	if cf == nil {
        #如果ChannelCmdFactory為空,則初始化一個
		cf, err = InitCmdFactory(EndorserNotRequired, PeerDeliverNotRequired, OrdererRequired)
		if err != nil {
			return err
		}
	}
    #最后將ChannelCmdFactory傳入該方法,進行通道的創建
	return executeCreate(cf)
}

首先看一下InitCmdFactory()做了哪些工作,在peer/channel/channel.go文件中第126行:

func InitCmdFactory(isEndorserRequired, isPeerDeliverRequired, isOrdererRequired bool) (*ChannelCmdFactory, error) {
	#這里的意思就是只能有一個交付源,要么是Peer要么是Orderer
    if isPeerDeliverRequired && isOrdererRequired {
		return nil, errors.New("ERROR - only a single deliver source is currently supported")
	}

	var err error
    #初始化ChannelCmdFactory,看一下該結構體的內容
	cf := &ChannelCmdFactory{}

直接拿到這里來好了:

 type ChannelCmdFactory struct {
    #用於背書的客戶端
	EndorserClient   pb.EndorserClient
    #簽名者
	Signer           msp.SigningIdentity
    #用於廣播的客戶端
	BroadcastClient  common.BroadcastClient
    #用於交付的客戶端
	DeliverClient    deliverClientIntf
    #創建用於廣播的客戶端的工廠
	BroadcastFactory BroadcastClientFactory
}

再往下看:

    #獲取默認的簽名者,通常是Peer節點
	cf.Signer, err = common.GetDefaultSignerFnc()
	if err != nil {
		return nil, errors.WithMessage(err, "error getting default signer")
	}

	cf.BroadcastFactory = func() (common.BroadcastClient, error) {
        #根據ChannelCmdFactory結構體中的BroadcastFactory獲取BroadcastClient
		return common.GetBroadcastClientFnc()
	}

	// for join and list, we need the endorser as well
    #我們這里是完成對通道的創建,所以只使用了最后一個isOrdererRequired
	if isEndorserRequired {
        #創建一個用於背書的客戶端
		cf.EndorserClient, err = common.GetEndorserClientFnc(common.UndefinedParamValue, common.UndefinedParamValue)
		if err != nil {
			return nil, errors.WithMessage(err, "error getting endorser client for channel")
		}
	}

	// for fetching blocks from a peer
	if isPeerDeliverRequired {
        #從Peer節點創建一個用於交付的客戶端
		cf.DeliverClient, err = common.NewDeliverClientForPeer(channelID, bestEffort)
		if err != nil {
			return nil, errors.WithMessage(err, "error getting deliver client for channel")
		}
	}

	// for create and fetch, we need the orderer as well
	if isOrdererRequired {
		if len(strings.Split(common.OrderingEndpoint, ":")) != 2 {
			return nil, errors.Errorf("ordering service endpoint %s is not valid or missing", common.OrderingEndpoint)
		}
        #從Order節點創建一個一個用於交付的客戶端
		cf.DeliverClient, err = common.NewDeliverClientForOrderer(channelID, bestEffort)
		if err != nil {
			return nil, err
		}
	}
	logger.Infof("Endorser and orderer connections initialized")
	return cf, nil
}

返回create()方法:

#到了最后一行代碼,傳入之前創建的ChannelCmdFactory,開始進行通道的創建
return executeCreate(cf)

該方法在peer/channel/create.go文件中的第174行:

#方法比較清晰,一共完成了以下幾個步驟
func executeCreate(cf *ChannelCmdFactory) error {
    #發送創建通道的Transaction到Order節點
	err := sendCreateChainTransaction(cf)
	if err != nil {
		return err
	}
    #獲取該通道內的創世區塊(該過程在Order節點共識完成之后)
	block, err := getGenesisBlock(cf)
	if err != nil {
		return err
	}
    #序列化區塊信息
	b, err := proto.Marshal(block)
	if err != nil {
		return err
	}
	file := channelID + ".block"
	if outputBlock != common.UndefinedParamValue {
		file = outputBlock
	}
    #將區塊信息寫入本地文件中
	err = ioutil.WriteFile(file, b, 0644)
	if err != nil {
		return err
	}
	return nil
}

1.Peer節點創建用於創建通道的Envelope文件

首先我們看一下sendCreateChainTransaction()這個方法,又回到了peer/channel/create.go文件中,在第144行:

func sendCreateChainTransaction(cf *ChannelCmdFactory) error {
	var err error
    #定義了一個Envelope結構體
	var chCrtEnv *cb.Envelope

Envelope結構體:

type Envelope struct {
	#主要就是保存被序列化的有效載荷
	Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
    #由創建者進行的簽名信息
	Signature            []byte   `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

回到sendCreateChainTransaction()這個方法,繼續往下:

	if channelTxFile != "" {
        #如果指定了channelTxFile,則使用指定的文件創建通道,這個方法很簡單,從文件中讀取數據,反序列化后返回chCrtEnv.對於我們啟動Fabric網絡之前曾創建過一個channel.tx文件,指的就是這個
		if chCrtEnv, err = createChannelFromConfigTx(channelTxFile); err != nil {
			return err
		}
	} else {
        #如果沒有指定,則使用默認的配置創建通道,看一下這個方法,在71行
		if chCrtEnv, err = createChannelFromDefaults(cf); err != nil {
			return err
		}
	}
-------------------------------createChannelFromDefaults()-------
func createChannelFromDefaults(cf *ChannelCmdFactory) (*cb.Envelope, error) {
    #主要就這一個方法,點進去
	chCrtEnv, err := encoder.MakeChannelCreationTransaction(channelID, localsigner.NewSigner(), genesisconfig.Load(genesisconfig.SampleSingleMSPChannelProfile))
	if err != nil {
		return nil, err
	}
	return chCrtEnv, nil
}

MakeChannelCreationTransaction()方法傳入了通道的ID,並創建了一個簽名者,以及默認的配置文件,方法在common/tools/configtxgen/encoder/encoder.go文件中第502行:

func MakeChannelCreationTransaction(channelID string, signer crypto.LocalSigner, conf *genesisconfig.Profile) (*cb.Envelope, error) {
    #從名字可以看到是使用了默認的配置模板,對各種策略進行配置,里面就不再細看了
	template, err := DefaultConfigTemplate(conf)
	if err != nil {
		return nil, errors.WithMessage(err, "could not generate default config template")
	}
    #看一下這個方法,從模板中創建一個用於創建通道的Transaction
	return MakeChannelCreationTransactionFromTemplate(channelID, signer, conf, template)
}

MakeChannelCreationTransactionFromTemplate()方法在第530行:

func MakeChannelCreationTransactionFromTemplate(channelID string, signer crypto.LocalSigner, conf *genesisconfig.Profile, template *cb.ConfigGroup) (*cb.Envelope, error) {
	newChannelConfigUpdate, err := NewChannelCreateConfigUpdate(channelID, conf, template)
    ...
    #創建一個用於配置更新的結構體
	newConfigUpdateEnv := &cb.ConfigUpdateEnvelope{
		ConfigUpdate: utils.MarshalOrPanic(newChannelConfigUpdate),
	}

	if signer != nil {
        #如果簽名者不為空,創建簽名Header
		sigHeader, err := signer.NewSignatureHeader()
		...
		newConfigUpdateEnv.Signatures = []*cb.ConfigSignature{{
			SignatureHeader: utils.MarshalOrPanic(sigHeader),
		}}
        ...
        #進行簽名
		newConfigUpdateEnv.Signatures[0].Signature, err = signer.Sign(util.ConcatenateBytes(newConfigUpdateEnv.Signatures[0].SignatureHeader, newConfigUpdateEnv.ConfigUpdate))
		...
	}
    #創建被簽名的Envelope,然后一直返回到最外面的方法
	return utils.CreateSignedEnvelope(cb.HeaderType_CONFIG_UPDATE, channelID, signer, newConfigUpdateEnv, msgVersion, epoch)
}

到這里,用於創建通道的Envelope已經創建好了,sendCreateChainTransaction()繼續往下看:

...
    #該方法主要是對剛剛創建的Envelope進行驗證
	if chCrtEnv, err = sanityCheckAndSignConfigTx(chCrtEnv); err != nil {
		return err
	}
    var broadcastClient common.BroadcastClient
    #驗證完成后,創建一個用於廣播信息的客戶端
	broadcastClient, err = cf.BroadcastFactory()
	if err != nil {
		return errors.WithMessage(err, "error getting broadcast client")
	}

	defer broadcastClient.Close()
    #將創建通道的Envelope信息廣播出去
	err = broadcastClient.Send(chCrtEnv)
	return err
}

到這里,sendCreateChainTransaction()方法結束了,總結一下該方法所做的工作:

  1. 定義了個Envelope結構體
  2. 判斷channelTxFile文件(啟動網絡之前創建的channel.tx)是否存在,一般都是存在的。
  3. 如果存在的話從該文件中讀取配置信息,不存在的話從默認的模板創建,最后返回Envelope
  4. 對Envelope文件進行驗證
  5. 創建用於廣播信息的客戶端,將創建的Envelope文件廣播出去.

2 Order節點對Envelope文件處理

至於獲取創世區塊以及將文件保存到本地不再說明,接下來我們看一下Peer節點將創建通道的Envelope廣播出去后,Order節點做了什么。
方法在/order/common/server/server.go中第141行:

func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
	...
    #主要在這一行代碼,Handle方法對接收到的信息進行處理
	return s.bh.Handle(&broadcastMsgTracer{
		AtomicBroadcast_BroadcastServer: srv,
		msgTracer: msgTracer{
			debug:    s.debug,
			function: "Broadcast",
		},
	})
}

對於Handler()方法,在/order/common/broadcast/broadcast.go文件中第66行:

func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
    #首先獲取消息的源地址
	addr := util.ExtractRemoteAddress(srv.Context())
	...
	for {
        #接收消息
		msg, err := srv.Recv()
		...
        #處理接收到的消息,我們看一下這個方法
		resp := bh.ProcessMessage(msg, addr)
        #最后將響應信息廣播出去
		err = srv.Send(resp)
		...
	}
}

ProcessMessage(msg, addr)方法的傳入參數為接收到的消息以及消息的源地址,該方法比較重要,是Order節點對消息進行處理的主方法。在第136行:

func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
    #這個結構體應該理解為記錄器,記錄消息的相關信息
	tracker := &MetricsTracker{
		ChannelID: "unknown",
		TxType:    "unknown",
		Metrics:   bh.Metrics,
	}
	defer func() {
		// This looks a little unnecessary, but if done directly as
		// a defer, resp gets the (always nil) current state of resp
		// and not the return value
		tracker.Record(resp)
	}()
    #記錄處理消息的開始時間
	tracker.BeginValidate()
    #該方法獲取接收到的消息的Header,並判斷是否為配置信息
	chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)
    ...
    #由於之前Peer節點發送的為創建通道的信息,所以消息類型為配置信息
    if !isConfig {
        ...
        #而對於普通的交易信息的處理方法這里就不再看了,主要關注於創建通道的消息的處理
    } else { // isConfig
		logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)
        #到了這里,對配置更新消息進行處理,主要方法,點進行看一下
		config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)

ProcessConfigUpdateMsg(msg)方法在orderer/common/msgprocessor/systemchannel.go文件中第84行:

#這個地方有些不懂,為什么會調用systemchannel.ProcessConfigUpdateMsg()而不是standardchannel.ProcessConfigUpdateMsg()方法?是因為這個結構體的原因?
===========================SystemChannel=======================
type SystemChannel struct {
	*StandardChannel
	templator ChannelConfigTemplator
}
===========================SystemChannel=======================
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
    #首先從消息體中獲取通道ID
	channelID, err := utils.ChannelID(envConfigUpdate)
	...
    #判斷獲取到的通道ID是否為已經存在的用戶通道ID,如果是的話轉到StandardChannel中的ProcessConfigUpdateMsg()方法進行處理
	if channelID == s.support.ChainID() {
		return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
	}
    ...
    #由於之前由Peer節點發送的為創建通道的Tx,所以對於通道ID是不存在的,因此到了這個方法,點進行看一下
    bundle, err := s.templator.NewChannelConfig(envConfigUpdate)

NewChannelConfig()方法在第215行,比較重要的方法,完成通道的配置:

func (dt *DefaultTemplator) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error) {
    #首先反序列化有效載荷
    configUpdatePayload, err := utils.UnmarshalPayload(envConfigUpdate.Payload)
    ...
    #反序列化配置更新信息Envelope
    configUpdateEnv, err := configtx.UnmarshalConfigUpdateEnvelope(configUpdatePayload.Data)s
    ...
    #獲取通道頭信息
    channelHeader, err := utils.UnmarshalChannelHeader(configUpdatePayload.Header.ChannelHeader)
    ...
    #反序列化配置更新信息
    configUpdate, err := configtx.UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
    ...
    #以下具體的不再說了,就是根據之前定義的各項策略對通道進行配置,具體的策略可以看configtx.yaml文件
    consortiumConfigValue, ok := configUpdate.WriteSet.Values[channelconfig.ConsortiumKey]
    ...
    consortium := &cb.Consortium{}
	err = proto.Unmarshal(consortiumConfigValue.Value, consortium)
    ...
    applicationGroup := cb.NewConfigGroup()
	consortiumsConfig, ok := dt.support.ConsortiumsConfig()
    ...
    consortiumConf, ok := consortiumsConfig.Consortiums()[consortium.Name]
    ...
    applicationGroup.Policies[channelconfig.ChannelCreationPolicyKey] = &cb.ConfigPolicy{
		Policy: consortiumConf.ChannelCreationPolicy(),
	}
	applicationGroup.ModPolicy = channelconfig.ChannelCreationPolicyKey
    #獲取當前系統通道配置信息
	systemChannelGroup := dt.support.ConfigtxValidator().ConfigProto().ChannelGroup
	if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 &&
		len(configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups) == 0 {
		return nil, fmt.Errorf("Proposed configuration has no application group members, but consortium contains members")
	}
    if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 {
		for orgName := range configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups {
			consortiumGroup, ok := systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups[orgName]
			if !ok {
				return nil, fmt.Errorf("Attempted to include a member which is not in the consortium")
			}
			applicationGroup.Groups[orgName] = proto.Clone(consortiumGroup).(*cb.ConfigGroup)
		}
	}
    channelGroup := cb.NewConfigGroup()
    #將系統通道配置信息復制
    for key, value := range systemChannelGroup.Values {
		channelGroup.Values[key] = proto.Clone(value).(*cb.ConfigValue)
		if key == channelconfig.ConsortiumKey {
			// Do not set the consortium name, we do this later
			continue
		}
	}

	for key, policy := range systemChannelGroup.Policies {
		channelGroup.Policies[key] = proto.Clone(policy).(*cb.ConfigPolicy)
	}
    #新的配置信息中Order組配置使用系統通道的配置,同時將定義的application組配置賦值到新的配置信息
    channelGroup.Groups[channelconfig.OrdererGroupKey] = proto.Clone(systemChannelGroup.Groups[channelconfig.OrdererGroupKey]).(*cb.ConfigGroup)
	channelGroup.Groups[channelconfig.ApplicationGroupKey] = applicationGroup
	channelGroup.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{
		Value:     utils.MarshalOrPanic(channelconfig.ConsortiumValue(consortium.Name).Value()),
		ModPolicy: channelconfig.AdminsPolicyKey,
	}
    if oc, ok := dt.support.OrdererConfig(); ok && oc.Capabilities().PredictableChannelTemplate() {
		channelGroup.ModPolicy = systemChannelGroup.ModPolicy
		zeroVersions(channelGroup)
	}
    #將創建的新的配置打包為Bundle
	bundle, err := channelconfig.NewBundle(channelHeader.ChannelId, &cb.Config{
		ChannelGroup: channelGroup,
	})
    ...
    return bundle, nil
}

接下來我們回到ProcessConfigUpdateMsg()方法:

    ...
    #創建一個配置驗證器對該方法的傳入參數進行驗證操作
    newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
    ...
    #創建一個簽名的Envelope,此次為Header類型為HeaderType_CONFIG進行簽名
    newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
    #創建一個簽名的Transaction,此次為Header類型為HeaderType_ORDERER_TRANSACTION進行簽名
    wrappedOrdererTransaction, err := utils.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChainID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
    ...
    #過濾器進行過濾,主要檢查是否創建的Transaction過大,以及簽名檢查,確保Order節點使用正確的證書進行簽名
    err = s.StandardChannel.filters.Apply(wrappedOrdererTransaction)
    ...
    #將Transaction返回 
    return wrappedOrdererTransaction, s.support.Sequence(), nil
}

到這里,消息處理完畢,返回到ProcessMessage()方法:

    config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
	...
    #記錄消息處理完畢時間
	tracker.EndValidate()
    #開始進行入隊操作
	tracker.BeginEnqueue()
    #waitReady()是一個阻塞方法,等待入隊完成或出現異常
	if err = processor.WaitReady(); err != nil {
	logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
	return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
	}
    #共識方法,具體看定義的Fabric網絡使用了哪種共識
	err = processor.Configure(config, configSeq)
	...
    #最后返回操作成功的響應
	return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
}

到這里,由客戶端發送的創建通道的Transaction就結束了。總共分為兩個部分,一個是Peer節點對創建通道的Envelope進行創建的過程,一個是Order節點接收到該Envelope進行配置的過程,最后總結一下整體流程:
Peer節點方:

  1. 創建一個用於創建通道的Transaction
    1. 判斷是否存在channel.tx文件,如果有的話直接從文件中讀取已配置好的信息,一般都會存在
    2. 沒有的話根據默認的模板進行配置
    3. 對剛創建的用於配置更新的Envelope進行相關檢查包括各項數據是否為空,創建的通道是否已經存在,配置信息是否正確,以及進行簽名,封裝為HeaderTypeCONFIG_UPDATEEnvelope
    4. 將創建的Envelope廣播出去。
  2. 生成創世區塊(該步驟在Order節點成功共識之后)
  3. 將創世區塊寫入本地文件中

Order節點方:

  1. 接收到該Envelope信息,進行相關驗證並判斷是否為配置信息
  2. 判斷通道Id是否已經存在,如果存在的話為配置更新信息,否則為通道創建信息
  3. Envelope中讀取各項策略配置
  4. 對策略配置信息進行驗證
  5. Header類型為CONFIG的Envelope進行簽名
  6. Header類型為ORDERER_TRANSACTIONEnvelope進行簽名生成Transaction
  7. 對生成的Transaction進行過濾,主要是Tx大小,Order節點的證書信息是否正確
  8. 進行入隊操作,等待入隊完成然后進行共識
  9. 廣播成功響應

整個創建通道的過程也是比較長的,能力有限,所以有些地方並沒有分析太清晰。不過整體還是可以把握住的。
最后附上參考文檔:傳送門
以及Fabric源碼地址:傳送門


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM