一、動機
我們已經學了很多在 Spark 中對已分發的數據執行的操作。到目前為止,所展示的示例都是從本地集合或者普通文件中進行數據讀取和保存的。但有時候,數據量可能大到無法放在一台機器中,這時就需要探索別的數據讀取和保存的方法了。
Spark 及其生態系統提供了很多可選方案。本章會介紹以下三類常見的數據源。
• 文件格式與文件系統:對於存儲在本地文件系統或分布式文件系統(比如 NFS、HDFS、Amazon S3 等)中的數據,Spark 可以訪問很多種不同的文件格式,包括文本文件、JSON、SequenceFile,以及 protocol buffer。我們會展示幾種常見格式的用法,以及 Spark 針對不同文件系統的配置和壓縮選項。
• Spark SQL中的結構化數據源:后面會學習 Spark SQL 模塊,它針對包括 JSON 和 Apache Hive 在內的結構化數據源,為我們提供了一套更加簡潔高效的 API。此處會粗略地介紹一下如何使用 SparkSQL。
• 數據庫與鍵值存儲:概述 Spark 自帶的庫和一些第三方庫,它們可以用來連接 Cassandra、HBase、Elasticsearch 以及 JDBC 源。
二、文件格式
Spark 對很多種文件格式的讀取和保存方式都很簡單。從諸如文本文件的非結構化的文件,到諸如 JSON 格式的半結構化的文件,再到諸如 SequenceFile 這樣的結構化的文件,Spark都可以支持(見表)。Spark 會根據文件擴展名選擇對應的處理方式。這一過程是封裝好的,對用戶透明。

1、文本文件
在 Spark 中讀寫文本文件很容易。當我們將一個文本文件讀取為 RDD 時,輸入的每一行都會成為 RDD 的一個元素。( SparkContext.wholeTextFiles() 方法)也可以將多個完整的文本文件一次性讀取為一個 pair RDD,其中鍵是文件名,值是文件內容。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object Test {
def main(args: Array[String]): Unit = {
// Scala 中讀取一個文本文件
val conf = new SparkConf().setAppName("wordcount").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") // 設置日志顯示級別
val input = sc.textFile("words.txt")
input.foreach(println)
}
}

2、保存文本文件
輸出文本文件也相當簡單。saveAsTextFile(outputFile) 方法接收一個路徑,並將RDD 中的內容都輸入到路徑對應的文件中。Spark 將傳入的路徑作為目錄對待,會在那個目錄下輸出多個文件。這樣,Spark 就可以從多個節點上並行輸出了。在這個方法中,我們不能控制數據的哪一部分輸出到哪個文件中,不過有些輸出格式支持控制。
3、讀取JSON
JSON 是一種使用較廣的半結構化數據格式。這里有兩種方式解析JSON數據,一種是通過Scala自帶的JSON包(import scala.util.parsing.json.JSON)。后面還會展示使用Spark SQL讀取JSON數據。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import scala.util.parsing.json.JSON
object Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("JSONTest").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") // 設置日志顯示級別
val inputFile = "pandainfo.json"//讀取json文件
val jsonStr = sc.textFile(inputFile);
val result = jsonStr.map(s => JSON.parseFull(s));//逐個JSON字符串解析
result.foreach(
{
r => r match {
case Some(map:Map[String,Any]) => println(map)
case None => println("parsing failed!")
case other => println("unknown data structure" + other)
}
}
);
}
}


第二種方法是通過json4s來解析JSON文件。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.json4s.jackson.Serialization
import org.json4s.ShortTypeHints
import org.json4s.jackson.JsonMethods._
import org.json4s.DefaultFormats
object Test {
def main(args: Array[String]): Unit = {
// 第二種方法解析json文件
val conf = new SparkConf().setAppName("wordcount").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") // 設置日志顯示級別
implicit val formats = Serialization.formats(ShortTypeHints(List()))
val input = sc.textFile("pandainfo.json")
input.collect().foreach(x=>{
var c = parse(x).extract[Panda]
println(c.name+","+c.lovesPandas)
})
case class Panda(name:String,lovesPandas:Boolean)
}
}

4、保存JSON
寫出 JSON 文件比讀取它要簡單得多。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.json4s.jackson.Serialization
import org.json4s.ShortTypeHints
import org.json4s.jackson.JsonMethods._
import org.json4s.DefaultFormats
object Test {
def main(args: Array[String]): Unit = {
// 第二種方法解析json文件
val conf = new SparkConf().setAppName("wordcount").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") // 設置日志顯示級別
implicit val formats = Serialization.formats(ShortTypeHints(List()))
val input = sc.textFile("pandainfo.json")
input.collect().foreach(x=>{
var c = parse(x).extract[Panda]
println(c.name+","+c.lovesPandas)
})
case class Panda(name:String,lovesPandas:Boolean)
// 保存json
val datasave = input.map{ myrecord =>
implicit val formats = DefaultFormats
val jsonObj = parse(myrecord)
jsonObj.extract[Panda]
}
datasave.saveAsTextFile("savejson")
}
}

5、逗號分隔值與制表符分隔值
逗號分隔值(CSV)文件每行都有固定數目的字段,字段間用逗號隔開(在制表符分隔值文件,即 TSV 文件中用制表符隔開)。記錄通常是一行一條,不過也不總是這樣,有時也可以跨行。讀取 CSV/TSV 數據和讀取 JSON 數據相似,都需要先把文件當作普通文本文件來讀取數據,再對數據進行處理。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import au.com.bytecode.opencsv.CSVReader
import java.io.StringReader
import java.io.StringWriter
import au.com.bytecode.opencsv.CSVWriter
object Test {
def main(args: Array[String]): Unit = {
// 在Scala中使用textFile()讀取CSV
val conf = new SparkConf().setAppName("wordcount").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") // 設置日志顯示級別
val inputFile = "favourite_animals.csv"//讀取csv文件
val input = sc.textFile(inputFile)
val result = input.map{
line => val reader = new CSVReader(new StringReader(line))
reader.readNext()
}
// result.foreach(println)
for(res <- result)
for(r <- res)
println(r)
}
}


6、SequenceFile
SequenceFile 是由沒有相對關系結構的鍵值對文件組成的常用 Hadoop 格式。SequenceFile文件有同步標記,Spark 可以用它來定位到文件中的某個點,然后再與記錄的邊界對齊。這可以讓 Spark 使用多個節點高效地並行讀取 SequenceFile 文件。SequenceFile 也是Hadoop MapReduce 作業中常用的輸入輸出格式,所以如果你在使用一個已有的 Hadoop 系統,數據很有可能是以 SequenceFile 的格式供你使用的。
由於 Hadoop 使用了一套自定義的序列化框架,因此 SequenceFile 是由實現 Hadoop 的 Writable接口的元素組成。下表 列出了一些常見的數據類型以及它們對應的 Writable 類。標准的經驗法則是嘗試在類名的后面加上 Writable 這個詞,然后檢查它是否是 org.apache.hadoop.io.Writable 已知的子類。如果你無法為要寫出的數據找到對應的 Writable 類型(比如自定義的 case class),你可以通過重載 org.apache.hadoop.io.Writable 中的 readfields 和 write 來實現自己的 Writable 類。

讀取SequenceFile:Spark 有專門用來讀取 SequenceFile 的接口。在 SparkContext 中,可以調用 sequenceFile(path,keyClass, valueClass, minPartitions) 。前面提到過,SequenceFile 使用 Writable 類,因此 keyClass 和 valueClass 參數都必須使用正確的 Writable 類。
保存SequenceFile:在 Scala 中將數據寫出到 SequenceFile 的做法也很類似。可以直接調用 saveSequenceFile(path) 保存你的 PairRDD ,它會幫你寫出數據。
7、對象文件
對象文件看起來就像是對 SequenceFile 的簡單封裝,它允許存儲只包含值的 RDD。和SequenceFile 不一樣的是,對象文件是使用 Java 序列化寫出的。要保存對象文件,只需在 RDD 上調用 saveAsObjectFile 就行了。讀回對象文件也相當簡單:用 SparkContext 中的 objectFile() 函數接收一個路徑,返回對應的 RDD。
