FlinkSQL實踐記錄2


1. 背景

昨天《FlinkSQL實踐記錄1》對FlinkSql做了簡單的使用insert into .. select ..,今天對聚合運算做一些實踐。

2. 代碼實踐

        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               name STRING," +
                "               cnt BIGINT," +
                "               PRIMARY KEY (name) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
                " 'table-name' = 'count_info'," +
                " 'username' = 'kafka'," +
                " 'password' = 'Bonc@123'" +
                ")";

        tableEnv.executeSql(mysql_sql);

        // 插入數據
        TableResult tableResult = tableEnv.executeSql(
                "INSERT INTO mysql_sink " +
                        "SELECT name, count(*) as cnt " +
                        "FROM sensor_source " +
                        "where id > 3 " +
                        "group by name "
                       // "order by name "
        );
        System.out.println(tableResult.getJobClient().get().getJobStatus());

摘自官網

Flink uses the primary key that defined in DDL when writing data to external databases. The connector operate in upsert mode if the primary key was defined, otherwise, the connector operate in append mode.
Flink 在將數據寫入外部數據庫時使用 DDL 中定義的主鍵。 如果定義了主鍵,則連接器以 upsert 模式運行,否則,連接器以附加模式運行。

In upsert mode, Flink will insert a new row or update the existing row according to the primary key, Flink can ensure the idempotence in this way. To guarantee the output result is as expected, it’s recommended to define primary key for the table and make sure the primary key is one of the unique key sets or primary key of the underlying database table. In append mode, Flink will interpret all records as INSERT messages, the INSERT operation may fail if a primary key or unique constraint violation happens in the underlying database.
在 upsert 模式下,Flink 會根據主鍵插入新行或更新現有行,Flink 可以通過這種方式保證冪等性。 為保證輸出結果符合預期,建議為表定義主鍵,並確保主鍵是底層數據庫表的唯一鍵集或主鍵之一。 在 append 模式下,Flink 會將所有記錄解釋為 INSERT 消息,如果底層數據庫發生主鍵或唯一約束違規,INSERT 操作可能會失敗。

2.1 mysql表不加primary主鍵

# 注意需要使用bigint, int類型會報錯
create table count_info (
name varchar(100),
cnt bigint ) ;

當上游數據不斷產生時,會將實時產生的新結果插入mysql

2.2 mysql表添加primary主鍵

create table count_info (
name varchar(100),
cnt bigint,
primary key(NAME)
) ;

當上游數據不斷產生時,會將實時產生的新結果更新至mysql

新生產一批數據后

3. 遇到的問題及解決辦法

3.1 sink table缺失主鍵。FlinkSQL的select distinct ..語句也需要sink table有主鍵。

Exception in thread "main" java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

解決辦法: mysql_sink增加PRIMARY KEY (name) NOT ENFORCED

        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               name STRING," +
                "               cnt BIGINT," +
                "               PRIMARY KEY (name) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
                " 'table-name' = 'count_info'," +
                " 'username' = 'kafka'," +
                " 'password' = 'Bonc@123'" +
                ")";

3.2 不能排序

Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.

解決辦法:去掉order by

        TableResult tableResult = tableEnv.executeSql(
                "INSERT INTO mysql_sink " +
                        "SELECT name, count(*) as cnt " +
                        "FROM sensor_source " +
                        "where id > 3 " +
                        "group by name "
                      //  "order by name "
        );

4. 不過癮

接下來對join關聯做些實踐


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM