Spark使用jdbc時的並行度


Spark SQL支持數據源使用JDBC從其他數據庫讀取數據。 與使用JdbcRDD相比,應優先使用此功能。 這是因為結果以DataFrame的形式返回,並且可以輕松地在Spark SQL中進行處理或與其他數據源合並。 JDBC數據源也更易於從Java或Python使用,因為它不需要用戶提供ClassTag。 (請注意,這與Spark SQL JDBC服務器不同,后者允許其他應用程序使用Spark SQL運行查詢)。

首先,您需要在spark類路徑上包含特定數據庫的JDBC驅動程序。

例如,要從Spark Shell連接到postgres,您可以運行以下命令:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
  • Spark讀取關系型數據庫,官方有API接口,如下:
    ①、SparkSession.read.jdbc(url, table, properties)
    ②、SparkSession.read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, connectionProperties)
    ③、SparkSession.read.jdbc(url, table, predicates, connectionProperties)
  1. 單partition方式:使用如下函數
def jdbc(url: String, table: String, properties: Properties): DataFrame

例子:

val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"

// 設置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")

// 取得該表數據
val jdbcDF = spark.read.jdbc(url,tableName,prop)

// 一些操作
jdbcDF.write.mode..

查看並發度

jdbcDF.rdd.partitions.size # 結果返回 1

該操作的並發度為1,你所有的數據都會在一個partition中進行操作,意味着無論你給的資源有多少,只有一個task會執行任務,執行效率可想而之,並且在稍微大點的表中進行操作分分鍾就會OOM

更直觀的說法是,達到千萬級別的表就不要使用該操作,count操作就要等一萬年,親測4個小時 !

  1. 根據Long類型字段分區
    調用函數為
 def jdbc(
  url: String,
  table: String,
  columnName: String,    # 根據該字段分區,需要為整形,比如id等
  lowerBound: Long,      # 分區的下界
  upperBound: Long,      # 分區的上界
  numPartitions: Int,    # 分區的個數
  connectionProperties: Properties): DataFrame

例子:

val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"

val columnName = "colName"
val lowerBound = 1,
val upperBound = 10000000,
val numPartitions = 10,

// 設置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")

// 取得該表數據
val jdbcDF = spark.read.jdbc(url,tableName,columnName,lowerBound,upperBound,numPartitions,prop)

// 一些操作
....

查看並發度

jdbcDF.rdd.partitions.size # 結果返回 10
該操作將字段 colName 中1-10000000條數據分到10個partition中,使用很方便,缺點也很明顯,只能使用整形數據字段作為分區關鍵字。
  1. 根據任意類型字段分區
    調用函數為
jdbc(
  url: String,
  table: String,
  predicates: Array[String],
  connectionProperties: Properties): DataFrame

例子:

val url = "jdbc:mysql://localhost:3306/db"
val tableName = "tablename"

// 設置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","mysql")
prop.setProperty("password","123456")


val predicates =
  Array(
    "2018-10-01" -> "2018-11-01",
    "2018-11-02" -> "2018-12-01",
    "2018-12-02" -> "2019-01-01",
    "2019-02-02" -> "2019-03-01",
    "2019-03-02" -> "2019-04-01",
    "2019-04-02" -> "2019-05-01",
    "2019-05-02" -> "2019-06-01",
    "2019-06-02" -> "2019-07-01",
    "2019-07-02" -> "2019-08-01",
    "2019-08-02" -> "2019-09-01",
    "2019-09-02" -> "2019-10-01",
    "2019-10-02" -> "2019-11-01"
  ).map {
    case (start, end) =>
      s"cast(txntime as date) >= date '$start' " + s"AND cast(txntime as date) <= date '$end'"
  }

// 取得該表數據
val jdbcDF = spark.read.jdbc(url, tableName, predicates, prop)
// 寫入到hive表
jdbcDF.write.partitionBy().mode("overwrite").format("orc")
  .saveAsTable("db.tableName")

一千萬級別數據實測2.4min左右導入完成。

  1. limit分頁分區

    依舊采用上述函數,但是partitions做了修改,例子:

val url = "jdbc:mysql://localhost:3306/db"
val tableName = "tablename"

// 設置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","mysql")
prop.setProperty("password","123456")

def getPartition(count:Int) = {
  val step = count / 10
  Range(0, count, step).map(x =>{
    (x, step)
  }).toArray
}
val partitions = getPartition(10000000)
  .map {
    case (start,end) => s"1=1 limit ${start},${end}"
  }

// 取得該表數據
val jdbcDF = spark.read.jdbc(url, tableName, partitions, prop)
// 寫入到hive表
jdbcDF.write.partitionBy().mode("overwrite").format("orc")
  .saveAsTable("db.tableName")

實際測試效果和上面的差不多,區別是這里不需要字段有特殊的要求,對行數做處理就行啦。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM