相關文章鏈接
Flink之TableAPI和SQL(2):表和外部系統的連接方式
Flink之TableAPI和SQL(3):通過TableAPI和SQL表的一些操作(包括查詢,過濾,聚集等)
Flink之TableAPI和SQL(4):表的Sink實現
具體實現如下代碼所示:
// 1、創建執行環境 // 1.1、創建flink流的執行環境(表的環境需要基於此環境,在新的blink planner中,已經批流統一,所以直接使用流環境即可) val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ///1.2、設置並行度 env.setParallelism(2) ///1.3、創建表的環境(一般都是基於流,在flink中,批是特殊的流,用流環境也可以進行批處理) // 創建表環境時,也可以傳入配置對象,用於選擇老的planner還是blink planner,也可以選擇是基於批的方式 或者 流的方式 // (一般不用設置,直接基於env來創建表的環境即可) val oldStreamSetting: EnvironmentSettings = EnvironmentSettings.newInstance() .useOldPlanner() .inStreamingMode() .build() val blinkStreamSetting: EnvironmentSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) // 2、創建表 // 2.1、從外部系統(文件系統,比如文本的字符串數據)中讀取數據,在環境中注冊表 val filePath: String = "D:\\Project\\IDEA\\bigdata-study\\flink-demo\\src\\main\\resources\\source.txt" tableEnv .connect(new FileSystem().path(filePath)) // 連接文件系統 .withFormat(new Csv()) // 定義讀取數據之后的格式化方法,使用新的Csv(用逗號分隔字段) .withSchema(new Schema() // 定義表結構 .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) .createTemporaryTable("inputTable") // 注冊一張表(表名為傳入的字符串,可以在sql中直接使用該表名) // 2.2、連接到kafka,並在catalog中注冊表 tableEnv .connect(new Kafka() .version("0.11") .topic("flinkTestTopic") .property("bootstrap.servers", "cdh1:9092") .property("zookeeper.connect", "cdh1:2181") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) .createTemporaryTable("kafkaInputTable") // 2.3、連接到ES,並在catalog中注冊表 tableEnv .connect(new Elasticsearch() .version("7") .host("cdh1", 9200, "http") .index("myUsers") .documentType("user") ) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ) .createTemporaryTable("esInputTable") // 其他外部系統實現方式基本類似 // 可以參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connect.html // 3、打印表結構 和 表 val inputTable: Table = tableEnv.from("inputTable") inputTable.printSchema() inputTable.toAppendStream[Row].print() // 啟動執行器 env.execute("CreateTableDemo")