1.目標 - 風暴卡夫卡整合
在本Kafka教程中,我們將學習Storm Kafka Integration的概念。此外,我們將在此Kafka Storm集成教程中討論Storm架構,Storm Cluster。因此,為了使Kafka開發人員更容易從Storm拓撲中攝取和發布數據流,我們執行Storm Kafka Integration。
那么,讓我們開始Kafka Storm Integration教程。
2.什么是風暴?
Apache Storm是一個開源,分布式,可靠且容錯的系統。Storm有各種用例,如實時分析,在線機器學習,連續計算和提取轉換負載(ETL)范例。
但是,對於流數據處理,有幾個組件可以協同工作,例如:
- 噴口
spout是流的源,它是連續的日志數據流。
- 螺栓
此外,spout將數據傳遞給組件,我們稱之為bolt。基本上,bolt消耗任意數量的輸入流,進行一些處理,並可能發出新的流。
讓我們探討Apache Kafka用例| Kafka應用程序
下圖描述了Storm Architecture中的spout和bolt:
但是,讓我們假設一個Storm集群是一個螺栓組件鏈。在這里,每個螺栓對噴口流式傳輸的數據執行某種轉換。
此外,作業在Storm集群中稱為拓撲。雖然,這些拓撲結構永遠存在。之后,創建拓撲(計算圖),用於Storm的實時計算。那么,數據如何通過螺栓從噴口流出,拓撲將定義它。
3.什么是Storm Kafka Integration?
一般來說,卡夫卡和風暴相互補充。因此,我們可以說他們強大的合作可以為快速移動的大數據提供實時流分析。因此,為了使開發人員更容易從Storm拓撲中攝取和發布數據流,我們執行Kafka-Storm集成。
下圖描述了Kafka Storm集成工作模型的高級集成視圖:
一個。使用KafkaSpout
基本上,從Kafka集群讀取的常規spout 實現是KafkaSpout。它的基本用法是:
閱讀Apache Kafka Architecture及其基本概念
-
SpoutConfig spoutConfig = new SpoutConfig( ImmutableList.of("kafkahost1", "kafkahost2"), // list of Kafka brokers 8, // number of partitions per host "clicks", // topic to read from "/kafkastorm", // the root path in Zookeeper for the spout to store the consumer offsets "discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
但是,對於每個主機的靜態代理列表和固定數量的分區,將對spout進行參數化。
此外,它還存儲在Zookeeper中消耗的偏移的狀態。此外,為了存儲該特定噴口的偏移量和id,使用根路徑對噴口進行參數化。因此,分區的偏移量將存儲在這些路徑中,其中“0”,“1”是分區的ID:
-
{root path}/{id}/0 {root path}/{id}/1 {root path}/{id}/2 {root path}/{id}/3 …
確保默認情況下,偏移量將存儲在Storm使用的同一Zookeeper群集中。此外,我們可以通過我們的spout配置覆蓋這個,如下所示:
-
spoutConfig.zkServers = ImmutableList.of("otherserver.com"); spoutConfig.zkPort = 2191;
以下配置顯示了強制噴口回卷到先前偏移的能力。我們可以在spout配置上執行forceStartOffsetTime,如下所示:
-
spoutConfig.forceStartOffsetTime(-2);
這將選擇圍繞該時間戳寫的最新偏移量以開始消費。此外,我們可以通過傳入-1來強制噴口始終從最新的偏移開始,並且我們可以通過傳入-2來強制它從最早的偏移開始。
讓我們來討論卡夫卡的優缺點
一世。用於連接Kafka Cluster的參數
此外,KafkaSpout是一個常規的spout實現,它從Kafka集群中讀取數據。此外,為了連接到Kafka集群,它需要以下參數:
a。卡夫卡經紀人名單
b。每個主機的分區數
c。用於拉取消息的主題名稱。
d。ZooKeeper中的根路徑,Spout存儲消費者偏移量
e。在ZooKeeper中存儲使用者偏移量所需的使用者ID在
下面的代碼示例中顯示了使用前面參數的KafkaSpout類實例初始化:
-
Copy SpoutConfig spoutConfig = new SpoutConfig( ImmutableList.of("localhost:9092", "localhost:9093"), 2, " othertopic", "/kafkastorm", "consumID"); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
此外,為了存儲消息偏移的狀態和段消耗跟蹤(如果消耗),Kafka Spout使用ZooKeeper。
在為ZooKeeper指定的根路徑中,存儲這些偏移量。此外,為了存儲消息偏移量,Storm默認使用自己的ZooKeeper集群。但是,通過設置其他ZooKeeper集群,我們可以在Spout配置中使用它們。
要指定Spout如何通過設置屬性從Kafka集群中獲取消息,Kafka Spout還提供了一個選項,如緩沖區大小和超時。
值得注意的是,為了使用Storm運行Kafka,需要設置Storm和Kafka集群,並且它應該處於運行狀態。
所以,這完全是關於Storm Kafka Integration。希望你喜歡我們的解釋。
4。結論
因此,在這個Storm Kafka集成教程中,我們已經看到了Storm Kafka Integration的概念。在這里,我們討論了Apache Storm,Storm Architecture,Storm Cluster的簡要介紹。最后,我們討論了使用KafkaSpout的實現




