Spark之 RDD轉換成DataFrame的Scala實現


依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.3</version>
</dependency>

RDD轉化成DataFrame:通過StructType指定schema

package com.zy.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
  * RDD轉化成DataFrame:通過StructType指定schema
  */
object StructTypeSchema {
  def main(args: Array[String]): Unit = {
    //創建sparkSession對象
    val sparkSession: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate()
    //獲取sparkContext
    val sc: SparkContext = sparkSession.sparkContext
    //設置日志級別
    sc.setLogLevel("WARN")

    //讀取文件
    val textFile: RDD[String] = sc.textFile("D:\\person.txt")
    //切分文件
    val lineArrayRDD: RDD[Array[String]] = textFile.map(_.split(","))

    //關聯對象
    val rowRDD: RDD[Row] = lineArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
    //創建rdd的schema信息
    val schema: StructType = (new StructType)
      .add("id", IntegerType, true, "id")
      .add("name", StringType, false, "姓名")
      .add("age", IntegerType, true, "年齡")
    //根據rdd和schema信息創建DataFrame
    val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)

    //DSL操作
    personDF.show()

    //sql 操作
    //將df注冊成表
    personDF.createTempView("person")

    sparkSession.sql("select * from person where id =3").show()

    sparkSession.stop()
  }
}

 

RDD轉化成DataFrame:利用反射機制推斷schema

package com.zy.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


/**
  * RDD轉化成DataFrame:利用反射機制推斷schema
  */

//todo 定義一個樣例類
case class Person(id: Int, name: String, age: Int)

object CaseClassSchema {
  def main(args: Array[String]): Unit = {
    //構建sparkSession 指定appName和master地址(本地測試local)
    val sparkSession: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
    //獲取sparkContext
    val sc: SparkContext = sparkSession.sparkContext

    //設置日志輸出級別
    sc.setLogLevel("WARN")

    //加載數據
    val dataRDD: RDD[String] = sc.textFile("D:\\person.txt")
    //切分數據
    val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(","))
    //將rdd和person樣例類關聯
    val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

    //將rdd轉換成dataFrame 導入隱式轉換
    import sparkSession.implicits._
    val personDF: DataFrame = personRDD.toDF

    //DSL語法
    personDF.show()
    personDF.printSchema()
    personDF.select("name").show()
    personDF.filter($"age" > 30).show()

    println("---------------------------------------------")

    //sql語法
    //首先要創建臨時視圖
    personDF.createTempView("person")
    sparkSession.sql("select * from person where id>1").show()

    sparkSession.stop()
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM