在spark中,map與mapPartitions兩個函數都是比較常用,這里使用代碼來解釋一下兩者區別
import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer object MapAndPartitions { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("map_mapPartitions_demo").setMaster("local")) val arrayRDD =sc.parallelize(Array(1,2,3,4,5,6,7,8,9)) //map函數每次處理一個/行數據 arrayRDD.map(element=>{ element }).foreach(println) //mapPartitions每次處理一批數據 //將 arrayRDD分成x批數據進行處理 //elements是其中一批數據 //mapPartitions返回一批數據(iterator) arrayRDD.mapPartitions(elements=>{ var result = new ArrayBuffer[Int]() elements.foreach(element=>{ result.+=(element) }) result.iterator }).foreach(println) } }
兩個函數最終處理得到的結果是一樣的
mapPartitions比較適合需要分批處理數據的情況,比如將數據插入某個表,每批數據只需要開啟一次數據庫連接,大大減少了連接開支,偽代碼如下:
arrayRDD.mapPartitions(datas=>{ dbConnect = getDbConnect() //獲取數據庫連接 datas.foreach(data=>{ dbConnect.insert(data) //循環插入數據 }) dbConnect.commit() //提交數據庫事務 dbConnect.close() //關閉數據庫連接 })