Spark source API:構建DF、RDD與DF之間的相互轉換、show()的三種用法


spark source API:構建DF、RDD與DF之間的相互轉換、show()的三種用法

讀取csv格式的文件,構建DF
讀取json格式的文件,構建DF

1、不需要指定分割方式
2、不需要指定字段名和字段類型(json自帶字段和字段類型)
3、 json格式的文件相對於csv,會占用額外的空間,一般不使用json格式存儲數據

讀取數據庫中的數據(JDBC構建DF)

1、不需要指定分割方式
2、不需要指定字段名和字段類型(spark會繼承數據庫中的表結構)
3、執行之前pom.xml需要有mysql依賴

讀取parquet格式的文件,構建DF

1、不需要指定分割方式
2、不需要指定字段名和字段類型
3、parquet是一種壓縮的格式,同時會自帶列名,可以和hive完全兼容

package com.shujia.sql

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object Demo2SourceAPI {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName("Demo2SourceAPI")
      .master("local")
      .config("spark.sql.shuffle.partitions",1)
      .getOrCreate()
    //導包--導入sql所有的函數
    import org.apache.spark.sql.functions._
    //導入隱式轉換
    import spark.implicits._

    /**
     * 讀取csv格式的文件,創建DF
     */
    val studentsDF: DataFrame = spark
      .read
      .format("csv")
      .option("sep",",")
      .schema("id STRING, name STRING, age INT, gender STRING, clazz STRING")
      .load("data/students.txt")
//    studentsDF.show()

    /**
     * 讀取json格式的文件,創建DF
     * 讀取json格式的文件:
     *    1、不需要指定分割方式
     *    2、不需要指定字段名和字段類型(json自帶字段和字段類型)
     * json格式的文件相對於csv,會占用額外的空間,一般不使用json格式存儲數據
     */
    val jsonDF: DataFrame = spark
      .read
      .format("json")
      .load("data/students.json")
//    jsonDF.printSchema() json格式的文件自帶表結構,可以使用printSchema()查看它自帶的表結構
//    jsonDF.show()

    /**
     *  使用JDBC構建DF(讀取數據庫中的數據)
     *  1、不需要指定分割方式
     *  2、不需要指定字段名和字段類型(spark會繼承數據庫中的表結構)
     *  3、執行之前pom.xml需要有mysql依賴
     */
    val jdbcDF: DataFrame = spark
      .read
      .format("jdbc")
      .option("url","jdbc:mysql://master:3306")
      .option("dbtable", "lyw11.student")
      .option("user", "root")
      .option("password", "123456")
      .load()
    jdbcDF.show()

    /**
     * 讀取parquet格式的文件:
     *  1、不需要指定分割方式
     *  2、不需要指定字段名和字段類型
     *
     * parquet是一種壓縮的格式,同時會自帶列名,可以和hive完全兼容
     *
     */
    //如果我們現在手里沒有parquet格式的文件
    //我們可以將csv格式的文件轉成parquet格式的文件(利用保存數據來轉格式)
    studentsDF
      .write
      .mode(SaveMode.Overwrite) //指定保存模式
      .format("parquet")//指定保存格式為parquet
      .save("data/parquet")//指定路徑和輸出目錄

    //讀取parquet格式的文件
    val parquetDF: DataFrame = spark
      .read
      .format("parquet")
      .load("data/parquet")
    parquetDF.show()
  }
}
RDD與DF可以相互轉換
RDD轉換為DF:.toDF
DF轉換成RDD :.rdd
package com.shujia.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object Demo3RDDToDF {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("Demo3RDDToDF")
      .getOrCreate()

    //導入隱式轉換
    import spark.implicits._
    //導入所有的函數
    import org.apache.spark.sql.functions._


    //先獲取sparkContext,通過SparkContext才能調用textFile
    val sc: SparkContext = spark.sparkContext
    //讀取文件構建RDD
    val stdeuntRDD: RDD[String] = sc.textFile("data/students.txt")
    //切分一下數據,轉換成元組的形式
    val stuRDD: RDD[(String, String, Int, String, String)] = stdeuntRDD.map(stu => {
      val split: Array[String] = stu.split(",")
      (split(0), split(1), split(2).toInt, split(3), split(4))
    })
    stuRDD.foreach(println)
    //(1500100001,施笑槐,22,女,文科六班)
    //...
      
    /**
     * 將RDD轉換成DF  .toDF("列名")
     * RDD的類型是一個元組,轉換的時候需要指定列名
     */
    val df: DataFrame = stuRDD.toDF("id", "name", "age", "gender", "clazz")
    df.show()
    //+----------+------+---+------+--------+
    //|        id|  name|age|gender|   clazz|
    //+----------+------+---+------+--------+
    //|1500100001|施笑槐| 22|    女|文科六班|
    //|1500100002|呂金鵬| 24|    男|文科六班|
    //...

    /**
     * DF轉換成RDD  .rdd
     * 類型是ROW,ROW代表一行數據,可以使用列名獲取列值
     */
    val rdd: RDD[Row] = df.rdd

    /**
     * 解析Row,使用列名獲取列值----getAs("列名")
     */
    val tRDD1: RDD[(String, String, Int, String, String)] = rdd.map(row => {
      val id: String = row.getAs[String]("id")
      val name: String = row.getAs[String]("name")
      val age: Int = row.getAs[Int]("age")
      val gender: String = row.getAs[String]("gender")
      val clazz: String = row.getAs[String]("clazz")
      (id, name, age, gender, clazz)
    })
    tRDD1.foreach(println)
    //(1500100001,施笑槐,22,女,文科六班)
    //...

    /**
     * 解析Row,使用模式匹配
     */
    val tRDD2: RDD[(String, String, Int, String, String)] = rdd.map {
      case Row(id: String, name: String, age: Int, gender: String, clazz: String) =>
        (id, name, age, gender, clazz)
    }
    tRDD2.foreach(println)
    //(1500100001,施笑槐,22,女,文科六班)
    //...
    
  }
}
DF中的show()用法
package com.shujia.sql

import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo4Show {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("Demo4Show")
      .getOrCreate()


    val studentsDF: DataFrame = spark
      .read
      .format("csv")
      .option("sep", ",") //默認分割方式是逗號
      .schema("id STRING, name STRING , age INT ,gender STRING ,clazz STRING")
      .load("data/students.txt")

    /**
     * 指定打印的行數,show()默認打印20行
     */
    //打印數據(默認打印前20行)
    studentsDF.show()

    //打印數據,指定獲取的行數
    studentsDF.show(100)

    /**
     * 當單行數據字符較長時,打印的時候會出現省略標點
     * 想要查看完整的數據,需要指定參數
     */
    //完全顯示數據
    studentsDF.show(100, false)
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM