- 文本文件
將一個文本文件讀取為RDD時,輸入的每一行都會成為RDD的一個元素。也可以將多個完整的文本文件一次性讀取為一個pairRDD, 其中鍵是文件名,值是文件內容。
// 讀取文本文件
val input = sc.textFile("filePath")
// 保存為文本文件
result.savaAsTextFile(outputFile)
- JSON
讀取Json最簡單的方法是將數據作為文本文件讀取,然后使用Json解析器來對RDD中的值進行映射操作。Json的使用需要依賴第三方類庫,scala中可以使用Jackson。
import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.DeserializationFeature
... case class Person(name: String, lovesPandas: Boolean) //Must be a top-level class
... // Parse it into a specific case class. We use flatMap to handle errors // by returning an empty list (None) if we encounter an issue and a // list with one element if everything is ok (Some(_)). val result = input.flatMap(record => { try { Some(mapper.readValue(record, classOf[Person])) } catch { case e: Exception => None }
})
保存成Json。
result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
.saveAsTextFile(outputFile)
也可以使用Spark SQL讀取Json。
val input = hiveContext.jsonFile(inputFile)
- CSV
讀取CSV/TSV數據與Json類似,都需要先作為普通文本來讀取數據,再對數據進行處理。與Json一樣,CSV也有很多庫,scala中可以使用opencsv。
import Java.io.StringReader import au.com.bytecode.opencsv.CSVReader ... val input = sc.textFile(inputFile) val result = input.map{line => val reader = new CSVReader(new StringReader(line)); reader.readNext(); }
- SequenceFile
// 讀取SequenceFile
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]). map{case (x,y) => (x.toString, y.get())}
// 保存成SequenceFile
val data = sc.parallelize(List(("a",1),("b",2),("c",3)))
data.saveAsSequenceFile(outputFile)
- DataFrame
// DataFrame保存成csv文件 dataFrame.write.format("csv").save("path")
