6、Flink的常用Sink


image


1.1 Data Sink 數據輸出

經過一系列Transformation轉換操作后,最后一定要調用Sink操作,才會形成一個完整的DataFlow拓撲。只有調用了Sink操作,才會產生最終的計算結果,這些數據可以寫入到的文件、輸出到指定的網絡端口、消息中間件、外部的文件系統或者是打印到控制台。


1.1.1 print 打印

打印是最簡單的一個Sink,通常是用來做實驗和測試時使用。如果想讓一個DataStream輸出打印的結果,直接可以在該DataStream調用print方法。另外,該方法還有一個重載的方法,可以傳入一個字符,指定一個Sink的標識名稱,如果有多個打印的Sink,用來區分到底是哪一個Sink的輸出。

package cn._51doit.flink.day02;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;

public class PrintSinkDemo {

    public static void main(String[] args) throws Exception {

        //local模式默認的並行度是當前機器的邏輯核的數量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        int parallelism0 = env.getParallelism();

        System.out.println("執行環境默認的並行度:" + parallelism0);

        DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888);

        //獲取DataStream的並行度
        int parallelism = lines.getParallelism();

        System.out.println("SocketSource的並行度:" + parallelism);

        lines.print();

        //lines.addSink(new MyPrintSink()).name("my-print-sink");

        env.execute();


    }

    public static class MyPrintSink extends RichSinkFunction<String> {

        private int indexOfThisSubtask;
        @Override
        public void open(Configuration parameters) throws Exception {
            RuntimeContext runtimeContext = getRuntimeContext();
            indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        }

        @Override
        public void invoke(String value, Context context) throws Exception {

            System.out.println(indexOfThisSubtask + 1 + "> " + value);
        }
    }
}


下面的結果是WordCount例子中調用print Sink輸出在控制台的結果,細心的讀者會發現,在輸出的單詞和次數之前,有一個數字前綴,我這里是1~4,這個數字是該Sink所在subtask的Index + 1。有的讀者運行的結果數字前綴是1~8,該數字前綴其實是與任務的並行度相關的,由於該任務是以local模式運行,默認的並行度是所在機器可用的邏輯核數即線程數,我的電腦是2核4線程的,所以subtask的Index范圍是0~3,將Index + 1,顯示的數字前綴就是1~4了。這里在來仔細的觀察一下運行的結果發現:相同的單詞輸出結果的數字前綴一定相同,即經過keyBy之后,相同的單詞會被shuffle到同一個subtask中,並且在同一個subtask的同一個組內進行聚合。一個subtask中是可能有零到多個組的,如果是有多個組,每一個組是相互獨立的,累加的結果不會相互干擾。


1.1.2 writerAsText 以文本格式輸出

已writer開頭的sink 現在基本都已過時了;

該方法是將數據以文本格式實時的寫入到指定的目錄中,本質上使用的是TextOutputFormat格式寫入的。每輸出一個元素,在該內容后面同時追加一個換行符,最終以字符的形式寫入到文件中,目錄中的文件名稱是該Sink所在subtask的Index + 1。該方法還有一個重載的方法,可以額外指定一個枚舉類型的參數writeMode,默認是WriteMode.NO_OVERWRITE,如果指定相同輸出目錄下有相同的名稱文件存在,就會出現異常。如果是WriteMode.OVERWRITE,會將以前的文件覆蓋。

package cn._51doit.flink.day02;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class WriteSinkDemo {

    public static void main(String[] args) throws Exception {

        //local模式默認的並行度是當前機器的邏輯核的數量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        int parallelism0 = env.getParallelism();

        System.out.println("執行環境默認的並行度:" + parallelism0);

        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        //獲取DataStream的並行度
        int parallelism = lines.getParallelism();

        System.out.println("SocketSource的並行度:" + parallelism);

        lines.writeAsText("file:///Users/xing/Desktop/out");

        env.execute();


    }

    public static class MyPrintSink extends RichSinkFunction<String> {

        private int indexOfThisSubtask;
        @Override
        public void open(Configuration parameters) throws Exception {
            RuntimeContext runtimeContext = getRuntimeContext();
            indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        }

        @Override
        public void invoke(String value, Context context) throws Exception {

            System.out.println(indexOfThisSubtask + 1 + "> " + value);
        }
    }
}


1.1.3 writeAsCsv 以csv格式輸出

該方法是將數據以csv格式寫入到指定的目錄中,本質上使用的是CsvOutputFormat格式寫入的。每輸出一個元素,在該內容后面同時追加一個換行符,最終以csv的形式(類似Excel的格式,字段和字段之間用逗號分隔)寫入到文件中,目錄中的文件名稱是該Sink所在subtask的Index + 1。需要說明的是,該Sink並不是將數據實時的寫入到文件中,而是有一個BufferedOutputStream,默認緩存的大小為4096個字節,只有達到這個大小,才會flush到磁盤。另外程序在正常退出,調用Sink的close方法也會flush到磁盤。

DataStream<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
result.writeAsCsv(path);


1.1.4 writeUsingOutputFormat 以指定的格式輸出

該方法是將數據已指定的格式寫入到指定目錄中,該方法要傳入一個OutputFormat接口的實現類,該接口有很多已經實現好了的實現類,並且可以根據需求自己實現,所以該方法更加靈活。writeAsText和writeAsCsv方法底層都是調用了writeUsingOutputFormat方法。

DataStream<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
result.writeUsingOutputFormat(new TextOutputFormat<>(new Path(path));


1.1.5 writeToSocket 輸出到網絡端口

該方法是將數據輸出到指定的Socket網絡地址端口。該方法需要傳入三個參數:第一個為ip地址或主機名,第二個為端口號,第三個為數據輸出的序列化格式SerializationSchema。輸出之前,指定的網絡端口服務必須已經啟動。

DataStreamSource<String> lines = env.socketTextStream(“localhost”, 8888);
lines.writeToSocket(“localhost”, 9999, new SimpleStringSchema());


1.1.6 RedisSink

該方法是將數據輸出到Redis數據庫中,Redis是一個基於內存、性能極高的NoSQL數據庫,數據還可以持久化到磁盤,讀寫速度快,適合存儲key-value類型的數據。Redis不僅僅支持簡單的key-value類型的數據,同時還提供list,set,zset,hash等數據結構的存儲。Flink實時計算出的結果,需要快速的輸出存儲起來,要求寫入的存儲系統的速度要快,這個才不會造成數據積壓。Redis就是一個非常不錯的選擇。

首先在maven項目中的pom.xml中添加Redis Sink的依賴。

<!-- redis 依賴-->
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
    <version>1.1-SNAPSHOT</version>
</dependency>


接下來就是定義一個類(或者靜態內部類)實現RedisMapper即可,需要指定一個泛型,這里是Tuple2<String, Integer>,即寫入到Redis中的數據的類型,並實現三個方法。第一個方法是getCommandDescription方法,返回RedisCommandDescription實例,在該構造方法中可以指定寫入到Redis的方法類型為HSET,和Redis的additionalKey即value為HASH類型外面key的值;第二個方法getKeyFromData是指定value為HASH類型對應key的值;第三個方法geVauleFromData是指定value為HASH類型對應value的值。


在使用之前,先new FlinkJedisPoolConfig,設置Redis的ip地址或主機名、端口號、密碼等。然后new RedisSink將准備好的conf和RedisWordCountMapper實例傳入到其構造方法中,最后調用DataStream的addSink方法,將new好的RedisSink作為參數傳入。

package cn._51doit.flink.day02;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;

/**
 * 從指定的socket讀取數據,對單詞進行計算,將結果寫入到Redis中
 */
public class RedisSinkDemo {

    public static void main(String[] args) throws Exception {

        //創建Flink流計算執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //創建DataStream
        //Source
        DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888);

        //調用Transformation開始
        //調用Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分組
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tp) throws Exception {
                return tp.f0;
            }
        });

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

        //Transformation結束

        //調用Sink
        //summed.addSink()
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setDatabase(0).build();

        summed.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
        //啟動執行
        env.execute("StreamingWordCount");

    }

    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "WORD_COUNT");
        }

        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }

        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }

}

  image    image


1.1.7 KafkaSink

在實際的生產環境中,經常會有一些場景,需要將Flink處理后的數據快速地寫入到一個分布式、高吞吐、高可用、可用保證Exactly Once的消息中間件中,供其他的應用消費處理后的數據。Kafka就是Flink最好的黃金搭檔,Flink不但可以從Kafka中消費數據,還可以將處理后的數據寫入到Kafka,並且吞吐量高、數據安全、可以保證Exactly Once等。

Flink可以和Kafka多個版本整合,比如0.11.x、1.x、2.x等,從Flink1.9開始,使用的是kafka 2.2的客戶端,所以這里使用kafka的版本是2.2.2,並且使用最新的API。

下面的例子就是將數據寫入到Kafka中,首先要定義一個類實現KafkaSerializationSchema接口,指定一個泛型,String代表要寫入到Kafka的數據為String類型。該類的功能是指定寫入到Kafka中數據的序列化Schema,需要重寫serialize方法,將要寫入的數據轉成二進制數組,並封裝到一個ProducerRecord中返回。

package cn._51doit.flink.day02;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class KafkaSinkDemo {

    public static void main(String[] args) throws Exception {

        //local模式默認的並行度是當前機器的邏輯核的數量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        int parallelism0 = env.getParallelism();

        System.out.println("執行環境默認的並行度:" + parallelism0);

        DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888);

        //獲取DataStream的並行度
        int parallelism = lines.getParallelism();

        System.out.println("SocketSource的並行度:" + parallelism);

        //lines.writeAsText("file:///Users/xing/Desktop/out");

        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                "cs-28-87:9092,cs-28-88:9092,cs-28-89:9092", "wordcount18", new SimpleStringSchema()
        );

        lines.addSink(kafkaProducer);

        env.execute();

    }

}

啟動nc –lk 8888 ,然后啟動上述代碼程序;

在nc窗口中輸入數據,使用kafka可以消費到;

image

kafka消費wordcount18的topic:

[root@cs-28-88 ~]# kafka-console-consumer --zookeeper cs-28-88:2181 --topic wordcount18


然后將Kafka相關的參數設置到Properties中,再new FlinkKafkaProducer,將要寫入的topic名稱、Kafka序列化Schema、Properties和寫入到Kafka的Semantic語義作為FlinkKafkaProducer構造方法參數傳入。最好調用addSink方法將FlinkKafkaProducer的引用傳入到該方法中。雖然下面的代碼指定了EXACTLY_ONCE語義,但是沒有開啟Checkpointing,是沒法實現的。具有怎樣實現Exactly Once,會在后面原理深入的章節進行講解。


1.1.8 StreamFileDataSink

實時處理的數據,有一些場景要輸出到其他分布式文件系統中,比如Hadoop HDFS、Amazon S3 (Simple Storage Service)、Aliyun OSS(Object Storage Service)等。因為這些分布式文件系統都具有高可用、可擴展、多副本、存儲海量數據等特點。存儲到分布式文件系統的數據,就可以做一些離線的數據分析,比如離線的數倉、數據挖掘、機器學習等。

從Flink 1.9開始,原來的Bucketing Sink已經標記為過時,在未來的版本將會被移除。推薦使用StreamFileDataSink,該Sink不但可以將數據寫入到各種文件系統中,可以保證Exacly Once語義,還支持以列式存儲的格式寫入,功能更強大。

下面的例子是將數據寫入到HDFS中,首先在maven項目的pom.xml文件引入HDFS文件系統的依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.12</artifactId>
    <version>1.12-SNAPSHOT</version>
</dependency>


<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
</dependency>


通過DefaultRollingPolicy這個工具類,指定文件滾動生成的策略。這里設置的文件滾動生成策略有兩個,一個是距離上一次生成文件時間超過30秒,另一個是文件大小達到100 mb。這兩個條件只要滿足其中一個即可滾動生成文件。然后StreamingFileSink.forRowFormat方法將文件輸出目錄、文件寫入的編碼傳入,再調用withRollingPolicy關聯上面的文件滾動生成策略,接着調用build方法構建好StreamingFileSink,最后將其作為參數傳入到addSink方法中。


1.1.9 JDBCSink

package cn._51doit.flink.day02;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 從指定的socket讀取數據,對單詞進行計算,最后將結果寫入到MySQL
 */
public class JDBCSinkDemo {

    public static void main(String[] args) throws Exception {

        //創建Flink流計算執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(5000);

        //創建DataStream
        //Source
        DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888);

        //調用Transformation開始
        //調用Transformation
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    //new Tuple2<String, Integer>(word, 1)
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //分組
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tp) throws Exception {
                return tp.f0;
            }
        });

        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

        summed.addSink(JdbcSink.sink(
                "INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?",
                (ps, t) -> {
                    ps.setString(1, t.f0);
                    ps.setInt(2, t.f1);
                    ps.setInt(3, t.f1);
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/bigdata?characterEncoding=utf-8")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("123456")
                        .build()));

        //啟動執行
        env.execute("StreamingWordCount");

    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM