spark源代碼action系列-foreach與foreachPartition


RDD.foreachPartition/foreach的操作

在這個action的操作中:

這兩個action主要用於對每一個partition中的iterator時行迭代的處理.通過用戶傳入的functioniterator進行內容的處理.

首先我們先看看foreach的操作:

fureach,傳入一個function,這個函數的傳入參數就是每一個partition,每次的foreach得到的一個rddkv實例,也就是詳細的內容,這樣的處理你並不知道這個iteratorforeach什么時候結果,僅僅能是foreach的過程中,你得到一條數據,就處理一條數據.

由以下的紅色部分能夠看出,foreach操作是直接調用了partition中數據的foreach操作.

def foreach(f: => Unit): Unit withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this(iter: Iterator[T]) => iter.foreach(cleanF))
}

演示樣例說明:

val list = new ArrayBuffer()

Rdd.foreach(record => {

  list += record

  If (list.size >= 10000) {

    list.flush....

  }

})

上面這段演示樣例代碼中,假設這么使用就會存在一個問題,

迭代的最后,list的結果可能還沒有達到10000條,這個時候,你在內部的處理的flush部分就不會運行,也就是迭代的最后假設沒有達到10000的數據就會丟失.

所以在foreach中,一般就是拿到一條數據進行下處理Rdd.foreach(record => {record._1 == a return})

 

然后接下來看看foreachPartition:

這個函數也是依據傳入的function進行處理,但不同處在於,這里function的傳入參數是一個partition相應數據的iterator.而不是直接使用iteratorforeach,

這樣的情況下,假設是上面foreach的演示樣例代碼中list這個片段在這個action中就行正常的去處理.

def foreachPartition(f: Iterator[T] => Unit): Unit withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

 

演示樣例代碼:

Val list = new ArrayBuffer

rdd.foreachPartition(it => {

  It.foreach(r => {

List += r

If (list.size > 10000) flush

  })

  If (list.size > 0) flush

})

 

最后說下這兩個action的差別:

ForeachforeachPartition都是在每一個partition中對iterator進行操作,

不同的是,foreach是直接在每一個partition中直接對iterator運行foreach操作,而傳入的function僅僅是在foreach內部使用,

foreachPartition是在每一個partition中把iterator給傳入的function,function自己對iterator進行處理.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM