Flink SQL CDC中如何定義watermark和計算列


官網示例:

-- use the existing TIMESTAMP(3) field in schema as the rowtime attribute
CREATE TABLE MyTable (
  ts_field TIMESTAMP(3),
  WATERMARK FOR ts_field AS ...
) WITH (
  ...
)

-- use system functions or UDFs or expressions to extract the expected TIMESTAMP(3) rowtime field
CREATE TABLE MyTable (
  log_ts STRING,
  ts_field AS TO_TIMESTAMP(log_ts),
  WATERMARK FOR ts_field AS ...
) WITH (
  ...
)

  

使用內置函數進行轉換

TO_TIMESTAMP(log_ts) :此處的log_ts格式為:'yyyy-MM-dd HH:mm:ss' ,如果是秒級時間戳bigint格式則需要  t as TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss')) 進行轉換為 TIMESTAMP(3) 類型

   

參考:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/

Flink 1.10.0 SQL DDL中如何定義watermark和計算列

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM