1. war包導入
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency> </dependencies>
2. 代碼實現
package com.atguigu.kafkastream; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import java.util.Properties; public class Application { public static void main(String[] args) { String input = "abc"; //輸入 topic String output = "recommender"; //輸出 topic Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092"); //使用Serdes類創建序列化/反序列化所需的Serde實例 Serdes類為以下類型提供默認的實現:String、Byte array、Long、Integer和Double。 Serde<String> stringSerde = Serdes.String(); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> simpleFirstStream = builder.stream(input, Consumed.with(stringSerde, stringSerde)); // 使用KStream.mapValues 將輸入數據流以 abc: 拆分獲取下標為 1 字符串 KStream<String, String> upperCasedStream = simpleFirstStream.mapValues(line -> line.split("abc:")[1]); // 把轉換結果輸出到另一個topic upperCasedStream.to(output, Produced.with(stringSerde, stringSerde)); //創建和啟動KStream KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties); kafkaStreams.start(); } }
3. 測試
1)啟動 2 中程序
2)啟動 kafka
3)啟動一個名稱為 abc 的 topic 生產者
bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 1 --partitions 1 --topic abc
4)啟動一個名詞為 recommender 的topic 消費者
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic recommender
5)abc topic 中輸入字符串(如: abc:22|33|44|55)
6)recommender 中就可收到過濾后的字符串 22|33|44|55