object JdbcDatasourceTest { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("JdbcDatasourceTest") .master("local") .getOrCreate() //url: // jdbc:mysql://master:3306/test // jdbc:oracle://master:3306/test // jdbc:db2://master:3306/test // jdbc:derby://master:3306/test // jdbc:sqlserver://master:3306/test // jdbc:postgresql://master:3306/test val mysqlUrl = "jdbc:mysql://master:3306/test" //1: 讀取csv文件數據 val optsMap = Map("header" -> "true", "inferSchema" -> "true") val df = spark.read.options(optsMap).csv(s"${BASE_PATH}/jdbc_demo_data.csv") df.show() val properties = new Properties() properties.put("user", "root") properties.put("password", "root") //向Mysql數據庫寫數據 df.write.mode(SaveMode.Overwrite).jdbc(mysqlUrl, "person", properties) //從mysql數據庫讀取數據 val jdbcDFWithNoneOption = spark.read.jdbc(mysqlUrl, "person", properties) jdbcDFWithNoneOption.show() //寫數據的過程: //1 : 建表 //第一次寫的時候,需要創建一張表,建表語句類似如下: //CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1 //ENGINE=InnoDB使用innodb引擎 DEFAULT CHARSET=utf8 數據庫默認編碼為utf-8 AUTO_INCREMENT=1 自增鍵的起始序號為1 //.InnoDB,是MySQL的數據庫引擎之一,為MySQL AB發布binary的標准之一 //屬性配置ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1可以通過參數createTableOptions傳給spark var writeOpts = Map[String, String]("createTableOptions" -> "ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1") df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties) //2: 設置表的schema // 一般表的schema是和DataFrame是一致的,字段的類型是從spark sql的DataType翻譯到各個數據庫對應的數據類型 // 如果字段在數據庫中的類型不是你想要的, // 你可以通過參數createTableColumnTypes來設置createTableColumnTypes=age long,name string writeOpts = Map[String, String]("createTableColumnTypes" -> "id long,age long") df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties) //3: 事務隔離級別的設置,通過參數isolationLevel設置 // NONE 不支持事物1 // READ_UNCOMMITTED 會出現臟讀、不可重復讀以及幻讀 // READ_COMMITTED 不會出現臟讀,但是還是會出現不可重復讀以及幻讀 // REPEATABLE_READ 不會出現臟讀以及不可重復讀,但是還會出現幻讀 // SERIALIZABLE 臟讀、不可重復讀以及幻讀都不會出現了 writeOpts = Map[String, String]("isolationLevel" -> "READ_UNCOMMITTED") df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties) //4:寫數據 //寫數據的過程中可以采用批量寫數據,每一批寫的數據量的大小可以通過參數batchsize設置,默認是:1000 writeOpts = Map[String, String]("batchsize" -> "100") df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties) //5:第二次寫數據的時候,這個時候表已經存在了,所以需要區分SaveMode //當SaveMode=Overwrite 的時候,需要先清理表,然后再寫數據。清理表的方法又分兩種: // 第一種是truncate即清空表,如果是這種的話,則先清空表,然后再寫數據 // 第二種是drop掉表,如果是這種的話,則先drop表,然后建表,最后寫數據 //以上兩種方式的選擇,可以通過參數truncate(默認是false)控制。因為truncate清空數據可能會失敗,所以可以使用drop table的方式 //而且不是所有的數據庫都支持truncate table,其中PostgresDialect就不支持 //當SaveMode=Append 的時候,則直接寫數據就行 //當SaveMode=ErrorIfExists 的時候,則直接拋異常 //當SaveMode=Ignore 的時候,則直接不做任何事情 writeOpts = Map[String, String]("truncate" -> "false") df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties) //按照某個分區字段進行分區讀數據 //partitionColumn 分區的字段,這個字段必須是integral類型的 //lowerBound 用於決定分區步數的partitionColumn的最小值 //upperBound 用於決定分區步數的partitionColumn的最大值 //numPartitions 分區數,和lowerBound以及upperBound一起來為每一個分區生成sql的where字句 //如果upperBound - lowerBound >= numPartitions,那么我們就取numPartitions個分區, // 否則我們取upperBound - lowerBound個分區數 // 8 - 3 = 5 > 3 所以我們取3個分區 // where id < 3 + 1 這個1是通過 8/3 - 3/3 = 1得來的 // where id >= 3 + 1 and id < 3 + 1 + 1 // where id >= 3 + 1 + 1 //配置的方式 val readOpts = Map[String, String]("numPartitions" -> "3", "partitionColumn" -> "id", "lowerBound" -> "3", "upperBound" -> "8", "fetchsize" -> "100") val jdbcDF = spark.read.options(readOpts).jdbc(mysqlUrl, "person", properties) jdbcDF.rdd.partitions.size jdbcDF.rdd.glom().collect() jdbcDF.show() //api的方式 spark.read.jdbc(mysqlUrl, "person", "id", 3, 8, 3, properties).show() //參數predicates: Array[String],用於決定每一個分區對應的where子句,分區數就是數組predicates的大小 val conditionDF = spark.read.jdbc(mysqlUrl, "person", Array("id > 2 and id < 5", "id >= 5 and id < 8"), properties) conditionDF.rdd.partitions.size conditionDF.rdd.glom().collect() conditionDF.show() //每次讀取的時候,可以采用batch的方式讀取數據,batch的數量可以由參數fetchsize來設置。默認為:0,表示jdbc的driver來估計這個batch的大小 //不管是讀還是寫,都有分區數的概念, // 讀的時候是通過用戶設置numPartitions參數設置的, // 而寫的分區數是DataFrame的分區數 //需要注意一點的是不管是讀還是寫,每一個分區都會打開一個jdbc的連接,所以分區不宜太多,要不然的話會搞垮數據庫 //寫的時候,可以通過DataFrame的coalease接口來減少分區數 spark.stop() } }