SequenceFiles(序列文件)
SequenceFile是Hadoop的一種由鍵值對小文件組成的流行的格式。SequenceFIle有同步標記,Spark可以尋找標記點,然后與記錄邊界重新同步。Spark還可以從多個節點並行高效地讀取SequenceFile。SequenceFile也是Hadoop MapReduce中job的常用輸入輸出格式,如果你正使用着Hadoop系統,數據很有可能就是SequenceFile格式的。
SequenceFile是由實現HadoopWritabble
接口的元素組成的,Hadoop使用的是自定義的序列化框架。表5-2列出了一些常用的類型和他們對應的Writable
類。有個小經驗,在你的類名后面加上Wratable
,然后檢查這個類是否屬於org.apache.hadoop.io.Writable
包下的子類。如果你的輸入數據不屬於Writable
子類,你可以繼承org.apache.hadoop.io.Writable
類並重寫readFields
和write
方法就可以了。
Hadoop的RecordReader為每條記錄重用相同的對象,因此直接調用您讀取RDD上的緩存可能會失敗;相反,添加一個
map()
操作並緩存其結果。除此之外,很多Hadoop的Writable
類沒有實現java.io.Serializable
,所以,為了讓他們可以使用我們需要用map()
轉換他們。
在Spark1.0或更早的版本,只有Java和Scala可以使用SequenceFile,但是從Spark1.1開始,添加了Python也能加載保存SequenceFile的功能。注意一點,你在使用Java和Scala時需要自定義Writable
類型。Python Spark API只知道如何將Hadoop中可用的基本Writable
轉換為Python所使用,對於其他類,API基於可用的getter方法盡最大努力使其正常運行。
Loading SequenceFiles(加載SequenceFile)
對於SequenceFile,Spark定制了一套API。我們可以在SparkContext
上調用sequenceFile(path, keyClass, valueClass, minPartitions)
。之前提到過,SequenceFile必須結合Writable
類使用,所以我們鍵和值的類必須都是正確的Writable
類。想象一個從SequenceFile加載人和他們看過熊貓數量的程序,這個例子中,鍵的類是Text
,值的類是IntWritable
或VIntWritable
,為了簡化工作,就使用IntWritable
。示例如下:
Example 5-20. Loading a SequenceFile in Python
val data = sc.sequenceFile(inFile,
"org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
Example 5-21. Loading a SequenceFile in Scala
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).
map{case (x, y) => (x.toString, y.get())}
Example 5-22. Loading a SequenceFile in Java
public static class ConvertToNativeTypes implements
PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
return new Tuple2(record._1.toString(), record._2.get());
}
}
JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class,IntWritable.class);
JavaPairRDD<String, Integer> result = input.mapToPair(
new ConvertToNativeTypes());
在Scala中,有一個很方便的函數能把
Writable
轉換成Scala中對應的類型。而不是如樣例那樣標明鍵類型和值類型,我們可以調用SequenceFile[Key,Value](path,minPartitions)
,這回返回一個Scala原生類型的RDD。
Saving SequenceFiles(保存SequenceFile)
把數據寫入一個SequenceFile也很類似。首先,因為SequenceFile是鍵值對類型的,我們需要一個鍵值對RDD便於SequenceFile寫入數據。對於Scala很多本地類型,存在在Scala和Hadoop Writable
類型之間的隱式轉換,所以如果你正寫出一個Scala原生類型數據你可以通過調用saveAsSequenceFile(path)
直接保存你的鍵值對RDD。如果我們使用的鍵和值不能自動轉換成Wratble
,或者我們想使用變長類型(如,VIntWritable
),我們可以在保存之前遍歷數據對其進行映射成Writable
類型。思考一下之前的例子(人和看熊貓數量的例子)。示例如下:
Example 5-23. Saving a SequenceFile in Scala
val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)
在Java中保存SequenceFile更麻煩一點,因為Java的鍵值對RDD沒有saveAsSequenceFile()
方法。我們需要使用Spark的特性去保存定制的Hadoop輸出格式,我們會在84頁“Hadoop輸入輸出格式中”展示如何在Java中保存SequenceFile。
Object Files(對象文件)
對象文件看起來是對SequenceFile簡單地包裝,它允許我們保存只包含值的RDD。和SequenceFile不同,對象文件值的寫出是利用了Java的序列化。
如果你改變了類,例如添加或刪除了字段,原來的對象文件就不可讀了。對象文件使用序列化有一個好處,它可以跨類的版本保持兼容性,但是需要程序員做一些工作。
和SequenceFile不同,同一個對象,對象文件的輸出結果可能和Hadoop的輸出結果不同。和其他輸出格式也不同,對象文件最主要的用處就是用於Spark job之間的通信。而且Java的序列化也可能非常慢。
保存對象文件很簡單,直接調用RDD的saveAsObjectFile
就可以了。讀取對象文件也很簡單:SparkContext有一個函數objectFile()
,它需要接收一個路徑參數,返回一個RDD。
上面講了很多對象文件的弊病,你可能會好奇為什么會有人使用它們。一個主要原因就是它幾乎不用費任何操作就可以保存任意的對象。
對象文件在Python中無法使用,但是Python的RDD和SparkContext支持名為saveAsPickleFile()
和pickleFile()
的方法。這兩個方法是用了Python的pickle
序列化庫。pickle
文件和對象文件的弊病是相同的,pickle庫很慢,並且如果對類做了改動,原來的類可能無法讀取。
Hadoop Input and Output Format(Hadoop輸入輸出格式)
除了Spark包裝的格式,我們還可以與所有Hadoop支持的格式進行交互。Spark支持新版和老版的Hadoop文件API,提供了很高的靈活性。
loading with other Hadoop input formats(加載其他Hadoop格式)
為了使用Hadoop的新版API讀取文件我們需要告訴Spark一些事情。newAPIHadoopFile
需要接收一個路徑,和三個類作為參數。第一個類是“格式”類,用來代表我們輸入的格式。有一個類似的函數hadoopFile()
,用來處理舊版API實現的Hadoop輸入格式。第二個類是鍵的類,第三個類是值的類。如果我們需要指定其他的Hadoop配置文件屬性,我們也可以傳入一個conf對象。
一個最簡單的Hadoop輸入格式就是KeyValueTextInputFormat
,可以用來從文本文件中讀取鍵值對數據(Example5-24中有示例)。每一行都是單獨處理的,鍵和值用tab鍵隔開。這種格式由Hadoop提供,所以我們項目中不必添加額外的依賴。
Example 5-24. Loading KeyValueTextInputFormat() with old-style API in Scala
val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{
case (x, y) => (x.toString, y.toString)
}
之前看到的加載JSON是通過加載文本文件然后再對其轉換,但是我們也可以使用Hadoop的輸入格式加載JSON。下面這個例子需要對要文件做一點特別設置,你選擇跳過也沒關系。Twitter的Elephant Bird 庫支持很多數據格式,包括JSON,Lucene,Protocol Buffer相關的格式等等。這個庫可以使用新版和老版API。為了展示如何而在Spark中使用新版API,我們在下面的示例中演示使用jsonInputFormat
加載LZO-compressed JSON數據:
Example 5-25. Loading LZO-compressed JSON with Elephant Bird in Scala
val input = sc.newAPIHadoopFile(inputFile, classOf[LzoJsonInputFormat],
classOf[LongWritable], classOf[MapWritable], conf)
// Each MapWritable in "input" represents a JSON object
//每一個"輸出"中的MapWritable都代表一個JSON對象
你需要下載hadoop-lzo包並且設置Spark的本地庫才能使用LZO包。如果你下載了Debian包,在Spark的submit調用中加入
--driver-library-path /usr/lib/hadoop/lib/native/ --driver-class-path /usr/lib/hadoop/lib/
就能做到這一點。
從使用的角度來看,使用舊版Hadoop的API讀取文件幾乎是沒有區別,除了我們提供的那個舊式inputFormat
類。Spark很多內置便捷的函數(如sequenceFile()
)都是實現的舊版Hadoop API。
Saving with Hadoop output formats(使用Hadoop輸出格式保存)
我們已經在某種程度上檢驗了SequenceFIle,但是在Java中我們沒有同樣方便的函數來保存鍵值對RDD。我們會在Example5-26中演示如何使用老版Hadoop格式API,新版本的(saveAsNewAPIHadoopFile)調用也一樣。
Example 5-26. Saving a SequenceFile in Java
public static class ConvertToWritableTypes implements
PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
return new Tuple2(new Text(record._1), new IntWritable(record._2));
}
}
JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(input);
JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
result.saveAsHadoopFile(fileName, Text.class, IntWritable.class,
SequenceFileOutputFormat.class);
Non-filesystem data sources(非文件系統數據源)
除了hadoopFile()
和saveAsHadoopFile()
一系列函數,你可以使用hadoopDataset/saveAsHadoopDataSet
和newAPIHadoopDataset/saveAsNewAPIHadoopDataset
來訪問Hadoop支持的非文件系統的存儲格式。舉例來講,很多鍵值對存儲系統(如HBase和MongoDB)提供了直接從鍵值對存儲中讀取的Hadoop輸入格式。在Spark中使用也很簡單。
hadoopDataset()
系列函數只需要一個Configuration
對象參數,你可以設置其訪問數據源所需要的Hadoop屬性。你按照執行Hadoop MapReduce job的配置配置即可,所以你可以按照說明訪問MapReduce的數據源,然后將對象傳遞給Spark。在96頁的“HBase”中展示了如何使用newAPIHadoopDataset
加載數據。
Example:Protocol buffers (例子:協議緩沖區)
Protocol buffers是由Google首先開發用來內部的遠程過程調用(RPC)並且此后開源了。Protocol buffers(PBs)是結構化數據,其字段和字段類型是明確定義的。他們被優化地便於編碼和解碼並且占用空間很小。與XML相比,PBs占用空間小3到10倍,編碼解碼速度快20到100倍。盡管PB有一致的編碼,但是有多種方式可以創建一個由很多PB消息組成的文件。
Protocol buffers是由特定領域語言定義的,並且protocol buffer編譯器可以用多種語言來生成訪問器方法(Spark支持的所有語言都可以)。由於PB以最小化占用空間為目標,所以它不是"自我解釋型的",因為對數據描述編碼會占用額外的空間。這意味着解析PB格式的數據,我們需要protocol buffer的定義來理解它。
PB包含optional(可選的),required(必須的)或者 repeated(重復的)字段。當你解析數據的時候如果丟失了可選的字段不會造成解析失敗,但是如果丟失了必須的字段會導致失敗。所以,當你在已有的protocol buffer中添加新字段,把新字段設為可選是一個好習慣。因為不是每個人都會同時升級(即使他們這樣做了,你也可能需要讀取老版的數據)。
PB字段可能是預定義的類型,也可能是其他的PB消息。包括String,int32,enums
等等類型。這只是對protocol buffer做了很簡單的介紹,所以如果你感興趣,最好去瀏覽一下Protocol Buffer的網站。
在Example5-27中,我們展示了從protocol buffer格式中加載一些VenueResponse
對象。例子中VenueResponse
對象是一個具有單個重復字段的簡單格式,而且還包含另一個有required(必須),optional(可選)和enumeration(枚舉)字段的消息。
Example 5-27. Sample protocol buffer definition
message Venue {
required int32 id = 1;
required string name = 2;
required VenueType type = 3;
optional string address = 4;
enum VenueType {
COFFEESHOP = 0;
WORKPLACE = 1;
CLUB = 2;
OMNOMNOM = 3;
OTHER = 4;
}
}
message VenueResponse {
repeated Venue results = 1;
}
我們在之前章節用來加載JSON數據的推特的Elephant Bird
庫,也支持加載保存protocol buffer格式的數據。下面例子演示將Venues寫出。
Example 5-28. Elephant Bird protocol buffer writeout in Scala
val job = new Job()
val conf = job.getConfiguration
LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue], conf);
val dnaLounge = Places.Venue.newBuilder()
dnaLounge.setId(1);
dnaLounge.setName("DNA Lounge")
dnaLounge.setType(Places.Venue.VenueType.CLUB)
val data = sc.parallelize(List(dnaLounge.build()))
val outputData = data.map{ pb =>
val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
protoWritable.set(pb)
(null, protoWritable)
}
outputData.saveAsNewAPIHadoopFile(outputFile, classOf[Text],
classOf[ProtobufWritable[Places.Venue]],
classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]], conf)
該例代碼的完整版本在本書的源碼中。(本書 github地址 databricks/learning-spark)
當你構建項目時,確保使用的Protocol buffer庫版本與Spark相同,撰寫本書時,版本是2.5
package com.oreilly.learningsparkexamples.proto;
message Venue {
required int32 id = 1;
required string name = 2;
required VenueType type = 3;
optional string address = 4;
enum VenueType {
COFFEESHOP = 0;
WORKPLACE = 1;
CLUB = 2;
OMNOMNOM = 3;
OTHER = 4;
}
}
message VenueResponse {
repeated Venue results = 1;
}
/**
* Saves a sequence file of people and how many pandas they have seen.
*/
package com.oreilly.learningsparkexamples.scala
import com.oreilly.learningsparkexamples.proto.Places
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.hadoop.io.Text
import com.twitter.elephantbird.mapreduce.io.ProtobufWritable
import com.twitter.elephantbird.mapreduce.output.LzoProtobufBlockOutputFormat
import org.apache.hadoop.conf.Configuration
object BasicSaveProtoBuf {
def main(args: Array[String]) {
val master = args(0)
val outputFile = args(1)
val sc = new SparkContext(master, "BasicSaveProtoBuf", System.getenv("SPARK_HOME"))
val conf = new Configuration()
LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue], conf);
val dnaLounge = Places.Venue.newBuilder()
dnaLounge.setId(1);
dnaLounge.setName("DNA Lounge")
dnaLounge.setType(Places.Venue.VenueType.CLUB)
val data = sc.parallelize(List(dnaLounge.build()))
val outputData = data.map{ pb =>
val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
protoWritable.set(pb)
(null, protoWritable)
}
outputData.saveAsNewAPIHadoopFile(outputFile, classOf[Text], classOf[ProtobufWritable[Places.Venue]],
classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]], conf)
}
}
File Compression(文件壓縮)
當我們處理大數據時,我們經常發現需要壓縮數據來節省存儲空間和網絡開銷。對於大多數大數據格式,我們可以制定壓縮編碼器來壓縮數據。我們之前已經見識到,Spark原生的輸入格式可以自動為我們處理壓縮類型。當你讀取壓縮文件時,有些壓縮編碼器可以自動判斷壓縮類型。
這些壓縮選項僅適用於支持壓縮的Hadoop格式,即寫入文件系統的格式。數據庫Hadoop格式通常不支持壓縮,或者它們在數據庫自身配置了對數據的壓縮。
選擇一個輸出壓縮編碼器可能會對以后的數據使用人員造成很大的影響。像Spark這種分布式系統,我們通常會從多個不同的機器中讀取數據。為了滿足分布式的需求,每個工作節點需要找到新數據記錄的開始點。有些壓縮格式讓人無從下手,因為如果需要單個節點讀取數據,就很容易導致性能瓶頸。能夠很容易從多個機器中讀取的數據叫做“可分割性(splitable)”。表5-3羅列了可用的壓縮選擇。
格式 | 可分割 | 平均壓縮速度 | 文本效率 | Hadoop壓縮編碼器 | 純JAVA |
原生 | 評價 |
---|---|---|---|---|---|---|---|
gzip |
N | 快 | 高 | org.apache.hadoop .io.compress.GzipCodec |
是 | 是 | |
lzo |
Y | 非常快 | 中等 | com.hadoop.compression .lzo.LzoCodec |
是 | 是 | 每個工作節點都 需要下載LZO庫 |
bzip2 |
Y | 慢 | 很快 | org.apache.hadoop.io.compress .BZip2Codec |
是 | 是 | 對於java 使用可分割版本 |
zlib |
N | 慢 | 中等 | org.apache.hadoop.io.compress .DefaultCodec |
是 | 是 | hadoop的 默認壓縮編碼器 |
Snappy |
N | 非常快 | 慢 | org.apache.hadoop.io.compress .SnappyCodec |
否 | 是 | Snappy有一個 純Java端口,但在 Spark和Hadoop中尚不可用 |
Spark的
textFile()
方法可以處理壓縮的輸入,即使在輸入被壓縮以致其可以以分割的形式讀取的情況下,它也會自動禁用可分割性。如果你發現你需要讀取一個很大的單個壓縮輸入數據,最好跳過Spark的包裝,使用newAPIHadoopFile
或者hadoopFIle
並且明確指定正確的壓縮編碼器。
有些輸入格式(如SequenceFile)允許我們壓縮鍵值對數據的值,這樣可以在查找時提高效率。其他輸入格式有自己對壓縮控制的方式:例如,推特的Elephant Bird
包中的許多格式使用LZO壓縮數據。
Filesystems(文件系統)
Spark支持很多文件系統的讀取寫出,我們可以使用我們想用的各種文件格式。
Local/"Regular" FS(本地或常規文件系統)
盡管Spark支持從本地文件系統加載文件,但需要文件在集群所有節點的路徑都相同才能夠使用。
一些網絡文件系統,如NFS,AFS和MapR的NFS層,作為常規文件系統向用戶公開。如果你的數據已經在上述某個文件系統中,指定一個路徑file://你就可以使用了;前提是文件系統安裝在每個節點的相同路徑上。見Example5-29
Example 5-29. Loading a compressed text file from the local filesystem in Scala
val rdd = sc.textFile("file:///home/holden/happypandas.gz")
如果文件還沒有在集群的所有節點上,你可以不同通過Spark在驅動程序上本地加載,然后調用parallelize
將內容分發給worker。這種方式速度很慢,所以我們建議把文件放在HDFS,NFS或S3等共享文件系統中。
Amazon S3(亞馬遜的s3)
Amazon S3是一個日漸流行的存儲大數據的選擇。當您的計算節點位於Amazon EC2內時,S3非常快速,但是如果你必須在公網上傳輸,那可能會有很糟糕的表現。
要訪問Spark中的S3,您應該首先將AWS_ACCESS_KEY_ID
和AWS_SECRET_ACCESS_KEY
環境變量設置為您的S3憑證。你可以從Amazon Web Service控制台創建這些憑證。然后傳遞一個以s3n://開頭的路徑給Spark的讀取文件方法,格式為s3n://bucket/pathwithin-bucket。和其他文件系統一樣,Spark支持S3的通配符路徑,如s3n://bucket/my-files/*.txt。
如果你從Amazon獲得S3訪問權限錯誤,請確保為其指定了訪問密鑰的帳戶同時具有“read”和“list”權限。Spark需要能夠列出存儲bucket中的對象以識別您想要讀取的對象。
HDFS
HDFS是非常流行的分布式文件系統,Spark與其對接的效果也很好。HDFS被設計為在商用硬件上工作的系統,並且在提供高吞吐量的情況下還能保證對故障處理的彈性。Spark和HDFS可以布置在同一台機器上,並且Spark可以借此避免網絡通信開銷。
對輸入輸出的數據設置hdfs://master:port/path就可以在Spark上使用HDFS了,非常簡單。
HDFS的協議在Hadoop不同版本之間存在差異,所以如果你運行一個針對不同版本編譯的Spark版本,會導致失敗。默認情況下,Spark是針對
Hadoop 1.0.4構建的。如果從源代碼構建,則可以設置SPARK_HADOOP_VERSION =指定為環境變量,以針對不同的版本構建;或者你可以下載一個不同的Spark預編譯版本。你可以在運行時決定hadoop的版本。
Structured Data with Spark SQL(Spark SQL處理結構化數據)
Spark SQL是Spark1.0加入的組件,很快變成Spark處理結構化和半結構化數據最受歡迎的方式。對於結構化數據,我們意思是固定模式的數據,就是數據記錄之間具有一致的字段集。Spark SQL支持多個結構化數據源作為輸入,並且因其能夠理解他們的格式,所以可以高效地讀取數據源必須的字段。第九章我們會詳細介紹Spark SQL,現在我們會簡要介紹少量常用數據源的使用。
在所有的例子中,我們讓Spark SQL在數據源上運行一個SQL查詢(選擇一些字段或字段的函數),並且會返回給我們Row 對象的RDD,每條記錄一個。在Java和Scala中,允許Row對象基於列號進行訪問。每個Row象都有一個get()
方法,用來返回我們可以轉換的一般類型,並且有對於常見的基本類型特殊的get()
方法(如, getFloat(), getInt(), getLong(), getString(), getShort(), and getBoolean()
)。在Python中我們直接使用row[column_number]
和row.column_name
訪問元素。
Apache Hive(Apache的Hive)
Hadoop中常用的一個結構數據就是Apache的Hive。Hive可以以各種格式存儲表格,在HDFS或這其他存儲系統中,從純文本到列向格式的格式都可以。Spark SQL可以支持任何Hive支持的表格。
要將Spark SQL連接現有的Hive,你需要提供Hive的配置。你可以通過復制你的hive-site.xml到Spark的./conf/directory中進行連接。一旦你這樣做了,你就創建了個HiveContext
對象,它是Spark SQL的入口點,你可以寫Hive的查詢語句(HQL)來查詢數據表,會返回一個行RDD。示例如下:
Example 5-30. Creating a HiveContext and selecting data in Python
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users")
firstRow = rows.first()
print firstRow.name
Example 5-31. Creating a HiveContext and selecting data in Scala
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0)) // Field 0 is the name
Example 5-32. Creating a HiveContext and selecting data in Java
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SchemaRDD;
HiveContext hiveCtx = new HiveContext(sc);
SchemaRDD rows = hiveCtx.sql("SELECT name, age FROM users");
Row firstRow = rows.first();
System.out.println(firstRow.getString(0)); // Field 0 is the name
170頁“Apache Hive”會詳細介紹Hive如何加載數據。
JSON
如果你數據記錄的JSON數據結構一致,Spark SQL可以推斷它們的結構並且加載數據為行,使你可以非常容易地獲取需要的字段。為了加載JSON數據,首先在需要使用Hive的時候創建一個HiveContext對象。(在這種情況下,不需要安裝Hive,也就是說,您不需要hivesite.xml文件。)然后使用HiveContext.jsonFIle
方法來獲得整個文件的Row對象RDD。除了使用整個Row對象之外,你也可以將這個RDD注冊為一個表並從中搜索特定字段。舉例來講,假如我們有一個JSON文件格式如下,並且每行一條數據:
Example 5-33. Sample tweets in JSON
{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}
我們可以加載這段數據並且查詢username和text字段,示例如下:
Example 5-34. JSON loading with Spark SQL in Python
tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
results = hiveCtx.sql("SELECT user.name, text FROM tweets")
Example 5-35. JSON loading with Spark SQL in Scala
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")
Example 5-36. JSON loading with Spark SQL in Java
SchemaRDD tweets = hiveCtx.jsonFile(jsonFile);
tweets.registerTempTable("tweets");
SchemaRDD results = hiveCtx.sql("SELECT user.name, text FROM tweets");
我們會在172頁的“JSON”中詳細討論如何使用Spark SQL加載JSON數據和訪問其模式。另外,
Spark SQL支持的數據不僅僅是加載數據,包括查詢數據,以更復雜的方式與RDD結合,以及在其上運行自定義函數,我們將在第9章中介紹。
Databases (數據庫)
Spark可以使用他們的Hadoop連接器或自定義的Spark連接器訪問多種流行的數據庫。 在本節中,我們將展示四個通用連接器。
Java Database Connectivity(Java 數據庫連接)
Spark可以從任何支持Java數據庫連接(JDBC)的關系型數據庫加載數據,包括MySQL, Postgres和其他系統。為了訪問這些數據,我們構建一個org.apache.spark.rdd.JdbcRDD
並為其提供了我們的SparkContext和其他參數。Example5-37展示了使用JdbcRDD連接MySQL數據庫。
Example 5-37. JdbcRDD in Scala
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r: ResultSet) = {
(r.getInt(1), r.getString(2))
}
val data = new JdbcRDD(sc,
createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",
lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)
JdbcRDD接收的幾個參數:
- 首先,我們提供一個函數來建立數據庫連接。這使得每個節點在執行任何連接所需的配置后,都可以創建自己的連接來加載數據。
- 然后我們提供一個讀取一定范圍數據的查詢,以及此查詢參數的上限和下限。這些參數允許Spark在不同機器上查詢不同范圍的數據,所以我們不會因在單個節點上加載所有數據而產生性能瓶頸。(如果您不知道有多少記錄,則可以先手動進行計數查詢並使用其結果來確定upperBound和lowerBound。)
- 最后一個參數是一個函數,用來將java.sql.ResultSet的每行輸出轉換為對操作數據有用的格式。在Example5-37中,我們會拿到(Int,String)鍵值對。如果省略此參數,Spark會自動將每行轉換為一個對象數組。
如其他的數據源,當使用JdbcRDD時,確保你的數據庫可以處理從Spark並行讀取的負載。如果你想離線查詢數據而不是實時的數據庫,你可以使用數據庫的導出功能導出文本文件。
Cassandra
隨着從DataStax引入開源Spark Cassandra連接器,Spark對Cassandra的支持已經大大改善。由於連接器目前不是Spark的一部分,所以你需要在項目中引入額外的依賴。Cassandra目前還不能使用Spark SQL,但是可以返回CassandraRow對象,它和Spark SQL的Row對象具有一些相同的方法,Eample5-38和5-39展示了用法。Spark Cassandra連接器目前只能在Java和Scala中使用。
Example 5-38. sbt requirements for Cassandra connector
//sbt配置
"com.datastax.spark" %% "spark-cassandra-connector" % "1.0.0-rc5",
"com.datastax.spark" %% "spark-cassandra-connector-java" % "1.0.0-rc5"
Example 5-39. Maven requirements for Cassandra connector
//maven配置
<dependency> <!-- Cassandra -->
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector</artifactId>
<version>1.0.0-rc5</version>
</dependency>
<dependency> <!-- Cassandra -->
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java</artifactId>
<version>1.0.0-rc5</version>
</dependency>
很像Elasticsearch,Cassandra連接器讀取一個job的屬性然后確定連接哪個集群。通過設置spark.cassandra.connection.host
來連接Cassandra集群並且如果有用戶名和密碼的話可以通過spark.cassandra.auth.username
和spark.cassandra.auth.password
設置。假如你只有一個可供連接的Cassandra集群,我們可以在創建SparkContext的時候設置。示例如下:
Example 5-40. Setting the Cassandra property in Scala
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "hostname")
val sc = new SparkContext(conf)
Example 5-41. Setting the Cassandra property in Java
SparkConf conf = new SparkConf(true)
.set("spark.cassandra.connection.host", cassandraHost);
JavaSparkContext sc = new JavaSparkContext(
sparkMaster, "basicquerycassandra", conf);
Datastax Cassandra連接器使用Scala的隱式轉換在SparkContext和RDD上提供額外的函數,來引入一些隱式轉換並加載一些數據(Example5-42)。
Example 5-42. Loading the entire table as an RDD with key/value data in Scala
// Implicits that add functions to the SparkContext & RDDs.
//導入隱式轉換使用額外函數
import com.datastax.spark.connector._
// Read entire table as an RDD. Assumes your table test was created as
// CREATE TABLE test.kv(key text PRIMARY KEY, value int);
//引入整個表作為RDD。假設你創建的測試表結構是(key text PRIMARY KEY, value int)
val data = sc.cassandraTable("test" , "kv")
// Print some basic stats on the value field.
//打印對值字段的基本統計
data.map(row => row.getInt("value")).stats()
在Java中沒有隱式轉換,所以我們需要顯式地將我們的SparkContext和RDD轉換為這個功能。示例如下:
Example 5-43. Loading the entire table as an RDD with key/value data in Java
import com.datastax.spark.connector.CassandraRow;
import static com.datastax.spark.connector.CassandraJavaUtil.javaFunctions;
// Read entire table as an RDD. Assumes your table test was created as
// CREATE TABLE test.kv(key text PRIMARY KEY, value int);
JavaRDD<CassandraRow> data = javaFunctions(sc).cassandraTable("test" , "kv");
// Print some basic stats.
System.out.println(data.mapToDouble(new DoubleFunction<CassandraRow>() {
public double call(CassandraRow row) { return row.getInt("value"); }
}).stats());
除了加載整個表,我們還可以查詢數據的子集。我們可以通過在cassandraTable()
調用中添加where子句來限制數據,例如,sc.cassandraTable(…).where("key=?", "panda")
。
Cassandra連接器支持保存不同類型的RDD。我們可以直接保存CassandraRow
對象RDD,這在表之間復制數據很有用。通過指定列映射,我們可以將非行形式的RDD保存為元組和列表。示例如下:
Example 5-44. Saving to Cassandra in Scala
val rdd = sc.parallelize(List(Seq("moremagic", 1)))
rdd.saveToCassandra("test" , "kv", SomeColumns("key", "value"))
這一部分只是很簡要地介紹了Cassandra的連接器。如果想更深入地了解,可以查看連接器的GitHub。
HBase
Spakr可以在實現了org.apache.hadoop.hbase.mapreduce.TableInputFormat
的類中通過Hadoop的輸入格式訪問HBase。這種輸入格式返回鍵值對,鍵是org.apache.hadoop.hbase.io.ImmutableBytesWritable
類型,值是org.apache.hadoop.hbase.client.Result
類型。如API文檔中所述,Result類包含根據列族獲取值的各種方法。
為了在Spark中使用HBase,你可以對正確的輸入格式調用SparkContext.newAPIHadoopRDD
。Scala示例如下:
Example 5-45. Scala example of reading from HBase
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename") // which table to scan
val rdd = sc.newAPIHadoopRDD(
conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
為了優化HBase的讀取,TableInputFormat包含多個設置,例如將掃描限制為只有一組列,並限制掃描的時間范圍。你可以在TableInputFormat
API文檔中找到這些選項,並在將它們傳遞給Spark之前,對HBaseConfiguration設置。
Elasticsearch
Spark可以使用Elasticsearch-Hadoop
從Elasticsearch讀取和寫入數據。Elasticsearch是基於Lucene的開源搜索系統。
Elasticsearch連接器和我們之前介紹的連接器有些不同,因為它不需要提供路徑信息,而是取決於SparkContext設置的配置信息。Elasticsearch輸出格式連接器也不需要使用Spark的包裝,我們使用saveAsHadoopDataSet
,這意味着我們需要手動設置更多參數。讓我們看看在Elasticsearch中讀取寫入簡單數據的例子。
最新的Elasticsearch Spark連接器用起來很簡單,支持返回Spark SQL行。之所以還介紹了這個連接器,因為行轉換還不支持Elasticsearch中所有的原生類型。
Example 5-46. Elasticsearch output in Scala
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.mr.EsOutputFormat")
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")
jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
FileOutputFormat.setOutputPath(jobConf, new Path("-"))
output.saveAsHadoopDataset(jobConf)
Example 5-47. Elasticsearch input in Scala
def mapWritableToInput(in: MapWritable): Map[String, String] = {
in.map{case (k, v) => (k.toString, v.toString)}.toMap
}
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args(1))
jobConf.set(ConfigurationOptions.ES_NODES, args(2))
val currentTweets = sc.hadoopRDD(jobConf,
classOf[EsInputFormat[Object, MapWritable]], classOf[Object],
classOf[MapWritable])
// Extract only the map
// Convert the MapWritable[Text, Text] to Map[String, String]
val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }
與其他連接器相比,Elasticsearch連接器稍微有些費解,但它對於如何使用這種類型的連接器提供了有用的參考。
在寫出方面,Elasticsearch可以進行映射推斷,但這可能偶爾會錯誤推斷,所以如果要存儲除字符串以外的數據類型,則最好明確設置映射。
Conclusion(總結)
隨着本章的結尾,你現在應該能夠將數據存入Spark中,並以對你有用的格式存儲計算結果。我們分析了可用於數據的多種不同格式,以及壓縮選項及其對使用數據方式的影響。隨后的章節將研究如何編寫更高效,更強大的Spark程序,以便我們可以加載和保存大型數據集。