A Table可以轉換成a DataStream或DataSet。通過這種方式,可以在Table API或SQL查詢的結果上運行自定義的DataStream或DataSet程序
將表轉換為DataStream
有兩種模式可以將 Table轉換為DataStream:
1:Append Mode
將一個表附加到流上
2:Retract Mode
將表轉換為流
語法格式:
// get TableEnvironment. // registration of a DataSet is equivalent // ge val tableEnv = TableEnvironment.getTableEnvironment(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table) // convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream[(String, Int)](table) // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
例子:
object TableTODataSet_DataStream { def main(args: Array[String]): Unit = { //構造數據,轉換為table val data = List( Peoject(1L, 1, "Hello"), Peoject(2L, 2, "Hello"), Peoject(3L, 3, "Hello"), Peoject(4L, 4, "Hello"), Peoject(5L, 5, "Hello"), Peoject(6L, 6, "Hello"), Peoject(7L, 7, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(20L, 20, "Hello World")) val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tEnv = TableEnvironment.getTableEnvironment(env) val stream = env.fromCollection(data) val table: Table = tEnv.fromDataStream(stream) //TODO 將table轉換為DataStream----將一個表附加到流上Append Mode val appendStream: DataStream[Peoject] = tEnv.toAppendStream[Peoject](table) //TODO 將表轉換為流Retract Mode true代表添加消息,false代表撤銷消息 val retractStream: DataStream[(Boolean, Peoject)] = tEnv.toRetractStream[Peoject](table) retractStream.print() env.execute() } } case class Peoject(user: Long, index: Int, content: String)
將表轉換為DataSet
語法格式:
// get TableEnvironment // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table) // convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
例子:
case class Peoject(user: Long, index: Int, content: String) object TableTODataSet{ def main(args: Array[String]): Unit = { //構造數據,轉換為table val data = List( Peoject(1L, 1, "Hello"), Peoject(2L, 2, "Hello"), Peoject(3L, 3, "Hello"), Peoject(4L, 4, "Hello"), Peoject(5L, 5, "Hello"), Peoject(6L, 6, "Hello"), Peoject(7L, 7, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(20L, 20, "Hello World")) //初始化環境,加載table數據 val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnvironment = TableEnvironment.getTableEnvironment(env) val collection: DataSet[Peoject] = env.fromCollection(data) val table: Table = tableEnvironment.fromDataSet(collection) //TODO 將table轉換為dataSet val toDataSet: DataSet[Peoject] = tableEnvironment.toDataSet[Peoject](table) toDataSet.print() // env.execute() } }