轉載:https://my.oschina.net/u/2828172/blog/4387485
先說結論:
.setBatchSize(1) //將寫入MySQL的buffer大小為1。
Flink 1.10 使用 flink-jdbc 連接器的方式與 MySQL 交互,讀數據和寫數據都能完成,但是在寫數據時,發現 Flink 程序執行完畢之后,才能在 MySQL 中查詢到插入的數據。即,雖然是流計算,但卻不能實時的輸出計算結果?
相關代碼片段:
JDBCAppendTableSink.builder()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost/flink")
.setUsername("root")
.setPassword("123456")
.setParameterTypes(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
.setQuery("insert into batch_size values(?,?)")
.build()
如何解決?
Flink 1.10 這個問題是知道一秒鍾,不知磨洋工的 Case,在初學時候非常容易遇上,那么真的是 Flink 不能實時寫入 MySQL 嗎?當然不是,上面代碼基礎之上簡單的加上一行,就解決問題了:
...
.setBatchSize(1) //將寫入MySQL的buffer大小為1。
..
原因剖析
那么問題雖然解決了,根本原因是個啥呢?也許你看到這里會說,這問題很明顯,就是 Flink 設計 JDBC Sink 的時候出於性能因素考慮,對寫入 buffer 做了默認值設置。
沒錯,這一點你說的很對,在 Flink 1.10 中 JDBC OutputFormat 的基類 AbstractJDBCOutputFormat 里面和這相關的變量 DEFAULT_FLUSH_MAX_SIZE 默認值是 5000,所以在你學習測試時候由於測試數據少(少於 5000),數據一直在 buffer 中,直到數據源數據結束,作業也結束了,才將計算結果刷入 MySQL,所以沒有實時的(每條)寫入 MySQL。如下:
但這里還有個因素需要注意,那就是時間因素,上面 DEFAULT_FLUSH_INTERVAL_MILLS 默認值是 0,這個相當於沒有時間限制,一直等到 buffer 滿了或者作業結束才能觸發寫出動作。
也就是有些初學者,發現問題,即使故意 debug 時候打上斷點,不讓作業結束,但是等到花兒都謝了,數據也沒有寫入到 MySQL。
在 Flink 1.10 中 AbstractJDBCOutputFormat 有兩個實現類:
分別對應了如下兩類 Sink:
所以在 Flink 1.10 中不論是 AppendTableSink 和 UpsertTableSink 都會有同樣的問題。不過 UpsertTableSink 時用戶可以設置時間,而 AppendTableSink 是連時間設置的入口都木有。
那么,是 Flink 的鍋?
就這個問題而言,我個人認為不是用戶的問題,是 Flink 1.10 代碼設計有進一步改進的空間。在 Flink 1.11 中社區的確重構了,對 JDBCOutputFormat 打了 @Deprecated。感興趣可以查閱 FLINK-17537 了解變化過程。但是在這個改進中,並沒有對 DEFAULT_FLUSH_MAX_SIZE 默認值和 DEFAULT_FLUSH_INTERVAL_MILLS 默認值做變化,社區也在積極的討論改進方案,想參與社區貢獻或者了解最終討論結果的可以查閱 FLINK-16497。
舉一反三
當然在你學習過程中使用任何 Sink 的時候,只要沒有實時寫入,都可以找找是否有寫出 buffer 和寫出時間的限制設置。在這一點上,羅鵬程也提到了 Elasticsearch 也有類似問題,需要調用 setBulkFlushMaxActions 進行設置。