kafka stream 使用樣例


1. war包導入

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.3.0</version>
        </dependency>
    </dependencies>

2. 代碼實現

package com.atguigu.kafkastream;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;

import java.util.Properties;

public class Application {
    public static void main(String[] args) {
        String input = "abc";   //輸入 topic
        String output = "recommender";  //輸出 topic

        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092");
        //使用Serdes類創建序列化/反序列化所需的Serde實例 Serdes類為以下類型提供默認的實現:String、Byte array、Long、Integer和Double。
        Serde<String> stringSerde = Serdes.String();

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> simpleFirstStream = builder.stream(input, Consumed.with(stringSerde, stringSerde));
        // 使用KStream.mapValues 將輸入數據流以 abc: 拆分獲取下標為 1 字符串
        KStream<String, String> upperCasedStream = simpleFirstStream.mapValues(line -> line.split("abc:")[1]);
        // 把轉換結果輸出到另一個topic
        upperCasedStream.to(output, Produced.with(stringSerde, stringSerde));

        //創建和啟動KStream
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        kafkaStreams.start();
    }
}

3. 測試 

  1)啟動 2 中程序

  2)啟動 kafka

  3)啟動一個名稱為 abc 的 topic 生產者

    bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 1 --partitions 1 --topic abc

  4)啟動一個名詞為 recommender 的topic 消費者

    bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic recommender

  5)abc topic 中輸入字符串(如:   abc:22|33|44|55)

  6)recommender 中就可收到過濾后的字符串   22|33|44|55


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM