項目實戰從0到1之Spark(2)Spark讀取和存儲HDFS上的數據


本篇來介紹一下通過Spark來讀取和HDFS上的數據,主要包含四方面的內容:將RDD寫入HDFS、讀取HDFS上的文件、將HDFS上的文件添加到Driver、判斷HDFS上文件路徑是否存在。

1、啟動Hadoop

首先啟動咱們的Hadoop,在hadoop的目錄下執行下面的命令:

rm -rf tmp 
mkdir tmp
cd sbin
hadoop namenode -format
start-dfs.sh
start-yarn.sh

查看是否啟動成功:

 
 

2、將RDD寫入HDFS

先創建一個SparkSession:

val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .enableHiveSupport()
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

將RDD寫入HDFS使用的函數是saveAsTextFile:

val modelNames = Array("FM","FFM","DEEPFM","NFM","DIN","DIEN")
val modelNamesRdd = spark.sparkContext.parallelize(modelNames,1)
modelNamesRdd.saveAsTextFile("hdfs://localhost:9000/user/root/modelNames")

接下來,我們查看一下是否保存成功:

 
 

可以看到RDD在HDFS上是分塊存儲的,由於我們只有一個分區,所以只有part-0000。假設我們存儲一個包含兩個分區的RDD:

val modelNames3 = Array("FM","FFM","DEEPFM","NFM","DIN","DIEN")
val modelNames3Rdd = spark.sparkContext.parallelize(modelNames3,2)

modelNames3Rdd.saveAsTextFile("hdfs://localhost:9000/user/root/modelNames3")

再次查看,可以看到有part-00000和part-00001:

 
 

3、讀取HDFS上的文件

讀取HDFS上的文件,使用textFile方法:

 val modelNames2 = spark.sparkContext.textFile("hdfs://localhost:9000/user/root/modelNames/part-00000")

val modelNames4 = spark.sparkContext.textFile("hdfs://localhost:9000/user/root/modelNames3/")

讀取時是否加最后的part-00000都是可以的,當只想讀取某個part,則必須加上。

4、將HDFS上的文件添加到Driver

有時候,我們並不想直接讀取HDFS上的文件,而是想對應的文件添加到Driver上,然后使用java或者Scala的I/O方法進行讀取,此時使用addFile和get方法來實現:

val files = "hdfs://localhost:9000/user/root/modelNames/part-00000"
spark.sparkContext.addFile(files)
val path = SparkFiles.get("part-00000")
println(path)

打印的路徑十分奇怪,沒有截取完全:

 
 

然后有了path之后,就可以使用scala的I/O進行讀取:

val source = Source.fromFile(path)
val lineIterator = source.getLines
val lines =lineIterator.toArray
println(lines.mkString(","))

輸出為:

FM,FFM,DEEPFM,NFM,DIN,DIEN

5、判斷HDFS上文件路徑是否存在

在讀取HDFS地址或者將文件傳輸到Driver上的時候,首先需要判斷文件是否存在。單機環境下,代碼如下:

val conf = spark.sparkContext.hadoopConfiguration

val path = new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames/part-00000")
val fs = path.getFileSystem(conf) //得hdfs文件系統中的路徑信息

val modelNamesExists = fs.exists(path)
val modelNames1Exists = fs.exists(new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames1/part-00000"))

println(modelNamesExists)
println(modelNames1Exists)

輸出結果為:

true
false

而在公司中的大規模集群環境下,通常的代碼如下:

val conf = spark.sparkContext.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)

val modelNamesExists = fs.exists(new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames/part-00000"))
val modelNames1Exists = fs.exists(new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames1/part-00000"))

println(modelNamesExists)
println(modelNames1Exists)

如果在本地單機環境下仍然使用上面的代碼,會報如下的錯誤:

Wrong FS: hdfs://localhost:9000/user/root/modelNames/part-00000, expected: file:///

所以對比兩份代碼你可以發現,在本地環境中,我們首先使用getFileSystem獲取了hdfs文件系統中的路徑信息,從而避免了上面的錯誤。

好了,今天的知識就分享到這里,小伙伴們都掌握了么?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM