Bring up a Kafka-based Ordering Service


Bring up a Kafka-based Ordering Service

這篇文章假設讀者對怎樣設置Kafka集群和ZooKeeper集合已經初步了解。這篇文章的目的是講解部署一個基於Kafka集群的ordering service的步驟,以對你的區塊鏈網絡提供ordering服務。

關於KafkaZookeeper的介紹摘取百度百科如下:

KafkaKafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。

Kafka相關術語:

Broker:Kafka集群包含一個或多個服務器,這種服務器被稱為broker

Topic:每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)

Partition:Partition是物理上的概念,每個Topic包含一個或多個Partition.

Producer:負責發布消息到Kafka broker

Consumer:消息消費者,向Kafka broker讀取消息的客戶端。

Consumer Group:每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。

ZookeeperZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是GoogleChubby一個開源的實現,是HadoopHbase的重要組件。它是一個為分布式應用提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。

ZooKeeper的基本運轉流程:

1、選舉Leader

2、同步數據。

3、選舉Leader過程中算法有很多,但要達到的選舉標准是一致的。

4Leader要具有最高的執行ID,類似root權限。

5、集群中大多數的機器得到響應並follow選出的Leader

 =====================================分割線=============================================

   每個channel對應Kafka中一個單獨分區的主題。當一個OSNordering service node)通過broadcast調用接收到交易時,首先需要驗證確認broadcast client有寫這個channel的權限,然后轉發這些交易到適當的分區。分區也會被OSN消費(生產-消費模型,針對轉發的交易),OSN將收到的交易分組到本地的區塊中,將區塊持久化到本地的賬本中同時通過deliver調用分發區塊到peer

我們使用KZ分別代表Kafka集群和ZooKeeper集群的節點個數。

1)K的最小值應該被設置為4(我們將會在第4步中解釋,這是為了滿足crash容錯的最小節點數。如果有4個代理,那么可以容錯一個代理崩潰,一個代理停止服務后,channel仍然可以繼續讀寫,新的channel可以被創建)

2)Z可以為3,5或是7。它的值需要是一個奇數避免腦裂(split-brain)情況,同時選擇大於1的值為了避免單點故障。超過7ZooKeeper servers會被認為overkill

操作如下:

1、Orderers:將Kafka相關的信息編碼進入到網絡的genesis block如果你使用工具configtxgen,編輯configtx.yaml或者其他用於配置系統channelGenesis block的預先設置文件。

A、Orderer.OrdererType設置為Kafka

B、Orderer.Kafka.Brokers包含Kafka集群中至少兩個代理的地址信息(IPport),這個list不需要是完全的(這些是你的種子代理),這個代理表示當前Order所要連接的Kafka代理

2、Orderers:設置最大的區塊大小。每個區塊最大有Orderer.AbsoluteMaxBytes個字節(不包括頭部),這個值可以在configtx.yaml中設置。假定這里你設置的值為A,記住這個值,這會影響你在第4步怎樣配置Kafka代理。

3、Orderers:創建genesis block使用configtxgen,其中第1步和第2步的設置都是系統級設置,它們應用於整個網絡的所有OSNs。記住genesis block的位置。

4、Kafka集群:配置Kafka代理。確保每個Kafka代理都配置了如下屬性:

A、unclean.leader.election.enable = false---數據一致性在區塊鏈環境中是至關重要的。我們不能從in-sync replicaISR)集合之外選取channel leader,否則我們將會面臨對於之前的leader產生的offsets覆蓋的風險,這樣的結果是,orderers產生的區塊可能會重新寫入區塊鏈。

B、min.insync.replicas = M---設置一個M值(例如1<M<N,查看下面的

default.replication.factor)。數據提交時會寫入至少M個副本(這些數據然后會被同步並且歸屬到in-sync replica集合或ISR)。其它情況,寫入操作會返回一個錯誤。接下來:

1)如果channel寫入的數據多達N-M個副本變的不可用,操作可以正常執行。

2)如果有更多的副本不可用,Kafka不可以維護一個有M數量的ISR集合,因此Kafka停止接收寫操作。Channel只有當同步M個副本后才可以重新可以寫。

Cdefault.replication.factor = N---設置一個值NN<K。設置replication factor參數為N代表着每個channel都保存N個副本的數據到Kafka的代理上。這些都是一個channelISR集合的候選。如同在上邊min.insync.replicas section設置部分所描述的,不是所有的代理(orderer)在任何時候都是可用的。N的值必須小於K,如果少於N個代理的話,channel的創建是不能成功的。因此,如果設置N的值為K,一個代理失效后,那么區塊鏈網絡將不能再創建新的channel---orderering servicecrash容錯也就不存在了。

Dmessage.max.bytes 和replica.fetch.max.bytes應該設置一個大於A(第2步設置的Orderer.AbsoluteMaxBytes值)的值。為header增加一些緩沖區空間---1MB已經足夠大。上述不同設置值之間滿足如下關系:

Orderer.AbsoluteMaxBytes < replica.fetch.max.bytes <= message.max.bytes

(更完整的是,message.max.bytes應該嚴格小於socket.request.max.bytes的值,socket.request.max.bytes的值默認被設置為100MB。如果你想要區塊的大小大於100MB,需要編輯fabric/orderer/kafka/config.go文件里硬編碼的值brokerConfig.Producer.MaxMessageBytes,修改后重新編譯源碼得到二進制文件,這種設置是不建議的。)

Elog.retention.ms = -1。除非orderering serviceKafka日志的修剪增加支持,否則你需要關閉基於時間的日志保留方式並且避免分段到期(基於大小的日志保留方式log.retention.bytes在寫本文章時在Kafka中已經默認關閉,因此不需要再次明確設置這個配置)。

基於上面的描述,MN的最小值分別為23。這個配置允許新的channel創建,所有的channel也繼續可寫。

5、Orderers:指向每個OSNgenesis block編輯orderer.yaml中的General.GenesisFile配置設定指向第3步時所生成的genesis block

6、Orderers:調整輪詢間隔和超時(可選步驟)

A、orderer.yaml配置文件中的Kafka.Retry配置允許調整metadata/producer/consumer請求的頻率,同樣可以設置超時時間。(這些配置是作為一個Kafka生產者和消費者所需關注的所有配置)

B、另外,當一個新的channel被創建或者當一個已經存在的channel重新加載(假設一個剛剛重啟的orderer),ordererKafka集群的交互方式如下:

1)orderer為該channel對應的Kafka分區創建一個producerwriter

2)使用producer發一個空操作連接信息到這個分區

3)為這個分區創建一個consumer

如果上述步驟中任意一步失敗了,你可以調整上述步驟重復的頻率。特定的,上述過程會在每個Kafka.Retry.ShortInterval重新嘗試,嘗試共計Kafka.Retry.ShortTotal時間。然后每個Kafka.Retry.LongInterval重新嘗試一次,嘗試總計Kafka.Retry.LongTotal時間,直到上述步驟成功執行。注意在上述步驟成功執行之前,orderer是不能對一個channel進行讀寫操作的。

7、設置OSNsKafka集群通過SSL通信。(可選步驟,但是高度推薦。)參考 the Confluent guidehttp://docs.confluent.io/2.0.0/kafka/ssl.htmlKafka集群的配置值,在每個OSN上設置orderer.yamlKafka.TLS下面的key的值。

8、按照如下順序啟動節點:啟動Zookeeper集合,Kafka集群,orderering service nodes

其他注意事項

1、建議消息大小。第2步中,可以通過設置Orderer.Batchsize.PreferredMaxBytes鍵的值設置每個區塊建議的大小。Kafka對於相對小的消息提供更高的吞吐量;區塊大小最好不要超過1MB

2、通過環境變量覆蓋設置。當使用Hyperledger提供的KafkaZookeeperdocker鏡像時(查看images/kafkaimages/zookeeper相應的)。可以使用環境變量覆蓋一個Kafka代理(broker)和Zookeeper server的配置。環境變量設置舉例如下:

KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=false 將會修改默認值

unclean.leader.election.enableORDERER_KAFKA_RETRY_SHORTINTERVAL=1s 將會修改默認值 Orderer.Kafka.Retry.ShortInterval.

支持的Kafka版本和升級

支持V1Kafka版本是0.90.10。(Hyperledger Fabric使用sarama client library,支持Kafka0.90.10

默認的Kafka版本設置為0.9.0.1Hyperledger Fabric提供的Kafka鏡像和默認版本一致。如果你不是使用Hyperledger Fabric所提供的Kafka鏡像,確保你在orderer.yaml文件中指定了Kafka.VersionKafka版本。

目前支持的Kafka版本包括

Version : 0.9.0.1

Version : 0.10.0.0

Version : 0.10.0.1

Version : 0.10.1.0

調試

設置General.LogLevel的值為DEBUGorderer.yamlKafka.Verbose的值為true

實例

按照上述建議配置進行配置對應的docker compose配置文件可以在fabric/bddtests目錄中找到。查看dc-orderer-kafka-base.yml和dc-orderer-kafka.yml


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM