相關文章鏈接
Flink之TableAPI和SQL(2):表和外部系統的連接方式
Flink之TableAPI和SQL(3):通過TableAPI和SQL表的一些操作(包括查詢,過濾,聚集等)
Flink之TableAPI和SQL(4):表的Sink實現
具體實現如下代碼所示:
// 1、創建執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) // 2、從流中獲取表 val sensorStream: DataStream[SensorReading] = env .readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt") .map(new MyMapToSensorReading) val sensorTable: Table = tableEnv.fromDataStream(sensorStream) // 將流轉換成表時可以指定獲取字段,或給字段修改別名 val sensorTable_tmp: Table = tableEnv.fromDataStream(sensorStream, 'id, 'temperature as 'temp) // 使用流創建臨時視圖時,可以指定獲取字段,或給字段修改別名 tableEnv.createTemporaryView("sensorTable_tmp", sensorStream, 'id, 'temperature as 'temp) // 3、使用TableAPI對表進行操作 // 查詢過濾操作 val resultApiTable_1: Table = sensorTable .select('id, 'temperature) .filter('id === "sensor_1") // 分組聚合操作 val resultApiTable_2: Table = sensorTable .groupBy('id) .select('id, 'id.count as 'cnt) // 4、使用sql對表進行操作(需先使用臨時視圖將表注冊到catalog中) tableEnv.createTemporaryView("sensorTable", sensorTable) // 查詢過濾操作 val resultSqlTable_1: Table = tableEnv.sqlQuery( """ |select id, temperature |from sensorTable |where id = 'sensor_1' |""".stripMargin) // 分組聚合操作 val resultSqlTable_2: Table = tableEnv.sqlQuery( """ |select id, count(id) as cnt |from sensorTable |group by id |""".stripMargin) // 5、打印表 // resultSqlTable_1.printSchema() // resultSqlTable_1.toAppendStream[Row].print() resultSqlTable_2.printSchema() resultSqlTable_2.toRetractStream[Row].print() // 啟動執行器,執行任務 env.execute("OperationTableDemo")
