知識點
FlinkTable步驟: // 1、創建表的執行環境 val tableEnv = ... // 2、創建一張表,用於讀取數據 tableEnv.connect(...).createTemporaryTable("inputTable") // 3、1通過 Table API 查詢算子,得到一張結果表 val result = tableEnv.from("inputTable").select(...) // 3、2通過 SQL 查詢語句,得到一張結果表 val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...") // 4、注冊一張表,用於把計算結果輸出 tableEnv.connect(...).createTemporaryTable("outputTable") // 5、將結果表寫入輸出表中 result.insertInto("outputTable")
1、CSV文件依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.10.1</version> </dependency>
<!-- old planner flink table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<!--new planner-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.1</version>
</dependency>
2、代碼案例
package table import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table, TableEnvironment} import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, OldCsv, Schema} import org.apache.flink.table.api.scala._ import org.apache.flink.streaming.api.scala._ /** * @author yangwj * @date 2021/1/12 21:53 * @version 1.0 */ object TableApiTest { def main(args: Array[String]): Unit = { //1、創建表執行環境、就得使用流式環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) /** //1、1老版本planner的流處理 val setttings = EnvironmentSettings.newInstance() .useOldPlanner() .inStreamingMode() .build() val oldStreamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, setttings) //1.2老版本的批處理 val batchEnv = ExecutionEnvironment.getExecutionEnvironment val oldBatchTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(batchEnv) //1.1新版本,基於blink planner的流處理 val blinkStreamSettings: EnvironmentSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val blinkStreamTableEnv = StreamTableEnvironment.create(env,blinkStreamSettings) //1.2新版本,基於blink planner的批處理 val blinkBatchSettings: EnvironmentSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build() val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings) **/ //2、連接外部系統,讀取數據,注冊表 //2.1讀取文件 val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" tableEnv.connect(new FileSystem().path(inputFile)) // new OldCsv()是一個非標的格式描述 .withFormat(new Csv()) .withSchema(new Schema().field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ) .createTemporaryTable("inputTable") val inputTable: Table = tableEnv.from("inputTable") inputTable.toAppendStream[(String,Long,Double)].print("result") //2.2讀取kafka數據 tableEnv.connect(new Kafka() .version("0.11") .topic("Demo") .property("zookeeper.connect","localhost:2181") .property("bootstrap.servers","localhost:9092") ) .withFormat(new Csv()) .withSchema(new Schema().field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) ).createTemporaryTable("kafkaTable") val kafkaTable: Table = tableEnv.from("kafkaTable") kafkaTable.toAppendStream[(String,Long,Double)].print("kafkaResult") //3、查詢轉換 //3.1 使用table api val sensorTable: Table = tableEnv.from("inputTable") val apiResult: Table = sensorTable.select('id, 'temperature) .filter('id === "sensor_1") //3.2sql實現 val sqlResult: Table = tableEnv.sqlQuery( """ |select id ,temperature |from inputTable |where id = 'sensor_1' """.stripMargin) apiResult.toAppendStream[(String, Double)].print("apiResult") sqlResult.toAppendStream[(String, Double)].print("sqlResult") env.execute("table api test") } }