實時流式計算 - Kafka Stream


實時流式計算 - Kafka Stream

2.1 概述

Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature。它是提供了對存儲於Kafka內的數據進行流式處理和分析的功能。

Kafka Stream的特點如下:

  • Kafka Stream提供了一個非常簡單而輕量的Library,它可以非常方便地嵌入任意Java應用中,也可以任意方式打包和部署
  • 除了Kafka外,無任何外部依賴
  • 充分利用Kafka分區機制實現水平擴展和順序性保證
  • 通過可容錯的state store實現高效的狀態操作(如windowed join和aggregation)
  • 支持正好一次處理語義
  • 提供記錄級的處理能力,從而實現毫秒級的低延遲
  • 支持基於事件時間的窗口操作,並且可處理晚到的數據(late arrival of records)
  • 同時提供底層的處理原語Processor(類似於Storm的spout和bolt),以及高層抽象的DSL(類似於Spark的map/group/reduce)

2.2 Kafka Streams的關鍵概念

(1)Stream處理拓撲

  • 是Kafka Stream提出的最重要的抽象概念:它表示一個無限的,不斷更新的數據集。流是一個有序的,可重放(反復的使用),不可變的容錯序列,數據記錄的格式是鍵值對(key-value)。
  • 通過Kafka Streams編寫一個或多個的計算邏輯的處理器拓撲。其中處理器拓撲是一個由流(邊緣)連接的流處理(節點)的圖。
  • 流處理器處理器拓撲中的一個節點;它表示一個處理的步驟,用來轉換流中的數據(從拓撲中的上游處理器一次接受一個輸入消息,並且隨后產生一個或多個輸出消息到其下游處理器中)。

(2)在拓撲中有兩個特別的處理器:

  • 源處理器(Source Processor):源處理器是一個沒有任何上游處理器的特殊類型的流處理器。它從一個或多個kafka主題生成輸入流。通過消費這些主題的消息並將它們轉發到下游處理器。
  • Sink處理器:sink處理器是一個沒有下游流處理器的特殊類型的流處理器。它接收上游流處理器的消息發送到一個指定的Kafka主題

1612186152537

2.3 KStream&KTable

(1)數據結構類似於map,如下圖,key-value鍵值對

1612186167796

(2)KStream

KStream數據流(data stream),即是一段順序的,可以無限長,不斷更新的數據集。
數據流中比較常記錄的是事件,這些事件可以是一次鼠標點擊(click),一次交易,或是傳感器記錄的位置數據。

KStream負責抽象的,就是數據流。與Kafka自身topic中的數據一樣,類似日志,每一次操作都是向其中插入(insert)新數據。

為了說明這一點,讓我們想象一下以下兩個數據記錄正在發送到流中:

(“ alice”,1)->(“” alice“,3)

如果您的流處理應用是要總結每個用戶的價值,它將返回4alice。為什么?因為第二條數據記錄將不被視為先前記錄的更新。(insert)新數據

(3)KTable

KTable傳統數據庫,包含了各種存儲了大量狀態(state)的表格。KTable負責抽象的,就是表狀數據。每一次操作,都是更新插入(update)

為了說明這一點,讓我們想象一下以下兩個數據記錄正在發送到流中:

(“ alice”,1)->(“” alice“,3)

如果您的流處理應用是要總結每個用戶的價值,它將返回3alice。為什么?因為第二條數據記錄將被視為先前記錄的更新。

KStream - 每個新數據都包含了部分信息。

KTable - 每次更新都合並到原記錄上。

2.4 Kafka Stream入門案例編寫

(1)引入依賴

在之前的kafka-demo工程的pom文件中引入

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>${kafka.client.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

(2)創建類

package com.itheima.kafka.simple;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.springframework.util.StringUtils;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;

/**
 * 統計消息中hello出現的次數
 */
public class KafkaStreamSample {

    private static final String INPUT_TOPIC="article_behavior_input";
    private static final String OUT_TOPIC="article_behavior_out";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"article_behavior_count");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        // 構建過濾聚合條件
        group(builder);
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

    }



    // 如果定義流計算過程
    static void group(final StreamsBuilder builder){
        /**
         * 第一個參數:消費的消息名稱
         * 第二個參數:LATEST  最近的   EARLIEST 更早的
         */
        KStream<String,String> stream = builder.stream(INPUT_TOPIC, Consumed.with(Topology.AutoOffsetReset.LATEST));
        //聚合的時間窗口,10秒中聚合一次
        KStream<String, String> map = stream.groupByKey()
                .windowedBy(TimeWindows.of(10000))
                .aggregate(new Initializer<String>() {
            //初始聚合
            @Override
            public String apply() {
                return "啥都沒有";
            }
        }, new Aggregator<String, String, String>() {
            /**
             * 多次聚合
             * @param aggKey  消息的key值,發送消息的key
             * @param value   消息的value值 發送消息的value值
             * @param aggValue  上面apply的方法的返回值
             *              * @return
             */
            @Override
            public String apply(String aggKey, String value, String aggValue) {
                if(StringUtils.isEmpty(value)){
                    return aggValue;
                }
                int count = 0;
                List<String> strings = Arrays.asList(value.toString().toLowerCase()split(","));
                for (String string : strings) {
                    if(string.equals("hello")){
                        count++;
                    }
                }

                return String.format("hello:%d", count);
            }
        }, Materialized.as("count-article-num-miukoo-1"))
                /**
                 * key : 發消息的key值
                 * value : 上面聚合以后的返回值
                 */
                .toStream().map((key, value) -> {
            return new KeyValue<>(key.key().toString(), value);
        });
        /**
         * 把聚合之后的消息發送到另外一個topic中
         */
        map.to(OUT_TOPIC,Produced.with(Serdes.String(),Serdes.String()));
    }
}

(3)測試

准備

  • 使用生產者在topic為:input_topic中發送多條消息
  • 使用消費者接收topic為:out_topic

結果:

  • 通過流式計算,會把生產者的多條消息匯總成一條發送到消費者中輸出

2.5 SpringBoot集成Kafka Stream

從資料文件夾中把提供好的4個類拷貝到項目的config目錄下

當前kafka-demo項目需要添加lombok的依賴包

<properties>
    <lombok.version>1.18.8</lombok.version>
</properties>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>${lombok.version}</version>
    <scope>provided</scope>
</dependency>

(1)自定配置參數

/**
 * 通過重新注冊KafkaStreamsConfiguration對象,設置自定配置參數
 */
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;

    /**
     * 重新定義默認的KafkaStreams配置屬性,包括:
     * 1、服務器地址
     * 2、應用ID
     * 3、流消息的副本數等配置
     * @return
     */
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // 消息副本數量
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
        props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
        return new KafkaStreamsConfiguration(props);
    }
}

修改application.yml文件,在最下方添加自定義配置

kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

(2)定義監聽接口

/**
 * 流數據的監聽消費者實現的接口類,系統自動會通過
 * KafkaStreamListenerFactory類掃描項目中實現該接口的類,
 * 並注冊為流數據的消費端。
 *
 * 其中泛型可是KStream或KTable
 * @param <T>
 */
public interface KafkaStreamListener<T> {

    // 監聽的類型
    String listenerTopic();
    // 處理結果發送的類
    String sendTopic();
    // 對象處理邏輯
    T getService(T stream);

}

(3)KafkaStream自動處理包裝類

/**
 * KafkaStream自動處理包裝類
 */
public class KafkaStreamProcessor {

    // 流構建器
    StreamsBuilder streamsBuilder;
    private String type;
    KafkaStreamListener listener;

    public KafkaStreamProcessor(StreamsBuilder streamsBuilder,KafkaStreamListener kafkaStreamListener){
        this.streamsBuilder = streamsBuilder;
        this.listener = kafkaStreamListener;
        this.parseType();
        Assert.notNull(this.type,"Kafka Stream 監聽器只支持kstream、ktable,當前類型是"+this.type);
    }

    /**
     * 通過泛型類型自動注冊對應類型的流處理器對象
     * 支持KStream、KTable
     * @return
     */
    public Object doAction(){
        if("kstream".equals(this.type)) {
            KStream<?, ?> stream = streamsBuilder.stream(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
            stream=(KStream)listener.getService(stream);
            stream.to(listener.sendTopic());
            return stream;
        }else{
            KTable<?, ?> table = streamsBuilder.table(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
            table = (KTable)listener.getService(table);
            table.toStream().to(listener.sendTopic());
            return table;
        }
    }

    /**
     * 解析傳入listener類的泛型類
     */
    private void parseType(){
        Type[] types = listener.getClass().getGenericInterfaces();
        if(types!=null){
            for (int i = 0; i < types.length; i++) {
                if( types[i] instanceof ParameterizedType){
                    ParameterizedType t = (ParameterizedType)types[i];
                    String name = t.getActualTypeArguments()[0].getTypeName().toLowerCase();
                    if(name.contains("org.apache.kafka.streams.kstream.kstream")||name.contains("org.apache.kafka.streams.kstream.ktable")){
                        this.type = name.substring(0,name.indexOf('<')).replace("org.apache.kafka.streams.kstream.","").trim();
                        break;
                    }
                }
            }
        }
    }
}

(4)KafkaStreamListener掃描和實例化成KafkaStreamProcessor.doAction的返回類,完成監聽器實際注冊的過程

@Component
public class KafkaStreamListenerFactory implements InitializingBean {

    Logger logger = LoggerFactory.getLogger(KafkaStreamListenerFactory.class);

    @Autowired
    DefaultListableBeanFactory defaultListableBeanFactory;

    /**
     * 初始化完成后自動調用
     */
    @Override
    public void afterPropertiesSet() {
        Map<String, KafkaStreamListener> map = defaultListableBeanFactory.getBeansOfType(KafkaStreamListener.class);
        for (String key : map.keySet()) {
            KafkaStreamListener k = map.get(key);
            KafkaStreamProcessor processor = new KafkaStreamProcessor(defaultListableBeanFactory.getBean(StreamsBuilder.class),k);
            String beanName = k.getClass().getSimpleName()+"AutoProcessor" ;
            //注冊baen,並且執行doAction方法
            defaultListableBeanFactory.registerSingleton(beanName,processor.doAction());
            logger.info("add kafka stream auto listener [{}]",beanName);
        }
    }
}

(5)手動創建監聽器

1,該類需要實現KafkaStreamListener接口

2,listenerTopic方法返回需要監聽的topic

3,sendTopic方法返回需要處理完后發送的topic

4,getService方法,主要處理流數據

package com.itheima.kafka.stream;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Arrays;
import java.util.List;

@Component
public class StreamListener implements com.itheima.kafka.config.KafkaStreamListener<KStream<?, String>> {
    @Override
    public String listenerTopic() {
        return "input_topic";
    }

    @Override
    public String sendTopic() {
        return "out_topic";
    }

    @Override
    public KStream<?, String> getService(KStream<?, String> stream) {
        return stream.groupByKey().windowedBy(TimeWindows.of(10000)).aggregate(
                new Initializer<String>() {
                    @Override
                    public String apply() {
                        return "啥也沒有";
                    }
                }, new Aggregator<Object, String, String>() {
                    @Override
                    public String apply(Object key, String value, String aggValue) {
                        System.out.println("spring整合以后的第二次聚合-------");
                        if(StringUtils.isEmpty(value)){
                            return aggValue;
                        }
                        List<String> strings = Arrays.asList(value.toLowerCase().split(","));
                        int count = 0;
                        for (String string : strings) {
                            if(string.equals("hello")){
                                count++;
                            }
                        }
                        return String.format("hello:%d",count);
                    }
                },Materialized.as("kafka-stream-sample-count")
        ).toStream().map((key, value) -> {
            return new KeyValue<>(key.key().toString(), value);
        });
    }
}

測試:

啟動微服務,正常發送消息,可以正常接收到消息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM