dataframe 基本操作


package com.jason.example

import org.apache.spark.sql.functions.broadcast

class DFTest extends SparkInstance {

  import spark.implicits._

  val df = Seq(
    ("jason", 1, "理想",0),
    (null, 2, "理想",1),
    ("mac", 3, "理想",2),
    ("mac", 4, "理想",2)
  ).toDF("name", "depid", "company","groupid").repartition(3)
  val df3 = Seq(
    ("jason", 1, "理想",0),
    ("dong", 2, "理想",1),
    ("mac", 3, "理想",2)
  ).toDF("name", "depid", "company","groupid").repartition(3)
  val df2 = Seq(
    (3,"周浦",2),
    (4,"孫橋",0),
    (5,"金橋",1)
  ).toDF("depid","addr","gid").repartition(3)
  def ff(): Unit = {
    println(df.toString())//[name: string, depid: int ... 1 more field]
    println(df.schema)
    df.printSchema()
    df.explain(true)//Prints the plans (logical and physical) to the console for debugging purposes.
    println(df.dtypes.mkString(","))//(name,StringType),(depid,IntegerType),(company,StringType)
    println(df.columns.mkString(","))//
    //df.withWatermark()  ???
    df.show(30,false)
    df.na.drop("any"/*"all"*/).show(false) //刪除df中包含null 或NaN 的記錄,如果為any 則只要有有一列為
    //null 或NaN 則刪除整行,如果是all 則所有列是null ho NaN 時才刪除整行
    df.na.fill("xxx",Seq("name")).show()//缺失值填充,把null 或 NaN 替換為所需要的值
    df.na.replace("name",Map("jason"->"abc","dong"->"def")).show()//將字段name 中 的值按照map 內容進行更改
    //df.stat.xxx  ???
    df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"right").show()
    df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show()

    df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show()
    println("="*40)
    df.join(df2.hint("broadcast"),(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show()
    df.join(broadcast(df2),(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show()//spark 默認廣播10MB的小表
    //df2.hint("broadcast")  和 broadcast(df2) 是等同的
    df.crossJoin(df2).show()//笛卡爾積
    df.sort($"name".desc,$"depid".asc).show()
    df.select("name","depid").show()
    df.selectExpr("name as nm","depid as id").show()
    df.filter(s"""name='jason'""").show()
    df.where(s"""name='jason'""").select("name","depid").show
    df.rollup("name","depid").count().show()
    df.cube("name","depid").count().show()
    df.groupBy("name","depid").count().show()
    df.agg("name"->"max","depid"->"avg").show()
    df.groupBy("name","depid").agg("name"->"max","depid"->"avg").show()
    df.limit(2).show()
    df.union(df3).show()
    df.unionByName(df3).show()
    df.intersect(df3).show()//交集
    df.except(df3).show() //差集
    df.sample(0.5).show()
    df.randomSplit(Array(0.4,0.6)).apply(0).show()
    df.withColumn("depid",$"depid".<=(2)).show() // 該方法可以替換或增加一列到原df, 第二個參數中的col必須時df中的元素
    df.withColumnRenamed("name","姓名").show()
    df.drop("name","depid")//舍棄某幾列
    df.distinct()
    df.dropDuplicates("name").show() //根據某幾列去重,會保留最后一條數據
    df.describe().show() //count,mean,min,max
    df.summary().show()//count,min,25%,50%,max
    df.head() //所有的數據會被collect到driver
    df.toLocalIterator()

    spark.stop()
  }
}

object DFTest {
  def main(args: Array[String]): Unit = {
    val dt = new DFTest
    dt.ff()
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM