使用flink SQL對接kafka 時,日志不報錯,也沒有數據輸出,原因有哪幾個方面


本地調試么?確認一下下面的問題

一般先調試sink表的 connector換print 打印一下:

1. 是earliest還是latest

2. auto.commit 是true還是false

3. source改成 socket輸入試試邏輯有沒問題

示例代碼參考:

CREATE TABLE t_stock_match_p_1(
  id VARCHAR, 
  stkcode INT,
  volume INT,
  matchtime BIGINT,
  ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'yyyy-MM-dd HH:mm:ss')),
  WATERMARK  FOR ts AS ts - INTERVAL '1' SECOND
 ) WITH (
  'connector' = 'kafka-0.10',
  'topic' = 'stock_match_p_zyh',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'stock_match_p_zyh',
  'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668',
  'properties.key.deserializer' = 'org.apache.kafka.common.serialization.LongDeserializer',
  'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
  'format' = 'csv',
  'csv.field-delimiter' = ','
);

CREATE TABLE t_stock_match_1 (
  stkcode int,
  pd TIMESTAMP,
  volume  INT 
) WITH (
 'connector' = 'print'
);

INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),stkcode;

 

 

 

解決:

當時kafka只有一個分區,但是並行度設置大於了分區數,這樣有的任務中沒有數據,這樣水印一直是最小值,

在網上看到這樣一個案例后,將我的任務的並行度改成和分區數一致,Flink WebUI上水印值出來了,數據也能正常寫入目的地。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM