2019-04-20
關鍵字: Spark 的 agrregate 作用、Scala 的 aggregate 是什么
Spark 編程中的 aggregate 方法還是比較常用的。本篇文章站在初學者的角度以大白話的形式來講解一下 aggregate 方法。
aggregate 方法是一個聚合函數,接受多個輸入,並按照一定的規則運算以后輸出一個結果值。
aggregate 在哪
aggregate 方法是 Spark 編程模型 RDD 類( org.apache.spark.RDD ) 中定義的一個公有方法。它的方法聲明如下
1 def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { 2 3 // ... 4 5 }
aggregate 的參數是什么意思
然后我們一塊一塊來學習這個方法的聲明。其實這小節講的,都是 Scala 的語法知識。
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
首先看到的是 “泛型” 聲明。懂 Java 的同學直接把這個 " [U: ClassTag] " 理解成是一個泛型聲明就好了。如果您不是很熟悉 Java 語言,那我們只需要知道這個 U 表示我們的 aggregate 方法只能接受某一種類型的輸入值,至於到底是哪種類型,要看您在具體調用的時候給了什么類型。
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
然后我們來看看 aggregate 的參數列表。明顯這個 aggregate 方法是一個柯里化函數。柯里化的知識不在本篇文章討論的范圍之內。如果您還不了解柯里化的概念,那在這里簡單地理解為是通過多個圓括號來接受多個輸入參數就可以了。
然后我們來看看第 1 部分,即上面藍色加粗的 " (zeroValue: U) " 。這個表示它接受一個任意類型的輸入參數,變量名為 zeroValue 。這個值就是初值,至於這個初值的作用,姑且不用理會,等到下一小節通過實例來講解會更明了,在這里只需要記住它是一個 “只使用一次” 的值就好了。
第 2 部分,我們還可以再把它拆分一下,因為它里面其實有兩個參數。筆者認為 Scala 語法在定義多個參數時,辨識度比較弱,不睜大眼睛仔細看,很難確定它到底有幾個參數。
首先是第 1 個參數 " seqOp: (U, T) => U " 它是一個函數類型,以一個輸入為任意兩個類型 U, T 而輸出為 U 類型的函數作為參數。這個函數會先被執行。這個參數函數的作用是為每一個分片( slice )中的數據遍歷應用一次函數。換句話說就是假設我們的輸入數據集( RDD )有 1 個分片,則只有一個 seqOp 函數在運行,假設有 3 個分片,則有三個 seqOp 函數在運行。可能有點難以理解,不過沒關系,到后面結合實例就很容易理解了。
另一個參數 " combOp: (U, U) => U " 接受的也是一個函數類型,以輸入為任意類型的兩個輸入參數而輸出為一個與輸入同類型的值的函數作為參數。這個函數會在上面那個函數執行以后再執行。這個參數函數的輸入數據來自於第一個參數函數的輸出結果,這個函數僅會執行 1 次,它是用來最終聚合結果用的。同樣這里搞不懂沒關系,下一小節的實例部分保證讓您明白。
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
最后是上面這個紅色加粗的 " : U " 它是 aggregate 方法的返回值類型,也是泛型表示。
對了,最后還有一個 " withScope ",這個就不介紹了,因為筆者也不知道它是干嘛的,哈哈哈哈。反正對我們理解這個方法也沒什么影響。
aggregate 正確的使用姿勢
我們直接在 spark-shell 中來演示實例了。這里以兩個小例子來演示,一個是不帶分片的 RDD ,另一個則是帶 3 個分片的 RDD 。
首先我們來創建一個 RDD
scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd1 collect
warning: there was one feature warning; re-run with -feature for details
res24: Array[Int] = Array(1, 2, 3, 4, 5)
這個 RDD 僅有 1 個分片,包含 5 個數據: 1, 2, 3, 4, 5 。
然后我們來應用一下 aggregate 方法。
哦,不對,在使用 aggregate 之前,我們還是先定義兩個要給 aggregate 當作輸入參數的函數吧。
scala> :paste // Entering paste mode (ctrl-D to finish) def pfun1(p1: Int, p2: Int): Int = { p1 * p2 } // Exiting paste mode, now interpreting. pfun1: (p1: Int, p2: Int)Int scala>
首先來定義第 1 個函數,即等下要被當成 seqOp 的形參使用的函數。在上一小節我們知道 seqOp 函數是一個輸入類型為 U, T 類型而輸出為 U 類型的函數。但是在這里,因為我們的 RDD 只包含一個 Int 類型數據,所以這里的 seqOp 的兩個輸入參數都是 Int 類型的,這是沒毛病的哦!然后這個函數的返回類型也為 Int 。我們這個函數的作用就是將輸入的參數 p1 , p2 求積以后返回。
scala> :paste // Entering paste mode (ctrl-D to finish) def pfun2(p3: Int, p4: Int): Int = { p3 + p4 } // Exiting paste mode, now interpreting. pfun2: (p3: Int, p4: Int)Int scala>
接着是第 2 個函數。就不再解釋什么了。
然后終於可以開始應用我們的 aggregate 方法了。
scala> rdd1.aggregate(3)(pfun1, pfun2) res25: Int = 363 scala>
輸出結果是 363 !這個結果是怎么算出來的呢?
首先我們的 zeroValue 即初值是 3 。然后通過上面小節的介紹,我們知道首先會應用 pfun1 函數,因為我們這個 RDD 只有 1 個分片,所以整個運算過程只會有一次 pfun1 函數調用。它的計算過程如下:
首先用初值 3 作為 pfun1 的參數 p1 ,然后再用 RDD 中的第 1 個值,即 1 作為 pfun1 的參數 p2 。由此我們可以得到第一個計算值為 3 * 1 = 3 。接着這個結果 3 被當成 p1 參數傳入,RDD 中的第 2 個值即 2 被當成 p2 傳入,由此得到第二個計算結果為 3 * 2 = 6 。以此類推,整個 pfun1 函數執行完成以后,得到的結果是 3 * 1 * 2 * 3 * 4 * 5 = 360 。這個 pfun1 的應用過程有點像是 “在 RDD 中滑動計算” 。
在 aggregate 方法的第 1 個參數函數 pfun1 執行完畢以后,我們得到了結果值 360 。於是,這個時候就要開始執行第 2 個參數函數 pfun2 了。
pfun2 的執行過程與 pfun1 是差不多的,同樣會將 zeroValue 作為第一次運算的參數傳入,在這里即是將 zeroValue 即 3 當成 p3 參數傳入,然后是將 pfun1 的結果 360 當成 p4 參數傳入,由此得到計算結果為 363 。因為 pfun1 僅有一個結果值,所以整個 aggregate 過程就計算完畢了,最終的結果值就是 363 。
怎么樣?相信您已經完全明白 aggregate 方法的的作用與用法了吧。下面再貼一個有多個分片的 RDD 的示例。
scala> val rdd2 = sc.makeRDD(1 to 10, 3) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24 scala> rdd2.getNumPartitions res26: Int = 3 scala> rdd2.foreachPartition(myprint) 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ,
這里定義了一個擁有 3 個分片的 RDD 。然后 aggregate 的兩個函數參數仍然是使用上面定義的 pfun1 與 pfun2 。
scala> rdd2.aggregate(2)(pfun1, pfun2) res29: Int = 10334
結果是 10334 。怎么來的呢?
因為前面小節有提到 seqOp 函數,即這里的 pfun1 函數會分別在 RDD 的每個分片中應用一次,所以這里 pfun1 的計算過程為
2 * 1 * 2 * 3 = 12 2 * 4 * 5 * 6 = 240 2 * 7 * 8 * 9 * 10 = 10080
標橙的為 zeroValue 。
在這里 pfun1 的輸出結果有 3 個值。然后就來應用 combOp 即這里的 pfun2
2 + 12 + 240 + 10080 = 10334
所以,結果就是 10334 咯!