一 Spark SQL概述
1.1 什么是Spark SQL
Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame並且作為分布式SQL查詢引擎的作用。
Hive是將Hive SQL轉換成MapReduce然后提交到集群上執行,大大簡化了編寫MapReduce的程序的復雜性,由於MapReduce這種計算模型執行效率比較慢。所有Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然后提交到集群執行,執行效率非常快!
1.易整合
2.統一的數據訪問方式
3.兼容Hive
4.標准的數據連接
SparkSQL可以看做是一個轉換層,向下對接各種不同的結構化數據源,向上提供不同的數據訪問方式。
1.2 RDD vs DataFrames vs DataSet
在SparkSQL中Spark為我們提供了兩個新的抽象,分別是DataFrame和DataSet。他們和RDD的區別,首先從版本的產生上來看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
如果同樣的數據都給到這三個數據結構,他們分別計算之后,都會給出相同的結果。不同是的他們的執行效率和執行方式。
在后期的Spark版本中,DataSet會逐步取代RDD和DataFrame成為唯一的API接口。
1.2.1 RDD
- RDD是一個懶執行的不可變的可以支持Lambda表達式的並行數據集合。
- RDD的最大好處就是簡單,API的人性化程度很高。
- RDD的劣勢是性能限制,它是一個JVM駐內存對象,這也就決定了存在GC(垃圾收集器)的限制和數據增加時Java序列化成本的升高。
1.2.2 Dataframe
與RDD類似,DataFrame也是一個分布式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據以外,還記錄數據的結構信息,即schema。同時,與Hive類似,DataFrame也支持嵌套數據類型(struct、array和map)。從API易用性的角度上看,DataFrame API提供的是一套高層的關系操作,比函數式的RDD API要更加友好,門檻更低。由於與R和Pandas的DataFrame類似,Spark DataFrame很好地繼承了傳統單機數據分析的開發體驗。
上圖直觀地體現了DataFrame和RDD的區別。左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什么。DataFrame多了數據的結構信息,即schema。RDD是分布式的Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少數據讀取以及執行計划的優化,比如filter下推、裁剪等。
DataFrame是為數據提供了Schema的視圖。可以把它當做數據庫中的一張表來對待
DataFrame也是懶執行的。
性能上比RDD要高,主要有兩方面原因:
定制化內存管理
數據以二進制的方式存在於非堆內存,節省了大量空間之外,還擺脫了GC(垃圾收集器)的限制。
查詢優化器的意義在於,即便是經驗並不豐富的程序員寫出的次優的查詢,也可以被盡量轉換為高效的形式予以執行。
Dataframe的劣勢在於在編譯期缺少類型安全檢查,導致運行時出錯.
1.2.3 Dataset
1) 是Dataframe API的一個擴展,是Spark最新的數據抽象
2) 用戶友好的API風格,既具有類型安全檢查也具有Dataframe的查詢優化特性。
3) Dataset支持編解碼器,當需要訪問非堆上的數據時可以避免反序列化整個對象,提高了效率。
4) 樣例類被用來在Dataset中定義數據的結構信息,樣例類中每個屬性的名稱直接映射到DataSet中的字段名稱。
5) Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通過as方法將Dataframe轉換為Dataset。Row是一個類型,跟Car、Person這些的類型一樣,所有的表結構信息都用Row來表示。
6) DataSet是強類型的。比如可以有Dataset[Car],Dataset[Person].
DataFrame只是知道字段,但是不知道字段的類型,所以在執行這些操作的時候是沒辦法在編譯的時候檢查是否類型失敗的,比如你可以對一個String進行減法操作,在執行的時候才報錯,而DataSet不僅僅知道字段,而且知道字段類型,所以有更嚴格的錯誤檢查。就跟JSON對象和類對象之間的類比。
RDD讓我們能夠決定怎么做,而DataFrame和DataSet讓我們決定做什么,控制的粒度不一樣。
1.2.4 三者的共性
1、RDD、DataFrame、Dataset全都是spark平台下的分布式彈性數據集,為處理超大型數據提供便利
2、三者都有惰性機制,在進行創建、轉換,如map方法時,不會立即執行,只有在遇到Action如foreach時,三者才會開始遍歷運算,極端情況下,如果代碼里面有創建、轉換,但是后面沒有在Action中使用對應的結果,在執行時會被直接跳過.
val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000") val spark = SparkSession.builder().config(sparkconf).getOrCreate() val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1))) // map不運行 rdd.map{line=> println("運行") line._1 }
3、三者都會根據spark的內存情況自動緩存運算,這樣即使數據量很大,也不用擔心會內存溢出
4、三者都有partition的概念
5、三者有許多共同的函數,如filter,排序等
6、在對DataFrame和Dataset進行操作許多操作都需要這個包進行支持
import spark.implicits._
7、DataFrame和Dataset均可使用模式匹配獲取各個字段的值和類型
DataFrame:
testDF.map{ case Row(col1:String,col2:Int)=> println(col1);println(col2) col1 case _=> "" }
Dataset:
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型 testDS.map{ case Coltest(col1:String,col2:Int)=> println(col1);println(col2) col1 case _=> "" }
1.2.5 三者的區別
RDD:
1、RDD一般和spark mlib同時使用
2、RDD不支持sparksql操作
DataFrame:
1、與RDD和Dataset不同,DataFrame每一行的類型固定為Row,只有通過解析才能獲取各個字段的值,如
testDF.foreach{ line => val col1=line.getAs[String]("col1") val col2=line.getAs[String]("col2") }
每一列的值沒法直接訪問
2、DataFrame與Dataset一般與spark ml同時使用
3、DataFrame與Dataset均支持sparksql的操作,比如select,groupby之類,還能注冊臨時表/視窗,進行sql語句操作,如
dataDF.createOrReplaceTempView("tmp") spark.sql("select ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)
4、DataFrame與Dataset支持一些特別方便的保存方式,比如保存成csv,可以帶上表頭,這樣每一列的字段名一目了然
//保存 val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://master01:9000/test") datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save() //讀取 val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://master01:9000/test") val datarDF= spark.read.options(options).format("com.atguigu.spark.csv").load()
利用這樣的保存方式,可以方便的獲得字段名和列的對應,而且分隔符(delimiter)可以自由指定。
Dataset:
Dataset和DataFrame擁有完全相同的成員函數,區別只是每一行的數據類型不同。
DataFrame也可以叫Dataset[Row],每一行的類型是Row,不解析,每一行究竟有哪些字段,各個字段又是什么類型都無從得知,只能用上面提到的getAS方法或者共性中的第七條提到的模式匹配拿出特定字段
而Dataset中,每一行是什么類型是不一定的,在自定義了case class之后可以很自由的獲得每一行的信息
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型 /** rdd ("a", 1) ("b", 1) ("a", 1) **/ val test: Dataset[Coltest]=rdd.map{line=> Coltest(line._1,line._2) }.toDS test.map{ line=> println(line.col1) println(line.col2) }
可以看出,Dataset在需要訪問列中的某個字段時是非常方便的,然而,如果要寫一些適配性很強的函數時,如果使用Dataset,行的類型又不確定,可能是各種case class,無法實現適配,這時候用DataFrame即Dataset[Row]就能比較好的解決問題
二。執行SparkSQL查詢
2.1 命令行查詢流程
打開Spark shell
例子:查詢大於30歲的用戶
創建如下JSON文件,注意JSON的格式:
{"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
2.2 IDEA創建SparkSQL程序
IDEA中程序的打包和運行方式都和SparkCore類似,Maven依賴中需要添加新的依賴項:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency>
程序如下
package com.atguigu.sparksql import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.LoggerFactory /** * Created by wuyufei on 22/06/2019. */ object HelloWorld { val logger = LoggerFactory.getLogger(HelloWorld.getClass) def main(args: Array[String]) { //創建SparkConf()並設置App名稱 val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // RDDs 轉 DataFrames 需要的包 import spark.implicits._ val df = spark.read.json("examples/src/main/resources/people.json") df.show() df.filter($"age" > 21).show() df.createOrReplaceTempView("persons") spark.sql("SELECT * FROM persons where age > 21").show() spark.stop() } }
三。SparkSQL解析
3.1 新的起始點SparkSession
在老的版本中,SparkSQL提供兩種SQL查詢起始點,一個叫SQLContext,用於Spark自己提供的SQL查詢,一個叫HiveContext,用於連接Hive的查詢,SparkSession是Spark最新的SQL查詢起始點,實質上是SQLContext和HiveContext的組合,所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的。
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // RDDs 轉 DataFrames 需要的包
import spark.implicits._
SparkSession.builder 用於創建一個SparkSession。
import spark.implicits._的引入是用於將DataFrames隱式轉換成RDD,使df能夠使用RDD中的方法。
如果需要Hive支持,則需要以下創建語句:
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .enableHiveSupport() .getOrCreate()
// RDDs 轉 DataFrames 需要的包
import spark.implicits._
3.2 創建DataFrames
在Spark SQL中SparkSession是創建DataFrames和執行SQL的入口,創建DataFrames有三種方式,一種是可以從一個存在的RDD進行轉換,還可以從Hive Table進行查詢返回,或者通過Spark的數據源進行創建。
val df = spark.read.json("examples/src/main/resources/people.json") df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
從RDD進行轉換:
scala> val peopleRdd = sc.textFile("examples/src/main/resources/people.txt") peopleRdd: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[18] at textFile at <console>:24 //把每一行的數據用,隔開 然后通過第二個map轉換成一個Array 再通過toDF 映射給name age scala> val peopleDF3 = peopleRdd.map(_.split(",")).map(paras => (paras(0),paras(1).trim().toInt)).toDF("name","age") peopleDF3: org.apache.spark.sql.DataFrame = [name: string, age: int] scala> peopleDF.show() +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
Hive在數據源章節介紹
3.3 DataFrame常用操作
3.3.1 DSL風格語法
import spark.implicits._ df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
3.3.2 SQL風格語法
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Register the DataFrame as a global temporary view df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
臨時表是Session范圍內的,Session退出后,表就失效了。如果想應用范圍內有效,可以使用全局表。注意使用全局表時需要全路徑訪問,如:global_temp.people
3.4 創建DataSet
Dataset是具有強類型的數據集合,需要提供對應的類型信息。
case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
3.5 Dataset和RDD互操作
Spark SQL支持通過兩種方式將存在的RDD轉換為Dataset,轉換的過程中需要讓Dataset獲取RDD中的Schema信息,主要有兩種方式,一種是通過反射來獲取RDD中的Schema信息。這種方式適合於列名已知的情況下。第二種是通過編程接口的方式將Schema信息應用於RDD,這種方式可以處理那種在運行時才能知道列的方式。
3.5.1 通過反射獲取Scheam
SparkSQL能夠自動將包含有case類的RDD轉換成DataFrame,case類定義了table的結構,case類屬性通過反射變成了表的列名。Case類可以包含諸如Seqs或者Array等復雜的結構。
// For implicit conversions from RDDs to DataFrames import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19))
3.5.2 通過編程設置Schema(StructType)
如果case類不能夠提前定義,可以通過下面三個步驟定義一個DataFrame
創建一個多行結構的RDD;
創建用StructType來表示的行結構信息。
通過SparkSession提供的createDataFrame方法來應用Schema .
import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows import org.apache.spark.sql._ val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes().show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+
3.6 類型之間的轉換總結
RDD、DataFrame、Dataset三者有許多共性,有各自適用的場景常常需要在三者之間轉換
DataFrame/Dataset轉RDD:
val rdd1=testDF.rdd
val rdd2=testDS.rdd
RDD轉DataFrame:
import spark.implicits._ val testDF = rdd.map {line=> (line._1,line._2) }.toDF("col1","col2")
一般用元組把一行的數據寫在一起,然后在toDF中指定字段名
RDD轉Dataset:
import spark.implicits._ case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型 val testDS = rdd.map {line=> Coltest(line._1,line._2) }.toDS
可以看到,定義每一行的類型(case class)時,已經給出了字段名和類型,后面只要往case class里面添加值即可
Dataset轉DataFrame:
把case class封裝成Row
import spark.implicits._ val testDF = testDS.toDF
DataFrame轉Dataset:
import spark.implicits._ case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型 val testDS = testDF.as[Coltest]
這種方法就是在給出每一列的類型后,使用as方法,轉成Dataset,這在數據類型是DataFrame又需要針對各個字段處理時極為方便。
在使用一些特殊的操作時,一定要加上 import spark.implicits._ 不然toDF、toDS無法使用
3.7 用戶自定義函數
通過spark.udf功能用戶可以自定義函數。
3.7.1 用戶自定義UDF函數
scala> val df = spark.read.json("examples/src/main/resources/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ scala> spark.udf.register("addName", (x:String)=> "Name:"+x) res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType))) scala> df.createOrReplaceTempView("people") scala> spark.sql("Select addName(name), age from people").show() +-----------------+----+ |UDF:addName(name)| age| +-----------------+----+ | Name:Michael|null| | Name:Andy| 30| | Name:Justin| 19| +-----------------+----+
3.7.2 用戶自定義聚合函數
強類型的Dataset和弱類型的DataFrame都提供了相關的聚合函數, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用戶可以設定自己的自定義聚合函數。
3.7.2.1 弱類型用戶自定義聚合函數
通過繼承UserDefinedAggregateFunction來實現用戶自定義聚合函數。下面展示一個求平均工資的自定義聚合函數。
import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object MyAverage extends UserDefinedAggregateFunction { // 聚合函數輸入參數的數據類型 def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // 聚合緩沖區中值得數據類型 def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // 返回值的數據類型 def dataType: DataType = DoubleType // 對於相同的輸入是否一直返回相同的輸出。 def deterministic: Boolean = true // 初始化 def initialize(buffer: MutableAggregationBuffer): Unit = { // 存工資的總額 buffer(0) = 0L // 存工資的個數 buffer(1) = 0L } // 相同Execute間的數據合並。 def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // 不同Execute間的數據合並 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // 計算最終結果 def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // 注冊函數 spark.udf.register("myAverage", MyAverage) val df = spark.read.json("examples/src/main/resources/employees.json") df.createOrReplaceTempView("employees") df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+
3.7.2.2 強類型用戶自定義聚合函數
通過繼承Aggregator來實現強類型自定義聚合函數,同樣是求平均工資
import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.Encoder import org.apache.spark.sql.Encoders import org.apache.spark.sql.SparkSession // 既然是強類型,可能有case類 case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverage extends Aggregator[Employee, Average, Double] { // 定義一個數據結構,保存工資總數和工資總個數,初始都為0 def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // 聚合不同execute的結果 def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // 計算輸出 def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // 設定之間值類型的編碼器,要轉換成case類 // Encoders.product是進行scala元組和case類轉換的編碼器 def bufferEncoder: Encoder[Average] = Encoders.product // 設定最終輸出值的編碼器 def outputEncoder: Encoder[Double] = Encoders.scalaDouble } val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee] ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ // Convert the function to a `TypedColumn` and give it a name val averageSalary = MyAverage.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+