spark文件讀取與保存(scala實現)


  • 文本文件

將一個文本文件讀取為RDD時,輸入的每一行都會成為RDD的一個元素。也可以將多個完整的文本文件一次性讀取為一個pairRDD, 其中鍵是文件名,值是文件內容。

// 讀取文本文件
val input = sc.textFile("filePath")
// 保存為文本文件
result.savaAsTextFile(outputFile)
  • JSON

讀取Json最簡單的方法是將數據作為文本文件讀取,然后使用Json解析器來對RDD中的值進行映射操作。Json的使用需要依賴第三方類庫,scala中可以使用Jackson。

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
...
case class Person(name: String, lovesPandas: Boolean) //Must be a top-level class
...
// Parse it into a specific case class. We use flatMap to handle errors // by returning an empty list (None) if we encounter an issue and a // list with one element if everything is ok (Some(_)). val result = input.flatMap(record => {   try {     Some(mapper.readValue(record, classOf[Person]))   } catch {     case e: Exception => None   }
})

保存成Json。

result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
.saveAsTextFile(outputFile)

也可以使用Spark SQL讀取Json。

val input = hiveContext.jsonFile(inputFile)
  • CSV

讀取CSV/TSV數據與Json類似,都需要先作為普通文本來讀取數據,再對數據進行處理。與Json一樣,CSV也有很多庫,scala中可以使用opencsv。

import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input = sc.textFile(inputFile)
val result = input.map{line =>
  val reader = new CSVReader(new StringReader(line));
  reader.readNext();
}
  • SequenceFile
// 讀取SequenceFile
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).   map{case (x,y) => (x.toString, y.get())}
// 保存成SequenceFile
val data = sc.parallelize(List(("a",1),("b",2),("c",3)))
data.saveAsSequenceFile(outputFile)
  • DataFrame
// DataFrame保存成csv文件
dataFrame.write.format("csv").save("path")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM