自定義函數
package com.ruozedata.SparkProjectJob import org.apache.spark.sql.SparkSession object FunctionApp { def main(args: Array[String]): Unit = { val spark =SparkSession.builder()// .master("local[2]")// .appName("AnalyzerTrain")// .getOrCreate() import spark.implicits._ val likeDF= spark.sparkContext.parallelize(List("17er\truoze,j哥,星星,小海", "老二\tzwr,17er", "小海\t蒼老師,波老師")) .map(x => { val fileds = x.split("\t") Stu(fileds(0).trim, fileds(1).trim) } ).toDF() spark.udf.register("like_count" ,(like:String)=>like.split(",").size) //注冊函數like_count
//方式一:
likeDF.createOrReplaceTempView("info")//DF通過createOrReplaceTempView注冊成臨時表info
spark.sql("select name,like,like_count(like) num from info").show() spark.stop() }
case class Stu(name: String, like: String) }
運行結果
+------+------------------------+-----+
|name| like |num|
+------+------------------------+-----+
|17er |ruoze,j哥,星星,小海| 4|
| 老二| zwr,17er | 2 |
| 小海| 蒼老師,波老師 | 2 |
+----+--------------+----------------+
定義了每個人喜歡的人的個數的函數;以上的是定義函數以后通過sql來使用的,那如何通過API來使用呢?看下面的代碼
package com.ruozedata.SparkProjectJob import org.apache.spark.sql.SparkSession object FunctionApp { def main(args: Array[String]): Unit = { val spark =SparkSession.builder()// .master("local[2]")// .appName("AnalyzerTrain")// .getOrCreate() import spark.implicits._ val likeDF= spark.sparkContext.parallelize(List("17er\truoze,j哥,星星,小海", "老二\tzwr,17er", "小海\t蒼老師,波老師")) .map(x => { val fileds = x.split("\t") Stu(fileds(0).trim, fileds(1).trim) } ).toDF() spark.udf.register("like_count" ,(like:String)=>like.split(",").size) //注冊函數like_count //likeDF.createOrReplaceTempView("info")//DF通過createOrReplaceTempView注冊成臨時表info //方式二:注冊函數以后直接就可以當成內置函數使用的模式來使用
likeDF.selectExpr("name","like","like_count(like) as cnt").show()
case class Stu(name: String, like: String) }
運行結果
+----+--------------------------+--------+
|name| like |cnt |
+------+--------------------------+------+
|17er | ruoze,j哥,星星,小海 | 4 |
| 老二 | zwr,17er | 2 |
| 小海 | 蒼老師,波老師 | 2 |
+------+--------------------------+------+
其實方式二僅僅是半API,純正的API見下
package com.ruozedata.SparkProjectJob import org.apache.spark.sql.{SparkSession, functions} object FunctionApp { def main(args: Array[String]): Unit = { val spark =SparkSession.builder()// .master("local[2]")// .appName("AnalyzerTrain")// .getOrCreate() import spark.implicits._ val likeDF= spark.sparkContext.parallelize(List("17er\truoze,j哥,星星,小海", "老二\tzwr,17er", "小海\t蒼老師,波老師")) .map(x => { val fileds = x.split("\t") Stu(fileds(0).trim, fileds(1).trim) } ).toDF()
//方式三: val like_count =functions.udf((like:String)=>like.split(",").size) //spark.udf.register("like_count" ,(like:String)=>like.split(",").size) likeDF.select($"name",$"like",like_count($"like").alias("cnt") ).show()
//like_count 需要通過val來定義,這樣在select里邊使用的時候才不會爆紅
spark.stop() } case class Stu(name: String, like: String) }
運行結果
+----+--------------------------+--------+
|name| like |cnt |
+------+--------------------------+------+
|17er | ruoze,j哥,星星,小海 | 4 |
| 老二 | zwr,17er | 2 |
| 小海 | 蒼老師,波老師 | 2 |
+------+--------------------------+------+