spark foreachPartition foreach


1.foreach

    val list = new ArrayBuffer()
    myRdd.foreach(record => {
      list += record
    })

2.foreachPartition

    val list = new ArrayBuffer
    rdd.foreachPartition(it => {
      it.foreach(r => {
        list += r
      })
    })

說明:

foreachPartition屬於算子操作,可以提高模型效率。比如在使用foreach時,將RDD中所有數據寫Mongo中,就會一條數據一條數據地寫,每次函數調用可能就會創建一個數據庫連接,此時就勢必會頻繁地創建和銷毀數據庫連接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個partition的數據,那么對於每個partition,只要創建一個數據庫連接即可,然后執行批量插入操作,此時性能是比較高的。

參考官網的說明:

https://spark.apache.org/docs/latest/streaming-programming-guide.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM