Fabric 1.4 源碼分析 背書節點和鏈碼容器交互
本文檔主要介紹背書節點和鏈碼容器交互流程,在Endorser背書節點章節中,無論是deploy、upgrade或者調用鏈碼,最后都會調用ChaincodeSupport.LaunchInit()/Launch()以及ChaincodeSupport.execute()方法。其中Launch()方法啟動鏈碼容器,execute()方法調用鏈碼。
1. 准備
ChaincodeSupport.Launch()首先進行判斷,根據peer側該版本鏈碼的Handler是否存在,存在則表示已運行。若不存在,則調用lscc鏈碼方法cs.Lifecycle.ChaincodeContainerInfo()獲取啟動鏈碼所需的數據ChaincodeContainerInfo。再調用cs.Launcher.Launch()方法啟動鏈碼。再判斷是否注冊了handler。
func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) {
cname := chaincodeName + ":" + chaincodeVersion
if h := cs.HandlerRegistry.Handler(cname); h != nil {
return h, nil
}
ccci, err := cs.Lifecycle.ChaincodeContainerInfo(chaincodeName, qe)
if err != nil {
// TODO: There has to be a better way to do this...
if cs.UserRunsCC {
chaincodeLogger.Error(
"You are attempting to perform an action other than Deploy on Chaincode that is not ready and you are in developer mode. Did you forget to Deploy your chaincode?",
)
}
return nil, errors.Wrapf(err, "[channel %s] failed to get chaincode container info for %s", chainID, cname)
}
if err := cs.Launcher.Launch(ccci); err != nil {
return nil, errors.Wrapf(err, "[channel %s] could not launch chaincode %s", chainID, cname)
}
h := cs.HandlerRegistry.Handler(cname)
if h == nil {
return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname)
}
return h, nil
}
type ChaincodeContainerInfo struct {
Name string
Version string
Path string
Type string
CodePackage []byte
// ContainerType is not a great name, but 'DOCKER' and 'SYSTEM' are the valid types
ContainerType string
}
Launch()主要實現方法在core/chaincode/runtime_launcher.go Launch()方法。在該方法中,會調用r.Runtime.Start(ccci, codePackage)啟動鏈碼,在該方法中,首先會調用c.LaunchConfig(cname, ccci.Type)生成創建鏈碼所需的參數LaunchConfig(鏈碼類型go/java/nodejs,以及TLS配置),然后構造啟動鏈碼容器請求StartContainerReq。接着調用c.Processor.Process(ccci.ContainerType, scr)正式啟動鏈碼容器。操作完成后,通過Launch()里面的select—case語句阻塞獲取結果,並結束程序運行。
func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error {
...
if !alreadyStarted {
...
go func() {
if err := r.Runtime.Start(ccci, codePackage); err != nil {
startFailCh <- errors.WithMessage(err, "error starting container")
return
}
exitCode, err := r.Runtime.Wait(ccci)
if err != nil {
launchState.Notify(errors.Wrap(err, "failed to wait on container exit"))
}
launchState.Notify(errors.Errorf("container exited with %d", exitCode))
}()
}
var err error
select {
case <-launchState.Done():
err = errors.WithMessage(launchState.Err(), "chaincode registration failed")
case err = <-startFailCh:
launchState.Notify(err)
r.Metrics.LaunchFailures.With("chaincode", cname).Add(1)
case <-timeoutCh:
err = errors.Errorf("timeout expired while starting chaincode %s for transaction", cname)
launchState.Notify(err)
r.Metrics.LaunchTimeouts.With("chaincode", cname).Add(1)
}
...
return err
}
經上面可知,在啟動鏈碼容器時會調用c.Processor.Process()方法,其中會調用req.Do(v)。存在3個實現,分別是StartContainerReq、WaitContainerReq、StopContainerReq。啟動時是調用StartContainerReq。
func (si StartContainerReq) Do(v VM) error {
return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)
}
2. 啟動系統鏈碼
啟動系統鏈碼(進程模式)的話,則v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)的實現是在core/container/inproccontroller/inproccontroller.go start()方法。
func (vm *InprocVM) Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
path := ccid.GetName() // name=Name-Version
// 獲取已注冊的inprocContainer模版
ipctemplate := vm.registry.getType(path)
...
instName := vm.GetVMName(ccid)
// 構建chaincode實例ipc
ipc, err := vm.getInstance(ipctemplate, instName, args, env)
// 判斷鏈碼是否運行
if ipc.running {
return fmt.Errorf(fmt.Sprintf("chaincode running %s", path))
}
ipc.running = true
go func() {
defer func() {
if r := recover(); r != nil {
inprocLogger.Criticalf("caught panic from chaincode %s", instName)
}
}()
// 進程模式運行鏈碼
ipc.launchInProc(instName, args, env)
}()
return nil
}
在start()方法方法中,首先會獲取ccid的name,然后根據name獲取已注冊的系統鏈碼模版ipctemplate,根據模版及args、env等參數構建系統鏈碼實例ipc,然后再判斷是否運行了系統鏈碼,如果沒有運行,則開啟協程調用launchInProc()方法進程模式啟動系統鏈碼。
在launchInProc()中開啟了2個協程,協程一主要執行shimStartInProc()方法,協程二主要執行HandleChaincodeStream()方法。並且新建了2個通道,便於peer側和鏈碼側通信。
func (ipc *inprocContainer) launchInProc(id string, args []string, env []string) error {
if ipc.ChaincodeSupport == nil {
inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()")
}
// 建立peer側接收鏈碼側發送通道
peerRcvCCSend := make(chan *pb.ChaincodeMessage)
// 建立鏈碼側接收peer側發送通道
ccRcvPeerSend := make(chan *pb.ChaincodeMessage)
var err error
// 傳遞鏈碼側Handler對象運行狀態的通道
ccchan := make(chan struct{}, 1)
// 傳遞peer側Handler對象運行狀態的通道
ccsupportchan := make(chan struct{}, 1)
shimStartInProc := _shimStartInProc // shadow to avoid race in test
go func() {
defer close(ccchan)
inprocLogger.Debugf("chaincode started for %s", id)
if args == nil {
args = ipc.args
}
if env == nil {
env = ipc.env
}
// 啟動系統鏈碼
err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend)
if err != nil {
err = fmt.Errorf("chaincode-support ended with err: %s", err)
_inprocLoggerErrorf("%s", err)
}
inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err)
}()
// shadow function to avoid data race
inprocLoggerErrorf := _inprocLoggerErrorf
go func() {
defer close(ccsupportchan)
inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend)
inprocLogger.Debugf("chaincode-support started for %s", id)
// 啟動peer側Handler處理句柄,創建消息循環,處理鏈碼側發送的消息
err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream)
if err != nil {
err = fmt.Errorf("chaincode ended with err: %s", err)
inprocLoggerErrorf("%s", err)
}
inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err)
}()
// 阻塞等待消息處理
select {
// 鏈碼側退出,關閉peer側接收鏈碼側發送通道
case <-ccchan:
close(peerRcvCCSend)
inprocLogger.Debugf("chaincode %s quit", id)
// peer側chaincode support退出
case <-ccsupportchan:
close(ccRcvPeerSend)
inprocLogger.Debugf("chaincode support %s quit", id)
case <-ipc.stopChan:
close(ccRcvPeerSend)
close(peerRcvCCSend)
inprocLogger.Debugf("chaincode %s stopped", id)
}
return err
}
- 鏈碼側:
shimStartInProc()方法本質上是執行StartInProc()方法,首先遍歷環境變量,獲取CORE_CHAINCODE_ID_NAME,在執行newInProcStream()創建通信流,本質上只是將鏈碼側和peer側發送接收的兩個通道綁定。再執行chatWithPeer()方法與peer側交互。chatWithPeer()首先調用newChaincodeHandler()創建鏈碼側Handler,然后發送第一個注冊消息,然后開啟消息循環進行處理。
// Register on the stream
chaincodeLogger.Debugf("Registering.. sending %s", pb.ChaincodeMessage_REGISTER)
if err = handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil {
return errors.WithMessage(err, "error sending chaincode REGISTER")
}
- peer側:
該協程中,首先newInProcStream()創建通信流此處和鏈碼側剛剛相反。再調用HandleChaincodeStream()方法,首先創建peer側Handle,再調用handler.ProcessStream(stream)對通信流進行處理(里面也有個循環)。
具體交互流程后續介紹。
3. 啟動應用鏈碼
當啟動應用鏈碼(docker容器模式)時,Start()接口實現為core/container/dockercontroller/dockercontroller.go Start()方法。
在Start()方法中,首先調用GetVMNameForDocker方法生成鏡像名networkId-peerid-name-version-Hash(networkId-peerid-name-version),在調用GetVMName()方法生成容器名(networkId-peerid-name-version)。在調用getClientFnc()獲取docker客戶端,判斷當前是否運行鏈碼容器,運行則停止當前運行的容器。接着調用createContainer()創建容器,如果報不存在鏡像,則構建鏡像,再創建鏈碼容器。如果需要配置TLS,則調用UploadToContainer()方法提交TLS證書文件。再調用StartContainer()正式啟動鏈碼容器。
當鏈碼容器啟動后,會執行shim.start()方法。首先會獲取通信流與peer側通信。再調用chatWithPeer()方法。此處介紹獲取通信流方法。
func userChaincodeStreamGetter(name string) (PeerChaincodeStream, error) {
flag.StringVar(&peerAddress, "peer.address", "", "peer address")
...
// Establish connection with validating peer
// 與peer建立連接
clientConn, err := newPeerClientConnection()
...
// 創建鏈碼支持服務客戶端
chaincodeSupportClient := pb.NewChaincodeSupportClient(clientConn)
...
// Establish stream with validating peer
// 調用Register()接口獲取通信流
stream, err := chaincodeSupportClient.Register(context.Background())
return stream, nil
}
當執行chaincodeSupportClient.Register()方法時peer側會執行HandleChaincodeStream()方法。
func (cs *ChaincodeSupport) Register(stream pb.ChaincodeSupport_RegisterServer) error {
return cs.HandleChaincodeStream(stream)
}
4. 背書節點和鏈碼交互
4.1 准備
在構建系統鏈碼和應用鏈碼流程中,peer側執行HandleChaincodeStream()方法,鏈碼側執行chatWithPeer()方法,並通過通信流來進行交互。其中,兩個方法中對消息處理的方法為handleMessage()
- 鏈碼側
switch handler.state {
case ready:
err = handler.handleReady(msg, errc)
case established:
err = handler.handleEstablished(msg, errc)
case created:
err = handler.handleCreated(msg, errc)
default:
err = errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state)
}
- peer側
switch h.state {
case Created:
return h.handleMessageCreatedState(msg)
case Ready:
return h.handleMessageReadyState(msg)
default:
return errors.Errorf("handle message: invalid state %s for transaction %s", h.state, msg.Txid)
}
接下來按照消息流程介紹
- 鏈碼側發送REGISTER消息
- 首先進行各項基本配置,然后建立起與Peer節點的gRPC連接。
- 創建Handler,並更改Handler狀態為“Created”。
- 發送REGISTER消息到peer節點。
- 等待peer節點返回的信息
- peer側接收REGISTER消息
- 此時peer側Handler狀態為“Created”,調用handleMessageCreatedState()里面的HandleRegister()方法。
- peer側注冊Handler,並發送REGISTERED消息給鏈碼側
- 更新peer側Handler狀態為“Established”
- 並且會調用notifyRegistry()方法,發送READY消息給鏈碼側,並更新狀態為“Ready”
- 鏈碼側接收消息
- 當鏈碼側接收REGISTERED消息,更新狀態為Handler狀態為“Established”
- 當鏈碼側接收READY消息,更新狀態為Handler狀態為“Ready”
至此,鏈碼容器與peer節點已完成連接准備操作。
4.2 執行鏈碼
主要實現是Execute()方法。在背書節點介紹中,存在兩種消息類型:ChaincodeMessage_TRANSACTION/ChaincodeMessage_INIT。分別對應調用鏈碼和實例化鏈碼/升級鏈碼操作。此時鏈碼側和peer側Handler都處於Ready狀態。在該交互流程中,本質上是peer側發送消息給鏈碼側通過調用鏈碼的Init()/Invoke()方法完成,然后將消息返回給鏈碼側。
4.2.1 實例化鏈碼/升級鏈碼操作
則peer側發送的消息類型為ChaincodeMessage_INIT。在ChaincodeSupport.execute()中會調用handler.execute()方法。
func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error) {
txParams.CollectionStore = h.getCollectionStore(msg.ChannelId)
txParams.IsInitTransaction = (msg.Type == pb.ChaincodeMessage_INIT)
// 創建交易上下文
txctx, err := h.TXContexts.Create(txParams)
if err != nil {
return nil, err
}
// 刪除交易上下文
defer h.TXContexts.Delete(msg.ChannelId, msg.Txid)
if err := h.setChaincodeProposal(txParams.SignedProp, txParams.Proposal, msg); err != nil {
return nil, err
}
// 異步發送消息
h.serialSendAsync(msg)
var ccresp *pb.ChaincodeMessage
// 等待鏈碼側響應
select {
case ccresp = <-txctx.ResponseNotifier:
// response is sent to user or calling chaincode. ChaincodeMessage_ERROR
// are typically treated as error
case <-time.After(timeout):
err = errors.New("timeout expired while executing transaction")
ccName := cccid.Name + ":" + cccid.Version
h.Metrics.ExecuteTimeouts.With(
"chaincode", ccName,
).Add(1)
}
return ccresp, err
}
當鏈碼側接收到ChaincodeMessage_INIT類型消息時會調用handler.handleInit(msg, errc)方法。
case pb.ChaincodeMessage_INIT:
chaincodeLogger.Debugf("[%s] Received %s, initializing chaincode", shorttxid(msg.Txid), msg.Type)
// Call the chaincode's Run function to initialize
handler.handleInit(msg, errc)
return nil
// handleInit handles request to initialize chaincode.
func (handler *Handler) handleInit(msg *pb.ChaincodeMessage, errc chan error) {
go func() {
var nextStateMsg *pb.ChaincodeMessage
defer func() {
// 協程結束時執行
handler.triggerNextState(nextStateMsg, errc)
}()
...
// Get the function and args from Payload
// 獲取方法和參數
input := &pb.ChaincodeInput{}
unmarshalErr := proto.Unmarshal(msg.Payload, input)
// Call chaincode's Run
// Create the ChaincodeStub which the chaincode can use to callback
stub := new(ChaincodeStub)
err := stub.init(handler, msg.ChannelId, msg.Txid, input, msg.Proposal)
// 執行鏈碼的Init方法
res := handler.cc.Init(stub)
// Send COMPLETED message to chaincode support and change state
nextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelId}
chaincodeLogger.Debugf("[%s] Init succeeded. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_COMPLETED)
}()
}
在handleInit(msg, errc)方法中,會反序列化msg.Payload為鏈碼的輸入,其中包含Args。然后調用鏈碼的Init()方法,執行鏈碼初始化流程。並將返回結果、鏈碼事件、交易id以及通道id封裝成ChaincodeMessage_COMPLETED類型的ChaincodeMessage發送給peer側(triggerNextState()方法調用serialSendAsync()發送給peer)
當peer側接收到對應消息。core/chaincode/handler.go handleMessageReadyState()。此時會調用Notify()方法把消息寫入ResponseNotifier通道返回response。從而完成鏈碼實例化/升級流程。
switch msg.Type {
case pb.ChaincodeMessage_COMPLETED, pb.ChaincodeMessage_ERROR:
h.Notify(msg)
4.2.2 調用鏈碼
peer側發送的消息類型為ChaincodeMessage_TRANSACTION。同理鏈碼側獲取到ChaincodeMessage_TRANSACTION消息進行處理。會調用handler.handleTransaction(msg, errc)方法處理該類型消息。該類型消息執行流程和上述流程類似,只是此時調用的是鏈碼的Invoke方法。再調用過程中會與狀態數據庫存在交互,因此會發送消息給peer側,peer側與狀態數據庫交互進行處理,完成后發送消息給鏈碼側,鏈碼側處理完成后發送ChaincodeMessage_COMPLETED消息給peer側。
res := handler.cc.Invoke(stub)
- 鏈碼側:
當在鏈碼執行過程中,需要從狀態數據庫獲取消息時,例如
func (stub *ChaincodeStub) GetState(key string) ([]byte, error) {
// Access public data by setting the collection to empty string
collection := ""
return stub.handler.handleGetState(collection, key, stub.ChannelId, stub.TxID)
}
會在handleGetState()方法中調用callPeerWithChaincodeMsg()方法,再調用handler.sendReceive(msg, respChan)將消息類型ChaincodeMessage_GET_STATE的消息發送給peer側。等待peer側的消息返回,然后進行處理。處理完成后發送ChaincodeMessage_COMPLETED消息給peer側。
- peer側:
當peer側獲取到對應消息時會調用h.HandleTransaction(msg, h.HandleGetState)進行處理。最后將對應的消息封裝成ChaincodeMessage_RESPONSE類型消息給鏈碼側。
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
h.serialSendAsync(resp)
當鏈碼側處理完成后發送ChaincodeMessage_COMPLETED消息給peer側。peer側再通過notify()方法返回消息給上層接口。
其他消息類型暫不介紹,詳情請看源碼。
上述消息交互過程當中,Peer 和鏈碼側還會進行一項操作,那就是定期相互發送ChaincodeMessage_KEEPALIVE消息給對方,以確保彼此是在線狀態。
總結
本節主要介紹了背書節點和鏈碼之間的交互流程。首先本節介紹了系統鏈碼和應用鏈碼的創建流程,然后介紹了鏈碼和背書節點之間是如何建立連接、如何發送消息的。