自定義實現spark的分區函數


有時自己的業務需要自己實現spark的分區函數

以下代碼是實現一個自定義spark分區的demo

實現的功能是根據key值的最后一位數字,寫到不同的文件

例如:

10寫入到part-00000

11寫入到part-00001

.

.

.

19寫入到part-00009

自定義分區:

import org.apache.spark.{Partitioner, SparkContext, SparkConf}

//自定義分區類,需繼承Partitioner類
class UsridPartitioner(numParts:Int) extends Partitioner{
  //覆蓋分區數
  override def numPartitions: Int = numParts
  
  //覆蓋分區號獲取函數
  override def getPartition(key: Any): Int = {
    key.toString.toInt%10
  }
}

object Test {
  def main(args: Array[String]) {
    val conf=new SparkConf()
    val sc=new SparkContext(conf)

    //模擬5個分區的數據
    val data=sc.parallelize(1 to 10,5)
    
    //根據尾號轉變為10個分區,分寫到10個文件
    data.map((_,1)).partitionBy(new UsridPartitioner(10)).saveAsTextFile("/chenm/partition")
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM