知識點
表的輸出,是通過將數據寫入 TableSink 來實現的。TableSink 是一個通用接口,可以 支持不同的文件格式、存儲數據庫和消息隊列。 具體實現,輸出表最直接的方法,就是通過 Table.insertInto() 方法將一個 Table 寫入 注冊過的 TableSink 中。同時表的輸出跟更新模式有關 更新模式(Update Mode) 對於流式查詢(Streaming Queries),需要聲明如何在(動態)表和外部連接器之間執行 轉換。與外部系統交換的消息類型,由更新模式(update mode)指定。 Flink Table API 中的更新模式有以下三種: 1)追加模式(Append Mode) 在追加模式下,表(動態表)和外部連接器只交換插入(Insert)消息。 2)撤回模式(Retract Mode) 在撤回模式下,表和外部連接器交換的是:添加(Add)和撤回(Retract)消息。 插入(Insert)會被編碼為添加消息; 刪除(Delete)則編碼為撤回消息; 更新(Update)則會編碼為,已更新行(上一行)的撤回消息,和更新行(新行) 的添加消息。 在此模式下,不能定義 key,這一點跟 upsert 模式完全不同。 3)Upsert(更新插入)模式 在 Upsert 模式下,動態表和外部連接器交換 Upsert 和 Delete 消息。 這個模式需要一個唯一的 key,通過這個 key 可以傳遞更新消息。為了正確應用消息外部連接器需要知道這個唯一 key 的屬性。 插入(Insert)和更新(Update)都被編碼為 Upsert 消息; 刪除(Delete)編碼為 Delete 信息。 這種模式和 Retract 模式的主要區別在於,Update 操作是用單個消息編碼的,所以效率 會更高。
1、文件代碼案例
package guigu.table.sink import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} /** * @program: demo * @description: ${description} * @author: yang * @create: 2021-01-14 18:48 */ object FileSink { def main(args: Array[String]): Unit = { //1、環境准備 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) //2、讀取數據,創建表視圖 val inputFile = "E:\\java\\demo\\src\\main\\resources\\file\\data5.csv" tableEnv.connect(new FileSystem().path(inputFile)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("temperature",DataTypes.DOUBLE()) .field("timestamp",DataTypes.BIGINT()) ) .createTemporaryTable("inputTable") //3、table api轉換 val tableApi: Table = tableEnv.from("inputTable") val apiResult: Table = tableApi.select("id,temperature").where("id = 'sensor_1'") val sqlResult: Table = tableEnv.sqlQuery("select id,temperature from inputTable where id = 'sensor_1'") //字符串模板 val sqlModelResult: Table = tableEnv.sqlQuery( """ |select id,temperature |from inputTable |where id = 'sensor_1' """.stripMargin) //4、創建輸出表視圖 val outputFile = "E:\\java\\demo\\src\\main\\resources\\file\\outputFile.csv" tableEnv.connect(new FileSystem().path(outputFile)) .withFormat(new Csv()) .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("temperature",DataTypes.DOUBLE()) ) .createTemporaryTable("outputTable") //5、執行 sqlModelResult.insertInto("outputTable") tableEnv.execute("Flink Sink Flie Test") } }
2、Es代碼案例
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.10.1</version> </dependency>
package table.tableSink import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.descriptors.{Csv, Elasticsearch, FileSystem, Json, Kafka, Schema} /** * @author yangwj * @date 2021/1/13 22:19 * @version 1.0 */ object EsSink { def main(args: Array[String]): Unit = { //1、環境准備 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) //2、讀取數據並轉為表視圖 val filePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val schema: Schema = new Schema().field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) streamTableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) .withSchema(schema) .createTemporaryTable("inputTable") //3、表的轉換 val inputTable: Table = streamTableEnv.from("inputTable") val resultTable: Table = inputTable.select("id,temperature").where("id = 'sensor_1'") val aggTable: Table = inputTable.groupBy('id).select('id, 'id.count as 'count) //4、注冊表輸出視圖,輸出到es streamTableEnv.connect( new Elasticsearch() .version("6") .host("localhost", 9200, "http") .index("sensor") .documentType("_doc")
.bulkFlushMaxActions(1) //一定要加呀,否則數據都在內存中,沒有輸出到es
) .inUpsertMode() .withFormat(new Json()) .withSchema(new Schema().field("id",DataTypes.STRING()) .field("temperature",DataTypes.DOUBLE())) .createTemporaryTable("outputEsTable") //5、執行 aggTable.insertInto("outputEsTable") env.execute() } }
3、Kafka代碼案例
package table.tableSink import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, Schema} /** * @author yangwj * @date 2021/1/13 22:19 * @version 1.0 */ object KafkaSink { def main(args: Array[String]): Unit = { //1、表的環境准備 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) //2、讀取數據並轉為表視圖 val filePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val outputPath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\output.txt" val schema: Schema = new Schema().field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) streamTableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) .withSchema(schema) .createTemporaryTable("inputTable") //3、表的基本轉換 val inputTable: Table = streamTableEnv.from("inputTable") val resultTable: Table = inputTable.select("id,temperature").where("id = 'sensor_1'") //4、注冊輸出表視圖,輸出至kafka streamTableEnv.connect( new Kafka() .version("0.11") .topic("sinkTest") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) .withFormat(new Csv()) .withSchema(new Schema().field("id",DataTypes.STRING()) .field("temperature",DataTypes.DOUBLE())) .createTemporaryTable("outputKafkaTable") //5、執行 resultTable.insertInto("outputKafkaTable") env.execute() } }
4、mysql代碼案例
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>1.10.1</version> </dependency>
package table.tableSink import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.descriptors._ /** * @author yangwj * @date 2021/1/13 22:19 * @version 1.0 */ object MysqlSink { def main(args: Array[String]): Unit = { //1、環境准備 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) //2、讀取數據並轉為表視圖 val filePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val schema: Schema = new Schema().field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) streamTableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) .withSchema(schema) .createTemporaryTable("inputTable") //3、表的轉換 val inputTable: Table = streamTableEnv.from("inputTable") val resultTable: Table = inputTable.select("id,temperature").where("id = 'sensor_1'") val aggTable: Table = inputTable.groupBy('id).select('id, 'id.count as 'cnt) //4、創建mysql DDL,並在環境中執行 with表示連接器 val sinkDDL: String = """ |create table jdbcOutputTable ( | id varchar(20) not null, | cnt bigint not null |) with ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://localhost:3306/test', | 'connector.table' = 'sensor_count', | 'connector.driver' = 'com.mysql.jdbc.Driver', | 'connector.username' = 'root', | 'connector.password' = '123456' |) """.stripMargin streamTableEnv.sqlUpdate(sinkDDL) //5、執行 aggTable.insertInto("jdbcOutputTable") env.execute() } }