*以下内容由《Spark快速大数据分析》整理所得。
读书笔记的第三部分是讲的是Spark有哪些常见数据源?怎么读取它们的数据并保存。
Spark有三类常见的数据源:
- 文件格式与文件系统:它们是存储在本地文件系统或分布式文件系统(比如 NFS、HDFS、Amazon S3 等)中的 数据,例如:文本文件、JSON、SequenceFile, 以及 protocol buffer。
- Spark SQL中的结构化数据源:它针对包括JSON和Apache Hive在内的结构化数据源。
- 数据库与键值存储:Spark 自带的库和一些第三方库,它们可以用来连接Cassandra、HBase、Elasticsearch以及JDBC源。
一、文件格式与文件系统
1. 文本文件
2. JSON
3. CSV
4. SequenceFile
二、Spark SQL中的结构化数据源
1. Hive
2. JSON
三、数据库与键值存储
一、文件格式与文件系统
1. 文本文件
文本文件读取:
# 方法1:文本文件读取 input = sc.textFile("file://home/holden/repos/sparks/README.md") # 方法2:如果文件足够小,同时读取整个文件,从而返回一个pair RDD,其中键时输入文件的文件名 input = sc.wholeTextFiles("file://home/holden/salesFiles")
文本文件保存:
result.saveAsTextFile(outputFile)
2. JSON
JSON读取:
# JSON读取
import json
data = input.map(lambda x: json.loads(x))
JSON保存:
# JSON保存 - 举例选出喜爱熊猫的人 (data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile)) # 保存文本文件 result.SaveAsTextFile(outputFilePath)
3. CSV
CSV读取:
import csv import StringIO
# CSV读取 - 如果数据字段均没有包括换行符,只能一行行读取 def loadRecord(line): """解析一行CSV记录""" input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"]) return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
# CSV读取 - 如果数据字段嵌有换行符,需要完整读入每个文件 def loadRecords(fileNameContents): """读取给定文件中的所有记录""" input = StringIO.StringIO(fileNameContents[1]) reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"]) return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
CSV保存:
# CSV保存 def writeRecords(records): """写出一些CSV记录""" output = StringIO.StringIO() writer = csv.DictWriter(output, fieldnames=["names", "favoriteAnimal"]) for record in records: writer.writerow(record) return [output.getvalue()] pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
4. SequenceFile
SequenceFile读取:
# sc.sequenceFile(path, keyClass, valueClass)
data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
SequenceFile保存(用Scala):
val data = sc.parallelize(List(("Pandas", 3), ("Kay", 6), ("Snail", 2))) data.saveAsSequenceFile(outputFile)
二、Spark SQL中的结构化数据源
用Spark SQL从多种数据源里读取数据:
1. Hive
用Spark SQL连接已有的Hive:
(1.1)需要将hive-site.xml文件复制到 Spark 的 ./conf/ 目录下;
(1.2)再创建出HiveContext对象,也就是 Spark SQL 的入口;
(1.3)使用Hive查询语言(HQL)来对你的表进行查询。
# 例子:用Python创建HiveContext并查询数据 from pyspark.sql import HiveContext hiveCtx = HiveContext(sc) rows = hiveCtx.sql("SELECT name, age FROM users") firstRow = rows.first() print firstRow.name
2. JSON
(2.1)和使用Hive一样创建一个HiveContext。(不过在这种情况下我们不需要安装好Hive,也就是说你也不需要hive-site.xml文件。);
(2.2)使用HiveContext.jsonFile方法来从整个文件中获取由Row对象组成的RDD。
(2.3)除了使用整个Row对象,你也可以将RDD注册为一张表,然后从中选出特定的字段。
# 例子:在Python中使用Spark SQL读取JSON数据 tweets = hiveCtx.jsonFile("tweets.json") tweets.registerTempTable("tweets") results = hiveCtx.sql("SELECT user.name, text FROM tweets")
三、数据库与键值存储
关于Cassandra、HBase、Elasticsearch以及JDBC源的数据库连接,详情请参考书本81-86页内容。