FlinkSql的三種建表方式


FLinkSql三種建表的方式

1.基於DataSteam流來創建

注:此時先創建的一個Table對象,如果使用Table APi操作的話就可以直接操作了,如果要使用Sql的方式則需要先注冊成一個view然后再操作

  • 關鍵語句
Table tableApi = tableEnv.fromDataStream(mapDataStream);
  • 舉例
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.readTextFile("D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt");
DataStream<SensorReading> mapDataStream = dataStreamSource.map(el -> {
String[] split = el.split(",");
return new SensorReading(split[0], Double.valueOf(split[2]), Long.valueOf(split[1]));
});
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table tableApi = tableEnv.fromDataStream(mapDataStream);
//tableApi 的方式查詢過濾只有sensor_2的溫度數據
Table resultTableApi = tableApi.select("id,temperature")
.where("id='sensor_2'");
//SQL的方式 需要先將dataStream注冊成一張表
tableEnv.createTemporaryView("sensor_table", mapDataStream);
Table resultTableSql = tableEnv.sqlQuery("select id,temperature from sensor_table where id='sensor_2'");
//打印
tableEnv.toAppendStream(resultTableApi, Row.class).print("api");
tableEnv.toAppendStream(resultTableSql, Row.class).print("sql");
env.execute();
}

2. 基於connect+withFormat+withSchema方式

注:此時是先注冊成一個view,如果使用SQL操作的話就可以直接操作了,如果要使用Api的方式則需要使用from語句獲得Table對象

  • 關鍵語句
tableEnv.connect(new FileSystem().path(filePath))
//withFormat 是用來告訴flink我應該怎么處理來源用的每一條數據 比如按csv的格式,號分割
.withFormat(new Csv())
//withSchema 是聲明創建表的表結構 要按照解析得到的數據的先后順序對應
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("time", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("inputTable");
  • 舉例
String filePath = "D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt";
tableEnv.connect(new FileSystem().path(filePath))
//withFormat 是用來告訴flink我應該怎么處理來源用的每一條數據 比如按csv的格式,號分割
.withFormat(new Csv())
//withSchema 是聲明創建表的表結構 要按照解析得到的數據的先后順序對應
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("time", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE()))
.createTemporaryTable("inputTable");
//2.1 使用table api 完成查詢過濾和聚合操作
Table inputTable = tableEnv.from("inputTable");
Table tableApiSelect = inputTable.select("id,temp")
.filter("id='sensor_2'");
Table tableApiAgg = inputTable.groupBy("id")
.select("id,id.count as cnt,temp.avg as avgTemp");
//2.2 使用table sql 實現上面的兩個查詢
Table tableSqlSelect = tableEnv.sqlQuery("select id,temp from inputTable where id='sensor_2'");
Table tableSqlAgg = tableEnv.sqlQuery("select id,count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");

3.基於DDL建表語句,就是Create Table方式

注:此時和方式2一樣,是先注冊成一個view,如果使用SQL操作的話就可以直接操作了,如果要使用Api的方式則需要使用from語句獲得Table對象

  • 關鍵
tableEnv.sqlUpdate(sinkDDL)
  • 舉例
String sinkDDL=
"create table jdbcOutputTable (" +
" id varchar(20) not null, " +
" cnt bigint not null " +
") with (" +
" 'connector.type' = 'jdbc', " +
" 'connector.url' = 'jdbc:mysql://localhost:3306/test', " +
" 'connector.table' = 'sensor_count', " +
" 'connector.driver' = 'com.mysql.jdbc.Driver', " +
" 'connector.username' = 'root', " +
" 'connector.password' = '123456' )";
tableEnv.sqlUpdate(sinkDDL)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM