FlinkSql指定時間語義


FlinkSql指定時間語義

FlinkSql在建表時指定時間語義,根據建表方式和時間語義的不同進行記錄

1.從DataStream流建表+process time語義

因為是process time所以不需要指定watermark的延遲生成時間,故可以直接在創建table對象時最后一列增加一個字段即可

  • 舉例
  public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.readTextFile("D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt");
        DataStream<SensorReading> mapDataStream = dataStreamSource.map(el -> {
            String[] split = el.split(",");
            return new SensorReading(split[0], Double.valueOf(split[2]), Long.valueOf(split[1]));
        });
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //pt就是我們要增加的process time字段 名字可以任意命名
        Table tableApi = tableEnv.fromDataStream(mapDataStream, "id,temperature as temp,timestamp,pt.proctime");
        tableApi.printSchema();
        tableEnv.toAppendStream(tableApi, Row.class).print("api");
        env.execute();
    }
  • 此時打印表的Schema可以看到表最后增加了一列
root
 |-- id: STRING
 |-- temp: DOUBLE
 |-- timestamp: BIGINT
 |-- pt: TIMESTAMP(3) *PROCTIME*

2.使用connect+format+schema建表+process time語義

  • 舉例
  public static void main(String[] args) throws Exception {
        //對於流式環境 StreamExecutionEnvironment 是必不可少的
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //快速創建時使用默認的planner 版本不同默認不同
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //2. 連接數據源並注冊成一張表
        String filePath = "D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt";
        tableEnv.connect(new FileSystem().path(filePath))
                //withFormat 是用來告訴flink我應該怎么處理來源用的每一條數據 比如按csv的格式,號分割
                .withFormat(new Csv())
                //withSchema 是聲明創建表的表結構 要按照解析得到的數據的先后順序對應
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("time", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                        //在最后直接追加一列即可
                        .field("pt", DataTypes.TIMESTAMP(3)
                                //現在的還不完善 低版本沒有這個方法       
                                //.processTime()
                        )
                )

                .createTemporaryTable("inputTable");
        Table inputTable = tableEnv.from("inputTable");
        inputTable.printSchema();
        tableEnv.toAppendStream(inputTable, Row.class);
        env.execute();

    }

3.使用DDL方式建表+process time語義

String sinkDDL =
"create table dataTable (" +
" id varchar(20) not null, " +
" ts bigint, " +
" temperature double, " +
" pt AS PROCTIME() " +
") with (" +
" 'connector.type' = 'filesystem', " +
" 'connector.path' = '/sensor.txt', " +
" 'format.type' = 'csv')";
tableEnv.sqlUpdate(sinkDDL);

4.從DataStream流建表+evnettime語義

事件時間語義 和watermark在生成table之前就定義了,建表時使用.rowtime

  • 舉例
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //指定時間語義
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> dataStreamSource = env.readTextFile("D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt");
        DataStream<SensorReading> mapDataStream = dataStreamSource.map(el -> {
            String[] split = el.split(",");
            return new SensorReading(split[0], Double.valueOf(split[2]), Long.valueOf(split[1]));
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(3)) {
            @Override
            public long extractTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000L;
            }
        });
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //event time  使用 rt.rowtime 聲明 這個時候rt已經不是原來的timestamp的LONG類型的時間戳了 而是TIMESTAMP(3)
       `Table tableApi = tableEnv.fromDataStream(mapDataStream, "id,temperature as tp,rt.rowtime,timestamp as ts");`
        tableApi.printSchema();
        tableEnv.toAppendStream(tableApi, Row.class).print("api");
        env.execute();
    }
  • scheam 如下:
root
 |-- id: STRING
 |-- tp: DOUBLE
 |-- rt: TIMESTAMP(3) *ROWTIME*
 |-- timestamp: BIGINT
  • 數據如下:
api> sensor_1,37.9,2021-01-31 11:35:07.0,1612092907
api> sensor_2,50.1,2021-01-31 11:34:15.0,1612092855
api> sensor_3,23.7,2021-01-31 11:34:58.0,1612092898
api> sensor_4,15.3,2021-01-31 11:35:17.0,1612092917

5.使用connect+format+schema建表+eventtime語義

  • 舉例
  public static void main(String[] args) throws Exception {
        //對於流式環境 StreamExecutionEnvironment 是必不可少的
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //快速創建時使用默認的planner 版本不同默認不同
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //2. 連接數據源並注冊成一張表
        String filePath = "D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt";
        tableEnv.connect(new FileSystem().path(filePath))
                //withFormat 是用來告訴flink我應該怎么處理來源用的每一條數據 比如按csv的格式,號分割
                .withFormat(new Csv())
                //withSchema 是聲明創建表的表結構 要按照解析得到的數據的先后順序對應
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("ts", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                        //還是增加一列是event time的列 但是需要聲明watermark的提取和生成方式
                        .rowtime(new Rowtime()
                                .timestampsFromField("ts") // 從字段中提取時間戳
                                .watermarksPeriodicBounded(1000) // watermark延遲1秒
                        )
                )
                .createTemporaryTable("inputTable");
        Table inputTable = tableEnv.from("inputTable");
        inputTable.printSchema();
        tableEnv.toAppendStream(inputTable, Row.class);
        env.execute();
    }

6.使用DDL方式建表+event time語義

  • 舉例
String sinkDDL=
"create table dataTable (" +
" id varchar(20) not null, " +
" ts bigint, " +
" temperature double, " +
" rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " +
" watermark for rt as rt - interval '1' second" +
") with (" +
" 'connector.type' = 'filesystem', " +
" 'connector.path' = '/sensor.txt', " +
" 'format.type' = 'csv')";
tableEnv.sqlUpdate(sinkDDL);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM