看一下Peer
節點的啟動過程,通常在Fabric網絡中,Peer
節點的啟動方式有兩種,通過Docker容器啟動,或者是通過執行命令直接啟動。
一般情況下,我們都是執行docker-compose -f docker-*.yaml up
命令通過容器啟動了Peer
節點,而如果直接啟動Peer
節點則是執行了peer node start
這條命令。看起來,這兩種方式所使用的命令毫無關系,但事實上,在Docker容器中啟動Peer
節點也是通過執行了peer node start
這條命令來啟動Peer
節點,只不過是Docker替我們執行了,這條命令就在之前通過啟動Docker容器的那個文件中寫到。所以說,無論是哪種方式啟動Peer
節點,都是通過peer node start
這條命令,接下來,我們就分析一下執行完這條命令后,Peer
節點的啟動過程。
和之前一樣,首先找到切入點,在/fabric/peer/main.go
文件中,第46行:
mainCmd.AddCommand(node.Cmd())
這里包含了與對Peer
節點進行相關操作的命令集合,其中就有啟動Peer
節點的命令,我們點進行看一下:
func Cmd() *cobra.Command {
nodeCmd.AddCommand(startCmd())
nodeCmd.AddCommand(statusCmd())
return nodeCmd
}
共有兩條命令:啟動Peer
節點,以及查看節點的狀態,我們看一下啟動Peer
節點這條命令,首先調用了peer/node/start.go
文件中的startCmd()
,之后轉到了nodeStartCmd
,以及serve(args)
這個方法。其中,serve(args)
這個方法就是本文要說明了主要方法,我們就從這里開始分析,在peer/node/start.go
文件中第125行:
func serve(args []string) error {
#首先獲取MSP的類型,msp指的是成員關系服務提供者,相當於許可證
mspType := mgmt.GetLocalMSP().GetType()
#如果MSP的類型不是FABRIC,返回錯誤信息
if mspType != msp.FABRIC {
panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))
}
...
#創建ACL提供者,access control list訪問控制列表
aclProvider := aclmgmt.NewACLProvider(
aclmgmt.ResourceGetter(peer.GetStableChannelConfig),
)
#平台注冊,可以使用的語言類型,最后一個car不太理解,可能和官方的一個例子有關
pr := platforms.NewRegistry(
&golang.Platform{},
&node.Platform{},
&java.Platform{},
&car.Platform{},
)
定義一個用於部署鏈碼的Provider結構體:
deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{}
==========================DeployedCCInfoProvider==========================
type DeployedChaincodeInfoProvider interface {
Namespaces() []string #命名空間
UpdatedChaincodes(stateUpdates map[string][]*kvrwset.KVWrite) ([]*ChaincodeLifecycleInfo, error) #保存更新的鏈碼
ChaincodeInfo(chaincodeName string, qe SimpleQueryExecutor) (*DeployedChaincodeInfo, error) #保存鏈碼信息
CollectionInfo(chaincodeName, collectionName string, qe SimpleQueryExecutor) (*common.StaticCollectionConfig, error)
} #保存鏈碼數據信息
==========================DeployedCCInfoProvider==========================
下面是對Peer節點的一些屬性的設置了:
identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer {
#獲取通道管理者
return mgmt.GetManagerForChain(chainID)
}
#相當於配置Peer節點的運行環境了,主要就是保存Peer節點的IP地址,端口,證書等相關基本信息
opsSystem := newOperationsSystem()
err := opsSystem.Start()
if err != nil {
return errors.WithMessage(err, "failed to initialize operations subystems")
}
defer opsSystem.Stop()
metricsProvider := opsSystem.Provider
#創建觀察者,對Peer節點進行記錄
logObserver := floggingmetrics.NewObserver(metricsProvider)
flogging.Global.SetObserver(logObserver)
#創建成員關系信息Provider,簡單來說就是保存其他Peer節點的信息,以便通信等等
membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory)
#賬本管理器初始化,主要就是之前所定義的一些屬性
ledgermgmt.Initialize(
&ledgermgmt.Initializer{
#與Tx處理相關
CustomTxProcessors: peer.ConfigTxProcessors,
#之前定義的所使用的語言
PlatformRegistry: pr,
#與鏈碼相關
DeployedChaincodeInfoProvider: deployedCCInfoProvider,
#與Peer節點交互相關
MembershipInfoProvider: membershipInfoProvider,
#這個不太清楚,與Peer節點的屬性相關?
MetricsProvider: metricsProvider,
#健康檢查
HealthCheckRegistry: opsSystem,
},
)
#判斷是否處於開發模式下
if chaincodeDevMode {
logger.Info("Running in chaincode development mode")
logger.Info("Disable loading validity system chaincode")
viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)
}
#里面有兩個方法,分別是獲取本地地址與獲取當前Peer節點實例地址,將地址進行緩存
if err := peer.CacheConfiguration(); err != nil {
return err
}
#獲取當前Peer節點實例地址,如果沒有進行緩存,則會執行上一步的CacheConfiguration()方法
peerEndpoint, err := peer.GetPeerEndpoint()
if err != nil {
err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)
return err
}
#簡單的字符串操作,獲取Host
peerHost, _, err := net.SplitHostPort(peerEndpoint.Address)
if err != nil {
return fmt.Errorf("peer address is not in the format of host:port: %v", err)
}
#獲取監聽地址,該屬性在opsSystem中定義過
listenAddr := viper.GetString("peer.listenAddress")
#返回當前Peer節點的gRPC服務器配置,該方法主要就是設置TLS與心跳信息,在/core/peer/config.go文件中第128行。
serverConfig, err := peer.GetServerConfig()
if err != nil {
logger.Fatalf("Error loading secure config for peer (%s)", err)
}
#設置gRPC最大並發 grpcMaxConcurrency=2500
throttle := comm.NewThrottle(grpcMaxConcurrency)
#設置日志信息
serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")
serverConfig.MetricsProvider = metricsProvider
#設置攔截器,不再細說
serverConfig.UnaryInterceptors = append(
serverConfig.UnaryInterceptors,
grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
throttle.UnaryServerIntercptor,
)
serverConfig.StreamInterceptors = append(
serverConfig.StreamInterceptors,
grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),
grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
throttle.StreamServerInterceptor,
)
到這里創建了Peer節點的gRPC服務器,將之前的監聽地址與服務器配置傳了進去:
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
if err != nil {
logger.Fatalf("Failed to create peer server (%s)", err)
}
關於權限的一些配置:
#TLS的相關設置
if serverConfig.SecOpts.UseTLS {
logger.Info("Starting peer with TLS enabled")
// set up credential support
cs := comm.GetCredentialSupport()
roots, err := peer.GetServerRootCAs()
if err != nil {
logger.Fatalf("Failed to set TLS server root CAs: %s", err)
}
cs.ServerRootCAs = roots
// set the cert to use if client auth is requested by remote endpoints
clientCert, err := peer.GetClientCertificate()
if err != nil {
logger.Fatalf("Failed to set TLS client certificate: %s", err)
}
comm.GetCredentialSupport().SetClientCertificate(clientCert)
}
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
#策略檢查Provider,看傳入的參數就比較清楚了,Envelope,通道ID,環境變量
policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc {
return func(env *cb.Envelope, channelID string) error {
return aclProvider.CheckACL(resourceName, channelID, env)
}
}
創建了另一個服務器,與上面的權限設置相關,用於交付與過濾區塊的事件服務器:
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
#將之前創建的gRPC服務器與用於交付與過濾區塊的事件服務器注冊到這里
pb.RegisterDeliverServer(peerServer.Server(), abServer)
接下來是與鏈碼相關的操作:
#啟動與鏈碼相關的服務器,看傳入的值 Peer節點的主機名,訪問控制列表Provider,pr是之前提到與語言相關的,以及之前的運行環境
#主要完成三個操作:1.設置本地鏈碼安裝路徑,2.創建自簽名CA,3,啟動鏈碼gRPC監聽服務,該方法在本文件中第709行
chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
logger.Debugf("Running peer")
#啟動管理員服務,這個不太懂干嘛的
startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
#看這個方法是分發私有數據到其他節點
return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)
}
========================TxPvtReadWriteSetWithConfigInfo==========================
#看這里,主要是私有的讀寫集以及配置信息
type TxPvtReadWriteSetWithConfigInfo struct {
EndorsedAt uint64 `protobuf:"varint,1,opt,name=endorsed_at,json=endorsedAt,proto3" json:"endorsed_at,omitempty"`
PvtRwset *rwset.TxPvtReadWriteSet `protobuf:"bytes,2,opt,name=pvt_rwset,json=pvtRwset,proto3" json:"pvt_rwset,omitempty"`
CollectionConfigs map[string]*common.CollectionConfigPackage `protobuf:"bytes,3,rep,name=collection_configs,json=collectionConfigs,proto3" json:"collection_configs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
============================TxPvtReadWriteSetWithConfigInfo==========================
#獲取本地的已簽名的身份信息,主要是看當前節點具有的功能,比如背書,驗證
signingIdentity := mgmt.GetLocalSigningIdentityOrPanic()
serializedIdentity, err := signingIdentity.Serialize()
if err != nil {
logger.Panicf("Failed serializing self identity: %v", err)
}
#
libConf := library.Config{}
================================Config=============================
type Config struct {
#權限過濾
AuthFilters []*HandlerConfig `mapstructure:"authFilters" yaml:"authFilters"`
#這個不清楚
Decorators []*HandlerConfig `mapstructure:"decorators" yaml:"decorators"`
#背書
Endorsers PluginMapping `mapstructure:"endorsers" yaml:"endorsers"`
#驗證
Validators PluginMapping `mapstructure:"validators" yaml:"validators"`
}
==================================Config=============================
if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil {
return errors.WithMessage(err, "could not load YAML config")
}
#創建一個Registry實例,將上面的配置注冊到這里
reg := library.InitRegistry(libConf)
#這一部分是背書操作的相關設置,不貼出來了
...
#設置完之后注冊背書服務
pb.RegisterEndorserServer(peerServer.Server(), auth)
#創建通道策略管理者,比如哪些節點或用戶具有可讀,可寫,可操作的權限,都是由它管理
policyMgr := peer.NewChannelPolicyManagerGetter()
#創建用於廣播的服務,就是區塊鏈中用於向其他節點發送消息的服務
err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
到這里,鏈碼的相關配置已經差不多了,到了部署系統鏈碼的地方了:
#這一行代碼就是將系統鏈碼部署上去
sccp.DeploySysCCs("", ccp)
logger.Infof("Deployed system chaincodes")
installedCCs := func() ([]ccdef.InstalledChaincode, error) {
#查看已經安裝的鏈碼
return packageProvider.ListInstalledChaincodes()
}
#與鏈碼的生命周期相關
lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs))
if err != nil {
logger.Panicf("Failed creating lifecycle: +%v", err)
}
#處理鏈碼的元數據更新,由其他節點廣播
onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) {
service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel))
})
#添加監聽器監聽鏈碼元數據更新
lifecycle.AddListener(onUpdate)
這一部分是與通道的初始化相關的內容:
peer.Initialize(func(cid string) {
logger.Debugf("Deploying system CC, for channel <%s>", cid)
sccp.DeploySysCCs(cid, ccp)
#獲取通道的描述信息,就是通道的基本屬性
sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {
#根據通道ID獲取賬本的查詢執行器
return peer.GetLedger(cid).NewQueryExecutor()
}))
if err != nil {
logger.Panicf("Failed subscribing to chaincode lifecycle updates")
}
#為通道注冊監聽器
cceventmgmt.GetMgr().Register(cid, sub)
}, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName),
pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider)
#當前節點狀態改變后是否可以被發現
if viper.GetBool("peer.discovery.enabled") {
registerDiscoveryService(peerServer, policyMgr, lifecycle)
}
#獲取Peer節點加入的網絡ID
networkID := viper.GetString("peer.networkId")
logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)
#查看是否已經定義了配置文件
profileEnabled := viper.GetBool("peer.profile.enabled")
profileListenAddress := viper.GetString("peer.profile.listenAddress")
#創建進程啟動gRPC服務器
serve := make(chan error)
go func() {
var grpcErr error
if grpcErr = peerServer.Start(); grpcErr != nil {
grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
} else {
logger.Info("peer server exited")
}
serve <- grpcErr
}()
#如果已經定義了配置文件,則啟動監聽服務
if profileEnabled {
go func() {
logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {
logger.Errorf("Error starting profiler: %s", profileErr)
}
}()
}
#開始處理接收到的消息了
go handleSignals(addPlatformSignals(map[os.Signal]func(){
syscall.SIGINT: func() { serve <- nil },
syscall.SIGTERM: func() { serve <- nil },
}))
logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)
#阻塞在這里,除非gRPC服務停止
return <-serve
}
到這里Peer
節點已經啟動完成了,過程還是很復雜的,這里總結一下整體的過程:
- 首先就是讀取配置信息,創建Cache結構,以及檢測其他
Peer
節點的信息。CacheConfiguration()
,主要保存其他Peer
節點的相關信息。
- 創建
PeerServer
。peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
- 創建
DeliverEventsServer
。abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
pb.RegisterDeliverServer(peerServer.Server(), abServer)
fabric/core/peer/deliverevents.go
,該服務主要用於區塊的交付與過濾,主要方法:Deliver(),DeliverFiltered()
- 啟動
ChaincodeServer
。chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
core/chaincode/chaincode_support.go
,返回了ChaincodeSupport:為Peer提供執行鏈碼的接口,主要功能有Launch():啟動一個停止運行的鏈碼,Stop():停止鏈碼的運行,HandleChaincodeStream():處理鏈碼流信息,Register():將鏈碼注冊到當前Peer節點 ,createCCMessage():創建一個交易,ExecuteLegacyInit():鏈碼的實例化,Execute():執行鏈碼並返回回原始的響應,processChaincodeExecutionResult():處理鏈碼的執行結果,InvokeInit():調用鏈碼的Init方法,Invoke():調用鏈碼,execute():執行一個交易
- 啟動
AdminServer
。startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
core/protos/peer/admin.go
文件,具有GetStatus(),StartServer(),GetModuleLogLevel(),SetModuleLogLevel()
等方法
- 創建
EndorserServer
。pb.RegisterEndorserServer(peerServer.Server(), auth)
core/endorser/endorser.go
文件,注冊背書服務器,提供了一個很重要的方法:ProcessProposal()
,這個方法值得看一下。
- 創建
GossipService
。err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
gossip/service/gossip_service.go
,具有InitializeChannel(),createSelfSignedData(),updateAnchors(),AddPayload()
等方法
- 部署系統鏈碼。
- 初始化通道。
- 啟動gRPC服務。
- 如果啟用了profile,還會啟動監聽服務。
流程圖:,由於Fabric在不斷更新,所以代碼和圖中還是有一些不同的。
參考:這里