本文從以下四個方面手把手教你寫Kafka Streams程序:
一. 設置Maven項目
二. 編寫第一個Streams應用程序:Pipe
三. 編寫第二個Streams應用程序:Line Split
四. 編寫第三個Streams應用程序:Wordcount
一. 設置Maven項目
我們將使用Kafka Streams Maven Archetype來創建Streams項目結構:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \ -DarchetypeArtifactId=streams-quickstart-java \ -DarchetypeVersion=1.1.0 \ -DgroupId=streams.examples \ -DartifactId=streams.examples \ -Dversion=0.1 \ -Dpackage=myapps
如果你需要,您可以為groupId,artifactId和package設置不同的值。假設您使用上述參數值,該命令將創建一個如下所示的項目結構:
> tree streams.examples
streams-quickstart
|-- pom.xml |-- src |-- main |-- java | |-- myapps | |-- LineSplit.java | |-- Pipe.java | |-- WordCount.java |-- resources |-- log4j.properties
項目中包含的pom.xml文件已經定義了Streams依賴項,並且在src/main/java已經有幾個Streams示例程序。 既然我們要從頭開始編寫這樣的程序,現在我們先刪除這些例子:
> cd streams-quickstart > rm src/main/java/myapps/*.java
二. 編寫第一個Streams應用程序:Pipe
It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under src/main/java. Let's name it Pipe.java:
現在是編碼時間! 隨意打開你最喜歡的IDE並導入這個Maven項目,或者直接打開一個文本編輯器並在src/main/java
下創建一個java文件。 我們將其命名為Pipe.java
:
package myapps; public class Pipe { public static void main(String[] args) throws Exception { } }
我們在main中來編寫這個pipe程序。請注意,由於IDE通常可以自動添加導入語句,因此我們不會列出導入語句。但是,如果您使用的是文本編輯器,則需要手動添加導入,並且在本節末尾,我們將為您顯示帶有導入語句的完整代碼段。
編寫Streams應用程序的第一步是創建一個java.util.Properties
映射來指定StreamsConfig中定義的不同Streams執行配置值。 需要設置的幾個重要配置值:StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
,它指定用於建立初始連接到Kafka集群的host/port列表,以及StreamsConfig.APPLICATION_ID_CONFIG
,它提供了Streams的唯一標識符應用程序與其他應用程序進行區分:
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker
假設這個應用程序和集群在同一台機器運行。
另外,你也可以自定義其他配置,例如設置消息key-value對的默認序列化和反序列:
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
有關Kafka Streams的完整配置列表,請參閱這里。
接下來我們將定義Streams應用程序的計算邏輯。在Kafka Streams中,這種計算邏輯被定義為連接處理器節點的拓撲結構。我們可以使用拓撲構建器來構建這樣的拓撲,
final StreamsBuilder builder = new StreamsBuilder();
然后使用此拓撲構建器,創建主題為streams-plaintext-input
的源流
(ps:就是數據的來源):
KStream<String, String> source = builder.stream("streams-plaintext-input");
現在我們得到一個KStream,它不斷的從來源主題streams-plaintext-input
獲取消息。消息是String類型的key-value對。我們可以用這個流做的最簡單的事情就是將它寫入另一個Kafka主題streams-pipe-output
中:
source.to("streams-pipe-output");
請注意,我們也可以將上面兩行連接成一行,如下所示:
builder.stream("streams-plaintext-input").to("streams-pipe-output");
我們可以通過執行以下操作來檢查此構建器創建的拓撲結構類型:
final Topology topology = builder.build();
將描述輸出:
System.out.println(topology.describe());
如果我們現在編譯並運行程序,它會輸出以下信息:
> mvn clean package > mvn exec:java -Dexec.mainClass=myapps.Pipe Sub-topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001 Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000 Global Stores: none
如上所示,它說明構建的拓撲有兩個處理器節點,源節點KSTREAM-SOURCE-0000000000
和sink節點KSTREAM-SINK-0000000001
。KSTREAM-SOURCE-0000000000
連續讀取Kafka主題streams-plaintext-input
的消息,並將它們傳送到其下游節點KSTREAM-SINK-0000000001
; KSTREAM-SINK-0000000001
會將其接收到的每條消息寫入另一個Kafka主題streams-pipe-output中( -->
和<--
箭頭指示該節點的下游和上游處理器節點,即在拓撲圖中的“children”和“parents“)。 它還說明,這種簡單的拓撲沒有與之相關聯的全局狀態存儲(我們將在后面的章節中更多地討論狀態存儲)。
請注意,我們總是可以像在上面那樣在任何給定點上描述拓撲,而我們正在代碼中構建它,因此作為用戶,您可以交互式地“嘗試並品嘗”拓撲中定義的計算邏輯,直到你滿意為止。假設我們已經完成了這個簡單的拓撲結構,它只是以一種無盡的流式方式將數據從一個Kafka主題管道傳輸到另一個主題,我們現在可以使用我們剛剛構建的兩個組件構建Streams客戶端:配置map和拓撲對象(也可以從props map構造一個StreamsConfig對象,然后將該對象傳遞給構造函數,可以重載KafkaStreams構造函數來實現任一類型)。
final KafkaStreams streams = new KafkaStreams(topology, props);
通過調用它的start()
函數,我們可以觸發這個客戶端的執行。在此客戶端上調用close()
之前,執行不會停止。 例如,我們可以添加一個帶有倒計時的shutdown hook來捕獲用戶中斷,並在終止該程序時關閉客戶端:
final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0);
到目前為止,完整的代碼如下所示:
package myapps; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class Pipe { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); builder.stream("streams-plaintext-input").to("streams-pipe-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
如果您已經在localhost:9092
上運行了Kafka,並且創建了主題streams-plaintext-input
和streams-pipe-output
,則可以在IDE或命令行上使用Maven運行此代碼:
> mvn clean package > mvn exec:java -Dexec.mainClass=myapps.Pipe
有關如何運行Streams應用程序並觀察計算結果的詳細說明,請閱讀Play with a Streams部分。本節的其余部分我們不會談論這一點。
三. 編寫第二個Streams應用程序:Line Split
我們已經學會了如何構建Streams客戶端及其兩個關鍵組件:StreamsConfig和Topology。 現在讓我們繼續通過增加當前拓撲來添加一些實際的處理邏輯。我們可以首先復制現有的Pipe.java
類來創建另一個程序:
> cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java
並更改其類名以及應用程序ID配置以,與之前的程序區分開來:
public class LineSplit { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit"); // ... } }
由於每個源流的消息都是一個字符串類型的鍵值對,因此讓我們將值字符串視為文本行,並使用FlatMapValues運算符將其分成單詞:
KStream<String, String> source = builder.stream("streams-plaintext-input"); KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.split("\\W+")); } });
操作員將把源流作為輸入,並通過按順序處理源流中的每條消息並將其值字符串分解為一個單詞列表,並生成每個單詞作為輸出的新消息,從而生成一個名為單詞的新流。這是一個無狀態的操作,無需跟蹤以前收到的消息或處理結果。請注意,如果您使用的是JDK 8,則可以使用lambda表達式並簡化上面的代碼:
KStream<String, String> source = builder.stream("streams-plaintext-input"); KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
最后,我們可以將單詞流寫回另一個Kafka主題,比如說stream-linesplit-output。 再次,這兩個步驟可以如下所示連接(假設使用lambda表達式):
KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))) .to("streams-linesplit-output");
如果我們現在將此擴展拓撲描述打印出來System.out.println(topology.describe())
,我們將得到以下結果:
> mvn clean package > mvn exec:java -Dexec.mainClass=myapps.LineSplit Sub-topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000 Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- KSTREAM-FLATMAPVALUES-0000000001 Global Stores: none
正如我們上面看到的,一個新的處理器節點KSTREAM-FLATMAPVALUES-0000000001被注入到原始源節點和sink節點之間的拓撲中。 它將源節點作為其父節點,將sink節點作為其子節點。換句話說,源節點獲取的每個消息,將首先遍歷新加入的KSTREAM-FLATMAPVALUES-0000000001
節點進行處理,並且結果將生成一個或多個新消息。它們將繼續往下走到sink節點回寫給kafka。注意這個處理器節點是“無狀態的”,因為它不與任何倉庫相關聯(即(stores:[]))。
完整的代碼如下所示(假設使用lambda表達式):
package myapps; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class LineSplit { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))) .to("streams-linesplit-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // ... same as Pipe.java above } }
四. 編寫第三個Streams應用程序:Wordcount
現在讓我們進一步通過計算源文本流中單詞的出現,來向拓撲中添加一些“有狀態”計算。按照類似的步驟,我們創建另一個基於LineSplit.java類的程序:
public class WordCount { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); // ... } }
為了計算單詞,我們可以首先修改flatMapValues,將它們全部作為小寫字母(假設使用lambda表達式):
source.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } });
我們必須首先指定我們要關鍵流的字符串value,即小寫單詞,用groupBy操作。該運算符生成一個新的分組流,然后可以由一個計數操作員匯總,該操作員可以在每個分組鍵上生成一個運行計數:
KTable<String, Long> counts = source.flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } }) .groupBy(new KeyValueMapper<String, String, String>() { @Override public String apply(String key, String value) { return value; } }) // Materialize the result into a KeyValueStore named "counts-store". // The Materialized store is always of type <Bytes, byte[]> as this is the format of the inner most store. .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));
請注意,count運算符有Materialized參數,該參數指定運行計數應存儲在名為counts-store的狀態存儲中。 此Counts倉庫可以實時查詢,詳情請參閱開發者手冊。
請注意,為了從主題streams-wordcount-output
讀取changelog流,需要將值反序列化設置為org.apache.kafka.common.serialization.LongDeserializer。假設可以使用JDK 8的lambda表達式,上面的代碼可以簡化為:
KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
如果我們再次將這種擴展拓撲描述為System.out.println(topology.describe()),我們將得到以下結果:
> mvn clean package
> mvn exec:java -Dexec.mainClass=myapps.WordCount
Sub-topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002 Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003 Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006 Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003 Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007 Global Stores: none
如上所述,拓撲現在包含兩個斷開的子拓撲。第一個子拓撲的接收節點KSTREAM-SINK-0000000004將寫入一個重新分區主題Counts-repartition
,它將由第二個子拓撲的源節點KSTREAM-SOURCE-0000000006
讀取。重分區topic通過使用聚合鍵“shuffle”的源流,在這種情況下,聚合鍵為值字符串。此外,在第一個子拓撲結構內部,在分組KSTREAM-KEY-SELECT-0000000002
節點和sink節點之間注入無狀態的KSTREAM-FILTER-0000000005
節點,以過濾出聚合key為空的任何中間記錄。
在第二個子拓撲中,聚合節點KSTREAM-AGGREGATE-0000000003
與名為Counts
的狀態存儲相關聯(名稱由用戶在count運算符中指定)。在即將到來的流源節點接收到每個消息時,聚合處理器將首先查詢其關聯的Counts存儲以獲得該密鑰的當前計數,並將其增加1,然后將新計數寫回倉庫。將每個更新的key計數傳送到KTABLE-TOSTREAM-0000000007節點,KTABLE-TOSTREAM-0000000007節點將該更新流解釋為消息流,然后再傳輸到匯聚節點KSTREAM-SINK-0000000008以寫回Kafka。
完整的代碼如下所示(假設使用lambda表達式):
package myapps;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class WordCount { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // ... same as Pipe.java above } }
本文轉發自 http://orchome.com/957
在本指南中,我們將從頭開始幫助你搭建自己的Kafka Streams流處理程序。 強烈建議您首先閱讀快速入門,了解如何運行使用Kafka Streams編寫的Streams應用程序(如果尚未這樣做)。
關於Kafka深入學習視頻, 如Kafka領導選舉, offset管理, Streams接口, 高性能之道, 監控運維, 性能測試等,
請關注個人微信公眾號: 求學之旅, 發送Kafka, 即可收獲Kafka學習視頻大禮包一枚。