1.先上代碼,存入mysql
val spark = SparkSession.builder() .appName("jdbc") .getOrCreate() import spark.implicits._ val pathcsv = "/user/xxx/private/moviecsv" val csvdf = spark.read.format("csv") .option("sep", ",") .option("inferSchema", "true") .option("header", "true") .load(pathcsv) csvdf.write .format("jdbc") .mode(SaveMode.Overwrite) .option("url", "jdbc:mysql://ip:port/database?useUnicode=true&characterEncoding=utf-8") .option("dbtable", "database.table") .option("user", "root") .option("password", "123456") .option("batchsize", "1000")
.option("createTableOptions", "engine=MyISAM")
.option("createTableColumnTypes", "Ratings varchar(200)")
.option("truncate", "true") .option("numPartitions", "20") .save() spark.stop()
參數解釋
url: 連接mysql 的url
user:登陸mysql的用戶
password :登陸密碼
dbtable: 要訪問的數據庫 點 表
batchsize : 當條數達到batchsize時會往mysql插入一次數據
truncate : 當savemode是 overwrite時,若dataframe 與原mysql 結構相同,則只truncate mysql,不會重新建表
numPartions : 訪問mysql的並發數,注意:當dataframe的分區數小於numPartitions 時,並發數是dataframe的分區數,否則並發數是numPartitions
createTableOptions:建表時的一些額外選項,比如指定engine, "engine=MyISAM",源碼中是這樣處理createTableOptions 的val sql = s"CREATE TABLE $table ($strSchema) $createTableOptions"
createTableColumnTypes:可以用來替換默認的字段類型,比如name 字段默認是text 類型,可以手動指定為 “name varchar(200)”
2.spark 讀取mysql
val jdbccdf = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.100.200:1234/data_store?useUnicode=true&characterEncoding=utf-8") .option("dbtable", "data_store.movieaa") .option("user", "root") .option("password", "12345") .option("fetchsize", "1000") .option("truncate", "true") .option("numPartitions", "6") .option("partitionColumn", "UserID") .option("lowerBound", "1000") .option("upperBound", "6000") .load()
dbtable除了1 中的寫法還可以是一個 query :option("dbtable", "(select * from data_store.movieaa where userid between 1000 and 2000) as tt")
partitionColumn:分區列,只支持數值類型,當此參數存在時,lowerBound 和upperBound 必須存在,bound 和numpartition 起到了分區的作用,bound 並不會去過濾數據