數據源-基本操作load和save
object BasicTest { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("BasicTest") .master("local") .getOrCreate() //最基本的讀取(load)和保存(write)操作,操作的文件的數據格式默認是parquet val sessionDF = spark.read.load(s"${BASE_PATH}/trackerSession") sessionDF.show() sessionDF.select("ip", "cookie").write.save(s"${BASE_PATH}/trackerSession_ip_cookie") //可以讀取多個文件目錄下的數據文件 val multiSessionDF = spark.read.load(s"${BASE_PATH}/trackerSession", s"${BASE_PATH}/trackerSession_ip_cookie") multiSessionDF.show() //讀取的時候指定schema val schema = StructType(StructField("ip", StringType) :: Nil) val specSessionDF = spark.read.schema(schema).load(s"${BASE_PATH}/trackerSession") specSessionDF.show() //指定數據源數據格式 //讀取json文件, 且將讀取出來的數據保存為parquet文件 val deviceInfoDF = spark.read.format("json").load(s"${BASE_PATH}/IoT_device_info.json") spark.read.json(s"${BASE_PATH}/IoT_device_info.json").show() deviceInfoDF.write.format("orc").save(s"${BASE_PATH}/iot") deviceInfoDF.write.orc(s"${BASE_PATH}/iot2") //option傳遞參數,改變讀寫數據源的行為 spark.read.option("mergeSchema", "true").parquet(s"${BASE_PATH}/trackerSession") deviceInfoDF.write.option("compression", "snappy").parquet(s"${BASE_PATH}/iot2_parquet") val optsMap = Map("mergeSchema" -> "mergeSchema") spark.read.options(optsMap).parquet("") //SaveMode //SaveMode.ErrorIfExists(對應着字符串"error"):表示如果目標文件目錄中數據已經存在了,則拋異常(這個是默認的配置) //SaveMode.Append(對應着字符串"append"):表示如果目標文件目錄中數據已經存在了,則將數據追加到目標文件中 //SaveMode.Overwrite(對應着字符串"overwrite"):表示如果目標文件目錄中數據已經存在了,則用需要保存的數據覆蓋掉已經存在的數據 //SaveMode.Ignore(對應着字符串為:"ignore"):表示如果目標文件目錄中數據已經存在了,則不做任何操作 deviceInfoDF.write.option("compression", "snappy").mode(SaveMode.Ignore).parquet(s"${BASE_PATH}/iot/iot2_parquet") spark.read.parquet(s"${BASE_PATH}/iot/iot2_parquet").show() deviceInfoDF.write.option("compression", "snappy").mode("ignore").parquet(s"${BASE_PATH}/iot/iot2_parquet") spark.stop() } }