背景
數據列不固定,每次全量覆蓋數據到Mysql,涉及到數據表結構的變更,需要調整自動創建數據表結構
方案1:DataFrameWriter.jdbc
使用spark原生提供的DataFrameWriter.jdbc,參考代碼如下:
/**
* 數據覆蓋寫入指定mysql表
* 批量讀寫參數設置參考:https://bbs.huaweicloud.com/blogs/210624
*
* @param dataFrame 數據集
* @param tableName mysql數據表名
* @param parallelism 插入並行度
*/
def override2Mysql(dataFrame: DataFrame, tableName: String, parallelism: Int): Unit = {
val property = new Properties()
property.put("user", jdbcUsername)
property.put("password", jdbcPassword)
property.put("driver", mysqlJdbcDriver)
dataFrame.repartition(parallelism).write.option("truncate", value = false).mode(SaveMode.Overwrite).jdbc(jdbcUrl, tableName, property)
}
實驗如下,100萬數據,並行度設置為10,插入需要26分鍾。
疑問?為什么這么慢?難道不是批量插入嗎?
查看DataFrameWriter.jdbc,在connectionProperties中可以設置batchsize,默認已經是1000,實驗調整batchsize,沒什么變化,沒搞明白why?
同時,發現DataFrameWriter.jdbc自動刪除並創建表存在數據類型映射的問題:spark中數據類型分類沒有那么細,String類型映射到Mysql中統一轉化為text類型。而text類型創建索引,必須設置前綴前綴長度,不利於索引創建。可通過在option中設置truncate=true解決,手動創建數據吧,每次寫入時截斷數據表,然后再插入,但不能解決數據插入效率問題。
方案2:自定義批量插入
數據插入前對數據進行分區,每個分區自定義一個數據庫連接,批量插入數據,參考代碼如下:
private def saveXXX(xxxData: DataFrame, nacosConfig: NacosConfig): Unit = {
val mysqlUrl = nacosConfig.getValue("user_label_mysql_url")
val mysqlUser = nacosConfig.getValue("user_label_mysql_username")
val mysqlPsw = nacosConfig.getValue("user_label_mysql_psw")
val mysqlParallelism = nacosConfig.getValue("mysql_parallelism").toInt
val jdbcUtils = JdbcUtils.build(mysqlUrl, mysqlUser, mysqlPsw)
jdbcUtils.runSql(s"truncate table $labelTable", null)
val sql=
s"""
|insert into xxx () values()
|""".stripMargin
xxxData.repartition(mysqlParallelism).foreachPartition(rows => {
jdbcUtils.insert(sql, rows)
})
}
def insert(sql: String, rows: Iterator[Row]): Unit = {
val conn = getMysqlConn(jdbcUrl, jdbcUsername, jdbcPassword)
val pstat: PreparedStatement = conn.prepareStatement(sql)
try {
var size = 0
while (rows.hasNext) {
val row: Row = rows.next()
val len = row.length
for (i <- 0 until len) {
pstat.setObject(i + 1, row.get(i))
}
pstat.addBatch()
size += 1
if (size % maxBatchSize == 0) {
pstat.executeBatch()
lgr.info("=======批量插入數據成功,數量是[{}]=======", size)
size = 0
}
}
if (size > 0) {
pstat.executeBatch()
lgr.info("=======批量插入數據成功,數量是[{}]=======", size)
}
} finally {
pstat.close()
conn.close()
}
}
同樣100萬數據,划分為10個分區並行插入,每次批量10000條數據,插入耗時2分鍾。需要注意的並行度和batchSize不能過大,避免影響數據庫正常使用。
但是,上述方案未解決數據表結構變化的問題。可根據dataFrame所屬的schema.fields自動生成創建表語句,先執行drop table xxx if exists
,然后替換上述語句中truncate未自動生成的創建表語句即可。