1、map是對數據1對1的遍歷,傳輸效率相對比較差,相比起mapPartitions不會出現內存溢出
2、mapPartitions 對一個rdd里所有分區遍歷 效率優於map算子,減少了發送到執行器執行的交互次數,mapPartitions是批量將分區數據一次發送 但是執行器內存不夠的則可能會出現內存溢出(OOM) 假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理所有分區 func的函數類型必須是Iterator[T] => Iterator[U] map每次拿到的數據是集合中一個元素,mapPartitions每次拿到的是一個分區里的所有元素
參考demo:https://github.com/asker124143222/spark-demo
例:
package com.home.spark import org.apache.spark.{SparkConf, SparkContext} object Ex_operate { def main(args: Array[String]): Unit = { val conf = new SparkConf(true).setMaster("local[*]").setAppName("spark demo : operate") val sc = new SparkContext(conf) //todo map 1對1的遍歷 val listRDD = sc.makeRDD(1 to 10,2) listRDD.map(_*2).map(_.toString+"s").collect().foreach(println) listRDD.map((_,1)).collect().foreach(println) listRDD.map(x=>(x,x*100,List((x,1)))).collect().foreach(println) //todo mapPartitions // 對一個rdd里所有分區遍歷 // 效率優於map算子,減少了發送到執行器執行的交互次數,mapPartitions是批量將分區數據一次發送 // 但是執行器內存不夠的則可能會出現內存溢出(OOM) // 假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理所有分區 // func的函數類型必須是Iterator[T] => Iterator[U] // map每次拿到的數據是集合中一個元素,mapPartitions每次拿到的是一個分區里的所有元素 // mapPartitions業務邏輯里的(_.map(_*2))的map是scala的map,不是spark的map,一定要區分清楚 listRDD.mapPartitions(_.map(_*2)).mapPartitions(_.map(_.toString+"m")).collect().foreach(println) //todo mapPartitionsWithIndex //類似於mapPartitions,但func帶有一個整數參數表示分片的索引值 listRDD.mapPartitionsWithIndex{ case (num,data) => { data.map((_,"分區號"+num)) } }.collect().foreach(println) val listTuple = sc.parallelize(List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female")),2) listTuple.mapPartitionsWithIndex((num,data)=>{ var women = List[String]() while (data.hasNext){ val next = data.next() next match { case (_,"female") => women="["+num+"]"+next._1::women case _ => } } women.iterator }).collect().foreach(println) //todo driver和executor // 除了計算部分在executor(計算節點),其他部分都在driver里 val step = 10 listRDD.map( _*step //只有這部分在executor,那么step這個變量是driver里,需要通過網絡傳輸到executor, //所以step需要實現序列化 ).collect() sc.stop() } }
