淺談kafka streams


  隨着數據時代的到來,數據的實時計算也越來越被大家重視。實時計算的一個重要方向就是實時流計算,目前關於流計算的有很多成熟的技術實現方案,比如Storm、Spark Streaming、flink等。
我今天要講的kafka streams體量上來說沒有那么大,都算不上一個框架,只是kafka的一個類庫。麻雀雖小,五臟俱全。kafka streams能提供強大的流處理的功能,並且具備一些大框架不具備的靈活特點。
這篇文章的目標是把流計算這個事講清楚,並介紹kafka streams是如何來做流計算的如有欠妥之處,歡迎指出。

大綱

  • 什么是流計算
  • 什么是kafka streams
  • kafka streams的特點、架構、關鍵問題處理
  • word count示例

一、什么是流計算

在介紹流計算之前,我們先把在它之前的批計算講一下。

批計算是在計算之前將這次計算的源數據一次性到位,按數據塊來處理數據,每一個task接收一定大小的數據塊,然后經過批計算在這次計算的結果一次性返還給調用者。

批計算的處理的對象是有限數據(bound data),得到的結果也是一個有限結果集,因此批量計算中的每個任務都是短任務,任務在處理完其負責的數據后關閉。
流計算與之相反,流計算處理的對象是無限數據(unbound data),流式計算的上游算子處理完一條數據后,會立馬發送給下游算子,所以一條數據從進入流式系統到輸出結果的時間間隔較短,經過流計算得到的結果也是無限的結果集。

流式計算往往是長任務,每個work一直運行,持續接受數據源傳過來的數據。

                           

 

二、什么是kafka streams

  說到流計算,很多人會想到Storm、Spark Streaming、Flink。確實這些框架目前都已經完美的支持流計算,並且很多大廠都有相應的使用案例,但是這些框架使用起來門檻很高,首先要學習框架的使用,各種規范,然后要講業務遷移到框架中,其次線上使用這些流計算框架,部署也是一個很頭疼的事。但是今天要講的主角Kafka Streams,是Kafka 在0.10版本加入的一個新的類庫,官方定位是輕量級的流計算類庫。簡單體現在以下幾個方面:

  1)由於Kafka Streams是Kafka的一個lib,所以實現的程序不依賴單獨的環境

  2)基於功能實現時比較簡潔,只需要基於規范實現業務邏輯即可,規模和Failover等問題有Kafka本身的特性保證。

                   

三、Kafka Streams的特點、組件架構

  1、Kafka Streams的特點   

    1) 輕量級java應用,除了kafka,無需依賴資源調度框架

         2) 毫秒級延遲

    3)支持stateful(有狀態的)處理,如join,aggregation等。

    4)試錯成本很低,相比較其他框架,

    5)支持exactly-once語義支持

  2、組件  

    1)Stream Topology:Processor 處理后后結果輸出

    2)Processor: Stream Topology中的節點,是一個基本的計算節點

    3)State Store:本地信息存儲

      類型:
        1)Key-value based
        2)Window based
      容錯性
        1)本地RocksDB備份
        2)遠程由changelog topic備份在broker上

        

    3、架構圖

           

                          (該圖摘自Confluent官網)

  4、流計算的一些關鍵問題的處理

    1)故障恢復(Fault Tolerance)

    Kafka Streams的容錯構建在Kafka本之上,由於Kafka Consumer Group已經實現HA,因此當出現消費異常的導致流處理任務失敗的時候,會轉移到其他機器繼續消費處理,中間的過程數據不會丟失,但是要考慮重復消費的問題。

    2)狀態處理(Stateful Compute)

    Kafka Stream 提供了一個抽象概念KTable,KStream來解決狀態存儲和數據變化的問題。這里簡單介紹下Kafka Stream如何實現有狀態的處理,為了實現狀態的概念,Kafka Streams有兩個重要抽象:KStream 和 KTable。分別對應數據流和數據庫,區別在於key-value對值如何被解釋。Kafka Streams作為流處理技術,很好的將存儲狀態的表(table)和作為記錄的流(stream)無縫地結合在了一起。

    3)亂序問題處理(Out-of-Order Handling)

    

 

    無序數據對於無狀態處理其實沒什么影響,對於有狀態的處理,則直接導致處理邏輯是否正確,比如聚合操作。通常流處理中,數據有三個時間屬性:

     i)事件時間(Event Time):數據產生時間

     ii)處理時間(Processing Time):數據被處理時間

     iii)攝取時間(Ingest Time): 數據存儲到kafka分區的時間。

    在處理無序的數據通過架將中間聚合結果保存在KTable中,后來的數據計算會覆蓋之前的,這種處理方式類似Spark和Flink中的watermark機制,等待一個給定時間后,開始計算,后來的數據將會舍棄。

 

 

四、來一個wordcount例子

  一般編程領域學習一個新技術都會以hello-world開始,但是在大數據計算,則是以word-count開始,顧名思義,統計單詞數量。

  1、啟動zookeeper

    zkServer.cmd

  2、啟動kafka

    kafka-server-start.bat d:\soft\tool\Kafka\kafka_2.12-2.1.0\config\server.properties

  3、創建一個用於存儲輸入數據的topic

    kafka-console-producer.bat --broker-list localhost:9092 --topic streams-file-input < file-input.txt

    為了方便演示,其中file-input.txt我是直接放到kafka的bin目錄下

  4、在idea中創建一個簡單的項目,書寫以下代碼:

    

/**
 * ymm56.com Inc.
 * Copyright (c) 2013-2019 All Rights Reserved.
 */
package wikiedits;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;
import java.util.Properties;

/**
 * @author LvHuiKang
 * @version $Id: KafkaStreamTest.java, v 0.1 2019-03-26 19:45 LvHuiKang Exp $$
 */
public class KafkaStreamTest {
    public static void main(String[] args) {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        Serde<String> sdeStr = Serdes.String();
        Serde<Long> sdeLong = Serdes.Long();
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> inputLines = builder.stream(sdeStr, sdeStr, "streams-file-input");
        KTable<String, Long> wordCounts = inputLines.flatMapValues(inputLine -> Arrays.asList(inputLine.toLowerCase().split("\\W+"))).groupBy((key, word) -> word).count("Counts");
        wordCounts.to(sdeStr, sdeLong, "streams-wordcount-output");
        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
        System.out.println();

    }
}

pom 依賴如下:

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>0.11.0.0</version>
</dependency>
<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.11.0.0</version>
</dependency>

然后啟動main方法,運行如下:

 

  5、啟動consumer:

    kafka-console-consumer.bat  --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

  展示如下:

  

五、總結

  本文簡單介紹了kafka streams這個作為輕量級流計算引擎的架構、主要組件已經區別其他流計算引擎的特點,並通過word-count簡單演示了kafka streams的使用。本文也是在我研究流計算時無意發現的一個技術,仍有很多關鍵的技術點沒有吃透並給大家講解,后續研究后會追加。感謝你的閱讀,歡迎指正不足,並進行討論。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM