spark jdbc讀取並發度優化


很多人在spark中使用默認提供的jdbc方法時,在數據庫數據較大時經常發現任務 hang 住,其實是單線程任務過重導致,這時候需要提高讀取的並發度。 
下文以 mysql 為例進行說明。

在spark中使用jdbc

在 spark-env.sh 文件中加入:

export SPARK_CLASSPATH=/path/mysql-connector-java-5.1.34.jar

任務提交時加入:

--jars /path/mysql-connector-java-5.1.34.jar

1. 單partition(無並發)

調用函數

def jdbc(url: String, table: String, properties: Properties): DataFrame

使用:

val url = "jdbc:mysql://mysqlHost:3306/database" val tableName = "table"

// 設置連接用戶&密碼
val prop = new java.util.Properties prop.setProperty("user","username") prop.setProperty("password","pwd") // 取得該表數據
val jdbcDF = sqlContext.read.jdbc(url,tableName,prop) // 一些操作
....

查看並發度

jdbcDF.rdd.partitions.size # 結果返回 1

該操作的並發度為1,你所有的數據都會在一個partition中進行操作,意味着無論你給的資源有多少,只有一個task會執行任務,執行效率可想而之,並且在稍微大點的表中進行操作分分鍾就會OOM。

更直觀的說法是,達到千萬級別的表就不要使用該操作,count操作就要等一萬年,no zuo no die ,don’t to try !

WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 56, spark047219): java.lang.OutOfMemoryError: GC overhead limit exceeded at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380)

2. 根據Long類型字段分區

調用函數

  def jdbc(
  url: String,
  table: String,
  columnName: String,    # 根據該字段分區,需要為整形,比如id等
  lowerBound: Long,      # 分區的下界
  upperBound: Long,      # 分區的上界
  numPartitions: Int,    # 分區的個數
  connectionProperties: Properties): DataFrame

使用:

val url = "jdbc:mysql://mysqlHost:3306/database" val tableName = "table" val columnName = "colName" val lowerBound = 1, val upperBound = 10000000, val numPartitions = 10, // 設置連接用戶&密碼
val prop = new java.util.Properties prop.setProperty("user","username") prop.setProperty("password","pwd") // 取得該表數據
val jdbcDF = sqlContext.read.jdbc(url,tableName,columnName,lowerBound,upperBound,numPartitions,prop) // 一些操作
....

查看並發度

jdbcDF.rdd.partitions.size # 結果返回 10

該操作將字段 colName 中1-10000000條數據分到10個partition中,使用很方便,缺點也很明顯,只能使用整形數據字段作為分區關鍵字。

3000w數據的表 count 跨集群操作只要2s。

3. 根據任意類型字段分區

調用函數

jdbc(
  url: String,
  table: String,
  predicates: Array[String],
  connectionProperties: Properties): DataFrame

下面以使用最多的時間字段分區為例:

val url = "jdbc:mysql://mysqlHost:3306/database" val tableName = "table"

// 設置連接用戶&密碼
val prop = new java.util.Properties prop.setProperty("user","username") prop.setProperty("password","pwd") /** * 將9月16-12月15三個月的數據取出,按時間分為6個partition * 為了減少事例代碼,這里的時間都是寫死的 * modified_time 為時間字段 */ val predicates = Array( "2015-09-16" -> "2015-09-30", "2015-10-01" -> "2015-10-15", "2015-10-16" -> "2015-10-31", "2015-11-01" -> "2015-11-14", "2015-11-15" -> "2015-11-30", "2015-12-01" -> "2015-12-15" ).map { case (start, end) => s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'" } // 取得該表數據
val jdbcDF = sqlContext.read.jdbc(url,tableName,predicates,prop) // 一些操作
....

查看並發度

jdbcDF.rdd.partitions.size # 結果返回 6

該操作的每個分區數據都由該段時間的分區組成,這種方式適合各種場景,較為推薦。

結語
以 mysql 3000W 數據量表為例,單分區count,僵死若干分鍾報OOM。

分成5-20個分區后,count 操作只需要 2s

高並發度可以大幅度提高讀取以及處理數據的速度,但是如果設置過高(大量的partition同時讀取)也可能會將數據源數據庫弄掛。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM