Bring up a Kafka-based Ordering Service
這篇文章假設讀者對怎樣設置Kafka集群和ZooKeeper集合已經初步了解。這篇文章的目的是講解部署一個基於Kafka集群的ordering service的步驟,以對你的區塊鏈網絡提供ordering服務。
關於Kafka和Zookeeper的介紹摘取百度百科如下:
Kafka:Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。
Kafka相關術語:
Broker:Kafka集群包含一個或多個服務器,這種服務器被稱為broker
Topic:每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)
Partition:Partition是物理上的概念,每個Topic包含一個或多個Partition.
Producer:負責發布消息到Kafka broker
Consumer:消息消費者,向Kafka broker讀取消息的客戶端。
Consumer Group:每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。
Zookeeper:ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分布式應用提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。
ZooKeeper的基本運轉流程:
1、選舉Leader。
2、同步數據。
3、選舉Leader過程中算法有很多,但要達到的選舉標准是一致的。
4、Leader要具有最高的執行ID,類似root權限。
5、集群中大多數的機器得到響應並follow選出的Leader。
=====================================分割線=============================================
每個channel對應Kafka中一個單獨分區的主題。當一個OSN(ordering service node)通過broadcast調用接收到交易時,首先需要驗證確認broadcast client有寫這個channel的權限,然后轉發這些交易到適當的分區。分區也會被OSN消費(生產-消費模型,針對轉發的交易),OSN將收到的交易分組到本地的區塊中,將區塊持久化到本地的賬本中同時通過deliver調用分發區塊到peer。
我們使用K和Z分別代表Kafka集群和ZooKeeper集群的節點個數。
1)K的最小值應該被設置為4(我們將會在第4步中解釋,這是為了滿足crash容錯的最小節點數。如果有4個代理,那么可以容錯一個代理崩潰,一個代理停止服務后,channel仍然可以繼續讀寫,新的channel可以被創建)
2)Z可以為3,5或是7。它的值需要是一個奇數避免腦裂(split-brain)情況,同時選擇大於1的值為了避免單點故障。超過7個ZooKeeper servers會被認為overkill。
操作如下:
1、Orderers:將Kafka相關的信息編碼進入到網絡的genesis block。如果你使用工具configtxgen,編輯configtx.yaml或者其他用於配置系統channel的Genesis block的預先設置文件。
A、Orderer.OrdererType設置為Kafka
B、Orderer.Kafka.Brokers包含Kafka集群中至少兩個代理的地址信息(IP:port),這個list不需要是完全的(這些是你的種子代理),這個代理表示當前Order所要連接的Kafka代理。
2、Orderers:設置最大的區塊大小。每個區塊最大有Orderer.AbsoluteMaxBytes個字節(不包括頭部),這個值可以在configtx.yaml中設置。假定這里你設置的值為A,記住這個值,這會影響你在第4步怎樣配置Kafka代理。
3、Orderers:創建genesis block。使用configtxgen,其中第1步和第2步的設置都是系統級設置,它們應用於整個網絡的所有OSNs。記住genesis block的位置。
4、Kafka集群:配置Kafka代理。確保每個Kafka代理都配置了如下屬性:
A、unclean.leader.election.enable = false---數據一致性在區塊鏈環境中是至關重要的。我們不能從in-sync replica(ISR)集合之外選取channel leader,否則我們將會面臨對於之前的leader產生的offsets覆蓋的風險,這樣的結果是,orderers產生的區塊可能會重新寫入區塊鏈。
B、min.insync.replicas = M---設置一個M值(例如1<M<N,查看下面的
default.replication.factor)。數據提交時會寫入至少M個副本(這些數據然后會被同步並且歸屬到in-sync replica集合或ISR)。其它情況,寫入操作會返回一個錯誤。接下來:
1)如果channel寫入的數據多達N-M個副本變的不可用,操作可以正常執行。
2)如果有更多的副本不可用,Kafka不可以維護一個有M數量的ISR集合,因此Kafka停止接收寫操作。Channel只有當同步M個副本后才可以重新可以寫。
C、default.replication.factor = N---設置一個值N,N<K。設置replication factor參數為N代表着每個channel都保存N個副本的數據到Kafka的代理上。這些都是一個channel的ISR集合的候選。如同在上邊min.insync.replicas section設置部分所描述的,不是所有的代理(orderer)在任何時候都是可用的。N的值必須小於K,如果少於N個代理的話,channel的創建是不能成功的。因此,如果設置N的值為K,一個代理失效后,那么區塊鏈網絡將不能再創建新的channel---orderering service的crash容錯也就不存在了。
D、message.max.bytes 和replica.fetch.max.bytes應該設置一個大於A(第2步設置的Orderer.AbsoluteMaxBytes值)的值。為header增加一些緩沖區空間---1MB已經足夠大。上述不同設置值之間滿足如下關系:
Orderer.AbsoluteMaxBytes < replica.fetch.max.bytes <= message.max.bytes
(更完整的是,message.max.bytes應該嚴格小於socket.request.max.bytes的值,socket.request.max.bytes的值默認被設置為100MB。如果你想要區塊的大小大於100MB,需要編輯fabric/orderer/kafka/config.go文件里硬編碼的值brokerConfig.Producer.MaxMessageBytes,修改后重新編譯源碼得到二進制文件,這種設置是不建議的。)
E、log.retention.ms = -1。除非orderering service對Kafka日志的修剪增加支持,否則你需要關閉基於時間的日志保留方式並且避免分段到期(基於大小的日志保留方式log.retention.bytes在寫本文章時在Kafka中已經默認關閉,因此不需要再次明確設置這個配置)。
基於上面的描述,M和N的最小值分別為2和3。這個配置允許新的channel創建,所有的channel也繼續可寫。
5、Orderers:指向每個OSN的genesis block。編輯orderer.yaml中的General.GenesisFile配置設定指向第3步時所生成的genesis block。
6、Orderers:調整輪詢間隔和超時(可選步驟)
A、orderer.yaml配置文件中的Kafka.Retry配置允許調整metadata/producer/consumer請求的頻率,同樣可以設置超時時間。(這些配置是作為一個Kafka生產者和消費者所需關注的所有配置)
B、另外,當一個新的channel被創建或者當一個已經存在的channel重新加載(假設一個剛剛重啟的orderer),orderer和Kafka集群的交互方式如下:
1)orderer為該channel對應的Kafka分區創建一個producer(writer)
2)使用producer發一個空操作連接信息到這個分區
3)為這個分區創建一個consumer
如果上述步驟中任意一步失敗了,你可以調整上述步驟重復的頻率。特定的,上述過程會在每個Kafka.Retry.ShortInterval重新嘗試,嘗試共計Kafka.Retry.ShortTotal時間。然后每個Kafka.Retry.LongInterval重新嘗試一次,嘗試總計Kafka.Retry.LongTotal時間,直到上述步驟成功執行。注意在上述步驟成功執行之前,orderer是不能對一個channel進行讀寫操作的。
7、設置OSNs和Kafka集群通過SSL通信。(可選步驟,但是高度推薦。)參考 the Confluent guide(http://docs.confluent.io/2.0.0/kafka/ssl.html)Kafka集群的配置值,在每個OSN上設置orderer.yaml中Kafka.TLS下面的key的值。
8、按照如下順序啟動節點:啟動Zookeeper集合,Kafka集群,orderering service nodes。
其他注意事項
1、建議消息大小。第2步中,可以通過設置Orderer.Batchsize.PreferredMaxBytes鍵的值設置每個區塊建議的大小。Kafka對於相對小的消息提供更高的吞吐量;區塊大小最好不要超過1MB。
2、通過環境變量覆蓋設置。當使用Hyperledger提供的Kafka和Zookeeper的docker鏡像時(查看images/kafka和images/zookeeper相應的)。可以使用環境變量覆蓋一個Kafka代理(broker)和Zookeeper server的配置。環境變量設置舉例如下:
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=false 將會修改默認值
unclean.leader.election.enable。ORDERER_KAFKA_RETRY_SHORTINTERVAL=1s 將會修改默認值 Orderer.Kafka.Retry.ShortInterval.
支持的Kafka版本和升級
支持V1的Kafka版本是0.9和0.10。(Hyperledger Fabric使用sarama client library,支持Kafka0.9和0.10)
默認的Kafka版本設置為0.9.0.1。Hyperledger Fabric提供的Kafka鏡像和默認版本一致。如果你不是使用Hyperledger Fabric所提供的Kafka鏡像,確保你在orderer.yaml文件中指定了Kafka.Version的Kafka版本。
目前支持的Kafka版本包括
Version : 0.9.0.1
Version : 0.10.0.0
Version : 0.10.0.1
Version : 0.10.1.0
調試
設置General.LogLevel的值為DEBUG,orderer.yaml中Kafka.Verbose的值為true。
實例
按照上述建議配置進行配置對應的docker compose配置文件可以在fabric/bddtests目錄中找到。查看dc-orderer-kafka-base.yml和dc-orderer-kafka.yml