菜鳥系列Fabric源碼學習 — kafka共識機制


有興趣的關注IT程序員客棧哦

Fabric 1.4源碼分析 kafka共識機制

本文檔主要介紹kafka共識機制流程。在查看文檔之前可以先閱覽raft共識流程以及orderer服務啟動流程。

1. kafka 簡介

Kafka是最初由Linkedin公司開發,是一個分布式、分區的、多副本的、多訂閱者,基於zookeeper協調的分布式日志系統,一種高吞吐量的分布式發布訂閱消息系統。kafka詳細介紹可以參考這一篇博客。kafka介紹

2. kafka共識

kafka共識當中,每個orderer節點即是生產者Producer也是消費者Consumer,在具體設計當中,每個channel對應一個topic,並且為了保證順序性,只設置了一個patition。(參見orderer啟動初始化kafka共識代碼kafka.New(conf, metricsProvider, healthChecker, registrar)),關於kafka共識,這里推薦一篇博客,可以看看設計思路以及實現流程。The ABCs of Kafka in Hyperledger Fabric

實現共識算法需要實現的接口。

type Consenter interface {
    // 處理普通交易
	Order(env *cb.Envelope, configSeq uint64) error
    // 處理配置交易
	Configure(config *cb.Envelope, configSeq uint64) error
	WaitReady() error
}

而接口chain在Consenter接口基礎上增加來部分接口

type Chain interface {
	Order(env *cb.Envelope, configSeq uint64) error
	Configure(config *cb.Envelope, configSeq uint64) error
	WaitReady() error
	Errored() <-chan struct{}
	// 分配資源
	Start()
	// 釋放資源
	Halt()
	MigrationStatus() migration.Status
}

kafka共識實現代碼路徑為:orderer/consensus/kafka/chain.go;首先,在創建chain時會調用start()方法分配資源,在kafka共識中,會初始化生產者producer、消費者Consumer以及一些配置。后續重點通過源碼來介紹producer和consumer模塊實現以及kafka共識整個交易的流程,即主要介紹交易排序整個流程。在此基礎上,解決個人的一些疑問。

3. 交易排序處理

3.1 orderer作為生產者

首先,當發送一個交易給orderer時,會調用orderer模塊的broadcast()服務,其中會調用bh.ProcessMessage(msg, addr)方法根據交易類型調用不同的方法處理。

其中無論是配置交易還是普通交易都會調用chain.enqueue()方法,通過chain.producer.SendMessage(message)方法將交易寫入kafka。從而orderer作為生產者角色功能就是將客戶端發過來的交易寫入kafka。再次強調一下,每個通道對應一個topic,每個topic只有一個patition。

func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
	logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID())
	select {
	case <-chain.startChan: // The Start phase has completed
		select {
		case <-chain.haltChan: // The chain has been halted, stop here
			logger.Warningf("[channel: %s] consenter for this channel has been halted", chain.ChainID())
			return false
		default: // The post path
			payload, err := utils.Marshal(kafkaMsg)
			if err != nil {
				logger.Errorf("[channel: %s] unable to marshal Kafka message because = %s", chain.ChainID(), err)
				return false
			}
			message := newProducerMessage(chain.channel, payload)
			if _, _, err = chain.producer.SendMessage(message); err != nil {
				logger.Errorf("[channel: %s] cannot enqueue envelope because = %s", chain.ChainID(), err)
				return false
			}
			logger.Debugf("[channel: %s] Envelope enqueued successfully", chain.ChainID())
			return true
		}
	default: // Not ready yet
		logger.Warningf("[channel: %s] Will not enqueue, consenter for this channel hasn't started yet", chain.ChainID())
		return false
	}
}

3.2 orderer作為消費者

orderer作為消費者的功能為:將kafka對應topic里面的交易打包成區塊,並寫入賬本。

其中,在orderer創建對應chain的時候調用chain.start()。

func (chain *chainImpl) Start() {
	go startThread(chain)
}

kafka會開啟協程go startThread(chain),其中會對kafka進行一系列初始化工作。最后會調用chain.processMessagesToBlocks()方法,生成對應區塊。

func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
    ...
	for {
		select {
	    ...
	   	case in, ok := <-chain.channelConsumer.Messages():
            ...
			switch msg.Type.(type) {
            ...
			case *ab.KafkaMessage_Regular:
				if err := chain.processRegular(msg.GetRegular(), in.Offset); err != nil {
					logger.Warningf("[channel: %s] Error when processing incoming message of type REGULAR = %s", chain.ChainID(), err)
					counts[indexProcessRegularError]++
				} else {
					counts[indexProcessRegularPass]++
				}
			}
            ...
	}
}

其中,會對chain.processRegular(msg.GetRegular(), in.Offset)消息進行處理。

其中,會針對配置交易和普通交易進行分別處理。普通交易會調用chain.BlockCutter().Ordered(message)生成對應的batchs,配置交易會一個交易一個區塊,直接調用chain.BlockCutter().Cut()生成batch。然后再生成區塊、寫入賬本。

4. 問題思考

  1. kafka共識模式下動態更新系統通道配置添加orderer,是否就可提供排序服務。
    經查看代碼,在orderer啟動過程中,只有raft共識會判斷該orderer(raft節點)是否在對應對consenter集群中。如果不在則會創建inactivechain,無法提供排序服務(必須更新consenter才行)。但是kafka不存在上述過程,在orderer啟動后,會從kafka同步系統通道區塊,當區塊包括創建通道交易時,會創建應用通道,同步應用通道區塊(該流程類似raft共識寫賬本流程)。因此,該orderer可以提供服務,但是如果是每個組織提供orderer的場景、由於沒有更新應用通道排序組織,從而導致無法通過服務發現獲取該orderer信息。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM