Spark數據讀取
- 對於存儲在本地文件系統或分布式文件系統(HDFS、Amazon S3)中的數據,Spark可以訪問很多種不同的文件格式,比如文本文件、JSON、SequenceFile
- Spark SQL中的結構化數據源,包括JSON和Hive的結構化數據源
- 數據庫和鍵值存儲,自帶的庫,聯結HBase或其他JDBC源
格式名稱 | 結構化 | 備注 |
---|---|---|
文本文件 | 否 | 普通的文本文件,每行一條記錄 |
JSON | 半結構化 | 每行一條記錄 |
CSV | 是 | 非常常見的基於文本的格式 |
SequenceFiles | 是 | 用於鍵值對的常見Hadoop文件格式 |
textFile()和saveAsTextFile(),讀取文本文件和保存為文本文件。
讀取JSON數據的方式是將數據作為文本文件讀取,然后使用JSON解析器對RDD中的值進行映射操作。
import json
data = input.map(lambda x: json.loads(x))
//保存JSON
(data.filter(lambda x: x["lovesPands"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile))
Spark有專門用來讀取SequenceFile的接口,可以調用sequenceFile(path,keyClass,valueClass,minparttions)來讀取。
val data = sc.sequenceFile(inFile,"org.apache.hadoop.io.Text","org.apache.hadoop.io.IntWritable")
文件壓縮
對數據進行壓縮以節省存儲空間和網絡傳輸開銷。Spark原生的輸入方式(texeFile和sequenceFile)可以自動處理一類型的壓縮。
文件系統
- 本地文件,file:///home/path
- Amazon S3,s3n://bucket/path
- HDFS,hdfs://master/path
數據庫
Java數據庫連接,需要構建一個org.apache.spark.rdd.jdbcRDD,將SparkContext和其他參數一起傳給它
//Scala
def createConnect() = {
Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r: ResultSet) = {
(r.getInt(1),r.getString(2))
}
val data = new JdbcRDD(sc,createConnection,"SELECT * FROM panda WHERE ",lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)
- 提供一個用於數據庫連接的函數
- 提供一個可以讀取一定范圍內數據的查詢,以及查詢參數中lowerBound和upperBound的值。
- 最后一個參數可以將輸出結果從java.sql.ResultSet轉為對操作數據有用的格式的函數。
Spark可以用org.apache.hadoop.hbase.mapreduce.TableInputFormat類通過Hadoop輸入格式訪問HBase。鍵的類型為org.apache.hadoop.hbase.io.ImmutableBytesWritable,值的類型為org.apache.hadoop.hbase.client.Result。
//Scala
import org.apache.hadoop.hbase.HBaseConfigration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE,"tablename")
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
分區操作
分區的作用:
- 可以增加並行度,在多個節點上同時計算。
- 減少通信開銷。join()時,減少shuffle。
分區原則:分區個數等於本地機器CPU數目
rdd.sc.parallelize(list,2)//設置兩個分區
rdd.repartition(1)//重新分區,分1個
分區方法有三種:
- 哈希分區,HashPartitioner
- 區域分區,RangePartitioner
- 自定義分區
def Mypartition(key):
return key % 10;
可以在每個分區共享一個數據庫連接池,避免建立太多連接
def processCallsigns(signs):
http = urllib3.PoolManager()//建立連接池
urls = map()//操作
···