測試通過flink實時寫入mysql的時候發現mysql里遲遲沒有數據


轉載:https://my.oschina.net/u/2828172/blog/4387485

先說結論:

.setBatchSize(1) //將寫入MySQL的buffer大小為1。

 

 

Flink 1.10 使用 flink-jdbc 連接器的方式與 MySQL 交互,讀數據和寫數據都能完成,但是在寫數據時,發現 Flink 程序執行完畢之后,才能在 MySQL 中查詢到插入的數據。即,雖然是流計算,但卻不能實時的輸出計算結果?

 


相關代碼片段:

JDBCAppendTableSink.builder() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost/flink") .setUsername("root") .setPassword("123456") .setParameterTypes( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) .setQuery("insert into batch_size values(?,?)") .build()

 

如何解決?

 

Flink 1.10 這個問題是知道一秒鍾,不知磨洋工的 Case,在初學時候非常容易遇上,那么真的是 Flink 不能實時寫入 MySQL 嗎?當然不是,上面代碼基礎之上簡單的加上一行,就解決問題了:

....setBatchSize(1) //將寫入MySQL的buffer大小為1。..

 

原因剖析

 

那么問題雖然解決了,根本原因是個啥呢?也許你看到這里會說,這問題很明顯,就是 Flink 設計 JDBC Sink 的時候出於性能因素考慮,對寫入 buffer 做了默認值設置。
沒錯,這一點你說的很對,在 Flink 1.10 中 JDBC OutputFormat 的基類  AbstractJDBCOutputFormat 里面和這相關的變量 DEFAULT_FLUSH_MAX_SIZE 默認值是 5000,所以在你學習測試時候由於測試數據少(少於 5000),數據一直在 buffer 中,直到數據源數據結束,作業也結束了,才將計算結果刷入 MySQL,所以沒有實時的(每條)寫入 MySQL。如下:

但這里還有個因素需要注意,那就是時間因素,上面 DEFAULT_FLUSH_INTERVAL_MILLS 默認值是 0,這個相當於沒有時間限制,一直等到 buffer 滿了或者作業結束才能觸發寫出動作。
也就是有些初學者,發現問題,即使故意 debug 時候打上斷點,不讓作業結束,但是等到花兒都謝了,數據也沒有寫入到 MySQL。
在 Flink 1.10 中 AbstractJDBCOutputFormat 有兩個實現類:

分別對應了如下兩類 Sink:

所以在 Flink 1.10 中不論是 AppendTableSink 和 UpsertTableSink 都會有同樣的問題。不過 UpsertTableSink 時用戶可以設置時間,而 AppendTableSink 是連時間設置的入口都木有。
那么,是 Flink 的鍋?
就這個問題而言,我個人認為不是用戶的問題,是 Flink 1.10 代碼設計有進一步改進的空間。在 Flink 1.11 中社區的確重構了,對 JDBCOutputFormat 打了  @Deprecated。感興趣可以查閱 FLINK-17537 了解變化過程。但是在這個改進中,並沒有對 DEFAULT_FLUSH_MAX_SIZE 默認值和 DEFAULT_FLUSH_INTERVAL_MILLS 默認值做變化,社區也在積極的討論改進方案,想參與社區貢獻或者了解最終討論結果的可以查閱 FLINK-16497。

舉一反三


當然在你學習過程中使用任何 Sink 的時候,只要沒有實時寫入,都可以找找是否有寫出 buffer 和寫出時間的限制設置。在這一點上,羅鵬程也提到了 Elasticsearch 也有類似問題,需要調用 setBulkFlushMaxActions 進行設置。



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM