3. Spark常見數據源


*以下內容由《Spark快速大數據分析》整理所得。

讀書筆記的第三部分是講的是Spark有哪些常見數據源?怎么讀取它們的數據並保存。

Spark有三類常見的數據源:

  • 文件格式與文件系統:它們是存儲在本地文件系統或分布式文件系統(比如 NFS、HDFS、Amazon S3 等)中的 數據,例如:文本文件、JSON、SequenceFile, 以及 protocol buffer
  • Spark SQL中的結構化數據源:它針對包括JSON和Apache Hive在內的結構化數據源。
  • 數據庫與鍵值存儲:Spark 自帶的庫和一些第三方庫,它們可以用來連接Cassandra、HBase、Elasticsearch以及JDBC源。

 

一、文件格式與文件系統

1. 文本文件

2. JSON

3. CSV

4. SequenceFile

二、Spark SQL中的結構化數據源

  1. Hive

  2. JSON

三、數據庫與鍵值存儲

 

 


一、文件格式與文件系統

1. 文本文件
文本文件讀取:

# 方法1:文本文件讀取
input = sc.textFile("file://home/holden/repos/sparks/README.md")
# 方法2:如果文件足夠小,同時讀取整個文件,從而返回一個pair RDD,其中鍵時輸入文件的文件名
input = sc.wholeTextFiles("file://home/holden/salesFiles")

文本文件保存:

result.saveAsTextFile(outputFile)

2. JSON
JSON讀取:

# JSON讀取
import json
data = input.map(lambda x: json.loads(x))

JSON保存:

# JSON保存 - 舉例選出喜愛熊貓的人
(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile))
# 保存文本文件
result.SaveAsTextFile(outputFilePath)

3. CSV
CSV讀取:

import csv
import StringIO

# CSV讀取 - 如果數據字段均沒有包括換行符,只能一行行讀取 def loadRecord(line):
"""解析一行CSV記錄""" input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"]) return reader.next()
input
= sc.textFile(inputFile).map(loadRecord)
# CSV讀取
- 如果數據字段嵌有換行符,需要完整讀入每個文件 def loadRecords(fileNameContents): """讀取給定文件中的所有記錄""" input = StringIO.StringIO(fileNameContents[1]) reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"]) return reader
fullFileData
= sc.wholeTextFiles(inputFile).flatMap(loadRecords)

CSV保存:

# CSV保存
def writeRecords(records):
    """寫出一些CSV記錄"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["names", "favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]

pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

4. SequenceFile

SequenceFile讀取:

# sc.sequenceFile(path, keyClass, valueClass)
data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")

SequenceFile保存(用Scala):

val data = sc.parallelize(List(("Pandas", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)

 


 二、Spark SQL中的結構化數據源

 用Spark SQL從多種數據源里讀取數據:

1. Hive

用Spark SQL連接已有的Hive:
(1.1)需要將hive-site.xml文件復制到 Spark 的 ./conf/ 目錄下;
(1.2)再創建出HiveContext對象,也就是 Spark SQL 的入口;
(1.3)使用Hive查詢語言(HQL)來對你的表進行查詢。

# 例子:用Python創建HiveContext並查詢數據
from pyspark.sql import HiveContext

hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users")
firstRow = rows.first()
print firstRow.name

2. JSON
(2.1)和使用Hive一樣創建一個HiveContext。(不過在這種情況下我們不需要安裝好Hive,也就是說你也不需要hive-site.xml文件。);
(2.2)使用HiveContext.jsonFile方法來從整個文件中獲取由Row對象組成的RDD。
(2.3)除了使用整個Row對象,你也可以將RDD注冊為一張表,然后從中選出特定的字段。

# 例子:在Python中使用Spark SQL讀取JSON數據
tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
results = hiveCtx.sql("SELECT user.name, text FROM tweets")

 


 三、數據庫與鍵值存儲

關於Cassandra、HBase、Elasticsearch以及JDBC源的數據庫連接,詳情請參考書本81-86頁內容。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM