FlinkSql指定時間語義
FlinkSql在建表時指定時間語義,根據建表方式和時間語義的不同進行記錄
1.從DataStream流建表+process time語義
因為是process time所以不需要指定watermark的延遲生成時間,故可以直接在創建table對象時最后一列增加一個字段即可
- 舉例
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.readTextFile("D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt");
DataStream<SensorReading> mapDataStream = dataStreamSource.map(el -> {
String[] split = el.split(",");
return new SensorReading(split[0], Double.valueOf(split[2]), Long.valueOf(split[1]));
});
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//pt就是我們要增加的process time字段 名字可以任意命名
Table tableApi = tableEnv.fromDataStream(mapDataStream, "id,temperature as temp,timestamp,pt.proctime");
tableApi.printSchema();
tableEnv.toAppendStream(tableApi, Row.class).print("api");
env.execute();
}
- 此時打印表的
Schema可以看到表最后增加了一列
root
|-- id: STRING
|-- temp: DOUBLE
|-- timestamp: BIGINT
|-- pt: TIMESTAMP(3) *PROCTIME*
2.使用connect+format+schema建表+process time語義
- 舉例
public static void main(String[] args) throws Exception {
//對於流式環境 StreamExecutionEnvironment 是必不可少的
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//快速創建時使用默認的planner 版本不同默認不同
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2. 連接數據源並注冊成一張表
String filePath = "D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt";
tableEnv.connect(new FileSystem().path(filePath))
//withFormat 是用來告訴flink我應該怎么處理來源用的每一條數據 比如按csv的格式,號分割
.withFormat(new Csv())
//withSchema 是聲明創建表的表結構 要按照解析得到的數據的先后順序對應
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("time", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
//在最后直接追加一列即可
.field("pt", DataTypes.TIMESTAMP(3)
//現在的還不完善 低版本沒有這個方法
//.processTime()
)
)
.createTemporaryTable("inputTable");
Table inputTable = tableEnv.from("inputTable");
inputTable.printSchema();
tableEnv.toAppendStream(inputTable, Row.class);
env.execute();
}
3.使用DDL方式建表+process time語義
String sinkDDL =
"create table dataTable (" +
" id varchar(20) not null, " +
" ts bigint, " +
" temperature double, " +
" pt AS PROCTIME() " +
") with (" +
" 'connector.type' = 'filesystem', " +
" 'connector.path' = '/sensor.txt', " +
" 'format.type' = 'csv')";
tableEnv.sqlUpdate(sinkDDL);
4.從DataStream流建表+evnettime語義
事件時間語義 和watermark在生成table之前就定義了,建表時使用.rowtime
- 舉例
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//指定時間語義
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> dataStreamSource = env.readTextFile("D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt");
DataStream<SensorReading> mapDataStream = dataStreamSource.map(el -> {
String[] split = el.split(",");
return new SensorReading(split[0], Double.valueOf(split[2]), Long.valueOf(split[1]));
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(3)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//event time 使用 rt.rowtime 聲明 這個時候rt已經不是原來的timestamp的LONG類型的時間戳了 而是TIMESTAMP(3)
`Table tableApi = tableEnv.fromDataStream(mapDataStream, "id,temperature as tp,rt.rowtime,timestamp as ts");`
tableApi.printSchema();
tableEnv.toAppendStream(tableApi, Row.class).print("api");
env.execute();
}
scheam如下:
root
|-- id: STRING
|-- tp: DOUBLE
|-- rt: TIMESTAMP(3) *ROWTIME*
|-- timestamp: BIGINT
- 數據如下:
api> sensor_1,37.9,2021-01-31 11:35:07.0,1612092907
api> sensor_2,50.1,2021-01-31 11:34:15.0,1612092855
api> sensor_3,23.7,2021-01-31 11:34:58.0,1612092898
api> sensor_4,15.3,2021-01-31 11:35:17.0,1612092917
5.使用connect+format+schema建表+eventtime語義
- 舉例
public static void main(String[] args) throws Exception {
//對於流式環境 StreamExecutionEnvironment 是必不可少的
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//快速創建時使用默認的planner 版本不同默認不同
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2. 連接數據源並注冊成一張表
String filePath = "D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt";
tableEnv.connect(new FileSystem().path(filePath))
//withFormat 是用來告訴flink我應該怎么處理來源用的每一條數據 比如按csv的格式,號分割
.withFormat(new Csv())
//withSchema 是聲明創建表的表結構 要按照解析得到的數據的先后順序對應
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("ts", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
//還是增加一列是event time的列 但是需要聲明watermark的提取和生成方式
.rowtime(new Rowtime()
.timestampsFromField("ts") // 從字段中提取時間戳
.watermarksPeriodicBounded(1000) // watermark延遲1秒
)
)
.createTemporaryTable("inputTable");
Table inputTable = tableEnv.from("inputTable");
inputTable.printSchema();
tableEnv.toAppendStream(inputTable, Row.class);
env.execute();
}
6.使用DDL方式建表+event time語義
- 舉例
String sinkDDL=
"create table dataTable (" +
" id varchar(20) not null, " +
" ts bigint, " +
" temperature double, " +
" rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " +
" watermark for rt as rt - interval '1' second" +
") with (" +
" 'connector.type' = 'filesystem', " +
" 'connector.path' = '/sensor.txt', " +
" 'format.type' = 'csv')";
tableEnv.sqlUpdate(sinkDDL);
