Kafka設計解析(七)Kafka Stream


轉載自 技術世界,原文鏈接 Kafka設計解析(七)- Kafka Stream

 

本文介紹了Kafka Stream的背景,如Kafka Stream是什么,什么是流式計算,以及為什么要有Kafka Stream。接着介紹了Kafka Stream的整體架構,並行模型,狀態存儲,以及主要的兩種數據集KStream和KTable。並且分析了Kafka Stream如何解決流式系統中的關鍵問題,如時間定義,窗口操作,Join操作,聚合操作,以及如何處理亂序和提供容錯能力。最后結合示例講解了如何使用Kafka Stream。

目錄

一、Kafka Stream背景

1. Kafka Stream是什么

2. 什么是流式計算

3. 為什么要有Kafka Stream

二、Kafka Stream架構

1. Kafka Stream整體架構

2. Processor Topology

3. Kafka Stream並行模型

4. KTable vs. KStream

5. State store

三、Kafka Stream如何解決流式系統中關鍵問題

1. 時間

2. 窗口

3. Join

4. 聚合與亂序處理

5. 容錯

四、Kafka Stream應用示例

五、總結

一、Kafka Stream背景

1. Kafka Stream是什么

Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature。它是提供了對存儲於Kafka內的數據進行流式處理和分析的功能。

Kafka Stream的特點如下:

  • Kafka Stream提供了一個非常簡單而輕量的Library,它可以非常方便地嵌入任意Java應用中,也可以任意方式打包和部署
  • 除了Kafka外,無任何外部依賴
  • 充分利用Kafka分區機制實現水平擴展和順序性保證
  • 通過可容錯的state store實現高效的狀態操作(如windowed join和aggregation)
  • 支持正好一次處理語義
  • 提供記錄級的處理能力,從而實現毫秒級的低延遲
  • 支持基於事件時間的窗口操作,並且可處理晚到的數據(late arrival of records)
  • 同時提供底層的處理原語Processor(類似於Storm的spout和bolt),以及高層抽象的DSL(類似於Spark的map/group/reduce)

2. 什么是流式計算

一般流式計算會與批量計算相比較。在流式計算模型中,輸入是持續的,可以認為在時間上是無界的,也就意味着,永遠拿不到全量數據去做計算。同時,計算結果是持續輸出的,也即計算結果在時間上也是無界的。流式計算一般對實時性要求較高,同時一般是先定義目標計算,然后數據到來之后將計算邏輯應用於數據。同時為了提高計算效率,往往盡可能采用增量計算代替全量計算。

批量處理模型中,一般先有全量數據集,然后定義計算邏輯,並將計算應用於全量數據。特點是全量計算,並且計算結果一次性全量輸出。

3. 為什么要有Kafka Stream

當前已經有非常多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark Streaming和Apache Storm。Apache Storm發展多年,應用廣泛,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基於Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對於熟悉其它Spark應用開發的用戶而言使用門檻低。另外,目前主流的Hadoop發行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。

既然Apache Spark與Apache Storm擁用如此多的優勢,那為何還需要Kafka Stream呢?筆者認為主要有如下原因。

第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基於Kafka的流式處理類庫。框架要求開發者按照特定的方式去開發邏輯部分,供框架調用。開發者很難了解框架的具體運行方式,從而使得調試成本高,並且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試。

第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復雜。而Kafka Stream作為類庫,可以非常方便的嵌入應用程序中,它對應用的打包和部署基本沒有任何要求。更為重要的是,Kafka Stream充分利用了Kafka的分區機制Consumer的Rebalance機制,使得Kafka Stream可以非常方便的水平擴展,並且各個實例可以使用不同的部署方式。具體來說,每個運行Kafka Stream的應用程序實例都包含了Kafka Consumer實例,多個同一應用的實例之間並行處理數據集。而不同實例之間的部署方式並不要求一致,比如部分實例可以運行在Web容器中,部分實例可運行在Docker或Kubernetes中。

第三,就流式處理系統而言,基本都支持Kafka作為數據源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統的標准數據源。換言之,大部分流式系統中都已部署了Kafka,此時使用Kafka Stream的成本非常低。

第四,使用Storm或Spark Streaming時,需要為框架本身的進程預留資源,如Storm的supervisor和Spark on YARN的node manager。即使對於應用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預留內存。

第五,由於Kafka本身提供數據持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。

第六,由於Kafka Consumer Rebalance機制,Kafka Stream可以在線動態調整並行度。

二、Kafka Stream架構

1. Kafka Stream整體架構

Kafka Stream的整體架構圖如下所示。

目前(Kafka 0.11.0.0)Kafka Stream的數據源只能如上圖所示是Kafka。但是處理結果並不一定要如上圖所示輸出到Kafka。實際上KStream和Ktable的實例化都需要指定Topic。

KStream<String, String> stream = builder.stream("words-stream");
KTable<String, String> table = builder.table("words-table", "words-store");

另外,上圖中的Consumer和Producer並不需要開發者在應用中顯示實例化,而是由Kafka Stream根據參數隱式實例化和管理,從而降低了使用門檻。開發者只需要專注於開發核心業務邏輯,也即上圖中Task內的部分。

2. Processor Topology

基於Kafka Stream的流式應用的業務邏輯全部通過一個被稱為Processor Topology的地方執行。它與Storm的Topology和Spark的DAG類似,都定義了數據在各個處理單元(在Kafka Stream中被稱作Processor)間的流動方式,或者說定義了數據的處理邏輯。

下面是一個Processor的示例,它實現了Word Count功能,並且每秒輸出一次結果。

public class WordCountProcessor implements Processor<String, String> {
  private ProcessorContext context;
  private KeyValueStore<String, Integer> kvStore;
  @SuppressWarnings("unchecked")
  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(1000);
    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
  }
  @Override
  public void process(String key, String value) {
    Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {
      Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));
      int count = counts.map(wordcount -> wordcount + 1).orElse(1);
      kvStore.put(word, count);
    });
  }
  @Override
  public void punctuate(long timestamp) {
    KeyValueIterator<String, Integer> iterator = this.kvStore.all();
    iterator.forEachRemaining(entry -> {
      context.forward(entry.key, entry.value);
      this.kvStore.delete(entry.key);
    });
    context.commit();
  }
  @Override
  public void close() {
    this.kvStore.close();
  }
}

從上述代碼中可見

  • process定義了對每條記錄的處理邏輯,也印證了Kafka可具有記錄級的數據處理能力。
  • context.scheduler定義了punctuate被執行的周期,從而提供了實現窗口操作的能力。
  • context.getStateStore提供的狀態存儲為有狀態計算(如窗口,聚合)提供了可能。

3. Kafka Stream並行模型

Kafka Stream的並行模型中,最小粒度為Task,而每個Task包含一個特定子Topology的所有Processor。因此每個Task所執行的代碼完全一樣,唯一的不同在於所處理的數據集互補。這一點跟Storm的Topology完全不一樣。Storm的Topology的每一個Task只包含一個Spout或Bolt的實例。因此Storm的一個Topology內的不同Task之間需要通過網絡通信傳遞數據,而Kafka Stream的Task包含了完整的子Topology,所以Task之間不需要傳遞數據,也就不需要網絡通信。這一點降低了系統復雜度,也提高了處理效率。

如果某個Stream的輸入Topic有多個(比如2個Topic,1個Partition數為4,另一個Partition數為3),則總的Task數等於Partition數最多的那個Topic的Partition數(max(4,3)=4)。這是因為Kafka Stream使用了Consumer的Rebalance機制,每個Partition對應一個Task。

下圖展示了在一個進程(Instance)中以2個Topic(Partition數均為4)為數據源的Kafka Stream應用的並行模型。從圖中可以看到,由於Kafka Stream應用的默認線程數為1,所以4個Task全部在一個線程中運行。

為了充分利用多線程的優勢,可以設置Kafka Stream的線程數。下圖展示了線程數為2時的並行模型。

前文有提到,Kafka Stream可被嵌入任意Java應用(理論上基於JVM的應用都可以)中,下圖展示了在同一台機器的不同進程中同時啟動同一Kafka Stream應用時的並行模型。注意,這里要保證兩個進程的StreamsConfig.APPLICATION_ID_CONFIG完全一樣。因為Kafka Stream將APPLICATION_ID_CONFIG作為隱式啟動的Consumer的Group ID。只有保證APPLICATION_ID_CONFIG相同,才能保證這兩個進程的Consumer屬於同一個Group,從而可以通過Consumer Rebalance機制拿到互補的數據集。

既然實現了多進程部署,可以以同樣的方式實現多機器部署。該部署方式也要求所有進程的APPLICATION_ID_CONFIG完全一樣。從圖上也可以看到,每個實例中的線程數並不要求一樣。但是無論如何部署,Task總數總會保證一致。

注意:Kafka Stream的並行模型,非常依賴於《Kafka設計解析(一)- Kafka背景及架構介紹》一文中介紹的Kafka分區機制和《Kafka設計解析(四)- Kafka Consumer設計解析》中介紹的Consumer的Rebalance機制。強烈建議不太熟悉這兩種機制的朋友,先行閱讀這兩篇文章。

這里對比一下Kafka Stream的Processor Topology與Storm的Topology。

  • Storm的Topology由Spout和Bolt組成,Spout提供數據源,而Bolt提供計算和數據導出。Kafka Stream的Processor Topology完全由Processor組成,因為它的數據固定由Kafka的Topic提供。
  • Storm的不同Bolt運行在不同的Executor中,很可能位於不同的機器,需要通過網絡通信傳輸數據。而Kafka Stream的Processor Topology的不同Processor完全運行於同一個Task中,也就完全處於同一個線程,無需網絡通信。
  • Storm的Topology可以同時包含Shuffle部分和非Shuffle部分,並且往往一個Topology就是一個完整的應用。而Kafka Stream的一個物理Topology只包含非Shuffle部分,而Shuffle部分需要通過through操作顯示完成,該操作將一個大的Topology分成了2個子Topology。
  • Storm的Topology內,不同Bolt/Spout的並行度可以不一樣,而Kafka Stream的子Topology內,所有Processor的並行度完全一樣。
  • Storm的一個Task只包含一個Spout或者Bolt的實例,而Kafka Stream的一個Task包含了一個子Topology的所有Processor。

4. KTable vs. KStream

KTable和KStream是Kafka Stream中非常重要的兩個概念,它們是Kafka實現各種語義的基礎。因此這里有必要分析下二者的區別。

KStream是一個數據流,可以認為所有記錄都通過Insert only的方式插入進這個數據流里。而KTable代表一個完整的數據集,可以理解為數據庫中的表。由於每條記錄都是Key-Value對,這里可以將Key理解為數據庫中的Primary Key,而Value可以理解為一行記錄。可以認為KTable中的數據都是通過Update only的方式進入的。也就意味着,如果KTable對應的Topic中新進入的數據的Key已經存在,那么從KTable只會取出同一Key對應的最后一條數據,相當於新的數據更新了舊的數據。

以下圖為例,假設有一個KStream和KTable,基於同一個Topic創建,並且該Topic中包含如下圖所示5條數據。此時遍歷KStream將得到與Topic內數據完全一樣的所有5條數據,且順序不變。而此時遍歷KTable時,因為這5條記錄中有3個不同的Key,所以將得到3條記錄,每個Key對應最新的值,並且這三條數據之間的順序與原來在Topic中的順序保持一致。這一點與Kafka的日志compact相同。

此時如果對該KStream和KTable分別基於key做Group,對Value進行Sum,得到的結果將會不同。對KStream的計算結果是<Jack,4>,<Lily,7>,<Mike,4>。而對Ktable的計算結果是<Mike,4>,<Jack,3>,<Lily,5>。

5. State store

流式處理中,部分操作是無狀態的,例如過濾操作(Kafka Stream DSL中用filer方法實現)。而部分操作是有狀態的,需要記錄中間狀態,如Window操作和聚合計算。State store被用來存儲中間狀態。它可以是一個持久化的Key-Value存儲,也可以是內存中的HashMap,或者是數據庫。Kafka提供了基於Topic的狀態存儲。

Topic中存儲的數據記錄本身是Key-Value形式的,同時Kafka的log compaction機制可對歷史數據做compact操作,保留每個Key對應的最后一個Value,從而在保證Key不丟失的前提下,減少總數據量,從而提高查詢效率。

構造KTable時,需要指定其state store name。默認情況下,該名字也即用於存儲該KTable的狀態的Topic的名字,遍歷KTable的過程,實際就是遍歷它對應的state store,或者說遍歷Topic的所有key,並取每個Key最新值的過程。為了使得該過程更加高效,默認情況下會對該Topic進行compact操作。

另外,除了KTable,所有狀態計算,都需要指定state store name,從而記錄中間狀態。

三、Kafka Stream如何解決流式系統中關鍵問題

1. 時間

在流式數據處理中,時間是數據的一個非常重要的屬性。從Kafka 0.10開始,每條記錄除了Key和Value外,還增加了timestamp屬性。目前Kafka Stream支持三種時間

  • 事件發生時間。事件發生的時間,包含在數據記錄中。發生時間由Producer在構造ProducerRecord時指定。並且需要Broker或者Topic將message.timestamp.type設置為CreateTime(默認值)才能生效。
  • 消息接收時間。也即消息存入Broker的時間。當Broker或Topic將message.timestamp.type設置為LogAppendTime時生效。此時Broker會在接收到消息后,存入磁盤前,將其timestamp屬性值設置為當前機器時間。一般消息接收時間比較接近於事件發生時間,部分場景下可代替事件發生時間。
  • 消息處理時間。也即Kafka Stream處理消息時的時間。

注:Kafka Stream允許通過實現org.apache.kafka.streams.processor.TimestampExtractor接口自定義記錄時間。

2. 窗口

前文提到,流式數據是在時間上無界的數據。而聚合操作只能作用在特定的數據集,也即有界的數據集上。因此需要通過某種方式從無界的數據集上按特定的語義選取出有界的數據。窗口是一種非常常用的設定計算邊界的方式。不同的流式處理系統支持的窗口類似,但不盡相同。

Kafka Stream支持的窗口如下。

(1)Hopping Time Window 該窗口定義如下圖所示。它有兩個屬性,一個是Window size,一個是Advance interval。Window size指定了窗口的大小,也即每次計算的數據集的大小。而Advance interval定義輸出的時間間隔。一個典型的應用場景是,每隔5秒鍾輸出一次過去1個小時內網站的PV或者UV。

 

(2)Tumbling Time Window該窗口定義如下圖所示。可以認為它是Hopping Time Window的一種特例,也即Window size和Advance interval相等。它的特點是各個Window之間完全不相交。

(3)Sliding Window該窗口只用於2個KStream進行Join計算時。該窗口的大小定義了Join兩側KStream的數據記錄被認為在同一個窗口的最大時間差。假設該窗口的大小為5秒,則參與Join的2個KStream中,記錄時間差小於5的記錄被認為在同一個窗口中,可以進行Join計算。

(4)Session Window該窗口用於對Key做Group后的聚合操作中。它需要對Key做分組,然后對組內的數據根據業務需求定義一個窗口的起始點和結束點。一個典型的案例是,希望通過Session Window計算某個用戶訪問網站的時間。對於一個特定的用戶(用Key表示)而言,當發生登錄操作時,該用戶(Key)的窗口即開始,當發生退出操作或者超時時,該用戶(Key)的窗口即結束。窗口結束時,可計算該用戶的訪問時間或者點擊次數等。

3. Join

Kafka Stream由於包含KStream和Ktable兩種數據集,因此提供如下Join計算

  • KTable Join KTable 結果仍為KTable。任意一邊有更新,結果KTable都會更新。
  • KStream Join KStream 結果為KStream。必須帶窗口操作,否則會造成Join操作一直不結束。
  • KStream Join KTable / GlobalKTable 結果為KStream。只有當KStream中有新數據時,才會觸發Join計算並輸出結果。KStream無新數據時,KTable的更新並不會觸發Join計算,也不會輸出數據。並且該更新只對下次Join生效。一個典型的使用場景是,KStream中的訂單信息與KTable中的用戶信息做關聯計算。

對於Join操作,如果要得到正確的計算結果,需要保證參與Join的KTable或KStream中Key相同的數據被分配到同一個Task。具體方法是

  • 參與Join的KTable或KStream的Key類型相同(實際上,業務含意也應該相同)
  • 參與Join的KTable或KStream對應的Topic的Partition數相同
  • Partitioner策略的最終結果等效(實現不需要完全一樣,只要效果一樣即可),也即Key相同的情況下,被分配到ID相同的Partition內

如果上述條件不滿足,可通過調用如下方法使得它滿足上述條件。

KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)

4. 聚合與亂序處理

聚合操作可應用於KStream和KTable。當聚合發生在KStream上時必須指定窗口,從而限定計算的目標數據集。

需要說明的是,聚合操作的結果肯定是KTable。因為KTable是可更新的,可以在晚到的數據到來時(也即發生數據亂序時)更新結果KTable。

這里舉例說明。假設對KStream以5秒為窗口大小,進行Tumbling Time Window上的Count操作。並且KStream先后出現時間為1秒, 3秒, 5秒的數據,此時5秒的窗口已達上限,Kafka Stream關閉該窗口,觸發Count操作並將結果3輸出到KTable中(假設該結果表示為<1-5,3>)。若1秒后,又收到了時間為2秒的記錄,由於1-5秒的窗口已關閉,若直接拋棄該數據,則可認為之前的結果<1-5,3>不准確。而如果直接將完整的結果<1-5,4>輸出到KStream中,則KStream中將會包含該窗口的2條記錄,<1-5,3>, <1-5,4>,也會存在骯數據。因此Kafka Stream選擇將聚合結果存於KTable中,此時新的結果<1-5,4>會替代舊的結果<1-5,3>。用戶可得到完整的正確的結果。

這種方式保證了數據准確性,同時也提高了容錯性。

但需要說明的是,Kafka Stream並不會對所有晚到的數據都重新計算並更新結果集,而是讓用戶設置一個retention period,將每個窗口的結果集在內存中保留一定時間,該窗口內的數據晚到時,直接合並計算,並更新結果KTable。超過retention period后,該窗口結果將從內存中刪除,並且晚到的數據即使落入窗口,也會被直接丟棄。

5. 容錯

Kafka Stream從如下幾個方面進行容錯

  • 高可用的Partition保證無數據丟失。每個Task計算一個Partition,而Kafka數據復制機制保證了Partition內數據的高可用性,故無數據丟失風險。同時由於數據是持久化的,即使任務失敗,依然可以重新計算。
  • 狀態存儲實現快速故障恢復和從故障點繼續處理。對於Join和聚合及窗口等有狀態計算,狀態存儲可保存中間狀態。即使發生Failover或Consumer Rebalance,仍然可以通過狀態存儲恢復中間狀態,從而可以繼續從Failover或Consumer Rebalance前的點繼續計算。
  • KTable與retention period提供了對亂序數據的處理能力。

四、Kafka Stream應用示例

下面結合一個案例來講解如何開發Kafka Stream應用。本例完整代碼可從作者Github獲取。

訂單KStream(名為orderStream),底層Topic的Partition數為3,Key為用戶名,Value包含用戶名,商品名,訂單時間,數量。用戶KTable(名為userTable),底層Topic的Partition數為3,Key為用戶名,Value包含性別,地址和年齡。商品KTable(名為itemTable),底層Topic的Partition數為6,Key為商品名,價格,種類和產地。現在希望計算每小時購買產地與自己所在地相同的用戶總數。

首先由於希望使用訂單時間,而它包含在orderStream的Value中,需要通過提供一個實現TimestampExtractor接口的類從orderStream對應的Topic中抽取出訂單時間。

public class OrderTimestampExtractor implements TimestampExtractor {
  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    if(record instanceof Order) {
      return ((Order)record).getTS();
    } else {
      return 0;
    }
  }
}

接着通過將orderStream與userTable進行Join,來獲取訂單用戶所在地。由於二者對應的Topic的Partition數相同,且Key都為用戶名,再假設Producer往這兩個Topic寫數據時所用的Partitioner實現相同,則此時上文所述Join條件滿足,可直接進行Join。

orderUserStream = orderStream
    .leftJoin(userTable, 
         // 該lamda表達式定義了如何從orderStream與userTable生成結果集的Value
        (Order order, User user) -> OrderUser.fromOrderUser(order, user), 
         // 結果集Key序列化方式
        Serdes.String(),
         // 結果集Value序列化方式
         SerdesFactory.serdFrom(Order.class))
    .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)

從上述代碼中,可以看到,Join時需要指定如何從參與Join雙方的記錄生成結果記錄的Value。Key不需要指定,因為結果記錄的Key與Join Key相同,故無須指定。Join結果存於名為orderUserStream的KStream中。

接下來需要將orderUserStream與itemTable進行Join,從而獲取商品產地。此時orderUserStream的Key仍為用戶名,而itemTable對應的Topic的Key為產品名,並且二者的Partition數不一樣,因此無法直接Join。此時需要通過through方法,對其中一方或雙方進行重新分區,使得二者滿足Join條件。這一過程相當於Spark的Shuffle過程和Storm的FieldGrouping。

orderUserStrea
    .through(
        // Key的序列化方式
        Serdes.String(),
        // Value的序列化方式 
        SerdesFactory.serdFrom(OrderUser.class), 
        // 重新按照商品名進行分區,具體取商品名的哈希值,然后對分區數取模
        (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, 
        "orderuser-repartition-by-item")
    .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))

從上述代碼可見,through時需要指定Key的序列化器,Value的序列化器,以及分區方式和結果集所在的Topic。這里要注意,該Topic(orderuser-repartition-by-item)的Partition數必須與itemTable對應Topic的Partition數相同,並且through使用的分區方法必須與iteamTable對應Topic的分區方式一樣。經過這種through操作,orderUserStream與itemTable滿足了Join條件,可直接進行Join。

五、總結

  • Kafka Stream的並行模型完全基於Kafka的分區機制和Rebalance機制,實現了在線動態調整並行度
  • 同一Task包含了一個子Topology的所有Processor,使得所有處理邏輯都在同一線程內完成,避免了不必的網絡通信開銷,從而提高了效率。
  • through方法提供了類似Spark的Shuffle機制,為使用不同分區策略的數據提供了Join的可能
  • log compact提高了基於Kafka的state store的加載效率
  • state store為狀態計算提供了可能
  • 基於offset的計算進度管理以及基於state store的中間狀態管理為發生Consumer rebalance或Failover時從斷點處繼續處理提供了可能,並為系統容錯性提供了保障
  • KTable的引入,使得聚合計算擁用了處理亂序問題的能力


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM