RDD轉換成為DataFrame


方式一: 通過case class創建DataFrames(反射)

TestDataFrame1.scala

package com.bky

// 隱式類的導入
// 定義case class,相當於表結構
case class Dept(var id:Int, var position:String, var location:String)

// 需要導入SparkSession這個包
import org.apache.spark.sql.SparkSession

/**
  * 方式一: 通過case class創建DataFrames(反射)
  */
object TestDataFrame1 {

  def main(args: Array[String]): Unit = {

    /**
      * 直接使用SparkSession進行文件的創建。
      * 封裝了SparkContext,SparkConf,SQLContext,
      * 為了向后兼容,SQLContext和HiveContext也被保存了下來
      */
    val spark = SparkSession
      .builder()  //構建sql
      .appName("TestDataFrame1") // 設置文件名
      .master("local[2]") // 設置executor
      .getOrCreate() //獲取或創建

    import spark.implicits._  // 隱式轉換
    // 將本地的數據讀入RDD,將RDD與case class關聯
    val deptRDD = spark.read.textFile("/Users/hadoop/data/dept.txt")
      .map(line => Dept(line.split("\t")(0).toInt,
        line.split("\t")(1),
        line.split("\t")(2).trim))

    // 將RDD轉換成DataFrames(反射)
    val df = deptRDD.toDF()

    // 將DataFrames創建成一個臨時的視圖
    df.createOrReplaceTempView("dept")

    // 使用SQL語句進行查詢
    spark.sql("select * from dept").show()

  }
}

精簡版 TestDataFrame1.scala

package com.bky

import org.apache.spark.sql.SparkSession

object TestDataFrame1 extends App {
    val spark = SparkSession
      .builder()  //構建sql
      .appName("TestDataFrame1") 
      .master("local[2]") 
      .getOrCreate() 

    import spark.implicits._  
    val deptRDD = spark.read.textFile("/Users/hadoop/data/dept.txt")
      .map(line => Dept(line.split("\t")(0).toInt,
        line.split("\t")(1),
        line.split("\t")(2).trim))
  
    val df = deptRDD.toDF()   
    df.createOrReplaceTempView("dept")
    spark.sql("select * from dept").show()
}

case class Dept(var id:Int, var position:String, var location:String)

方式二:通過創建structType創建DataFrames(編程接口)

TestDataFrame2.scala

package com.bky

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}

/**
  *
  * 方式二:通過創建structType創建DataFrames(編程接口)
  */
object TestDataFrame2 extends App {

  val spark = SparkSession
    .builder()
    .appName("TestDataFrame2")
    .master("local[2]")
    .getOrCreate()

  /**
    * 將RDD數據映射成Row,需要導入import org.apache.spark.sql.Row
    */
  import spark.implicits._
  val path = "/Users/hadoop/data/dept.txt"
  val fileRDD = spark.read.textFile(path)
  val rowRDD= fileRDD.map(line => {
    val fields = line.split("\t")
    Row(fields(0).toInt, fields(1), fields(2).trim)
  })

  // 創建StructType來定義結構
  val innerStruct = StructType(
    // 字段名,字段類型,是否可以為空
      StructField("id", IntegerType, true) ::
        StructField("position", StringType, true) ::
        StructField("location", StringType, true) :: Nil
  )

  val df = spark.createDataFrame(innerStruct)
  df.createOrReplaceTempView("dept")
  spark.sql("select * from dept").show()

}


方式三:通過json文件創建DataFrames

TestDataFrame3.scala

package com.bky

import org.apache.spark.sql.SparkSession

/**
  * 方式三:通過json文件創建DataFrames
  */
object TestDataFrame3 extends App {

  val spark = SparkSession
    .builder()
    .master("local[2]")
    .appName("TestDataFrame3")
    .getOrCreate()

  val path = "/Users/hadoop/data/test.json"
  val fileRDD = spark.read.json(path)
  fileRDD.createOrReplaceTempView("test")
  spark.sql("select * from test").show()
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM