Spark大數據量寫入Mysql效率問題


背景

數據列不固定,每次全量覆蓋數據到Mysql,涉及到數據表結構的變更,需要調整自動創建數據表結構

方案1:DataFrameWriter.jdbc

使用spark原生提供的DataFrameWriter.jdbc,參考代碼如下:

/**
   * 數據覆蓋寫入指定mysql表
   * 批量讀寫參數設置參考:https://bbs.huaweicloud.com/blogs/210624
   *
   * @param dataFrame   數據集
   * @param tableName   mysql數據表名
   * @param parallelism 插入並行度
   */
  def override2Mysql(dataFrame: DataFrame, tableName: String, parallelism: Int): Unit = {
    val property = new Properties()
    property.put("user", jdbcUsername)
    property.put("password", jdbcPassword)
    property.put("driver", mysqlJdbcDriver)
    dataFrame.repartition(parallelism).write.option("truncate", value = false).mode(SaveMode.Overwrite).jdbc(jdbcUrl, tableName, property)
  }

實驗如下,100萬數據,並行度設置為10,插入需要26分鍾。
疑問?為什么這么慢?難道不是批量插入嗎?
查看DataFrameWriter.jdbc,在connectionProperties中可以設置batchsize,默認已經是1000,實驗調整batchsize,沒什么變化,沒搞明白why?

同時,發現DataFrameWriter.jdbc自動刪除並創建表存在數據類型映射的問題:spark中數據類型分類沒有那么細,String類型映射到Mysql中統一轉化為text類型。而text類型創建索引,必須設置前綴前綴長度,不利於索引創建。可通過在option中設置truncate=true解決,手動創建數據吧,每次寫入時截斷數據表,然后再插入,但不能解決數據插入效率問題。

方案2:自定義批量插入

數據插入前對數據進行分區,每個分區自定義一個數據庫連接,批量插入數據,參考代碼如下:

private def saveXXX(xxxData: DataFrame, nacosConfig: NacosConfig): Unit = {
      val mysqlUrl = nacosConfig.getValue("user_label_mysql_url")
      val mysqlUser = nacosConfig.getValue("user_label_mysql_username")
      val mysqlPsw = nacosConfig.getValue("user_label_mysql_psw")
      val mysqlParallelism = nacosConfig.getValue("mysql_parallelism").toInt
      val jdbcUtils = JdbcUtils.build(mysqlUrl, mysqlUser, mysqlPsw)
      jdbcUtils.runSql(s"truncate table $labelTable", null)
      val sql=
        s"""
          |insert into xxx () values()
          |""".stripMargin
      xxxData.repartition(mysqlParallelism).foreachPartition(rows => {
        jdbcUtils.insert(sql, rows)
      })
  }
  
  def insert(sql: String, rows: Iterator[Row]): Unit = {
    val conn = getMysqlConn(jdbcUrl, jdbcUsername, jdbcPassword)
    val pstat: PreparedStatement = conn.prepareStatement(sql)
    try {
      var size = 0
      while (rows.hasNext) {
        val row: Row = rows.next()
        val len = row.length
        for (i <- 0 until len) {
          pstat.setObject(i + 1, row.get(i))
        }
        pstat.addBatch()
        size += 1
        if (size % maxBatchSize == 0) {
          pstat.executeBatch()
          lgr.info("=======批量插入數據成功,數量是[{}]=======", size)
          size = 0
        }
      }
      if (size > 0) {
        pstat.executeBatch()
        lgr.info("=======批量插入數據成功,數量是[{}]=======", size)
      }
    } finally {
      pstat.close()
      conn.close()
    }
  }

同樣100萬數據,划分為10個分區並行插入,每次批量10000條數據,插入耗時2分鍾。需要注意的並行度和batchSize不能過大,避免影響數據庫正常使用。

但是,上述方案未解決數據表結構變化的問題。可根據dataFrame所屬的schema.fields自動生成創建表語句,先執行drop table xxx if exists,然后替換上述語句中truncate未自動生成的創建表語句即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM