隨着數據時代的到來,數據的實時計算也越來越被大家重視。實時計算的一個重要方向就是實時流計算,目前關於流計算的有很多成熟的技術實現方案,比如Storm、Spark Streaming、flink等。
我今天要講的kafka streams體量上來說沒有那么大,都算不上一個框架,只是kafka的一個類庫。麻雀雖小,五臟俱全。kafka streams能提供強大的流處理的功能,並且具備一些大框架不具備的靈活特點。
這篇文章的目標是把流計算這個事講清楚,並介紹kafka streams是如何來做流計算的如有欠妥之處,歡迎指出。
大綱
- 什么是流計算
- 什么是kafka streams
- kafka streams的特點、架構、關鍵問題處理
- word count示例
一、什么是流計算
在介紹流計算之前,我們先把在它之前的批計算講一下。
批計算是在計算之前將這次計算的源數據一次性到位,按數據塊來處理數據,每一個task接收一定大小的數據塊,然后經過批計算在這次計算的結果一次性返還給調用者。
批計算的處理的對象是有限數據(bound data),得到的結果也是一個有限結果集,因此批量計算中的每個任務都是短任務,任務在處理完其負責的數據后關閉。
流計算與之相反,流計算處理的對象是無限數據(unbound data),流式計算的上游算子處理完一條數據后,會立馬發送給下游算子,所以一條數據從進入流式系統到輸出結果的時間間隔較短,經過流計算得到的結果也是無限的結果集。
流式計算往往是長任務,每個work一直運行,持續接受數據源傳過來的數據。
二、什么是kafka streams
說到流計算,很多人會想到Storm、Spark Streaming、Flink。確實這些框架目前都已經完美的支持流計算,並且很多大廠都有相應的使用案例,但是這些框架使用起來門檻很高,首先要學習框架的使用,各種規范,然后要講業務遷移到框架中,其次線上使用這些流計算框架,部署也是一個很頭疼的事。但是今天要講的主角Kafka Streams,是Kafka 在0.10版本加入的一個新的類庫,官方定位是輕量級的流計算類庫。簡單體現在以下幾個方面:
1)由於Kafka Streams是Kafka的一個lib,所以實現的程序不依賴單獨的環境
2)基於功能實現時比較簡潔,只需要基於規范實現業務邏輯即可,規模和Failover等問題有Kafka本身的特性保證。
三、Kafka Streams的特點、組件及架構
1、Kafka Streams的特點
1) 輕量級java應用,除了kafka,無需依賴資源調度框架
2) 毫秒級延遲
3)支持stateful(有狀態的)處理,如join,aggregation等。
4)試錯成本很低,相比較其他框架,
5)支持exactly-once語義支持
2、組件
1)Stream Topology:Processor 處理后后結果輸出
2)Processor: Stream Topology中的節點,是一個基本的計算節點
3)State Store:本地信息存儲
類型:
1)Key-value based
2)Window based
容錯性
1)本地RocksDB備份
2)遠程由changelog topic備份在broker上
3、架構圖
(該圖摘自Confluent官網)
4、流計算的一些關鍵問題的處理
1)故障恢復(Fault Tolerance)
Kafka Streams的容錯構建在Kafka本之上,由於Kafka Consumer Group已經實現HA,因此當出現消費異常的導致流處理任務失敗的時候,會轉移到其他機器繼續消費處理,中間的過程數據不會丟失,但是要考慮重復消費的問題。
2)狀態處理(Stateful Compute)
Kafka Stream 提供了一個抽象概念KTable,KStream來解決狀態存儲和數據變化的問題。這里簡單介紹下Kafka Stream如何實現有狀態的處理,為了實現狀態的概念,Kafka Streams有兩個重要抽象:KStream 和 KTable。分別對應數據流和數據庫,區別在於key-value對值如何被解釋。Kafka Streams作為流處理技術,很好的將存儲狀態的表(table)和作為記錄的流(stream)無縫地結合在了一起。
3)亂序問題處理(Out-of-Order Handling)
無序數據對於無狀態處理其實沒什么影響,對於有狀態的處理,則直接導致處理邏輯是否正確,比如聚合操作。通常流處理中,數據有三個時間屬性:
i)事件時間(Event Time):數據產生時間
ii)處理時間(Processing Time):數據被處理時間
iii)攝取時間(Ingest Time): 數據存儲到kafka分區的時間。
在處理無序的數據通過架將中間聚合結果保存在KTable中,后來的數據計算會覆蓋之前的,這種處理方式類似Spark和Flink中的watermark機制,等待一個給定時間后,開始計算,后來的數據將會舍棄。
四、來一個wordcount例子
一般編程領域學習一個新技術都會以hello-world開始,但是在大數據計算,則是以word-count開始,顧名思義,統計單詞數量。
1、啟動zookeeper
zkServer.cmd
2、啟動kafka
kafka-server-start.bat d:\soft\tool\Kafka\kafka_2.12-2.1.0\config\server.properties
3、創建一個用於存儲輸入數據的topic
kafka-console-producer.bat --broker-list localhost:9092 --topic streams-file-input < file-input.txt
為了方便演示,其中file-input.txt我是直接放到kafka的bin目錄下
4、在idea中創建一個簡單的項目,書寫以下代碼:
/** * ymm56.com Inc. * Copyright (c) 2013-2019 All Rights Reserved. */ package wikiedits; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import java.util.Arrays; import java.util.Properties; /** * @author LvHuiKang * @version $Id: KafkaStreamTest.java, v 0.1 2019-03-26 19:45 LvHuiKang Exp $$ */ public class KafkaStreamTest { public static void main(String[] args) { Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); Serde<String> sdeStr = Serdes.String(); Serde<Long> sdeLong = Serdes.Long(); KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> inputLines = builder.stream(sdeStr, sdeStr, "streams-file-input"); KTable<String, Long> wordCounts = inputLines.flatMapValues(inputLine -> Arrays.asList(inputLine.toLowerCase().split("\\W+"))).groupBy((key, word) -> word).count("Counts"); wordCounts.to(sdeStr, sdeLong, "streams-wordcount-output"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); System.out.println(); } }
pom 依賴如下:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.11.0.0</version> </dependency>
然后啟動main方法,運行如下:
5、啟動consumer:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
展示如下:
五、總結
本文簡單介紹了kafka streams這個作為輕量級流計算引擎的架構、主要組件已經區別其他流計算引擎的特點,並通過word-count簡單演示了kafka streams的使用。本文也是在我研究流計算時無意發現的一個技術,仍有很多關鍵的技術點沒有吃透並給大家講解,后續研究后會追加。感謝你的閱讀,歡迎指正不足,並進行討論。