kafka-stream數據清洗


1、數據清洗業務類LogProcessor

package com.css.kafka.kafka_stream;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

/**
 * 數據清洗*/
public class LogProcessor implements Processor<byte[], byte[]>{

    private ProcessorContext context;
    
    //初始化
    public void init(ProcessorContext context) {
        //傳輸
        this.context = context;
    }

    //具體業務邏輯
    public void process(byte[] key, byte[] value) {
        //1.拿到消息數據,轉成字符串
        String message = new String(value);
        
        //2.如果包含-  去除
        if (message.contains("-")) {
            //3.把- 去掉 之后去掉左側數據
            message = message.split("-")[1];
        }
        //4.發送數據
        context.forward(key, message.getBytes());
    }

    //釋放資源
    public void close() {
    }
}

2、Application類

package com.css.kafka.kafka_stream;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;

/**
 * 需求:對數據進行清洗操作
 * 
 * 思路:wo-henshuai  把-和wo清洗掉*/
public class Application {

    public static void main(String[] args) {
        //1.定義主題 發送到 另外一個主題中 數據清洗
        String oneTopic = "t1";
        String twoTopic = "t2";
        
        //2.設置屬性
        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092,192.168.146.133:9092,192.168.146.134:9092");
        
        //3.實例對象
        StreamsConfig config = new StreamsConfig(prop);
        
        //4.流計算 拓撲
        Topology builder = new Topology();
        
        //5.定義kafka組件數據源
        builder.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() {

            public Processor<byte[], byte[]> get() {
                return new LogProcessor();
            }
            //從哪里來
        }, "Source")
        //到哪里去
        .addSink("Sink", twoTopic, "Processor");

        //6.實例化kafkaStream
        KafkaStreams kafkaStreams = new KafkaStreams(builder, prop);
        kafkaStreams.start();
    }
}

3、運行Application類的main方法

4、在hd09-1機器上創建主題t1

bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partition 1 --topic t1

5、在hd09-2機器上啟動消費者

bin/kafka-console-consumer.sh --bootstrap-server hd09-2:9092 --topic t2 --from-beginning --consumer.config config/consumer.properties

6、在hd09-1機器上啟動生產者

bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic t1

7、此時在hd09-1機器kafka生產者上輸入  wo-henshuai,在hd09-2消費者機器上會顯示henshuai,即完成了數據清洗,如下圖。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM