大數據基礎---Flink_Data_Sink


一、Data Sinks

在使用 Flink 進行數據處理時,數據經 Data Source 流入,然后通過系列 Transformations 的轉化,最終可以通過 Sink 將計算結果進行輸出,Flink Data Sinks 就是用於定義數據流最終的輸出位置。Flink 提供了幾個較為簡單的 Sink API 用於日常的開發,具體如下:

1.1 writeAsText

writeAsText 用於將計算結果以文本的方式並行地寫入到指定文件夾下,除了路徑參數是必選外,該方法還可以通過指定第二個參數來定義輸出模式,它有以下兩個可選值:

  • WriteMode.NO_OVERWRITE:當指定路徑上不存在任何文件時,才執行寫出操作;
  • WriteMode.OVERWRITE:不論指定路徑上是否存在文件,都執行寫出操作;如果原來已有文件,則進行覆蓋。

使用示例如下:

 streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE);

以上寫出是以並行的方式寫出到多個文件,如果想要將輸出結果全部寫出到一個文件,需要設置其並行度為 1:

streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

1.2 writeAsCsv

writeAsCsv 用於將計算結果以 CSV 的文件格式寫出到指定目錄,除了路徑參數是必選外,該方法還支持傳入輸出模式,行分隔符,和字段分隔符三個額外的參數,其方法定義如下:

writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) 

1.3 print \ printToErr

print \ printToErr 是測試當中最常用的方式,用於將計算結果以標准輸出流或錯誤輸出流的方式打印到控制台上。

1.4 writeUsingOutputFormat

采用自定義的輸出格式將計算結果寫出,上面介紹的 writeAsTextwriteAsCsv 其底層調用的都是該方法,源碼如下:

public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
    TextOutputFormat<T> tof = new TextOutputFormat<>(new Path(path));
    tof.setWriteMode(writeMode);
    return writeUsingOutputFormat(tof);
}

1.5 writeToSocket

writeToSocket 用於將計算結果以指定的格式寫出到 Socket 中,使用示例如下:

streamSource.writeToSocket("192.168.0.226", 9999, new SimpleStringSchema());

二、Streaming Connectors

除了上述 API 外,Flink 中還內置了系列的 Connectors 連接器,用於將計算結果輸入到常用的存儲系統或者消息中間件中,具體如下:

  • Apache Kafka (支持 source 和 sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Google PubSub (source/sink)

除了內置的連接器外,你還可以通過 Apache Bahir 的連接器擴展 Flink。Apache Bahir 旨在為分布式數據分析系統 (如 Spark,Flink) 等提供功能上的擴展,當前其支持的與 Flink Sink 相關的連接器如下:

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)

這里接着在 Data Sources 章節介紹的整合 Kafka Source 的基礎上,將 Kafka Sink 也一並進行整合,具體步驟如下。

三、整合 Kafka Sink

3.1 addSink

Flink 提供了 addSink 方法用來調用自定義的 Sink 或者第三方的連接器,想要將計算結果寫出到 Kafka,需要使用該方法來調用 Kafka 的生產者 FlinkKafkaProducer,具體代碼如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 1.指定Kafka的相關配置屬性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.200.0:9092");

// 2.接收Kafka上的數據
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties));

// 3.定義計算結果到 Kafka ProducerRecord 的轉換
KafkaSerializationSchema<String> kafkaSerializationSchema = new KafkaSerializationSchema<String>() {
    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
        return new ProducerRecord<>("flink-stream-out-topic", element.getBytes());
    }
};
// 4. 定義Flink Kafka生產者
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic",
                                                                    kafkaSerializationSchema,
                                                                    properties,
                                               FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5);
// 5. 將接收到輸入元素*2后寫出到Kafka
stream.map((MapFunction<String, String>) value -> value + value).addSink(kafkaProducer);
env.execute("Flink Streaming");

3.2 創建輸出主題

創建用於輸出測試的主題:

bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                    --replication-factor 1 \
                    --partitions 1  \
                    --topic flink-stream-out-topic

# 查看所有主題
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3.3 啟動消費者

啟動一個 Kafka 消費者,用於查看 Flink 程序的輸出情況:

bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flink-stream-out-topic

3.4 測試結果

在 Kafka 生產者上發送消息到 Flink 程序,觀察 Flink 程序轉換后的輸出情況,具體如下:

可以看到 Kafka 生成者發出的數據已經被 Flink 程序正常接收到,並經過轉換后又輸出到 Kafka 對應的 Topic 上。

四、自定義 Sink

除了使用內置的第三方連接器外,Flink 還支持使用自定義的 Sink 來滿足多樣化的輸出需求。想要實現自定義的 Sink ,需要直接或者間接實現 SinkFunction 接口。通常情況下,我們都是實現其抽象類 RichSinkFunction,相比於 SinkFunction ,其提供了更多的與生命周期相關的方法。兩者間的關系如下:

這里我們以自定義一個 FlinkToMySQLSink 為例,將計算結果寫出到 MySQL 數據庫中,具體步驟如下:

4.1 導入依賴

首先需要導入 MySQL 相關的依賴:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.16</version>
</dependency>

4.2 自定義 Sink

繼承自 RichSinkFunction,實現自定義的 Sink :

public class FlinkToMySQLSink extends RichSinkFunction<Employee> {

    private PreparedStatement stmt;
    private Connection conn;

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.cj.jdbc.Driver");
        conn = DriverManager.getConnection("jdbc:mysql://192.168.0.229:3306/employees" +
                                           "?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false", 
                                           "root", 
                                           "123456");
        String sql = "insert into emp(name, age, birthday) values(?, ?, ?)";
        stmt = conn.prepareStatement(sql);
    }

    @Override
    public void invoke(Employee value, Context context) throws Exception {
        stmt.setString(1, value.getName());
        stmt.setInt(2, value.getAge());
        stmt.setDate(3, value.getBirthday());
        stmt.executeUpdate();
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (stmt != null) {
            stmt.close();
        }
        if (conn != null) {
            conn.close();
        }
    }

}

4.3 使用自定義 Sink

想要使用自定義的 Sink,同樣是需要調用 addSink 方法,具體如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Date date = new Date(System.currentTimeMillis());
DataStreamSource<Employee> streamSource = env.fromElements(
    new Employee("hei", 10, date),
    new Employee("bai", 20, date),
    new Employee("ying", 30, date));
streamSource.addSink(new FlinkToMySQLSink());
env.execute();

4.4 測試結果

啟動程序,觀察數據庫寫入情況:

數據庫成功寫入,代表自定義 Sink 整合成功。

以上所有用例的源碼見本倉庫:flink-kafka-integration

參考資料

  1. data-sinks: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html#data-sinks
  2. Streaming Connectors:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/index.html
  3. Apache Kafka Connector: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html

系列傳送門


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM