來源於:https://my.oschina.net/yangboxu/blog/3064184
剛接觸flink沒多久,做的一個flink流處理任務,狀況百出,下面聊一聊關於數據庫操作出的狀況。
需求:需要從數據庫取一些判斷條件,流數據根據判斷條件做一些變換(map),所以決定直接在map里操作數據庫
1.最初版(調試前):第一反應,操作數據庫,上連接池,所以在main里面直接建了一個連接池(druid),然后再map函數里用。結果直接無法運行,原因,無法序列化。看druid源碼,druid的Connection實例是沒有實現序列化接口的。。。
2.本着先實現再優化的原則:直接在map函數里建立jdbc連接,操作數據庫(但是總感覺一定會有問題,頻繁連接,斷開,本來也影響性能)。調試OK,測試環境跑了一段時間,被告知數據庫的連接數爆掉了。。。(可能跟代碼不嚴謹有關系,漏了一個地方的連接關閉操作)無論如何,准備優化。
3.各種查資料,得出結論,用RichMapFunction來實現,在open()方法中建立連接,在close()方法中關閉連接,map方法中應用。調試OK...高高興興回家,突然晚上覺得會有問題,連接長時間不用,會不會被mysql服務器主動斷掉?第二天早上到公司(一個晚上沒有數據處理,但是任務一直啟動着),果然,數據庫連接丟失,所有數據處理都出問題了。
4.既然猜到是數據庫連接被服務器端主動關閉的問題,那么還是上連接池吧,druid走起。仍然是RichMapFunction,open方法中建立druid的datasource,close方法中關閉druidDatasource。仍然擱一個晚上,第二天早上,新數據正常處理,happy....代碼片段如下:
public abstract class MapFuctionWithDataBase<I, O> extends RichMapFunction<I, O> { private static final long serialVersionUID = 20000L; private String driver; private String url; private String username; private String password; private DruidDataSource dataSource; public MapFuctionWithDataBase(String driver, String url, String username, String password) { this.driver = driver; this.url = url; this.username = username; this.password = password; } @Override public void open(Configuration parameters) throws Exception { dataSource = new DruidDataSource(); dataSource.setDriverClassName(driver); dataSource.setUrl(url); dataSource.setUsername(username); dataSource.setPassword(password); dataSource.setValidationQuery("select 1"); } @Override public void close() throws Exception { dataSource.close(); } public DataSource getDataSource(){ return dataSource; } }
PS:map方法沒有做實現,實際業務中根據業務需求去實現,map函數中調用getDataSource方法獲取dataSource,獲取Connection(其實例實際上是DruidPooledConnection,可以放心的執行close操作)