Kafka Stream數據清洗ETL


Kafka Streams

1.Apache Kafka開源項目的一個組成部分,是一個功能強大,易於使用的庫.用於在Kafka上構建高可分布,可拓展,高容錯的應用程序.

2.Kafka Streams特點

  1)功能強大:高擴展性,彈性,容錯

  2)輕量級:無需專門的集群,一個庫,而不是框架.

  3)完全集成:100%的Kafka 0.10版本兼容;易於集成到現有的程序

  4)實時性:毫秒級延遲,並非微批處理,窗口允許亂序數據,允許遲到數據

3.

當前已經有非常多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark StreamingApache StormApache Storm發展多年,應用廣泛,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基於Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對於熟悉其它Spark應用開發的用戶而言使用門檻低。另外,目前主流的Hadoop發行版,如ClouderaHortonworks,都集成了Apache StormApache Spark,使得部署更容易。

既然Apache SparkApache Storm擁用如此多的優勢,那為何還需要Kafka Stream呢?主要有如下原因。

第一,SparkStorm都是流式處理框架,而Kafka Stream提供的是一個基於Kafka的流式處理類庫。框架要求開發者按照特定的方式去開發邏輯部分,供框架調用。開發者很難了解框架的具體運行方式,從而使得調試成本高,並且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試。

第二,雖然ClouderaHortonworks方便了StormSpark的部署,但是這些框架的部署仍然相對復雜。而Kafka Stream作為類庫,可以非常方便的嵌入應用程序中,它對應用的打包和部署基本沒有任何要求。

第三,就流式處理系統而言,基本都支持Kafka作為數據源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統的標准數據源。換言之,大部分流式系統中都已部署了Kafka,此時使用Kafka Stream的成本非常低。

第四,使用StormSpark Streaming時,需要為框架本身的進程預留資源,如StormsupervisorSpark on YARNnode manager。即使對於應用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shufflestorage預留內存。但是Kafka作為類庫不占用系統資源。

第五,由於Kafka本身提供數據持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。

第六,由於Kafka Consumer Rebalance機制,Kafka Stream可以在線動態調整並行度

 

案例:數據清洗

需求描述:

實時處理單詞帶有”>>>”前綴的內容例如輸入”lxz>>>lexue”,最終處理成“lexue”;

0) pom文件導入

 <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>

        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <!-- Winodws下提交至Yarn上運行,改客戶端是2.6.1s
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.1</version>
        </dependency>
-->

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.mrunit/mrunit MRUnit測試 -->
        <dependency>
            <groupId>org.apache.mrunit</groupId>
            <artifactId>mrunit</artifactId>
            <version>0.9.0-incubating</version>
            <classifier>hadoop2</classifier>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.sun</groupId>
            <artifactId>tools</artifactId>
            <version>1.8.0</version>
            <scope>system</scope>
            <systemPath>${env.JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>1.3.2</version>
            <type>pom</type>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.3.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.2</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
            <type>pom</type>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.glassfish.jersey.core/jersey-client -->
        <dependency>
            <groupId>org.glassfish.jersey.core</groupId>
            <artifactId>jersey-client</artifactId>
            <version>2.26</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.10.0.0</version>
        </dependency>

    </dependencies>

1) 創建主程序

package com.lxz.kafka;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;


import java.util.Properties;

public class Application {
    public static void main(String[] args) {
        //1.定義輸入的topic
        String from = "first";
        //定義輸出的topic
        String to = "second";
        //設置參數
        Properties settings = new Properties();
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG,"logFilter");
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092");

        StreamsConfig config = new StreamsConfig(settings);

        //擴建拓撲
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("SOURCE",from)
                .addProcessor("PROCESS",new ProcessorSupplier<byte[],byte[]>(){
                    public Processor<byte[],byte[]> get(){
                        //具體分析
//                        return new LogProcessor();
                        return new LogProcessor();
                    }
                },"SOURCE")
                .addSink("SINK",to,"PROCESS");

        //創建Kafka stream
        KafkaStreams kafkaStreams = new KafkaStreams(builder, config);
        kafkaStreams.start();
    }
}

2) 具體業務處理

package com.lxz.kafka;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

public class LogProcessor implements Processor<byte[],byte[]> {
    private ProcessorContext context;


    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(byte[] key, byte[] value) {
        String input = new String(value);
        //如果包含“>>>"則只保留該標記后面的內容
        if (input.contains(">>>")){
             input = input.split(">>>")[1].trim();
             //輸出到下一個topic
            context.forward("logProcessor".getBytes(),input.getBytes());
        }else {
            context.forward("logProcessor".getBytes(),input.getBytes());
        }
    }

    @Override
    public void punctuate(long l) {

    }

    @Override
    public void close() {

    }
}

3) 報錯信息處理

如果遇到log4j提示報警,則是因為缺少了log4j的配置文件,在resources中創建log4j.properties並寫入

log4j.rootLogger=INFO,console,dailyFile
# TODO 發布到阿里雲記得添加,另外控制台不輸出(只輸出warn或者error信息)

# log4j.logger.org.mybatis = INFO
log4j.logger.com.imooc.mapper=INFO

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.encoding=UTF-8
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%l] - [%p] %m%n

# 定期滾動日志文件,每天都會生成日志
log4j.appender.dailyFile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.dailyFile.encoding=UTF-8
log4j.appender.dailyFile.Threshold=INFO
# TODO 本地日志地址,正式環境請務必切換為阿里雲地址
log4j.appender.dailyFile.File=C:/logs/maven-ssm-alipay/log.log4j
log4j.appender.dailyFile.DatePattern='.'yyyy-MM-dd
log4j.appender.dailyFile.layout=org.apache.log4j.PatternLayout
log4j.appender.dailyFile.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%l] - [%p] %m%n

4) 測試

  1.運行Idea主程序

  2.啟動hadoop1 2 3,三台服務器,zk,kafka集群

  3.在hadoop2上啟動生產者: bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic first

  4.在hadoop3上啟動消費者: bin/kafka-console-consumer.sh --zookeeper hadoop1:2181 --from-beginning --topic second

  5.在hadoop2上生產者下輸入:lxz>>>lexue

  6.查看hadoop3上的消費者是否成功消費到了:lexue

 

 生產者端:

 

 消費者端:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM