spark中map與mapPartitions區別


在spark中,map與mapPartitions兩個函數都是比較常用,這里使用代碼來解釋一下兩者區別

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object MapAndPartitions {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("map_mapPartitions_demo").setMaster("local"))
    val arrayRDD =sc.parallelize(Array(1,2,3,4,5,6,7,8,9))

    //map函數每次處理一個/行數據
    arrayRDD.map(element=>{
      element
    }).foreach(println)

    //mapPartitions每次處理一批數據
    //將 arrayRDD分成x批數據進行處理
    //elements是其中一批數據
    //mapPartitions返回一批數據(iterator)
    arrayRDD.mapPartitions(elements=>{
      var result = new ArrayBuffer[Int]()
      elements.foreach(element=>{
        result.+=(element)
      })
      result.iterator
    }).foreach(println)
  }
}

兩個函數最終處理得到的結果是一樣的

mapPartitions比較適合需要分批處理數據的情況,比如將數據插入某個表,每批數據只需要開啟一次數據庫連接,大大減少了連接開支,偽代碼如下:

    arrayRDD.mapPartitions(datas=>{
      dbConnect = getDbConnect() //獲取數據庫連接
      datas.foreach(data=>{
        dbConnect.insert(data) //循環插入數據
      })
      dbConnect.commit() //提交數據庫事務
      dbConnect.close() //關閉數據庫連接
    })

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM