object JdbcDatasourceTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("JdbcDatasourceTest")
.master("local")
.getOrCreate()
//url:
// jdbc:mysql://master:3306/test
// jdbc:oracle://master:3306/test
// jdbc:db2://master:3306/test
// jdbc:derby://master:3306/test
// jdbc:sqlserver://master:3306/test
// jdbc:postgresql://master:3306/test
val mysqlUrl = "jdbc:mysql://master:3306/test"
//1: 讀取csv文件數據
val optsMap = Map("header" -> "true", "inferSchema" -> "true")
val df = spark.read.options(optsMap).csv(s"${BASE_PATH}/jdbc_demo_data.csv")
df.show()
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "root")
//向Mysql數據庫寫數據
df.write.mode(SaveMode.Overwrite).jdbc(mysqlUrl, "person", properties)
//從mysql數據庫讀取數據
val jdbcDFWithNoneOption = spark.read.jdbc(mysqlUrl, "person", properties)
jdbcDFWithNoneOption.show()
//寫數據的過程:
//1 : 建表
//第一次寫的時候,需要創建一張表,建表語句類似如下:
//CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1
//ENGINE=InnoDB使用innodb引擎 DEFAULT CHARSET=utf8 數據庫默認編碼為utf-8 AUTO_INCREMENT=1 自增鍵的起始序號為1
//.InnoDB,是MySQL的數據庫引擎之一,為MySQL AB發布binary的標准之一
//屬性配置ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1可以通過參數createTableOptions傳給spark
var writeOpts =
Map[String, String]("createTableOptions" -> "ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1")
df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties)
//2: 設置表的schema
// 一般表的schema是和DataFrame是一致的,字段的類型是從spark sql的DataType翻譯到各個數據庫對應的數據類型
// 如果字段在數據庫中的類型不是你想要的,
// 你可以通過參數createTableColumnTypes來設置createTableColumnTypes=age long,name string
writeOpts = Map[String, String]("createTableColumnTypes" -> "id long,age long")
df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties)
//3: 事務隔離級別的設置,通過參數isolationLevel設置
// NONE 不支持事物
// READ_UNCOMMITTED 會出現臟讀、不可重復讀以及幻讀
// READ_COMMITTED 不會出現臟讀,但是還是會出現不可重復讀以及幻讀
// REPEATABLE_READ 不會出現臟讀以及不可重復讀,但是還會出現幻讀
// SERIALIZABLE 臟讀、不可重復讀以及幻讀都不會出現了
writeOpts = Map[String, String]("isolationLevel" -> "READ_UNCOMMITTED")
df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties)
//4:寫數據
//寫數據的過程中可以采用批量寫數據,每一批寫的數據量的大小可以通過參數batchsize設置,默認是:1000
writeOpts = Map[String, String]("batchsize" -> "100")
df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties)
//5:第二次寫數據的時候,這個時候表已經存在了,所以需要區分SaveMode
//當SaveMode=Overwrite 的時候,需要先清理表,然后再寫數據。清理表的方法又分兩種:
// 第一種是truncate即清空表,如果是這種的話,則先清空表,然后再寫數據
// 第二種是drop掉表,如果是這種的話,則先drop表,然后建表,最后寫數據
//以上兩種方式的選擇,可以通過參數truncate(默認是false)控制。因為truncate清空數據可能會失敗,所以可以使用drop table的方式
//而且不是所有的數據庫都支持truncate table,其中PostgresDialect就不支持
//當SaveMode=Append 的時候,則直接寫數據就行
//當SaveMode=ErrorIfExists 的時候,則直接拋異常
//當SaveMode=Ignore 的時候,則直接不做任何事情
writeOpts = Map[String, String]("truncate" -> "false")
df.write.mode(SaveMode.Overwrite).options(writeOpts).jdbc(mysqlUrl, "person", properties)
//按照某個分區字段進行分區讀數據
//partitionColumn 分區的字段,這個字段必須是integral類型的
//lowerBound 用於決定分區步數的partitionColumn的最小值
//upperBound 用於決定分區步數的partitionColumn的最大值
//numPartitions 分區數,和lowerBound以及upperBound一起來為每一個分區生成sql的where字句
//如果upperBound - lowerBound >= numPartitions,那么我們就取numPartitions個分區,
// 否則我們取upperBound - lowerBound個分區數
// 8 - 3 = 5 > 3 所以我們取3個分區
// where id < 3 + 1 這個1是通過 8/3 - 3/3 = 1得來的
// where id >= 3 + 1 and id < 3 + 1 + 1
// where id >= 3 + 1 + 1
//配置的方式
val readOpts = Map[String, String]("numPartitions" -> "3", "partitionColumn" -> "id",
"lowerBound" -> "3", "upperBound" -> "8", "fetchsize" -> "100")
val jdbcDF = spark.read.options(readOpts).jdbc(mysqlUrl, "person", properties)
jdbcDF.rdd.partitions.size
jdbcDF.rdd.glom().collect()
jdbcDF.show()
//api的方式
spark.read.jdbc(mysqlUrl, "person", "id", 3, 8, 3, properties).show()
//參數predicates: Array[String],用於決定每一個分區對應的where子句,分區數就是數組predicates的大小
val conditionDF = spark.read.jdbc(mysqlUrl,
"person", Array("id > 2 and id < 5", "id >= 5 and id < 8"), properties)
conditionDF.rdd.partitions.size
conditionDF.rdd.glom().collect() ////查看分區及數據
conditionDF.show() ////查看所有數據
//每次讀取的時候,可以采用batch的方式讀取數據,batch的數量可以由參數fetchsize來設置。默認為:0,表示jdbc的driver來估計這個batch的大小
//不管是讀還是寫,都有分區數的概念,
// 讀的時候是通過用戶設置numPartitions參數設置的,
// 而寫的分區數是DataFrame的分區數
//需要注意一點的是不管是讀還是寫,每一個分區都會打開一個jdbc的連接,所以分區不宜太多,要不然的話會搞垮數據庫
//寫的時候,可以通過DataFrame的coalease接口來減少分區數
spark.stop()
}
}