package com.jason.example import org.apache.spark.sql.functions.broadcast class DFTest extends SparkInstance { import spark.implicits._ val df = Seq( ("jason", 1, "理想",0), (null, 2, "理想",1), ("mac", 3, "理想",2), ("mac", 4, "理想",2) ).toDF("name", "depid", "company","groupid").repartition(3) val df3 = Seq( ("jason", 1, "理想",0), ("dong", 2, "理想",1), ("mac", 3, "理想",2) ).toDF("name", "depid", "company","groupid").repartition(3) val df2 = Seq( (3,"周浦",2), (4,"孫橋",0), (5,"金橋",1) ).toDF("depid","addr","gid").repartition(3) def ff(): Unit = { println(df.toString())//[name: string, depid: int ... 1 more field] println(df.schema) df.printSchema() df.explain(true)//Prints the plans (logical and physical) to the console for debugging purposes. println(df.dtypes.mkString(","))//(name,StringType),(depid,IntegerType),(company,StringType) println(df.columns.mkString(","))// //df.withWatermark() ??? df.show(30,false) df.na.drop("any"/*"all"*/).show(false) //刪除df中包含null 或NaN 的記錄,如果為any 則只要有有一列為 //null 或NaN 則刪除整行,如果是all 則所有列是null ho NaN 時才刪除整行 df.na.fill("xxx",Seq("name")).show()//缺失值填充,把null 或 NaN 替換為所需要的值 df.na.replace("name",Map("jason"->"abc","dong"->"def")).show()//將字段name 中 的值按照map 內容進行更改 //df.stat.xxx ??? df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"right").show() df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show() df.join(df2,(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show() println("="*40) df.join(df2.hint("broadcast"),(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show() df.join(broadcast(df2),(df("depid")===df2("depid")).and(df("groupid")===df2("gid")),"left").show()//spark 默認廣播10MB的小表 //df2.hint("broadcast") 和 broadcast(df2) 是等同的 df.crossJoin(df2).show()//笛卡爾積 df.sort($"name".desc,$"depid".asc).show() df.select("name","depid").show() df.selectExpr("name as nm","depid as id").show() df.filter(s"""name='jason'""").show() df.where(s"""name='jason'""").select("name","depid").show df.rollup("name","depid").count().show() df.cube("name","depid").count().show() df.groupBy("name","depid").count().show() df.agg("name"->"max","depid"->"avg").show() df.groupBy("name","depid").agg("name"->"max","depid"->"avg").show() df.limit(2).show() df.union(df3).show() df.unionByName(df3).show() df.intersect(df3).show()//交集 df.except(df3).show() //差集 df.sample(0.5).show() df.randomSplit(Array(0.4,0.6)).apply(0).show() df.withColumn("depid",$"depid".<=(2)).show() // 該方法可以替換或增加一列到原df, 第二個參數中的col必須時df中的元素 df.withColumnRenamed("name","姓名").show() df.drop("name","depid")//舍棄某幾列 df.distinct() df.dropDuplicates("name").show() //根據某幾列去重,會保留最后一條數據 df.describe().show() //count,mean,min,max df.summary().show()//count,min,25%,50%,max df.head() //所有的數據會被collect到driver df.toLocalIterator() spark.stop() } } object DFTest { def main(args: Array[String]): Unit = { val dt = new DFTest dt.ff() } }