Flink的sink實戰之四:自定義


歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

本篇概覽

Flink官方提供的sink服務可能滿足不了我們的需要,此時可以開發自定義的sink,文本就來一起實戰;

全系列鏈接

  1. 《Flink的sink實戰之一:初探》
  2. 《Flink的sink實戰之二:kafka》
  3. 《Flink的sink實戰之三:cassandra3》
  4. 《Flink的sink實戰之四:自定義》

繼承關系

  1. 在正式編碼前,要先弄清楚對sink能力是如何實現的,前面我們實戰過的print、kafka、cassandra等sink操作,核心類的繼承關系如下圖所示:
    在這里插入圖片描述
  2. 可見實現sink能力的關鍵,是實現RichFunction和SinkFunction接口,前者用於資源控制(如open、close等操作),后者負責sink的具體操作,來看看最簡單的PrintSinkFunction類是如何實現SinkFunction接口的invoke方法:
@Override
public void invoke(IN record) {
  writer.write(record);
}
  1. 現在對sink的基本邏輯已經清楚了,可以開始編碼實戰了;

內容和版本

本次實戰很簡單:自定義sink,用於將數據寫入MySQL,涉及的版本信息如下:

  1. jdk:1.8.0_191
  2. flink:1.9.2
  3. maven:3.6.0
  4. flink所在操作系統:CentOS Linux release 7.7.1908
  5. MySQL:5.7.29
  6. IDEA:2018.3.5 (Ultimate Edition)

源碼下載

如果您不想寫代碼,整個系列的源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):

名稱 鏈接 備注
項目主頁 https://github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) git@github.com:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議

這個git項目中有多個文件夾,本章的應用在flinksinkdemo文件夾下,如下圖紅框所示:
在這里插入圖片描述

數據庫准備

請您將MySQL准備好,並執行以下sql,用於創建數據庫flinkdemo和表student:

create database if not exists flinkdemo;
USE flinkdemo;
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

編碼

  1. 使用《Flink的sink實戰之二:kafka》中創建的flinksinkdemo工程;
  2. 在pom.xml中增加mysql的依賴:
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.11</version>
</dependency>
  1. 創建和數據庫的student表對應的實體類Student.java:
package com.bolingcavalry.customize;

public class Student {
    private int id;
    private String name;
    private int age;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Student(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
  1. 創建自定義sink類MySQLSinkFunction.java,這是本文的核心,有關數據庫的連接、斷開、寫入數據都集中在此:
package com.bolingcavalry.customize;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class MySQLSinkFunction extends RichSinkFunction<Student> {

    PreparedStatement preparedStatement;

    private Connection connection;

    private ReentrantLock reentrantLock = new ReentrantLock();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        //准備數據庫相關實例
        buildPreparedStatement();
    }

    @Override
    public void close() throws Exception {
        super.close();

        try{
            if(null!=preparedStatement) {
                preparedStatement.close();
                preparedStatement = null;
            }
        } catch(Exception e) {
            e.printStackTrace();
        }

        try{
            if(null!=connection) {
                connection.close();
                connection = null;
            }
        } catch(Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void invoke(Student value, Context context) throws Exception {
        preparedStatement.setString(1, value.getName());
        preparedStatement.setInt(2, value.getAge());
        preparedStatement.executeUpdate();
    }

    /**
     * 准備好connection和preparedStatement
     * 獲取mysql連接實例,考慮多線程同步,
     * 不用synchronize是因為獲取數據庫連接是遠程操作,耗時不確定
     * @return
     */
    private void buildPreparedStatement() {
        if(null==connection) {
            boolean hasLock = false;
            try {
                hasLock = reentrantLock.tryLock(10, TimeUnit.SECONDS);

                if(hasLock) {
                    Class.forName("com.mysql.cj.jdbc.Driver");
                    connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456");
                }

                if(null!=connection) {
                    preparedStatement = connection.prepareStatement("insert into student (name, age) values (?, ?)");
                }
            } catch (Exception e) {
                //生產環境慎用
                e.printStackTrace();
            } finally {
                if(hasLock) {
                    reentrantLock.unlock();
                }
            }
        }
    }
}
  1. 上述代碼很簡單,只需要注意在創建連接的時候用到了鎖來控制多線程同步,以及高版本mysql驅動對應的driver和uri的寫法與以前5.x版本的區別;
  2. 創建任務類StudentSink.java,用來創建一個flink任務,里面通過ArrayList創建了一個數據集,然后直接addSink,為了看清DAG,調用disableChaining方法取消了operator chain:
package com.bolingcavalry.customize;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;

public class StudentSink {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //並行度為1
        env.setParallelism(1);

        List<Student> list = new ArrayList<>();
        list.add(new Student("aaa", 11));
        list.add(new Student("bbb", 12));
        list.add(new Student("ccc", 13));
        list.add(new Student("ddd", 14));
        list.add(new Student("eee", 15));
        list.add(new Student("fff", 16));

        env.fromCollection(list)
            .addSink(new MySQLSinkFunction())
            .disableChaining();

        env.execute("sink demo : customize mysql obj");
    }
}
  1. 在flink web頁面提交任務,並設置任務類:
    在這里插入圖片描述
  2. 任務完成后,DAG圖顯示任務和記錄數都符合預期:
    在這里插入圖片描述
  3. 去檢查數據庫,發現數據已寫入:

在這里插入圖片描述

至此,自定義sink的實戰已經完成,希望本文能給您一些參考;

歡迎關注公眾號:程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM