Flink Table環境配置、讀取外部數據(File、Kafka)以及查詢轉換


知識點

FlinkTable步驟:
  // 1、創建表的執行環境 
  val tableEnv = ... 
  // 2、創建一張表,用於讀取數據 
  tableEnv.connect(...).createTemporaryTable("inputTable") 
  // 3、1通過 Table API 查詢算子,得到一張結果表 
  val result = tableEnv.from("inputTable").select(...) 
  // 3、2通過 SQL 查詢語句,得到一張結果表 
  val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...") 
  // 4、注冊一張表,用於把計算結果輸出 
  tableEnv.connect(...).createTemporaryTable("outputTable") 
  // 5、將結果表寫入輸出表中 
  result.insertInto("outputTable")

1、CSV文件依賴

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-csv</artifactId>
      <version>1.10.1</version>
    </dependency>

    <!-- old planner flink table-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.10.1</version>

    </dependency>

    <!--new planner-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.12</artifactId>
      <version>1.10.1</version>
    </dependency>

2、代碼案例

package table

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table, TableEnvironment}
import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, OldCsv, Schema}
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala._
/**
 * @author yangwj
 * @date 2021/1/12 21:53
 * @version 1.0
 */
object TableApiTest {
  def main(args: Array[String]): Unit = {
    //1、創建表執行環境、就得使用流式環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) /**
    //1、1老版本planner的流處理
    val setttings = EnvironmentSettings.newInstance()
      .useOldPlanner()
      .inStreamingMode()
      .build()
    val oldStreamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, setttings)
    //1.2老版本的批處理
    val batchEnv = ExecutionEnvironment.getExecutionEnvironment
    val oldBatchTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(batchEnv)

    //1.1新版本,基於blink planner的流處理
    val blinkStreamSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val blinkStreamTableEnv = StreamTableEnvironment.create(env,blinkStreamSettings)

    //1.2新版本,基於blink planner的批處理
    val blinkBatchSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inBatchMode()
      .build()
    val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings)

  **/
    //2、連接外部系統,讀取數據,注冊表
    //2.1讀取文件
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    tableEnv.connect(new FileSystem().path(inputFile))
      // new OldCsv()是一個非標的格式描述
      .withFormat(new Csv())
      .withSchema(new Schema().field("id",DataTypes.STRING())
      .field("timestamp",DataTypes.BIGINT())
      .field("temperature",DataTypes.DOUBLE())
      )
      .createTemporaryTable("inputTable")

    val inputTable: Table = tableEnv.from("inputTable")
    inputTable.toAppendStream[(String,Long,Double)].print("result")


    //2.2讀取kafka數據
    tableEnv.connect(new Kafka()
    .version("0.11")
    .topic("Demo")
    .property("zookeeper.connect","localhost:2181")
    .property("bootstrap.servers","localhost:9092")
    )
        .withFormat(new Csv())
        .withSchema(new Schema().field("id",DataTypes.STRING())
          .field("timestamp",DataTypes.BIGINT())
          .field("temperature",DataTypes.DOUBLE())
        ).createTemporaryTable("kafkaTable")

    val kafkaTable: Table = tableEnv.from("kafkaTable")
    kafkaTable.toAppendStream[(String,Long,Double)].print("kafkaResult")


    //3、查詢轉換
    //3.1 使用table api
    val sensorTable: Table = tableEnv.from("inputTable")
    val apiResult: Table = sensorTable.select('id, 'temperature)
      .filter('id === "sensor_1")

    //3.2sql實現
    val sqlResult: Table = tableEnv.sqlQuery(
      """
        |select id ,temperature
        |from inputTable
        |where id = 'sensor_1'
        """.stripMargin)

    apiResult.toAppendStream[(String, Double)].print("apiResult")
    sqlResult.toAppendStream[(String, Double)].print("sqlResult")
    env.execute("table api test")
  }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM