相關文章鏈接
Flink之TableAPI和SQL(2):表和外部系統的連接方式
Flink之TableAPI和SQL(3):通過TableAPI和SQL表的一些操作(包括查詢,過濾,聚集等)
Flink之TableAPI和SQL(4):表的Sink實現
flink中表的輸出Sink可以分為3種:
1、追加模式(Append Mode):
在追加模式下,表(動態表)和外部連接器只交換插入(Insert)消息。
2、撤回模式(Retract Mode):
在撤回模式下,表和外部連接器交換的是:添加(Add)和撤回(Retract)消息。
3、Upsert(更新插入)模式:
在Upsert模式下,動態表和外部連接器交換Upsert和Delete消息。
具體實現如下代碼所示:
// 創建執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) // 獲取輸入流,並轉換為表 val sensorStream: DataStream[SensorReading] = env .readTextFile("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt") .map(new MyMapToSensorReading) val sensorTable: Table = tableEnv.fromDataStream(sensorStream) // 1、輸出到文件 tableEnv .connect(new FileSystem().path("D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\out.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) .createTemporaryTable("outputTable") sensorTable.insertInto("outputTable") // 2、輸出到Kafka tableEnv .connect(new Kafka() .version("0.11") .topic("flinkTestTopic") .property("zookeeper.connect", "cdh1:2181") .property("bootstrap.servers", "cdh1:9092") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaOutputTable") sensorTable.insertInto("kafkaOutputTable") // 3、輸出到es tableEnv .connect(new Elasticsearch() .version("7") .host("cdh1", 9200, "http") .index("sensor") .documentType("temp") ) .inUpsertMode() // 指定為Upsert模式(es支持更新模式) .withFormat(new Json()) // es只支持json,flink下的json格式 .withSchema( new Schema() .field("id", DataTypes.STRING()) .field("count", DataTypes.BIGINT()) ) .createTemporaryTable("esOutputTable") sensorTable.insertInto("esOutputTable") // 輸出到 Mysql val sinkDDL: String = """ |create table jdbcOutputTable ( | id varchar(20) not null, | cnt bigint not null |) with ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://localhost:3306/test', | 'connector.table' = 'sensor_count', | 'connector.driver' = 'com.mysql.jdbc.Driver', | 'connector.username' = 'root', | 'connector.password' = '123456' |""".stripMargin tableEnv.sqlUpdate(sinkDDL) sensorTable.insertInto("jdbcOutputTable") // 啟動執行器,執行任務 env.execute("OutputTableDemo")