【sparkSQL】創建DataFrame及保存


首先我們要創建SparkSession

val spark = SparkSession.builder()
                        .appName("test")
                        .master("local")
                        .getOrCreate()
import spark.implicits._ //將RDD轉化成為DataFrame並支持SQL操作        

然后我們通過SparkSession來創建DataFrame

1.使用toDF函數創建DataFrame

 通過導入(importing)spark.implicits, 就可以將本地序列(seq), 數組或者RDD轉為DataFrame。

 只要這些數據的內容能指定數據類型即可。

import spark.implicits._
val df = Seq(
  (1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
  (2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
).toDF("id", "name", "created_time")

注意:如果直接用toDF()而不指定列名字,那么默認列名為"_1", "_2"

可以通過df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")進行修改列名

2.使用createDataFrame函數創建DataFrame

通過schema + row 來創建

我們可以通俗的理解為schema為表的表頭,row為表的數據記錄

import org.apache.spark.sql.types._
//定義dataframe的結構的schema
val schema = StructType(List(
    StructField("id", IntegerType, nullable = false),
    StructField("name", StringType, nullable = true),
    StructField("create_time", DateType, nullable = true)
))
//定義dataframe內容的rdd
val rdd = sc.parallelize(Seq(
  Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")),
  Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))
))
//創建dataframe
val df = spark.createDataFrame(rdd, schema)

不過,我們可以把文件結構當做參數來使用,通過rdd自動產生schema和row,不用自己手動生成。

import org.apache.spark.sql.types._

//傳入屬性參數
val schemaString = " id name create_time" 
//解析參數變成StructField
val fields = schemaString.split(" ")
                         .map(fieldName => StructField(fieldname, StringType, nullable = true))
//定義dataframe的結構的schema
val schema = StructType(fields)

//定義dataframe內容的rdd
val lines = sc.textFile("file:///people.txt")
val rdd = lines.spilt(_.split(","))
               .map(attributes=>ROW(attributes(0),attributes(1).trim) )

//創建dataframe
val df = spark.createDataFrame(rdd, schema)       

3.通過反射機制創建DataFrame

首先要定義一個case class,因為只有case class才能被Spark隱式轉化為DataFrame

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._
//創建匹配類
case class Person(id:Int,name:String,age:Long)
//讀取文件生成rdd
val rdd = sc.textFile("file:///")
//通過匹配類把rdd轉化成dataframe
val df = rdd.map(_.split(","))
            .map(attributes => Person(attributes(0),attributes(1),attributes(2).trim.toInt)) .toDF()  

4.通過文件直接創建DataFrame

 (1)使用parquet文件read創建  

val df = spark.read.parquet("hdfs:/path/to/file")

 (2)使用json文件read創建

val df = spark.read.json("examples/src/main/resources/people.json")

 (3)使用csv文件load創建

val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .option("mode", "DROPMALFORMED")
        .load("csv/file/path")

 (4)使用Hive表創建

spark.table("test.person") // 庫名.表名 的格式
     .registerTempTable("person")  // 注冊成臨時表
spark.sql(
      """
        | select *
        | from person
        | limit 10
      """.stripMargin).show()

記得,最后我們要調用spark.stop()來關閉SparkSession。  

5.保存

 (1)通過df.write.format().save("file:///")保存

  write.format()支持輸出的格式有 JSON、parquet、JDBC、orc、csv、text等文件格式

  ,save()定義保存的位置

  當我們保存成功后可以在保存位置的目錄下看到文件,但是這個文件並不是一個文件而是一個目錄。

  里面的內容一般為

  

  不用擔心,這是沒錯的。

  我們讀取的時候,並不需要使用文件夾里面的part-xxxx文件,直接讀取目錄即可。

 (2)通過df.rdd.saveAsTextFile("file:///")轉化成rdd再保存

我們對於不同格式的文件讀寫來說,我們一般使用兩套對應方式

val df = spark.read.格式("file:///")//讀取文件
df.write.格式("file:///")//保存文件
val df = spark.read.format("").load("file:///")//讀取文件
df.write.save("file:///")//保存文件

具體read和load方法有什么不同,我還不是很清楚,弄明白了回來補充。

6.通過JDBC創建DataFrame 

我們在啟動Spark-shell或者提交任務的時候需要添加相應的jar包

spark-shell(spark-submit)

--jars /usr/local/spark/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \

--driver-class-path /usr/local/spark/mysql-connector-java-5.1.40-bin.jar

val jdbcDf = spark.read.format("jdbc")
    .option("driver", "com.mysql.jdbc.Driver")   //驅動
    .option("url", "jdbc:mysql://ip:3306")  //數據庫地址
    .option("dbtable", "db.user_test") //表名:數據庫名.表名
    .option("user", "test") //用戶名
    .option("password", "123456")  //密碼
    .load()
jdbcDf.show()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM