本篇來介紹一下通過Spark來讀取和HDFS上的數據,主要包含四方面的內容:將RDD寫入HDFS、讀取HDFS上的文件、將HDFS上的文件添加到Driver、判斷HDFS上文件路徑是否存在。
1、啟動Hadoop
首先啟動咱們的Hadoop,在hadoop的目錄下執行下面的命令:
rm -rf tmp
mkdir tmp
cd sbin
hadoop namenode -format
start-dfs.sh
start-yarn.sh
查看是否啟動成功:

2、將RDD寫入HDFS
先創建一個SparkSession:
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
將RDD寫入HDFS使用的函數是saveAsTextFile:
val modelNames = Array("FM","FFM","DEEPFM","NFM","DIN","DIEN")
val modelNamesRdd = spark.sparkContext.parallelize(modelNames,1)
modelNamesRdd.saveAsTextFile("hdfs://localhost:9000/user/root/modelNames")
接下來,我們查看一下是否保存成功:

可以看到RDD在HDFS上是分塊存儲的,由於我們只有一個分區,所以只有part-0000。假設我們存儲一個包含兩個分區的RDD:
val modelNames3 = Array("FM","FFM","DEEPFM","NFM","DIN","DIEN")
val modelNames3Rdd = spark.sparkContext.parallelize(modelNames3,2)
modelNames3Rdd.saveAsTextFile("hdfs://localhost:9000/user/root/modelNames3")
再次查看,可以看到有part-00000和part-00001:

3、讀取HDFS上的文件
讀取HDFS上的文件,使用textFile方法:
val modelNames2 = spark.sparkContext.textFile("hdfs://localhost:9000/user/root/modelNames/part-00000")
val modelNames4 = spark.sparkContext.textFile("hdfs://localhost:9000/user/root/modelNames3/")
讀取時是否加最后的part-00000都是可以的,當只想讀取某個part,則必須加上。
4、將HDFS上的文件添加到Driver
有時候,我們並不想直接讀取HDFS上的文件,而是想對應的文件添加到Driver上,然后使用java或者Scala的I/O方法進行讀取,此時使用addFile和get方法來實現:
val files = "hdfs://localhost:9000/user/root/modelNames/part-00000"
spark.sparkContext.addFile(files)
val path = SparkFiles.get("part-00000")
println(path)
打印的路徑十分奇怪,沒有截取完全:

然后有了path之后,就可以使用scala的I/O進行讀取:
val source = Source.fromFile(path)
val lineIterator = source.getLines
val lines =lineIterator.toArray
println(lines.mkString(","))
輸出為:
FM,FFM,DEEPFM,NFM,DIN,DIEN
5、判斷HDFS上文件路徑是否存在
在讀取HDFS地址或者將文件傳輸到Driver上的時候,首先需要判斷文件是否存在。單機環境下,代碼如下:
val conf = spark.sparkContext.hadoopConfiguration
val path = new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames/part-00000")
val fs = path.getFileSystem(conf) //得hdfs文件系統中的路徑信息
val modelNamesExists = fs.exists(path)
val modelNames1Exists = fs.exists(new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames1/part-00000"))
println(modelNamesExists)
println(modelNames1Exists)
輸出結果為:
true
false
而在公司中的大規模集群環境下,通常的代碼如下:
val conf = spark.sparkContext.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val modelNamesExists = fs.exists(new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames/part-00000"))
val modelNames1Exists = fs.exists(new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames1/part-00000"))
println(modelNamesExists)
println(modelNames1Exists)
如果在本地單機環境下仍然使用上面的代碼,會報如下的錯誤:
Wrong FS: hdfs://localhost:9000/user/root/modelNames/part-00000, expected: file:///
所以對比兩份代碼你可以發現,在本地環境中,我們首先使用getFileSystem獲取了hdfs文件系統中的路徑信息,從而避免了上面的錯誤。
好了,今天的知識就分享到這里,小伙伴們都掌握了么?