Spark SQL內置函數官網API:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions%24
平常在使用mysql的時候,我們在寫SQL的時候會使用到MySQL為我們提供的一些內置函數,如數值函數:求絕對值abs()、平方根sqrt()等,還有其它的字符函數、日期函數、聚合函數等等。使我們利用這些內置函數能夠快速實現我們的業務邏輯。在SparkSQL里其實也為我們提供了近兩百多種內置函數,我們通過
import org.apache.spark.sql.functions._
導入內置函數包,來使用。也可以在SQL語句中直接使用。SparkSQL內置函數分類:聚合函數、集合函數、日期函數、數學函數、混雜函數、非聚合函數、排序函數、字符串函數、UDF函數和窗口函數這10類函數。
1 內置函數的使用
使用內置函數的方式有兩種,一種是通過編程的方式的使用,另一種是通過SQL的方式使用。
例如:我們有如下數據,想要使用SparkSQL內置函數lower()來將名字全部轉為小寫
+----+---+-----------+
|name|age| phone|
+----+---+-----------+
|Ming| 20|15552211521|
|hong| 19|13287994007|
| zhi| 21|15552211523|
+----+---+-----------+
以編程的方式使用內置函數
import org.apache.spark.sql.functions._ df.select(lower(col("name")).as("name"), col("age"), col("phone")).show()
以SQL的方式使用
df.createOrReplaceTempView("people") spark.sql("select lower(name) as name,age,phone from people").show()
2 UDF函數的使用
有的時候,SparkSQL提供的內置函數無法滿足我們的業務的時候,我們可以使用過UDF函數來自定義我們的實現邏輯。例如:需要對上面的數據添加一列id,要求id的生成是name+隨機生成的uuid+phone。這時候我們可以使用UDF自定義函數實現。如下所示:
//根據name和phone生成組合,並加上一段uud生成唯一表示id
def idGenerator(name: String, phone: Long): String = { name + "-" + UUID.randomUUID().toString + "-" + phone.toString } //生成udf函數
val idGeneratorUDF = udf(idGenerator _) //加入隱式轉換
import spark.implicits._ df.withColumn("id", idGeneratorUDF($"name", $"phone")).show()
也可以這樣寫:
//加入隱式轉換
import spark.implicits._ //根據name和phone生成組合,並加上一段uud生成唯一表示id
def idGenerator(name: String, phone: Long): String = { name + "-" + UUID.randomUUID().toString + "-" + phone.toString } //注冊udf函數
spark.udf.register("idGenerator",idGenerator _) //使用idGenerator
df.withColumn("id",callUDF("idGenerator",$"name",$"phone")).show()
結果都是一樣的:
+----+---+-----------+--------------------+
|name|age| phone| id|
+----+---+-----------+--------------------+
|Ming| 20|15552211521|Ming-9b87d4d5-91d...|
|hong| 19|13287994007|hong-7a91f7d8-66a...|
| zhi| 21|15552211523|zhi-f005859c-4516...|
+----+---+-----------+--------------------+
同樣,我們可以將我們自定義的UDF函數注冊到SparkSQL里,然后用SQL實現
//將自定義函數注冊到SparkSQL里
spark.udf.register("idGeneratorUDF",idGeneratorUDF) //創建臨時表
df.createOrReplaceTempView("people") //使用sql查詢
spark.sql("select idGeneratorUDF(name,phone) as id,name,age,phone from people").show()
注意:上面加入import spark.implicits._隱式轉換是為了方便使用$”列名”來代替col(“列名”)
完整代碼:
import java.util.UUID import org.apache.spark.sql.SparkSession /** * spark sql 內置函數 */
object SparkSQLFunctionApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local").getOrCreate() import org.apache.spark.sql.functions._ //加入隱式轉換: 本例子里可以使用toDF方法和$"列名"代替col("列名")
import spark.implicits._ val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L)).toDF("name", "age", "phone") df.show() /** * +----+---+-----------+ * |name|age| phone| * +----+---+-----------+ * |Ming| 20|15552211521| * |hong| 19|13287994007| * | zhi| 21|15552211523| * +----+---+-----------+ */
//1 使用內置函數將所有名字都轉為小寫 //1.1 編程的方式:
df.select(lower($"name").as("name"), $"age", $"phone").show() /** * +----+---+-----------+ * |name|age| phone| * +----+---+-----------+ * |ming| 20|15552211521| * |hong| 19|13287994007| * | zhi| 21|15552211523| * +----+---+-----------+ */
//1.2 SQL的方式 //注冊表
df.createOrReplaceTempView("people") spark.sql("select lower(name) as name,age,phone from people").show() /** * +----+---+-----------+ * |name|age| phone| * +----+---+-----------+ * |ming| 20|15552211521| * |hong| 19|13287994007| * | zhi| 21|15552211523| * +----+---+-----------+ */
//2 UDF函數的使用 //2.1 直接使用 //根據name和phone生成組合,並加上一段uud生成唯一表示id
def idGenerator(name: String, phone: Long): String = { name + "-" + UUID.randomUUID().toString + "-" + phone.toString } //生成udf函數
val idGeneratorUDF = udf(idGenerator _) df.withColumn("id", idGeneratorUDF($"name", $"phone")).show() /** * +----+---+-----------+--------------------+ * |name|age| phone| id| * +----+---+-----------+--------------------+ * |Ming| 20|15552211521|Ming-74338e40-548...| * |hong| 19|13287994007|hong-4f058f2b-9d3...| * | zhi| 21|15552211523|zhi-f42bea86-a9cf...| * +----+---+-----------+--------------------+ */
//將自定義函數注冊到SparkSQL里
spark.udf.register("idGeneratorUDF", idGeneratorUDF) //創建臨時表
df.createOrReplaceTempView("people") //使用sql查詢
spark.sql("select idGeneratorUDF(name,phone) as id,name,age,phone from people").show() /** * +----+---+-----------+--------------------+ * |name|age| phone| id| * +----+---+-----------+--------------------+ * |Ming| 20|15552211521|Ming-74338e40-548...| * |hong| 19|13287994007|hong-4f058f2b-9d3...| * | zhi| 21|15552211523|zhi-f42bea86-a9cf...| * +----+---+-----------+--------------------+ */
//2.2 通過callUDF使用 //注冊udf函數
spark.udf.register("idGenerator", idGenerator _) //使用idGenerator
df.withColumn("id", callUDF("idGenerator", $"name", $"phone")).show() /** * +----+---+-----------+--------------------+ * |name|age| phone| id| * +----+---+-----------+--------------------+ * |Ming| 20|15552211521|Ming-74338e40-548...| * |hong| 19|13287994007|hong-4f058f2b-9d3...| * | zhi| 21|15552211523|zhi-f42bea86-a9cf...| * +----+---+-----------+--------------------+ */
//創建臨時表
df.createOrReplaceTempView("people") //使用sql查詢
spark.sql("select idGenerator(name,phone) as id,name,age,phone from people").show() /** * +--------------------+----+---+-----------+ * | id|name|age| phone| * +--------------------+----+---+-----------+ * |Ming-d4236bac-e21...|Ming| 20|15552211521| * |hong-bff84c0d-67d...|hong| 19|13287994007| * |zhi-aa0174b0-c8b3...| zhi| 21|15552211523| * +--------------------+----+---+-----------+ */ } }