官網示例:
-- use the existing TIMESTAMP(3) field in schema as the rowtime attribute CREATE TABLE MyTable ( ts_field TIMESTAMP(3), WATERMARK FOR ts_field AS ... ) WITH ( ... ) -- use system functions or UDFs or expressions to extract the expected TIMESTAMP(3) rowtime field CREATE TABLE MyTable ( log_ts STRING, ts_field AS TO_TIMESTAMP(log_ts), WATERMARK FOR ts_field AS ... ) WITH ( ... )
使用內置函數進行轉換
TO_TIMESTAMP(log_ts) :此處的log_ts格式為:'yyyy-MM-dd HH:mm:ss' ,如果是秒級時間戳bigint格式則需要 t as TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss')) 進行轉換為 TIMESTAMP(3) 類型
參考:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/
Flink 1.10.0 SQL DDL中如何定義watermark和計算列