1 package code.parquet 2 3 import java.net.URI 4 5 import org.apache.hadoop.conf.Configuration 6 import org.apache.hadoop.fs.{Path, FileSystem} 7 import org.apache.spark.sql.{SaveMode, SparkSession} 8 9 /** 10 * Created by zhen on 2018/12/11. 11 */ 12 object ParquetIO { 13 // 指定hdfs根節點 14 private val hdfsRoot = "hdfs://172.20.32.163:8020" 15 // 獲取HDFS路徑 16 def getPath(path: String): Path = { 17 if (path.toLowerCase().startsWith("hdfs://")) { 18 new Path(path) 19 } else { 20 new Path(hdfsRoot + path) 21 } 22 } 23 def main(args: Array[String]) { 24 val spark = SparkSession.builder().appName("parquet").master("local[2]").getOrCreate() 25 spark.sparkContext.setLogLevel("WARN") // 設置日志級別為WARN 26 val fsUri = new URI(hdfsRoot) 27 val fs = FileSystem.get(fsUri, new Configuration()) 28 val path = hdfsRoot + "/YXFK/compute/KH_JLD" 29 val has = fs.exists(getPath(path)) 30 if(has){ 31 // 讀取hdfs文件系統parquet數據 32 val dataFrame = spark.read.parquet(path) 33 dataFrame.show(10) 34 // 篩選,過濾數據 35 val result = dataFrame.select("JLDBH", "JLDDZ", "JLDMC", "JLFSDM", "CJSJ") 36 .filter("JLDDZ is not null AND JLFSDM = 3") 37 .sort("JLDBH") 38 result.show(10) 39 // 寫入部分數據到本地 40 result.write.mode(SaveMode.Overwrite).parquet("E:\\result") 41 } 42 // 讀取本地parquet數據 43 val localDataFrame = spark.read.parquet("E:\\jld.parquet") 44 localDataFrame.show(10) 45 // 讀取寫入數據驗證 46 val resultSpace = spark.read.parquet("E:\\result") 47 resultSpace.show(10) 48 } 49 }
結果:
分析:Spark讀取parquet數據默認為目錄,因此可以只指定到你要讀取的上級目錄即可(本地模式除外),當保存為parquet時,會自動拆分,因此只能指定為上級目錄。