spark的map和mapPartitions


1、map是對數據1對1的遍歷,傳輸效率相對比較差,相比起mapPartitions不會出現內存溢出

 

2、mapPartitions  
   對一個rdd里所有分區遍歷
   效率優於map算子,減少了發送到執行器執行的交互次數,mapPartitions是批量將分區數據一次發送
   但是執行器內存不夠的則可能會出現內存溢出(OOM)
   假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理所有分區
   func的函數類型必須是Iterator[T] => Iterator[U]
   map每次拿到的數據是集合中一個元素,mapPartitions每次拿到的是一個分區里的所有元素    

 參考demo:https://github.com/asker124143222/spark-demo

例:

package com.home.spark

import org.apache.spark.{SparkConf, SparkContext}

object Ex_operate {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf(true).setMaster("local[*]").setAppName("spark demo : operate")

    val sc = new SparkContext(conf)

    //todo map 1對1的遍歷
    val listRDD = sc.makeRDD(1 to 10,2)
    listRDD.map(_*2).map(_.toString+"s").collect().foreach(println)
    listRDD.map((_,1)).collect().foreach(println)
    listRDD.map(x=>(x,x*100,List((x,1)))).collect().foreach(println)

    //todo mapPartitions
    // 對一個rdd里所有分區遍歷
    // 效率優於map算子,減少了發送到執行器執行的交互次數,mapPartitions是批量將分區數據一次發送
    // 但是執行器內存不夠的則可能會出現內存溢出(OOM)
    // 假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理所有分區
    // func的函數類型必須是Iterator[T] => Iterator[U]
    // map每次拿到的數據是集合中一個元素,mapPartitions每次拿到的是一個分區里的所有元素
    // mapPartitions業務邏輯里的(_.map(_*2))的map是scala的map,不是spark的map,一定要區分清楚
    listRDD.mapPartitions(_.map(_*2)).mapPartitions(_.map(_.toString+"m")).collect().foreach(println)


    //todo mapPartitionsWithIndex
    //類似於mapPartitions,但func帶有一個整數參數表示分片的索引值
    listRDD.mapPartitionsWithIndex{
      case (num,data) => {
        data.map((_,"分區號"+num))
      }
    }.collect().foreach(println)

    val listTuple = sc.parallelize(List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female")),2)
    listTuple.mapPartitionsWithIndex((num,data)=>{
      var women = List[String]()
      while (data.hasNext){
        val next = data.next()
        next match {
          case (_,"female") => women="["+num+"]"+next._1::women
          case _ =>
        }
      }
      women.iterator
    }).collect().foreach(println)

    //todo driver和executor
    // 除了計算部分在executor(計算節點),其他部分都在driver里
    val step = 10
    listRDD.map(
      _*step     //只有這部分在executor,那么step這個變量是driver里,需要通過網絡傳輸到executor,
                 //所以step需要實現序列化
    ).collect()
    sc.stop()
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM