項目實戰 從 0 到 1 學習之Flink(15)Flink讀取kafka數據並寫入HDFS


1.概述

最近有同學留言咨詢,Flink消費Kafka的一些問題,今天筆者將用一個小案例來為大家介紹如何將Kafka中的數據,通過Flink任務來消費並存儲到HDFS上。

2.內容

這里舉個消費Kafka的數據的場景。比如,電商平台、游戲平台產生的用戶數據,入庫到Kafka中的Topic進行存儲,然后采用Flink去實時消費積累到HDFS上,積累后的數據可以構建數據倉庫(如Hive)做數據分析,或是用於數據訓練(算法模型)。如下圖所示:

 

2.1 環境依賴

整個流程,需要依賴的組件有Kafka、Flink、Hadoop。由於Flink提交需要依賴Hadoop的計算資源和存儲資源,所以Hadoop的YARN和HDFS均需要啟動。各個組件版本如下:

組件 版本
Kafka 2.4.0
Flink 1.10.0
Hadoop 2.10.0

 

2.2 代碼實現

Flink消費Kafka集群中的數據,需要依賴Flink包,依賴如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.12</artifactId>
    <version>${flink.connector.version}</version>
 </dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    <version>${flink.kafka.version}</version>
 </dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>${flink.streaming.version}</version>
 </dependency>

編寫消費Topic的Flink代碼,這里不對Topic中的數據做邏輯處理,直接消費並存儲到HDFS上。代碼如下:

/**
 * Flink consumer topic data and store into hdfs.
 * 
 * @author smartloli.
 *
 *         Created by Mar 15, 2020
 */
public class Kafka2Hdfs {

    private static Logger LOG = LoggerFactory.getLogger(Kafka2Hdfs.class);

    public static void main(String[] args) {
        if (args.length != 3) {
            LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");
            return;
        }
        String bootStrapServer = args[0];
        String hdfsPath = args[1];
        int parallelism = Integer.parseInt(args[2]);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(parallelism);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_data", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));

        // Storage into hdfs
        BucketingSink<String> sink = new BucketingSink<>(hdfsPath);

        sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd"));

        sink.setBatchSize(1024 * 1024 * 1024); // this is 1GB
        sink.setBatchRolloverInterval(1000 * 60 * 60); // one hour producer a file into hdfs
        transction.addSink(sink);

        env.execute("Kafka2Hdfs");
    }

    private static Object configByKafkaServer(String bootStrapServer) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootStrapServer);
        props.setProperty("group.id", "test_bll_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }

}

2.3 注意事項

  • 存儲到HDFS時,不用添加其他HDFS依賴,只需要Flink采用yarn-cluster模式提交即可;
  • 采用FSDataOutputStream寫入時,會先寫入緩沖區,放在內存中;
  • Flink每次做Checkpoint的時候,會Flush緩沖區的數據,以及將Pending(已經完成的文件,但為被Checkpoint記錄,可以通過sink.setPendingSuffix("xxx")來設置)結尾的文件記錄下來
  • Flink每60秒(可以通過sink.setInactiveBucketCheckInterval(60 * 1000)來進行設置)檢測,如果一個文件的FSDataOutputStream在60秒內(可以通過sink.setInactiveBucketThreshold(60 * 1000)來設置),都還沒有接收到數據,Flink就會認為該文件是不活躍的Bucket,那么就會被Flush后關閉該文件;
  • 我們再深入一點查看代碼,實際上只是在processingTimeService中注冊了當前的時間(currentProcessingTime)+ 60秒不寫入的時間(inactiveBucketCheckInterval)。接着通過onProcessIngTime方法去不停的判斷是否滿足60秒不寫入,同時也會判斷是否到了滾動時間。代碼如下:
public void onProcessingTime(long timestamp) throws Exception {
        long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); 
        closePartFilesByTime(currentProcessingTime);
        processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
}        
  • 在Flink內部封裝了一個集合Map<String, BucketState<T>> bucketStates = new HashMap<>();用來記錄當前正在使用的文件,key是文件的路徑,BucketState內部封裝了該文件的所有信息,包括創建時間,最后一次寫入時間(這里的寫入指的是寫入緩存區的時間,不是Flush的時間)。當前文件是打開還是關閉,寫緩沖區的方法。都在這里。每次Flink要對文件進行操作的時候,都會從這里拿到文件的封裝對象;
  • 當程序被取消的時候,當前正在操作的文件,會被Flush,然后關閉。然后將文件的后綴名從in-progress改為pending。這個前后綴都是可以設置,但如果沒有什么特殊需求,默認即可。這里拿文件,用的就是上面說的bucketStates這個map。它在close方法中,會去遍歷這個map,去做上述的操作;代碼如下:
public void close() throws Exception {
        if (state != null) {
            for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
                closeCurrentPartFile(entry.getValue());
            }
        }
}
  • 每次寫入的時候,都是會bucketStates這個map中獲取對應的對象,如果沒有,就會new一個該對象。然后先判斷是否需要滾動(通過當前文件大小和滾動時間去判斷),然后才將數據寫入緩沖區,更新最后寫入時間,代碼如下:
public void invoke(T value) throws Exception {
        Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
 
        long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
 
        BucketState<T> bucketState = state.getBucketState(bucketPath);
        if (bucketState == null) {
            bucketState = new BucketState<>(currentProcessingTime);
            state.addBucketState(bucketPath, bucketState);
        }
 
        if (shouldRoll(bucketState, currentProcessingTime)) {
            openNewPartFile(bucketPath, bucketState);
        }
 
        bucketState.writer.write(value);
        bucketState.lastWrittenToTime = currentProcessingTime;
}
  • 寫入和關閉HDFS是通過異步的方式的,異步的超時時間默認是60秒,可以通過 sink.setAsyncTimeout(60 * 1000)去設置

3.總結

Flink消費Kafka數據並寫到HDFS的代碼實現是比較簡短了,沒有太多復雜的邏輯。實現的時候,注意Kafka的地址、反序列化需要在屬性中配置、以及Flink任務提交的時候,設置yarn-cluster模式、設置好內存和CPU、HDFS存儲路徑等信息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM