Spark學習筆記——數據讀取和保存


spark所支持的文件格式

 

1.文本文件

在 Spark 中讀寫文本文件很容易。

當我們將一個文本文件讀取為 RDD 時,輸入的每一行 都會成為 RDD 的 一個元素

也可以將多個完整的文本文件一次性讀取為一個 pair RDD, 其中鍵是文件名,值是文件內容

 在 Scala 中讀取一個文本文件

val inputFile = "file:///home/common/coding/coding/Scala/word-count/test.segmented"
val textFile = sc.textFile(inputFile)

 在 Scala 中讀取給定目錄中的所有文件

val input = sc.wholeTextFiles("file:///home/common/coding/coding/Scala/word-count")

 保存文本文件,Spark 將傳入的路徑作為目錄對待,會在那個目錄下輸出多個文件

textFile.saveAsTextFile("file:///home/common/coding/coding/Scala/word-count/writeback")
//textFile.repartition(1).saveAsTextFile 就能保存成一個文件

對於dataFrame文件,先使用.toJavaRDD 轉換成RDD,然后再使用  coalesce(1).saveAsTextFile

 

2.JSON

JSON 是一種使用較廣的半結構化數據格式。

讀取JSON,書中代碼有問題所以找了另外的一段讀取JSON的代碼

 build.sbt

"org.json4s" %% "json4s-jackson" % "3.2.11"

 代碼

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}

/**
  * Created by common on 17-4-3.
  */

case class Person(firstName: String, lastName: String, address: List[Address]) {
  override def toString = s"Person(firstName=$firstName, lastName=$lastName, address=$address)"
}

case class Address(line1: String, city: String, state: String, zip: String) {
  override def toString = s"Address(line1=$line1, city=$city, state=$state, zip=$zip)"
}

object WordCount {
  def main(args: Array[String]) {
    val inputJsonFile = "file:///home/common/coding/coding/Scala/word-count/test.json"
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val input5 = sc.textFile(inputJsonFile)
    val dataObjsRDD = input5.map { myrecord =>
      implicit val formats = DefaultFormats
      // Workaround as      DefaultFormats is not serializable
      val jsonObj = parse(myrecord)
      //val addresses = jsonObj \ "address"
      //println((addresses(0) \ "city").extract[String])
      jsonObj.extract[Person]
    }
    dataObjsRDD.saveAsTextFile("file:///home/common/coding/coding/Scala/word-count/test1.json")

  }


}

 讀取的JSON文件

{"firstName":"John","lastName":"Smith","address":[{"line1":"1 main street","city":"San Francisco","state":"CA","zip":"94101"},{"line1":"1 main street","city":"sunnyvale","state":"CA","zip":"94000"}]}
{"firstName":"Tim","lastName":"Williams","address":[{"line1":"1 main street","city":"Mountain View","state":"CA","zip":"94300"},{"line1":"1 main street","city":"San Jose","state":"CA","zip":"92000"}]}

 輸出的文件

Person(firstName=John, lastName=Smith, address=List(Address(line1=1 main street, city=San Francisco, state=CA, zip=94101), Address(line1=1 main street, city=sunnyvale, state=CA, zip=94000)))
Person(firstName=Tim, lastName=Williams, address=List(Address(line1=1 main street, city=Mountain View, state=CA, zip=94300), Address(line1=1 main street, city=San Jose, state=CA, zip=92000)))

 

3.逗號分割值與制表符分隔值

逗號分隔值(CSV)文件每行都有固定數目的字段,字段間用逗號隔開(在制表符分隔值文件,即 TSV 文 件中用制表符隔開)。

如果恰好CSV 的所有數據字段均沒有包含換行符,你也可以使用 textFile() 讀取並解析數據,

build.sbt

"au.com.bytecode" % "opencsv" % "2.4"

3.1 讀取CSV文件

import java.io.StringReader

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
import au.com.bytecode.opencsv.CSVReader

/**
  * Created by common on 17-4-3.
  */

object WordCount {
  def main(args: Array[String]) {

    val input = sc.textFile("/home/common/coding/coding/Scala/word-count/sample_map.csv")
    val result6 = input.map{ line =>
      val reader = new CSVReader(new StringReader(line));
      reader.readNext();
    }
    for(result <- result6){
      for(re <- result){
        println(re)
      }
    }

  }

}

 CSV文件內容

輸出

0
Front Left
/usr/share/alsa/samples/Front_Left.wav
1
Front Right
/usr/share/alsa/samples/Front_Right.wav

 

如果在字段中嵌有換行符,就需要完整讀入每個文件,然后解析各段。如果每個文件都很大,讀取和解析的過程可能會很不幸地成為性能瓶頸。

 代碼

import java.io.StringReader

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
import scala.collection.JavaConversions._
import au.com.bytecode.opencsv.CSVReader

/**
  * Created by common on 17-4-3.
  */

case class Data(index: String, title: String, content: String)

object WordCount {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val input = sc.wholeTextFiles("/home/common/coding/coding/Scala/word-count/sample_map.csv")
    val result = input.flatMap { case (_, txt) =>
      val reader = new CSVReader(new StringReader(txt));
      reader.readAll().map(x => Data(x(0), x(1), x(2)))
    }
    for(res <- result){
      println(res)
    }
  }

}

 輸出

Data(0,Front Left,/usr/share/alsa/samples/Front_Left.wav)
Data(1,Front Right,/usr/share/alsa/samples/Front_Right.wav)

 或者

import java.io.StringReader

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
import scala.collection.JavaConversions._
import au.com.bytecode.opencsv.CSVReader

/**
  * Created by common on 17-4-3.
  */

case class Data(index: String, title: String, content: String)

object WordCount {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val input = sc.wholeTextFiles("/home/common/coding/coding/Scala/word-count/sample_map.csv")
  //wholeTextFiles讀出來是一個RDD(String,String) val result = input.flatMap { case (_, txt) => val reader = new CSVReader(new StringReader(txt)); //reader.readAll().map(x => Data(x(0), x(1), x(2))) reader.readAll() } result.collect().foreach(x => { x.foreach(println); println("======") }) } }

 輸出

0
Front Left
/usr/share/alsa/samples/Front_Left.wav
======
1
Front Right
/usr/share/alsa/samples/Front_Right.wav
======

 

3.2 保存CSV

import java.io.{StringReader, StringWriter}

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}

import scala.collection.JavaConversions._
import au.com.bytecode.opencsv.{CSVReader, CSVWriter}

/**
  * Created by common on 17-4-3.
  */

case class Data(index: String, title: String, content: String)

object WordCount {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val inputRDD = sc.parallelize(List(Data("index", "title", "content")))
    inputRDD.map(data => List(data.index, data.title, data.content).toArray)
      .mapPartitions { data =>
        val stringWriter = new StringWriter();
        val csvWriter = new CSVWriter(stringWriter);
        csvWriter.writeAll(data.toList)
        Iterator(stringWriter.toString)
      }.saveAsTextFile("/home/common/coding/coding/Scala/word-count/sample_map_out")
  }
}

 輸出

"index","title","content"

 

4.SequenceFile 是由沒有相對關系結構的鍵值對文件組成的常用 Hadoop 格式。

SequenceFile 文件有同步標記, Spark 可 以用它來定位到文件中的某個點,然后再與記錄的邊界對齊。這可以讓 Spark 使 用多個節點高效地並行讀取 SequenceFile 文件。SequenceFile 也是Hadoop MapReduce 作 業中常用的輸入輸出格式,所以如果你在使用一個已有的 Hadoop 系統,數據很有可能是以 S equenceFile 的格式供你使用的。

import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

/**
  * Created by common on 17-4-6.
  */
object SparkRDD {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)

    //寫sequenceFile,
    val rdd = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
    rdd.saveAsSequenceFile("output")

    //讀sequenceFile
    val output = sc.sequenceFile("output", classOf[Text], classOf[IntWritable]).
      map{case (x, y) => (x.toString, y.get())}
    output.foreach(println)

  }
}

 

 

5.對象文件

對象文件看起來就像是對 SequenceFile 的簡單封裝,它允許存儲只包含值的 RDD。和 SequenceFile 不一樣的是,對象文件是使用 Java 序列化寫出的。

如果你修改了你的類——比如增減了幾個字段——已經生成的對象文件就不再可讀了。

讀取文件——用 SparkContext 中的 objectFile() 函數接收一個路徑,返回對應的 RDD。

寫入文件——要 保存對象文件, 只需在 RDD 上調用 saveAsObjectFile

 

6.Hadoop輸入輸出格式

除了 Spark 封裝的格式之外,也可以與任何 Hadoop 支持的格式交互。Spark 支持新舊兩套Hadoop 文件 API,提供了很大的靈活性。

舊的API:hadoopFile,使用舊的 API 實現的 Hadoop 輸入格式

新的API:newAPIHadoopFile
接收一個路徑以及三個類。第一個類是“格式”類,代表輸入格式。第二個類是鍵的類,最后一個類是值的類。如果需要設定額外的 H adoop 配置屬性,也可以傳入一個 conf 對象。

KeyValueTextInputFormat 是最簡單的 Hadoop 輸入格式之一,可以用於從文本文件中讀取鍵值對數據。每一行都會被獨立處理,鍵和值之間用制表符隔開。

import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.rdd._

/**
  * Created by common on 17-4-6.
  */
object SparkRDD {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)

    //使用老式 API 讀取 KeyValueTextInputFormat(),以JSON文件為例子
    //注意引用的包是org.apache.hadoop.mapred.KeyValueTextInputFormat
//    val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat]("input/test.json").map {
//      case (x, y) => (x.toString, y.toString)
//    }
//    input.foreach(println)

    // 讀取文件,使用新的API,注意引用的包是org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
    val job = new Job()
    val data = sc.newAPIHadoopFile("input/test.json" ,
      classOf[KeyValueTextInputFormat],
      classOf[Text],
      classOf[Text],
      job.getConfiguration)
    data.foreach(println)

    //保存文件,注意引用的包是org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
    data.saveAsNewAPIHadoopFile(
      "input/test1.json",
      classOf[Text],
      classOf[Text],
      classOf[TextOutputFormat[Text,Text]],
      job.getConfiguration)

  }
}

 

 

Hadoop 的非文件系統數據源

除 了 hadoopFile() 和 saveAsHadoopFile() 這 一 大 類 函 數, 還 可 以 使 用 hadoopDataset/saveAsHadoopDataSet 和 newAPIHadoopDataset/ saveAsNewAPIHadoopDataset 來訪問 Hadoop 所支持的非文件系統的存儲格式。例如,許多像 HBase 和 MongoDB 這樣的鍵值對存儲都提供了用來直接讀取 Hadoop 輸入格式的接口。我們可以在 Spark 中很方便地使用這些格式。

 

7.文件壓縮

Spark 原生的輸入方式( textFile 和 sequenceFile)可以自動處理一些類型的壓縮。在讀取壓縮后的數據時,一些壓縮編解碼器可以推測壓縮類型。
這些壓縮選項只適用於支持壓縮的 Hadoop 格式,也就是那些寫出到文件系統的格式。寫入數據庫的 Hadoop 格式一般沒有實現壓縮支持。如果數據庫中有壓縮過的記錄,那應該是數據庫自己配置的。

8.讀取har文件

val df = spark.read.text("har:///har/xxx/test/2019-06-30.har")

 查看

scala> df.show()
+--------------------+----------+----+
|               value|        dt|hour|
+--------------------+----------+----+
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
|/home/logs/sc...|2019-06-30|  17|
+--------------------+----------+----+
only showing top 20 rows

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM