Kafka Streams(實時流處理)簡介


 kafka Streams

1 概述

1.1 Kafka Streams

Kafka Streams。Apache Kafka開源項目的一個組成部分。是一個功能強大,易於使用的庫。用於在Kafka上構建高可分布式、拓展性,容錯的應用程序。

1.2 Kafka Streams特點

1.功能強大 

(1)高擴展性,彈性,容錯

2.輕量級 

(1)無需專門的集群 

(2)一個庫,而不是框架

3.完全集成 

(1)100%的Kafka 0.10.0版本兼容

(2)易於集成到現有的應用程序 

4.實時性

(1)毫秒級延遲 

(2)並非微批處理 

(3)窗口允許亂序數據 

(4)允許遲到數據

1.3 為什么要有Kafka Stream

當前已經有非常多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark Streaming和Apache Storm。Apache Storm發展多年,應用廣泛,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基於Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對於熟悉其它Spark應用開發的用戶而言使用門檻低。另外,目前主流的Hadoop發行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。

既然Apache Spark與Apache Storm擁用如此多的優勢,那為何還需要Kafka Stream呢?主要有如下原因。

第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基於Kafka的流式處理類庫。框架要求開發者按照特定的方式去開發邏輯部分,供框架調用。開發者很難了解框架的具體運行方式,從而使得調試成本高,並且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試,如圖8-11所示。

 

 

 

圖8-11 Kafka Stream

第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復雜。而Kafka Stream作為類庫,可以非常方便的嵌入應用程序中,它對應用的打包和部署基本沒有任何要求。

第三,就流式處理系統而言,基本都支持Kafka作為數據源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統的標准數據源。換言之,大部分流式系統中都已部署了Kafka,此時使用Kafka Stream的成本非常低。

第四,使用Storm或Spark Streaming時,需要為框架本身的進程預留資源,如Storm的supervisor和Spark on YARN的node manager。即使對於應用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預留內存。但是Kafka作為類庫不占用系統資源。

第五,由於Kafka本身提供數據持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。

第六,由於Kafka Consumer Rebalance機制,Kafka Stream可以在線動態調整並行度。

2 Kafka Stream數據清洗案例

1.需求

       實時處理單詞帶有”>>>”前綴的內容。例如輸入”hotdas>>>ximenqing”,最終處理成“ximenqing”

2.需求分析

如圖8-12所示

 

 

 

圖8-12 數據清洗案例

3.案例實操

(1)創建一個工程,並添加jar包

(2)創建主類

package com.hotdas.kafka.stream;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorSupplier;

import org.apache.kafka.streams.processor.TopologyBuilder;

 

public class Application {

 

   public static void main(String[] args) {

 

       // 定義輸入的topic

        String from = "first";

        // 定義輸出的topic

        String to = "second";

 

        // 設置參數

        Properties settings = new Properties();

        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");

        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop02:9092");

 

        StreamsConfig config = new StreamsConfig(settings);

 

        // 構建拓撲

        TopologyBuilder builder = new TopologyBuilder();

 

        builder.addSource("SOURCE", from)

               .addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {

 

                   @Override

                   public Processor<byte[], byte[]> get() {

                       // 具體分析處理

                       return new LogProcessor();

                   }

               }, "SOURCE")

                .addSink("SINK", to, "PROCESS");

 

        // 創建kafka stream

        KafkaStreams streams = new KafkaStreams(builder, config);

        streams.start();

   }

}

(3)具體業務處理

package com.hotdas.kafka.stream;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorContext;

 

public class LogProcessor implements Processor<byte[], byte[]> {

  

   private ProcessorContext context;

  

   @Override

   public void init(ProcessorContext context) {

       this.context = context;

   }

 

   @Override

   public void process(byte[] key, byte[] value) {

       String input = new String(value);

      

       // 如果包含“>>>”則只保留該標記后面的內容

       if (input.contains(">>>")) {

           input = input.split(">>>")[1].trim();

           // 輸出到下一個topic

           context.forward("logProcessor".getBytes(), input.getBytes());

       }else{

           context.forward("logProcessor".getBytes(), input.getBytes());

       }

   }

 

   @Override

   public void punctuate(long timestamp) {

      

   }

 

   @Override

   public void close() {

      

   }

}

(4)運行程序

(5)在hadoop02上啟動生產者

 

 

(6)在hadoop03上啟動消費者

 

 

 

僅供參考,有錯誤還請指出!

有什么想法,評論區留言,互相指教指教。

覺得不錯的可以點一下右邊的推薦喲

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM