spark SQL學習(load和save操作)


load操作:主要用於加載數據,創建出DataFrame

save操作:主要用於將DataFrame中的數據保存到文件中

代碼示例(默認為parquet數據源類型)


package wujiadong_sparkSQL

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2017/2/3.
  */
object GenericLoadSave {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("GenericLoadSave")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
//load默認是加載parquet格式文件
    val usersDF = sqlContext.read.load("hdfs://master:9000/student/2016113012/spark/users.parquet")
    usersDF.write.save("hdfs://master:9000/student/2016113012/parquet_out1")

  }

}


提交集群運行

hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.GenericLoadSave  --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar

運行后查看是否保存成功

hadoop@slave01:~$ hadoop fs -ls /student/2016113012/parquet_out1
17/02/03 12:06:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 4 items
-rw-r--r--   3 hadoop supergroup          0 2017-02-03 12:05 /student/2016113012/parquet_out1/_SUCCESS
-rw-r--r--   3 hadoop supergroup        476 2017-02-03 12:05 /student/2016113012/parquet_out1/_common_metadata
-rw-r--r--   3 hadoop supergroup        841 2017-02-03 12:05 /student/2016113012/parquet_out1/_metadata
-rw-r--r--   3 hadoop supergroup        864 2017-02-03 12:05 /student/2016113012/parquet_out1/part-r-00000-8025e2a8-ab06-4558-9d76-bb2cad0042cf.gz.parquet

手動指定數據源類型(進行格式轉換很方便)
默認情況下不指定數據源類型的話就是parquet類型

代碼示例(手動指定數據源類型)

package wujiadong_sparkSQL

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2017/2/3.
  */
object ManuallySpecifyOptions {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ManuallySpecifyOptions")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
//load讀其他格式文件如json時,需要先用format指定格式
    val peopleDF = sqlContext.read.format("json").load("hdfs://master:9000/student/2016113012/people.json")
    peopleDF.select("name").write.format("parquet").save("hdfs://master:9000/sudent/2016113012/people_out1")
    

  }

}

提交集群運行

hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.ManuallySpecifyOptions  --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar

查看是否運行成功

hadoop@master:~/wujiadong$ hadoop fs -ls hdfs://master:9000/sudent/2016113012/people_out1
17/02/03 12:24:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 4 items
-rw-r--r--   3 hadoop supergroup          0 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/_SUCCESS
-rw-r--r--   3 hadoop supergroup        207 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/_common_metadata
-rw-r--r--   3 hadoop supergroup        327 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/_metadata
-rw-r--r--   3 hadoop supergroup        352 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/part-r-00000-4d1a62a4-f550-4bde-899f-35e9aabfdc0c.gz.parquet


Save Mode

SaveMode.ErrorIfExists (默認):如果目標位置已經存在數據,那么拋出一個異常
SaveMode.Append:如果目標位置已經存在數據,那么將數據追加進去
SaveMode.Overwrite:如果目標位置已經存在數據,那么就將已經存在的數據刪除,用新數據進行覆蓋
SaveMode.Ignore:如果目標位置已經存在數據,那么就忽略,不做任何操作

代碼示例1

package wujiadong_sparkSQL

import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2017/2/3.
  */
object SaveModelTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SaveModelTest")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val peopleDF = sqlContext.read.format("json").load("hdfs://master:9000/student/2016113012/people.json")
    peopleDF.save("hdfs://master:9000/student/2016113012/people.json",SaveMode.ErrorIfExists)
  }

}

因為這種save mode文件已存在就報錯
package wujiadong_sparkSQL

import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2017/2/3.
  */
object SaveModelTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SaveModelTest")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val peopleDF = sqlContext.read.format("json").load("hdfs://master:9000/student/2016113012/people.json")
    peopleDF.save("hdfs://master:9000/student/2016113012/people.json",SaveMode.Overwrite)
  }

}

這種會直接覆蓋


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM