Hudi-Flink SQL實時讀取Hudi表數據


代碼如下(hudi表實時寫入參考上一篇[Hudi-Flink消費kafka將增量數據實時寫入Hudi])

package com.zhen.hudi;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * @Author FengZhen
 * @Date 3/10/22 8:33 PM
 * @Description 基於Flink SQL Connector實現:從hudi表中加載數據,編寫SQL查詢
 */
public class FlinkSQLReadDemo {

    public static void main(String[] args) {

        //1.獲取表的執行環境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        //2.創建輸入表,TODO:加載hudi表數據
        tableEnv.executeSql(
                "CREATE TABLE order_hudi(\n" +
                        "    `orderId` STRING PRIMARY KEY NOT ENFORCED,\n" +
                        "    `userId` STRING,\n" +
                        "    `orderTime` STRING,\n" +
                        "    `ip` STRING,\n" +
                        "    `orderMoney` DOUBLE,\n" +
                        "    `orderStatus` INT,\n" +
                        "    `ts` STRING,\n" +
                        "    `partition_day` STRING\n" +
                        ")\n" +
                        "PARTITIONED BY (partition_day)\n" +
                        "WITH(\n" +
                        "    'connector' = 'hudi',\n" +
                        "    'path'='hdfs://localhost:9000/hudi-warehouse/flink_hudi_order',\n" +
                        "    'table.type' = 'MERGE_ON_READ',\n" +
                        "    'read.streaming.enabled' = 'true',\n" +
                        "    'read.streaming.check-interval' = '4'\n" +
                        ")"
        );

        //3.執行查詢語句,流式讀取hudi表數據
        tableEnv.executeSql(
                "SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM order_hudi"
        ).print();


    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM