Flink--Table和DataStream和DataSet的集成


將DataStream或DataSet轉換為表格

在上面的例子講解中,直接使用的是:registerTableSource注冊表

對於flink來說,還有更靈活的方式:比如直接注冊DataStream或者DataSet轉換為一張表。

然后DataStream或者DataSet就相當於表,這樣可以繼續使用SQL來操作流或者批次的數據

語法:

// get TableEnvironment 
// registration of a DataSet is equivalent
Env:DataStream
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
object SQLToDataSetAndStreamSet {
  def main(args: Array[String]): Unit = {

    // set up execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    //構造數據
    val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 3),
      Order(1L, "diaper", 4),
      Order(3L, "rubber", 2)))
    val orderB: DataStream[Order] = env.fromCollection(Seq(
      Order(2L, "pen", 3),
      Order(2L, "rubber", 3),
      Order(4L, "beer", 1)))
    // 根據數據注冊表
    tEnv.registerDataStream("OrderA", orderA)
    tEnv.registerDataStream("OrderB", orderB)
    // union the two tables
    val result = tEnv.sqlQuery(
      "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
        "SELECT * FROM OrderB WHERE amount < 2")
    result.writeToSink(new CsvTableSink("ccc" , "," , 1 , FileSystem.WriteMode.OVERWRITE))
    env.execute()
  }
}
case class Order(user: Long, product: String, amount: Int)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM