spark自定義分區器實現


在spark中,框架默認使用的事hashPartitioner分區器進行對rdd分區,但是實際生產中,往往使用spark自帶的分區器會產生數據傾斜等原因,這個時候就需要我們自定義分區,按照我們指定的字段進行分區。具體的流程步驟如下:

1、創建一個自定義的分區類,並繼承Partitioner,注意這個partitioner是spark的partitioner

2、重寫partitioner中的方法

  override def numPartitions: Int = ???
override def getPartition(key: Any): Int = ???

代碼實現:
測試數據集:
cookieid,createtime,pv
cookie1,2015-04-10,1
cookie1,2015-04-11,5
cookie1,2015-04-12,7
cookie1,2015-04-13,3
cookie1,2015-04-14,2
cookie1,2015-04-15,4
cookie1,2015-04-16,4
cookie2,2015-04-10,2
cookie2,2015-04-11,3
cookie2,2015-04-12,5
cookie2,2015-04-13,6
cookie2,2015-04-14,3
cookie2,2015-04-15,9
cookie2,2015-04-16,7

  指定按照第一個字段進行分區

步驟1:
package _core.sourceCodeLearning.partitioner

import org.apache.spark.Partitioner
import scala.collection.mutable.HashMap

/**
  * Author Mr. Guo
  * Create 2019/6/23 - 12:19
  */
class UDFPartitioner(args: Array[String]) extends Partitioner {

  private val partitionMap: HashMap[String, Int] = new HashMap[String, Int]()
  var parId = 0
  for (arg <- args) {
    if (!partitionMap.contains(arg)) {
      partitionMap(arg) = parId
      parId += 1
    }
  }

  override def numPartitions: Int = partitionMap.valuesIterator.length

  override def getPartition(key: Any): Int = {
    val keys: String = key.asInstanceOf[String]
    val sub = keys
    partitionMap(sub)
  }
}

  步驟2:

主類測試:

package _core.sourceCodeLearning.partitioner

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.sql.SparkSession

/**
  * Author Mr. Guo
  * Create 2019/6/23 - 12:21
  */
object UDFPartitionerMain {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()
    val sc = ssc.sparkContext
    sc.setLogLevel("WARN")

    val rdd = ssc.sparkContext.textFile("file:///E:\\TestFile\\analyfuncdata.txt")
    val transform = rdd.filter(_.split(",").length == 3).map(x => {
      val arr = x.split(",")
      (arr(0), (arr(1), arr(2)))
    })
    val keys: Array[String] = transform.map(_._1).collect()
    val partiion = transform.partitionBy(new UDFPartitioner(keys))
    partiion.foreachPartition(iter => {
      println(s"**********分區號:${TaskContext.getPartitionId()}***************")
      iter.foreach(r => {
        println(s"分區:${TaskContext.getPartitionId()}###" + r._1 + "\t" + r._2 + "::" + r._2._1)
      })
    })
    ssc.stop()
  }
}

  運行結果:

這樣就是按照第一個字段進行了分區,當然在分區器的中,對於key是可以根據自己的需求隨意的處理,比如添加隨機數等等


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM