1、數據清洗業務類LogProcessor
package com.css.kafka.kafka_stream; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; /** * 數據清洗*/ public class LogProcessor implements Processor<byte[], byte[]>{ private ProcessorContext context; //初始化 public void init(ProcessorContext context) { //傳輸 this.context = context; } //具體業務邏輯 public void process(byte[] key, byte[] value) { //1.拿到消息數據,轉成字符串 String message = new String(value); //2.如果包含- 去除 if (message.contains("-")) { //3.把- 去掉 之后去掉左側數據 message = message.split("-")[1]; } //4.發送數據 context.forward(key, message.getBytes()); } //釋放資源 public void close() { } }
2、Application類
package com.css.kafka.kafka_stream; import java.util.Properties; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; /** * 需求:對數據進行清洗操作 * * 思路:wo-henshuai 把-和wo清洗掉*/ public class Application { public static void main(String[] args) { //1.定義主題 發送到 另外一個主題中 數據清洗 String oneTopic = "t1"; String twoTopic = "t2"; //2.設置屬性 Properties prop = new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092,192.168.146.133:9092,192.168.146.134:9092"); //3.實例對象 StreamsConfig config = new StreamsConfig(prop); //4.流計算 拓撲 Topology builder = new Topology(); //5.定義kafka組件數據源 builder.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() { public Processor<byte[], byte[]> get() { return new LogProcessor(); } //從哪里來 }, "Source") //到哪里去 .addSink("Sink", twoTopic, "Processor"); //6.實例化kafkaStream KafkaStreams kafkaStreams = new KafkaStreams(builder, prop); kafkaStreams.start(); } }
3、運行Application類的main方法
4、在hd09-1機器上創建主題t1
bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partition 1 --topic t1
5、在hd09-2機器上啟動消費者
bin/kafka-console-consumer.sh --bootstrap-server hd09-2:9092 --topic t2 --from-beginning --consumer.config config/consumer.properties
6、在hd09-1機器上啟動生產者
bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic t1
7、此時在hd09-1機器kafka生產者上輸入 wo-henshuai,在hd09-2消費者機器上會顯示henshuai,即完成了數據清洗,如下圖。