Flink之TableAPI和SQL(2):表和外部系統的連接方式


相關文章鏈接

Flink之TableAPI和SQL(1):基本功能描述

Flink之TableAPI和SQL(2):表和外部系統的連接方式

Flink之TableAPI和SQL(3):通過TableAPI和SQL表的一些操作(包括查詢,過濾,聚集等)

Flink之TableAPI和SQL(4):表的Sink實現

Flink之TableAPI和SQL(5):表的時間特性

具體實現如下代碼所示:

// 1、創建執行環境
// 1.1、創建flink流的執行環境(表的環境需要基於此環境,在新的blink planner中,已經批流統一,所以直接使用流環境即可)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
///1.2、設置並行度
env.setParallelism(2)
///1.3、創建表的環境(一般都是基於流,在flink中,批是特殊的流,用流環境也可以進行批處理)
// 創建表環境時,也可以傳入配置對象,用於選擇老的planner還是blink planner,也可以選擇是基於批的方式 或者 流的方式
// (一般不用設置,直接基於env來創建表的環境即可)
val oldStreamSetting: EnvironmentSettings = EnvironmentSettings.newInstance()
    .useOldPlanner()
    .inStreamingMode()
    .build()
val blinkStreamSetting: EnvironmentSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 2、創建表
// 2.1、從外部系統(文件系統,比如文本的字符串數據)中讀取數據,在環境中注冊表
val filePath: String = "D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt"
tableEnv
    .connect(new FileSystem().path(filePath))               // 連接文件系統
    .withFormat(new Csv())                                  // 定義讀取數據之后的格式化方法,使用新的Csv(用逗號分隔字段)
    .withSchema(new Schema()                                // 定義表結構
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
    )
    .createTemporaryTable("inputTable")   // 注冊一張表(表名為傳入的字符串,可以在sql中直接使用該表名)
// 2.2、連接到kafka,並在catalog中注冊表
tableEnv
    .connect(new Kafka()
        .version("0.11")
        .topic("flinkTestTopic")
        .property("bootstrap.servers", "cdh1:9092")
        .property("zookeeper.connect", "cdh1:2181")
    )
    .withFormat(new Csv())
    .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
    )
    .createTemporaryTable("kafkaInputTable")
// 2.3、連接到ES,並在catalog中注冊表
tableEnv
    .connect(new Elasticsearch()
        .version("7")
        .host("cdh1", 9200, "http")
        .index("myUsers")
        .documentType("user")
    )
    .withFormat(new Csv())
    .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
    )
    .createTemporaryTable("esInputTable")
// 其他外部系統實現方式基本類似
// 可以參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connect.html

// 3、打印表結構 和 表
val inputTable: Table = tableEnv.from("inputTable")
inputTable.printSchema()
inputTable.toAppendStream[Row].print()

// 啟動執行器
env.execute("CreateTableDemo")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM