FlinkSQL实践记录2


1. 背景

昨天《FlinkSQL实践记录1》对FlinkSql做了简单的使用insert into .. select ..,今天对聚合运算做一些实践。

2. 代码实践

        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               name STRING," +
                "               cnt BIGINT," +
                "               PRIMARY KEY (name) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
                " 'table-name' = 'count_info'," +
                " 'username' = 'kafka'," +
                " 'password' = 'Bonc@123'" +
                ")";

        tableEnv.executeSql(mysql_sql);

        // 插入数据
        TableResult tableResult = tableEnv.executeSql(
                "INSERT INTO mysql_sink " +
                        "SELECT name, count(*) as cnt " +
                        "FROM sensor_source " +
                        "where id > 3 " +
                        "group by name "
                       // "order by name "
        );
        System.out.println(tableResult.getJobClient().get().getJobStatus());

摘自官网

Flink uses the primary key that defined in DDL when writing data to external databases. The connector operate in upsert mode if the primary key was defined, otherwise, the connector operate in append mode.
Flink 在将数据写入外部数据库时使用 DDL 中定义的主键。 如果定义了主键,则连接器以 upsert 模式运行,否则,连接器以附加模式运行。

In upsert mode, Flink will insert a new row or update the existing row according to the primary key, Flink can ensure the idempotence in this way. To guarantee the output result is as expected, it’s recommended to define primary key for the table and make sure the primary key is one of the unique key sets or primary key of the underlying database table. In append mode, Flink will interpret all records as INSERT messages, the INSERT operation may fail if a primary key or unique constraint violation happens in the underlying database.
在 upsert 模式下,Flink 会根据主键插入新行或更新现有行,Flink 可以通过这种方式保证幂等性。 为保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。 在 append 模式下,Flink 会将所有记录解释为 INSERT 消息,如果底层数据库发生主键或唯一约束违规,INSERT 操作可能会失败。

2.1 mysql表不加primary主键

# 注意需要使用bigint, int类型会报错
create table count_info (
name varchar(100),
cnt bigint ) ;

当上游数据不断产生时,会将实时产生的新结果插入mysql

2.2 mysql表添加primary主键

create table count_info (
name varchar(100),
cnt bigint,
primary key(NAME)
) ;

当上游数据不断产生时,会将实时产生的新结果更新至mysql

新生产一批数据后

3. 遇到的问题及解决办法

3.1 sink table缺失主键。FlinkSQL的select distinct ..语句也需要sink table有主键。

Exception in thread "main" java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

解决办法: mysql_sink增加PRIMARY KEY (name) NOT ENFORCED

        String mysql_sql = "CREATE TABLE mysql_sink (" +
                "               name STRING," +
                "               cnt BIGINT," +
                "               PRIMARY KEY (name) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://101.71.102.255:8081/kafka?serverTimezone=UTC'," +
                " 'table-name' = 'count_info'," +
                " 'username' = 'kafka'," +
                " 'password' = 'Bonc@123'" +
                ")";

3.2 不能排序

Exception in thread "main" org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.

解决办法:去掉order by

        TableResult tableResult = tableEnv.executeSql(
                "INSERT INTO mysql_sink " +
                        "SELECT name, count(*) as cnt " +
                        "FROM sensor_source " +
                        "where id > 3 " +
                        "group by name "
                      //  "order by name "
        );

4. 不过瘾

接下来对join关联做些实践


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM