flink table&sql canal使用案例


本文項目源碼見github:https://github.com/felixzh2020/felixzh-learning-flink/tree/master/canal

版本信息

產品 版本
Flink 1.11.1
flink-cdc-connectors 1.1.0
Java 1.8.0_231
MySQL 5.7.16

Mavan依賴

  • pom.xml 依賴部分
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.11.1</flink.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <type>test-jar</type>
        </dependency>
        <!-- Flink-CDC -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.1.0</version>
        </dependency>
    
    </dependencies>

    主從同步配置、數據准備

    • 關閉MySQL服務
    • 在需要被同步的MySQL節點,添加如下配置(可供參考的文檔
      [mysqld]
      # 前面還有其他配置
      # 添加的部分
      server-id = 12345
      log-bin = mysql-bin
      # 必須為ROW
      binlog_format = ROW
      # 必須為FULL,MySQL-5.7后才有該參數
      binlog_row_image  = FULL
      expire_logs_days  = 10
    • 啟動MySQL服務
    • 使用如下命令,可查看binlog相關變量配置
      SHOW VARIABLES LIKE '%binlog%';
    • 創建待測試的庫、表、數據
      CREATE DATABASE db_inventory_cdc; CREATE TABLE tb_products_cdc( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(64), description VARCHAR(128) ); INSERT INTO tb_products_cdc VALUES (DEFAULT, 'zhangsan', 'aaa'), (DEFAULT, 'lisi', 'bbb'), (DEFAULT, 'wangwu', 'ccc');
    • 創建用於同步的用戶,並給予權限(可供參考的文檔
      -- 設置擁有同步權限的用戶 CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpassword'; -- 賦予同步相關權限 GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser';
    • 創建用戶並賦予權限成功后,使用該用戶登錄MySQL,可以使用以下命令查看主從同步相關信息
      SHOW MASTER STATUS SHOW SLAVE STATUS SHOW BINARY LOGS

    使用Flink-CDC

      • sql-cli點擊查看
      • 編碼方式,方便提交jar包,示例如下
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironimport org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.planner.factories.TestValuesTableFactory; public class FlinkCDCSQLTest { public static void main(String[] args) throws Exception { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); // 數據源表 String sourceDDL = "CREATE TABLE mysql_binlog (\n" + " id INT NOT NULL,\n" + " name STRING,\n" + " description STRING\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3306',\n" + " 'username' = 'flinkuser',\n" + " 'password' = 'flinkpassword',\n" + " 'database-name' = 'db_inventory_cdc',\n" + " 'table-name' = 'tb_products_cdc'\n" + ")"; // 輸出目標表 String sinkDDL = "CREATE TABLE tb_sink (\n" + " name STRING,\n" + " countSum BIGINT,\n" + " PRIMARY KEY (name) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")"; // 簡單的聚合處理 String transformSQL = "INSERT INTO tb_sink " + "SELECT name, COUNT(1) " + "FROM mysql_binlog " + "GROUP BY name"; tableEnv.executeSql(sourceDDL); tableEnv.executeSql(sinkDDL); TableResult result = tableEnv.executeSql(transformSQL); // 等待flink-cdc完成快照 waitForSnapshotStarted("tb_sink"); result.print(); result.getJobClient().get().cancel().get(); } private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { while (sinkSize(sinkName) == 0) { Thread.sleep(100); } } private static int sinkSize(String sinkName) { synchronized (TestValuesTableFactory.class) { try { return TestValuesTableFactory.getRawResults(sinkName).size(); } catch (IllegalArgumentException e) { // job is not started yet return 0; } } } }

    簡單的測試

      • 進行簡單測試,開始修改MySQL表的數據
        -- SQL測試數據,對照Flink應用 INSERT INTO tb_products_cdc VALUE(DEFAULT, 'lisi', 'ddd'); DELETE FROM tb_products_cdc WHERE id=4; UPDATE tb_products_cdc SET name='wangwu' WHERE id=2;
      • 執行一條SQL,查看一下Flink的結果變化


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM