spark map和mapPartitions的區別


package dayo1

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object MapAndPartitions {
  def main(args: Array[String]): Unit = {
    val cof = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" )
    val sc = new SparkContext ( cof )

    //創建RDD(並列化方法)
    val arrayRDD = sc.parallelize ( Array ( 1, 2, 3, 4, 5, 6, 7, 8, 9 ) )

    //map數據每次處理一行數據
    arrayRDD.map ( elements => elements ).foreach ( println )

    arrayRDD.mapPartitions(tp=>{
      val result=new ArrayBuffer[Int]()
      tp.foreach(tp=>{
        result+=tp
      })
      result.iterator
    }
    ).foreach(println)

    sc.stop ()
  }

  /**
    * 兩個函數最終處理得到的結果是一樣的
    *
    * mapPartitions比較適合需要分批處理數據的情況,比如將數據插入某個表,每批數據只需要開啟一次數據庫連接,大大減少了連接開支,偽代碼如下:
    *
    * 復制代碼
    *     arrayRDD.mapPartitions(datas=>{
    * dbConnect = getDbConnect() //獲取數據庫連接
    *       datas.foreach(data=>{
    *         dbConnect.insert(data) //循環插入數據
    * })
    *       dbConnect.commit() //提交數據庫事務
    *       dbConnect.close() //關閉數據庫連接
    * })
    * 復制代碼
    */
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM