SparkSQL讀寫外部數據源-通過jdbc讀寫mysql數據庫


object JdbcDatasourceTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("JdbcDatasourceTest")
      .master("local")
      .getOrCreate()

    //url:
    // jdbc:mysql://master:3306/test
    // jdbc:oracle://master:3306/test
    // jdbc:db2://master:3306/test
    // jdbc:derby://master:3306/test
    // jdbc:sqlserver://master:3306/test
    // jdbc:postgresql://master:3306/test
    val mysqlUrl = "jdbc:mysql://master:3306/test"

    //1: 讀取csv文件數據
    val optsMap = Map("header" -> "true", "inferSchema" -> "true")
    val df = spark.read.options(optsMap).csv(s"${BASE_PATH}/jdbc_demo_data.csv")
    df.show()

    val properties = new Properties()
    properties.put("user", "root")
    properties.put("password", "root")
    //向Mysql數據庫寫數據
    df.write.mode(SaveMode.Overwrite).jdbc(mysqlUrl, "person", properties)
    //從mysql數據庫讀取數據
    val jdbcDFWithNoneOption = spark.read.jdbc(mysqlUrl, "person", properties)
    jdbcDFWithNoneOption.show()

    //寫數據的過程:
    //1 : 建表
    //第一次寫的時候,需要創建一張表,建表語句類似如下:
    //CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1
    //ENGINE=InnoDB使用innodb引擎 DEFAULT CHARSET=utf8 數據庫默認編碼為utf-8 AUTO_INCREMENT=1 自增鍵的起始序號為1
    //.InnoDB,是MySQL的數據庫引擎之一,為MySQL AB發布binary的標准之一
    //屬性配置ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1可以通過參數createTableOptions傳給spark
    var writeOpts =
      Map[String, String]("createTableOptions" -> "ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1")
    df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties)

    //2: 設置表的schema
    // 一般表的schema是和DataFrame是一致的,字段的類型是從spark sql的DataType翻譯到各個數據庫對應的數據類型
    // 如果字段在數據庫中的類型不是你想要的,
    // 你可以通過參數createTableColumnTypes來設置createTableColumnTypes=age long,name string
    writeOpts = Map[String, String]("createTableColumnTypes" -> "id long,age long")
    df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties)

    //3: 事務隔離級別的設置,通過參數isolationLevel設置
    //  NONE 不支持事物1
    // READ_UNCOMMITTED 會出現臟讀、不可重復讀以及幻讀
    // READ_COMMITTED 不會出現臟讀,但是還是會出現不可重復讀以及幻讀
    // REPEATABLE_READ  不會出現臟讀以及不可重復讀,但是還會出現幻讀
    // SERIALIZABLE   臟讀、不可重復讀以及幻讀都不會出現了
    writeOpts = Map[String, String]("isolationLevel" -> "READ_UNCOMMITTED")
    df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties)

    //4:寫數據
    //寫數據的過程中可以采用批量寫數據,每一批寫的數據量的大小可以通過參數batchsize設置,默認是:1000
    writeOpts = Map[String, String]("batchsize" -> "100")
    df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties)

    //5:第二次寫數據的時候,這個時候表已經存在了,所以需要區分SaveMode
    //當SaveMode=Overwrite 的時候,需要先清理表,然后再寫數據。清理表的方法又分兩種:
    //  第一種是truncate即清空表,如果是這種的話,則先清空表,然后再寫數據
    //  第二種是drop掉表,如果是這種的話,則先drop表,然后建表,最后寫數據
    //以上兩種方式的選擇,可以通過參數truncate(默認是false)控制。因為truncate清空數據可能會失敗,所以可以使用drop table的方式
    //而且不是所有的數據庫都支持truncate table,其中PostgresDialect就不支持
    //當SaveMode=Append 的時候,則直接寫數據就行
    //當SaveMode=ErrorIfExists 的時候,則直接拋異常
    //當SaveMode=Ignore 的時候,則直接不做任何事情
    writeOpts = Map[String, String]("truncate" -> "false")
    df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties)


    //按照某個分區字段進行分區讀數據
    //partitionColumn 分區的字段,這個字段必須是integral類型的
    //lowerBound  用於決定分區步數的partitionColumn的最小值
    //upperBound  用於決定分區步數的partitionColumn的最大值
    //numPartitions 分區數,和lowerBound以及upperBound一起來為每一個分區生成sql的where字句

    //如果upperBound - lowerBound >= numPartitions,那么我們就取numPartitions個分區,
    // 否則我們取upperBound - lowerBound個分區數
    // 8 - 3 = 5 > 3 所以我們取3個分區
    // where id < 3 + 1 這個1是通過 8/3 - 3/3 = 1得來的
    // where id >= 3 + 1 and id < 3 + 1 + 1
    // where id >= 3 + 1 + 1
    //配置的方式
    val readOpts = Map[String, String]("numPartitions" -> "3", "partitionColumn" -> "id",
      "lowerBound" -> "3", "upperBound" -> "8", "fetchsize" -> "100")
    val jdbcDF = spark.read.options(readOpts).jdbc(mysqlUrl, "person", properties)
    jdbcDF.rdd.partitions.size
    jdbcDF.rdd.glom().collect()
    jdbcDF.show()

    //api的方式
    spark.read.jdbc(mysqlUrl, "person", "id", 3, 8, 3, properties).show()

    //參數predicates: Array[String],用於決定每一個分區對應的where子句,分區數就是數組predicates的大小
    val conditionDF = spark.read.jdbc(mysqlUrl,
      "person", Array("id > 2 and id < 5", "id >= 5 and id < 8"), properties)
    conditionDF.rdd.partitions.size
    conditionDF.rdd.glom().collect()
    conditionDF.show()

    //每次讀取的時候,可以采用batch的方式讀取數據,batch的數量可以由參數fetchsize來設置。默認為:0,表示jdbc的driver來估計這個batch的大小

    //不管是讀還是寫,都有分區數的概念,
    // 讀的時候是通過用戶設置numPartitions參數設置的,
    // 而寫的分區數是DataFrame的分區數
    //需要注意一點的是不管是讀還是寫,每一個分區都會打開一個jdbc的連接,所以分區不宜太多,要不然的話會搞垮數據庫
    //寫的時候,可以通過DataFrame的coalease接口來減少分區數

    spark.stop()
  }
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM