Spark數據存儲和分區操作


Spark數據讀取

  • 對於存儲在本地文件系統或分布式文件系統(HDFS、Amazon S3)中的數據,Spark可以訪問很多種不同的文件格式,比如文本文件、JSON、SequenceFile
  • Spark SQL中的結構化數據源,包括JSON和Hive的結構化數據源
  • 數據庫和鍵值存儲,自帶的庫,聯結HBase或其他JDBC源
格式名稱 結構化 備注
文本文件 普通的文本文件,每行一條記錄
JSON 半結構化 每行一條記錄
CSV 非常常見的基於文本的格式
SequenceFiles 用於鍵值對的常見Hadoop文件格式

textFile()和saveAsTextFile(),讀取文本文件和保存為文本文件。

讀取JSON數據的方式是將數據作為文本文件讀取,然后使用JSON解析器對RDD中的值進行映射操作。

import json
data = input.map(lambda x: json.loads(x))

//保存JSON
(data.filter(lambda x: x["lovesPands"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile))

Spark有專門用來讀取SequenceFile的接口,可以調用sequenceFile(path,keyClass,valueClass,minparttions)來讀取。

val data = sc.sequenceFile(inFile,"org.apache.hadoop.io.Text","org.apache.hadoop.io.IntWritable")

文件壓縮

對數據進行壓縮以節省存儲空間和網絡傳輸開銷。Spark原生的輸入方式(texeFile和sequenceFile)可以自動處理一類型的壓縮。

文件系統

  • 本地文件,file:///home/path
  • Amazon S3,s3n://bucket/path
  • HDFS,hdfs://master/path

數據庫

Java數據庫連接,需要構建一個org.apache.spark.rdd.jdbcRDD,將SparkContext和其他參數一起傳給它

//Scala
def createConnect() = {
    Class.forName("com.mysql.jdbc.Driver").newInstance();
    DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r: ResultSet) = {
    (r.getInt(1),r.getString(2))
}

val data = new JdbcRDD(sc,createConnection,"SELECT * FROM panda WHERE ",lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)
  • 提供一個用於數據庫連接的函數
  • 提供一個可以讀取一定范圍內數據的查詢,以及查詢參數中lowerBound和upperBound的值。
  • 最后一個參數可以將輸出結果從java.sql.ResultSet轉為對操作數據有用的格式的函數。

Spark可以用org.apache.hadoop.hbase.mapreduce.TableInputFormat類通過Hadoop輸入格式訪問HBase。鍵的類型為org.apache.hadoop.hbase.io.ImmutableBytesWritable,值的類型為org.apache.hadoop.hbase.client.Result。

//Scala
import org.apache.hadoop.hbase.HBaseConfigration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE,"tablename")

val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

分區操作

分區的作用:

  • 可以增加並行度,在多個節點上同時計算。
  • 減少通信開銷。join()時,減少shuffle。

分區原則:分區個數等於本地機器CPU數目

rdd.sc.parallelize(list,2)//設置兩個分區
rdd.repartition(1)//重新分區,分1個

分區方法有三種:

  1. 哈希分區,HashPartitioner
  2. 區域分區,RangePartitioner
  3. 自定義分區
def Mypartition(key):
    return key % 10;

可以在每個分區共享一個數據庫連接池,避免建立太多連接

def processCallsigns(signs):
    http = urllib3.PoolManager()//建立連接池
    urls = map()//操作
    ···


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM