數據源-基本操作load和save
object BasicTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("BasicTest")
.master("local")
.getOrCreate()
//最基本的讀取(load)和保存(write)操作,操作的文件的數據格式默認是parquet
val sessionDF = spark.read.load(s"${BASE_PATH}/trackerSession")
sessionDF.show()
sessionDF.select("ip", "cookie").write.save(s"${BASE_PATH}/trackerSession_ip_cookie")
//可以讀取多個文件目錄下的數據文件
val multiSessionDF = spark.read.load(s"${BASE_PATH}/trackerSession",
s"${BASE_PATH}/trackerSession_ip_cookie")
multiSessionDF.show()
//讀取的時候指定schema
val schema = StructType(StructField("ip", StringType) :: Nil)
val specSessionDF = spark.read.schema(schema).load(s"${BASE_PATH}/trackerSession")
specSessionDF.show()
//指定數據源數據格式
//讀取json文件, 且將讀取出來的數據保存為parquet文件
val deviceInfoDF = spark.read.format("json").load(s"${BASE_PATH}/IoT_device_info.json")
spark.read.json(s"${BASE_PATH}/IoT_device_info.json").show()
deviceInfoDF.write.format("orc").save(s"${BASE_PATH}/iot")
deviceInfoDF.write.orc(s"${BASE_PATH}/iot2")
//option傳遞參數,改變讀寫數據源的行為
spark.read.option("mergeSchema", "true").parquet(s"${BASE_PATH}/trackerSession")
deviceInfoDF.write.option("compression", "snappy").parquet(s"${BASE_PATH}/iot2_parquet")
val optsMap = Map("mergeSchema" -> "mergeSchema")
spark.read.options(optsMap).parquet("")
//SaveMode
//SaveMode.ErrorIfExists(對應着字符串"error"):表示如果目標文件目錄中數據已經存在了,則拋異常(這個是默認的配置)
//SaveMode.Append(對應着字符串"append"):表示如果目標文件目錄中數據已經存在了,則將數據追加到目標文件中
//SaveMode.Overwrite(對應着字符串"overwrite"):表示如果目標文件目錄中數據已經存在了,則用需要保存的數據覆蓋掉已經存在的數據
//SaveMode.Ignore(對應着字符串為:"ignore"):表示如果目標文件目錄中數據已經存在了,則不做任何操作
deviceInfoDF.write.option("compression", "snappy").mode(SaveMode.Ignore).parquet(s"${BASE_PATH}/iot/iot2_parquet")
spark.read.parquet(s"${BASE_PATH}/iot/iot2_parquet").show()
deviceInfoDF.write.option("compression", "snappy").mode("ignore").parquet(s"${BASE_PATH}/iot/iot2_parquet")
spark.stop()
}
}
