Flink之TableAPI和SQL(3):通過TableAPI和SQL表的一些操作(包括查詢,過濾,聚集等)


相關文章鏈接

Flink之TableAPI和SQL(1):基本功能描述

Flink之TableAPI和SQL(2):表和外部系統的連接方式

Flink之TableAPI和SQL(3):通過TableAPI和SQL表的一些操作(包括查詢,過濾,聚集等)

Flink之TableAPI和SQL(4):表的Sink實現

Flink之TableAPI和SQL(5):表的時間特性

具體實現如下代碼所示:

// 1、創建執行環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 2、從流中獲取表
val sensorStream: DataStream[SensorReading] = env
    .readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt")
    .map(new MyMapToSensorReading)
val sensorTable: Table = tableEnv.fromDataStream(sensorStream)
// 將流轉換成表時可以指定獲取字段,或給字段修改別名
val sensorTable_tmp: Table = tableEnv.fromDataStream(sensorStream, 'id, 'temperature as 'temp)
// 使用流創建臨時視圖時,可以指定獲取字段,或給字段修改別名
tableEnv.createTemporaryView("sensorTable_tmp", sensorStream, 'id, 'temperature as 'temp)

// 3、使用TableAPI對表進行操作
// 查詢過濾操作
val resultApiTable_1: Table = sensorTable
    .select('id, 'temperature)
    .filter('id === "sensor_1")
// 分組聚合操作
val resultApiTable_2: Table = sensorTable
    .groupBy('id)
    .select('id, 'id.count as 'cnt)

// 4、使用sql對表進行操作(需先使用臨時視圖將表注冊到catalog中)
tableEnv.createTemporaryView("sensorTable", sensorTable)
// 查詢過濾操作
val resultSqlTable_1: Table = tableEnv.sqlQuery(
    """
      |select id, temperature
      |from sensorTable
      |where id = 'sensor_1'
      |""".stripMargin)
// 分組聚合操作
val resultSqlTable_2: Table = tableEnv.sqlQuery(
    """
      |select id, count(id) as cnt
      |from sensorTable
      |group by id
      |""".stripMargin)

// 5、打印表
//        resultSqlTable_1.printSchema()
//        resultSqlTable_1.toAppendStream[Row].print()
resultSqlTable_2.printSchema()
resultSqlTable_2.toRetractStream[Row].print()

// 啟動執行器,執行任務
env.execute("OperationTableDemo")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM