本地調試么?確認一下下面的問題
一般先調試sink表的 connector換print 打印一下:
1. 是earliest還是latest
2. auto.commit 是true還是false
3. source改成 socket輸入試試邏輯有沒問題
示例代碼參考:
CREATE TABLE t_stock_match_p_1( id VARCHAR, stkcode INT, volume INT, matchtime BIGINT, ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka-0.10', 'topic' = 'stock_match_p_zyh', 'scan.startup.mode' = 'latest-offset', 'properties.group.id' = 'stock_match_p_zyh', 'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668', 'properties.key.deserializer' = 'org.apache.kafka.common.serialization.LongDeserializer', 'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer', 'format' = 'csv', 'csv.field-delimiter' = ',' ); CREATE TABLE t_stock_match_1 ( stkcode int, pd TIMESTAMP, volume INT ) WITH ( 'connector' = 'print' ); INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),stkcode;
解決:
當時kafka只有一個分區,但是並行度設置大於了分區數,這樣有的任務中沒有數據,這樣水印一直是最小值,
在網上看到這樣一個案例后,將我的任務的並行度改成和分區數一致,Flink WebUI上水印值出來了,數據也能正常寫入目的地。