1、使用c3p0
這個主要是因為c3p0實現了序列化,這樣就可以直接傳輸到Worker上
ComboPooledDataSource
這個類主要是用來做生成數據庫連接實例的,讓它傳到Worker上就可以直接使用了
2、業務代碼
獲取datasource
def getC3p0DateSource(filename:String,config:String): ComboPooledDataSource ={
val dataSource : ComboPooledDataSource = new ComboPooledDataSource(true)
val conf = FileUtils.readJsonFile2Prop(filename,config)
dataSource.setJdbcUrl(conf.getProperty("url"))
dataSource.setDriverClass(conf.getProperty("driverClassName"))
dataSource.setUser(conf.getProperty("username"))
dataSource.setPassword(conf.getProperty("password"))
dataSource.setMaxPoolSize(Integer.valueOf(conf.getProperty("maxPoolSize")))
dataSource.setMinPoolSize(Integer.valueOf(conf.getProperty("minPoolSize")))
dataSource.setAcquireIncrement(Integer.valueOf(conf.getProperty("acquireIncrement")))
dataSource.setInitialPoolSize(Integer.valueOf(conf.getProperty("initialPoolSize")))
dataSource.setMaxIdleTime(Integer.valueOf(conf.getProperty("maxIdleTime")))
dataSource
}
注意這里的InitialPoolSize不能太大
.foreachPartition(it=>{
val conn = comboPooledDataSource.getConnection
val statement = conn.prepareStatement("insert into tb_eventclass_min values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
conn.setAutoCommit(false)
it.foreach(x=>{
statement.setString(1,UUID.randomUUID().toString)
statement.setLong(2,x._1._1.toString.toLong)
statement.setLong(3,x._1._2.toString.toLong)
statement.setString(4,x._1._3.toString)
statement.setString(5,x._1._4.toString)
statement.setString(6,x._1._5.toString)
statement.setString(7,x._1._6.toString)
statement.setString(8,x._1._7.toString)
statement.setString(9,x._1._8.toString)
statement.setLong(10,x._2)
statement.setShort(11,x._1._10.toString.toShort)
statement.setLong(12,x._1._9/60000L)
val calendar = Calendar.getInstance()
calendar.setTime(new Date(x._1._9))
val year = calendar.get(Calendar.YEAR)
val month = calendar.get(Calendar.MONTH)+1
val day = calendar.get(Calendar.DAY_OF_MONTH)
val hour = calendar.get(Calendar.HOUR_OF_DAY)
val min = calendar.get(Calendar.MINUTE)
statement.setInt(13,year)
statement.setInt(14,month)
statement.setInt(15,day)
statement.setInt(16,hour)
statement.setInt(17,min)
statement.addBatch()
})
try {
statement.executeBatch()conn.commit()
}catch {
case e:Exception=>e.printStackTrace()
}finally {
statement.close()
conn.close()
}})
在這里有四個注意點:
1、使用foreachPartition算子,減少數據庫連接
這樣dataSource.getConnection生成的連接和partition數量是一直的(不會很多)
2、使用批量插入,提高效率
這里要注意要開啟批量插入
在數據庫連接的URl后面加上 rewriteBatchedStatements=true (啟動批處理操作)
String dbUrl = "jdbc:mysql://localhost:3306/User? rewriteBatchedStatements=true";
3、關閉自動提交,防止死鎖
conn.setAutoCommit(false)
4、在執行結束時要將statement和connect關閉
statement會一直增加消耗內存 connect歸還到資源池中