Flink之Table初探


知识点

Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么 几步:首先创建执行环境,然后定义 source、transform 和 sink。

1、依赖:Table API 和 SQL 需要引入的依赖

 <!-- old planner flink table-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_2.12</artifactId>
      <version>1.10.1</version>
    </dependency>
    <!--new planner-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.12</artifactId>
      <version>1.10.1</version>
    </dependency>

2、代码案例

package table

import com.yangwj.api.SensorReading
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
/**
 * @author yangwj
 * @date 2021/1/12 21:17
 * @version 1.0
 */
object TableExample {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //1、基于流创建表
    val table: Table = tableEnv.fromDataStream(dataStream)

    //2、调用table api进行转换
    val result: Table = table.select("id,temperature").filter("id == 'sensor_1'")
    result.toAppendStream[(String, Double)].print("result")

    //2、sql实现
    tableEnv.createTemporaryView("tabel",table)
    val sql = "select id, temperature from tabel where id = 'sensor_1'"

    val sqlResult: Table = tableEnv.sqlQuery(sql)
    sqlResult.toAppendStream[(String, Double)].print("sqlResult")
    env.execute("table api")
  }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM