spark通過jdbc讀取數據庫的並行


代碼如下:

    val conf = new SparkConf().setAppName("testMysqlToHiveJdbc")
                                           .setMaster("local")
    val spark = SparkSession.builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
    ////定義Propertites,確定鏈接MySQL的參數
    val mysqlProperties = new java.util.Properties()
    //MySQL的jdbc鏈接
    val mysqlConnectionUrl = "jdbc:mysql://localhost:3306/rest"
    //定義檢索語句,用於MySQL鏈接
    val mysqlTableName = """(select t.*,
    case when id<4000000 and id >=0 then 1
            when id<8000000 and id >=4000000 then 2
            when id<12000000 and id >=8000000 then 3
            when id<16000000 and id >=12000000 then 4
            when id<20000000 and id >=16000000 then 5
    else 6 end par
         from usppa_twitter_data t) tt"""
    //    val mysqlTableName = "usppa_twitter_data"
    mysqlProperties.put("driver","com.mysql.jdbc.Driver")   //確定driver
    mysqlProperties.put("user","root")          //用戶名
    mysqlProperties.put("password","1234")      //密碼
    mysqlProperties.put("fetchsize","10000")     //批次取數數量
    mysqlProperties.put("lowerBound","1")        //確定分區
    mysqlProperties.put("upperBound","7")           //確定分區
    mysqlProperties.put("numPartitions","6")        //分區數量
    mysqlProperties.put("partitionColumn","par")    //分區字段

    //讀取數據
    val re = spark.read.jdbc(mysqlConnectionUrl, 
                   mysqlTableName,mysqlProperties)
    //寫入Hive表中
    re.toDF().write.mode("overwrite").saveAsTable("testwarehouse.testtt")                            
View Code


代碼中,lowerbound和upperbound有兩種情況需要考慮。

1) 分區字段值可以窮舉出來,如年份。

  引用外網:https://www.percona.com/blog/2016/08/17/apache-spark-makes-slow-mysql-queries-10x-faster/

  如下,lowerbound和upperbound會按照年份進行數據分區,這里的分區指的是並行的executors。

  

val jdbcDF = spark.read.format("jdbc").options(
     |   Map("url" ->  "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql",
     |   "dbtable" -> "ontime.ontime_sm",
     |   "fetchSize" -> "10000",
     |   "partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2015", "numPartitions" -> "48"
     |   )).load()
View Code

  

  分區后,SQL會拆分成多個SQL:
  
2)分區字段不固定,如自動增長的ip,這時候lowerbound和upperbound在id數值之間,分區是一個估算值
  容易產生問題,每個executor的數據分布不均,導致OOM,源碼帶看。
  使用方式如下:  
CREATE OR REPLACE TEMPORARY VIEW ontime
USING org.apache.spark.sql.jdbc
OPTIONS (
  url  "jdbc:mysql://127.0.0.1:3306/ontime?user=root&password=",
  dbtable "ontime.ontime",
  fetchSize "1000",
  partitionColumn "id", lowerBound "1", upperBound "162668934", numPartitions "128"
);
View Code

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM