package dayo1 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer object MapAndPartitions { def main(args: Array[String]): Unit = { val cof = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" ) val sc = new SparkContext ( cof ) //創建RDD(並列化方法) val arrayRDD = sc.parallelize ( Array ( 1, 2, 3, 4, 5, 6, 7, 8, 9 ) ) //map數據每次處理一行數據 arrayRDD.map ( elements => elements ).foreach ( println ) arrayRDD.mapPartitions(tp=>{ val result=new ArrayBuffer[Int]() tp.foreach(tp=>{ result+=tp }) result.iterator } ).foreach(println) sc.stop () } /** * 兩個函數最終處理得到的結果是一樣的 * * mapPartitions比較適合需要分批處理數據的情況,比如將數據插入某個表,每批數據只需要開啟一次數據庫連接,大大減少了連接開支,偽代碼如下: * * 復制代碼 * arrayRDD.mapPartitions(datas=>{ * dbConnect = getDbConnect() //獲取數據庫連接 * datas.foreach(data=>{ * dbConnect.insert(data) //循環插入數據 * }) * dbConnect.commit() //提交數據庫事務 * dbConnect.close() //關閉數據庫連接 * }) * 復制代碼 */ }