4.數據讀取與保存
Spark 的數據讀取及數據保存可以從兩個維度來作區分:文件格式以及文件系統。
文件格式分為:
Text 文件、
Json 文件、Csv 文件、Sequence 文件以及 Object 文件;
文件系統分為:本地文件系統、
HDFS、
HBASE 以及數據庫。

1)數據讀取:textFile(String)
scala> val hdfsFile = sc.textFile("hdfs://hadoop102:9000/fruit.txt") hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24
2)數據保存: saveAsTextFile(String)
scala> hdfsFile.saveAsTextFile("/fruitOut")
4.1.2 Json 文件
如果 JSON 文件中每一行就是一個 JSON 記錄,那么可以通過將 JSON 文件當做文本
文件來讀取,然后利用相關的 JSON 庫對每一條數據進行 JSON 解析。
注意:使用 RDD 讀取 JSON 文件處理很復雜,同時 SparkSQL 集成了很好的處理
JSON 文件的方式,所以應用中多是采用 SparkSQL 處理 JSON 文件。
(1)導入解析 json 所需的包
scala> import scala.util.parsing.json.JSON
(2)上傳 json 文件到 HDFS
[lxl@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /
(3)讀取文件
scala> val json = sc.textFile("/people.json") json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24
(4)解析 json 數據
scala> val result = json.map(JSON.parseFull) result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27
(5)打印
scala> result.collect res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))
4.1.3 Sequence 文件
SequenceFile 文件是 Hadoop 用來存儲二進制形式的 key-value 對而設計的一種平面
文件(Flat File)。Spark 有專門用來讀取 SequenceFile 的接口。在 SparkContext 中,可以
調用 sequenceFile[ keyClass, valueClass](path)。
注意:SequenceFile 文件只針對 PairRDD
(1)創建一個 RDD
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6))) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
(2)將 RDD 保存為 Sequence 文件
scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
(3)查看該文件
[lxl@hadoop102 seqFile]$ pwd /opt/module/spark/seqFile
[lxl@hadoop102 seqFile]$ ll 總用量 8 -rw-r--r-- 1 atguigu atguigu 108 10 月 9 10:29 part-00000 -rw-r--r-- 1 atguigu atguigu 124 10 月 9 10:29 part-00001 -rw-r--r-- 1 atguigu atguigu 0 10 月 9 10:29 _SUCCESS
[lxl@hadoop102 seqFile]$ cat part-00000 SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritableط
(4)讀取 Sequence 文件
scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile") seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24
(5)打印讀取后的 Sequence 文件
scala> seq.collect res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
4.1.4 對象文件 (objectFile)
對象文件是將對象序列化后保存的文件,采用 Java 的序列化機制。可以通過
objectFile[k,v](path) 函數接收一個路徑,讀取對象文件,返回對應的 RDD,也可以通過調
用 saveAsObjectFile() 實現對對象文件的輸出。因為是序列化所以要指定類型。
(1)創建一個 RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
(2)將 RDD 保存為 Object 文件
scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")
(3)查看該文件
[lxl@hadoop102 object]$ pwd /opt/module/spark/object
[lxl@hadoop102 object]$ ll
總用量 16
-rw-r--r-- 1 lxl lxl 138 7月 8 03:12 part-00000
-rw-r--r-- 1 lxl lxl 138 7月 8 03:12 part-00001
-rw-r--r-- 1 lxl lxl 138 7月 8 03:12 part-00002
-rw-r--r-- 1 lxl lxl 142 7月 8 03:12 part-00003
-rw-r--r-- 1 lxl lxl 0 7月 8 03:12 _SUCCESS
[lxl@hadoop102 object]$ cat part-00000
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritabley.)a¬촲[IMº`&v겥xp
(4)讀取 Object 文件
scala> val objFile = sc.objectFile[(Int)]("file:///opt/module/spark/objectFile") objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24
(5)打印讀取后的 Sequence 文件
scala> objFile.collect res19: Array[Int] = Array(1, 2, 3, 4)
4.2 文件系統類數據讀取與保存
4.2.1 HDFS
Spark 的整個生態系統與 Hadoop 是完全兼容的,所以對於 Hadoop 所支持的文件類型
或者數據庫類型,Spark 也同樣支持.另外,由於 Hadoop 的 API 有新舊兩個版本,所以 Spark 為
了能夠兼容 Hadoop 所有的版本,也提供了兩套創建操作接口.對於外部存儲創建操作而
言,hadoopRDD 和 newHadoopRDD 是最為抽象的兩個函數接口,主要包含以下四個參數.
1)輸入格式(InputFormat): 制定數據輸入的類型,如 TextInputFormat 等,新舊兩個版本
所引用的版本分別是 org.apache.hadoop.mapred.InputFormat 和
org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
2)鍵類型: 指定[K,V]鍵值對中 K 的類型
3)值類型: 指定[K,V]鍵值對中 V 的類型
4)分區值: 指定由外部存儲生成的 RDD 的 partition 數量的最小值,如果沒有指定,系
統會使用默認值 defaultMinSplits
注意:其他創建操作的 API 接口都是為了方便最終的 Spark 程序開發者而設置的,是這兩個
接口的高效實現版本.例如,對於 textFile 而言,只有 path 這個指定文件路徑的參數,其他參數
在系統內部指定了默認值。
1.在 Hadoop 中以壓縮形式存儲的數據,不需要指定解壓方式就能夠進行讀取,因為
Hadoop 本身有一個解壓器會根據壓縮文件的后綴推斷解壓算法進行解壓.
2.如果用 Spark 從 Hadoop 中讀取某種類型的數據不知道怎么讀取的時候,上網查找一個
使用 map-reduce 的時候是怎么讀取這種這種數據的,然后再將對應的讀取方式改寫成上面的
hadoopRDD 和 newAPIHadoopRDD 兩個類就行了

4.2.2 MySQL 數據庫連接
支持通過 Java JDBC 訪問關系型數據庫。需要通過 JdbcRDD 進行,示例如下:
(1)添加依賴
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>
(2)Mysql 讀取:
package com.lxl
import java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext}
object MysqlRDD { def main(args: Array[String]): Unit = {
//1.創建 spark 配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//2.創建 SparkContext val sc = new SparkContext(sparkConf)
//3.定義連接 mysql 的參數 val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://hadoop102:3306/rdd" val userName = "root" val passWd = "000000"
//創建 JdbcRDD val rdd = new JdbcRDD(sc, () => { Class.forName(driver) DriverManager.getConnection(url, userName, passWd) }, "select * from `rddtable` where `id` >= ? and id <= ?;", 1, 10, 1, r => (r.getInt(1), r.getString(2)) )
//打印最后結果 println(rdd.count()) rdd.foreach(println) sc.stop() } }
Mysql 寫入:
def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp") val sc = new SparkContext(sparkConf) val data = sc.parallelize(List("Female", "Male","Female")) data.foreachPartition(insertData) }
def insertData(iterator: Iterator[String]): Unit = { Class.forName ("com.mysql.jdbc.Driver").newInstance() val conn = java.sql.DriverManager.getConnection("jdbc:mysql://master01:3306/rdd", "root","hive")
iterator.foreach(data => { val ps = conn.prepareStatement("insert into rddtable(name) values (?)") ps.setString(1, data) ps.executeUpdate() }) }
spark-shell 中使用 JDBC 連接 Mysql:
[lxl@hadoop102 spark]$ cp /opt/module/hive/lib/mysql-connector-java-5.1.27-bin.jar ./jars/
scala> val rdd = new org.apache.spark.rdd.JdbcRDD(sc, () => { | Class.forName("com.mysql.jdbc.Driver") | java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd", "root", "000000") | }, | "select * from `rddtable` where id >= ? and id <= ?;", | 1, | 10, | 1, | r => (r.getInt(1), r.getString(2)) | ) rdd: org.apache.spark.rdd.JdbcRDD[(Int, String)] = JdbcRDD[1] at JdbcRDD at <console>:24 scala> println(rdd.count()) 3 scala> rdd.foreach(println) (1,zhangsan) (2,lisi) (3,wangwu)
4.2.3 HBase 數據庫
由於 org.apache.hadoop.hbase.mapreduce.TableInputFormat 類的實現,Spark 可以通過
Hadoop 輸入格式訪問 HBase。這個輸入格式會返回鍵值對數據,其中鍵的類型為 org.
apache.hadoop.hbase.io.ImmutableBytesWritable,而值的類型為 org.apache.hadoop.hbase.client.
Result。
(1)添加依賴
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency>
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency>
(2)從 HBase 讀取數據
package com.lxl
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.hbase.util.Bytes
object HBaseSpark { def main(args: Array[String]): Unit = {
//創建 spark 配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//創建 SparkContext val sc = new SparkContext(sparkConf)
//構建 HBase 配置信息 val conf: Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104") conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
//從 HBase 讀取數據形成 RDD val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD( conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
val count: Long = hbaseRDD.count() println(count)
//對 hbaseRDD 進行處理 hbaseRDD.foreach { case (_, result) => val key: String = Bytes.toString(result.getRow) val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("name"))) val color: String = Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("color"))) println("RowKey:" + key + ",Name:" + name + ",Color:" + color) }
//關閉連接 sc.stop() } }
3)往 HBase 寫入
def main(args: Array[String]) {
//獲取 Spark 配置信息並創建與 spark 的連接 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseApp") val sc = new SparkContext(sparkConf)
//創建 HBaseConf val conf = HBaseConfiguration.create() val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat[ImmutableBytesWritable]]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
//構建 Hbase 表描述器 val fruitTable = TableName.valueOf("fruit_spark") val tableDescr = new HTableDescriptor(fruitTable) tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
//創建 Hbase 表 val admin = new HBaseAdmin(conf) if (admin.tableExists(fruitTable)) { admin.disableTable(fruitTable) admin.deleteTable(fruitTable) } admin.createTable(tableDescr)
//定義往 Hbase 插入數據的方法 def convert(triple: (Int, String, Int)) = { val put = new Put(Bytes.toBytes(triple._1)) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2)) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, put) }
//創建一個 RDD val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
//將 RDD 內容寫到 HBase val localData = initialRDD.map(convert) localData.saveAsHadoopDataset(jobConf) }