来源于:https://my.oschina.net/yangboxu/blog/3064184
刚接触flink没多久,做的一个flink流处理任务,状况百出,下面聊一聊关于数据库操作出的状况。
需求:需要从数据库取一些判断条件,流数据根据判断条件做一些变换(map),所以决定直接在map里操作数据库
1.最初版(调试前):第一反应,操作数据库,上连接池,所以在main里面直接建了一个连接池(druid),然后再map函数里用。结果直接无法运行,原因,无法序列化。看druid源码,druid的Connection实例是没有实现序列化接口的。。。
2.本着先实现再优化的原则:直接在map函数里建立jdbc连接,操作数据库(但是总感觉一定会有问题,频繁连接,断开,本来也影响性能)。调试OK,测试环境跑了一段时间,被告知数据库的连接数爆掉了。。。(可能跟代码不严谨有关系,漏了一个地方的连接关闭操作)无论如何,准备优化。
3.各种查资料,得出结论,用RichMapFunction来实现,在open()方法中建立连接,在close()方法中关闭连接,map方法中应用。调试OK...高高兴兴回家,突然晚上觉得会有问题,连接长时间不用,会不会被mysql服务器主动断掉?第二天早上到公司(一个晚上没有数据处理,但是任务一直启动着),果然,数据库连接丢失,所有数据处理都出问题了。
4.既然猜到是数据库连接被服务器端主动关闭的问题,那么还是上连接池吧,druid走起。仍然是RichMapFunction,open方法中建立druid的datasource,close方法中关闭druidDatasource。仍然搁一个晚上,第二天早上,新数据正常处理,happy....代码片段如下:
public abstract class MapFuctionWithDataBase<I, O> extends RichMapFunction<I, O> { private static final long serialVersionUID = 20000L; private String driver; private String url; private String username; private String password; private DruidDataSource dataSource; public MapFuctionWithDataBase(String driver, String url, String username, String password) { this.driver = driver; this.url = url; this.username = username; this.password = password; } @Override public void open(Configuration parameters) throws Exception { dataSource = new DruidDataSource(); dataSource.setDriverClassName(driver); dataSource.setUrl(url); dataSource.setUsername(username); dataSource.setPassword(password); dataSource.setValidationQuery("select 1"); } @Override public void close() throws Exception { dataSource.close(); } public DataSource getDataSource(){ return dataSource; } }
PS:map方法没有做实现,实际业务中根据业务需求去实现,map函数中调用getDataSource方法获取dataSource,获取Connection(其实例实际上是DruidPooledConnection,可以放心的执行close操作)