Spark常見編程問題解決辦法及優化


1.數據傾斜

來源:讀取數據之后,包括從數據源讀取和shuffle后讀取

后果:大部分task和小部分task完成時間相差很大、OOM(也有可能時異常數據的問題,需要完善代碼)。

分析:用sample + countBykey -> 除以count判斷key的分布情況。

解決方法:

  1. 采用map-side聚合的算子
  2. 提高並行度repartition
  3. 先估計分布,確定哪些key導致傾斜,如果單個key數據不是太大,可以自定義partition為其分區;如果單個key數據很大,就多key進行改造。
  4. join類傾斜:
    • 過濾掉業務無關null再join
    • 其中一個表小時,廣播join
    • 傾斜數據分離:分離出傾斜部分的表,這個表通常不大,此時再廣播join
    • 如果單個key過大,那只能對該key進行改造了,即為key添加一個隨機后序,如0、1、2中的一個,而另一個表則要擴大3倍,每條數據的key分別加上0、1、2的后綴(為保證所有key都被分配0后綴,從而另一個表沒有足夠的數據join)。這里可以自定義一些UDF來實現對數據分布的估計和改造key中n,即打散程度,的選擇。
  5. 數據源:盡量用可分割文件保存數據、repartition

2.TopN

TopN問題可分為4種

  • 已有可比較數據的總體TopN。例如從一個班的語文成績表中找前5名的學生,只是這個表的數據在不同的節點上。
    • DF轉pairRDD后用takeOrdered()。優點:不需要全排;缺點:結果為聚合到Driver的Array,所以不適合N較大的情況。
    • DF -> sort -> rdd -> zipWithIndex -> filter(index < n) 。優點:適合N較大的情況,結果仍然是分布式的;缺點:全排,N較小時比上面慢
    • DF的sort后take。優點:簡單;缺點:全排,N較大時很慢,甚至會OOM(take會將結果都shuffle到一個partition中)
  • 已有可比較數據分組TopN。例如從兩個班的語文成績表中找各班前5名的學生。
    • Aggragator。優點:快;缺點:較復雜
    • window function。優點:簡單,適合N較大或者數據量較小;缺點:數據量大時稍慢(window function並不進行map-side聚合,所以shuffle量較大)
  • 未有可比較數據,需要分組聚合后才能比較的總體TopN。例如一個班每名學生各科成績都在一個表上,求總分前5名的學生。
    • DF.groupBy().agg()然后接上面的“總體TopN”
  • 未有可比較數據,需要分組聚合后才能比較的分組TopN。例如兩個班每名學生各科成績都在一個表上,求各班總分前5名的學生。
    • DF.groupBy().agg()然后使用windowFunction。因為經過前面的groupBy的shuffle后,數據已經有了partitioner。所以此處的windowfunc操作並不會shuffle
// Aggragator例子
class TopNAggregator[K1: TypeTag, K2: TypeTag, V: TypeTag](num: Int, ord: Ordering[(K2, V)])
  extends Aggregator[(K1, K2, V), mutable.PriorityQueue[(K2, V)], Array[(K2, V)]] {

  override def zero: mutable.PriorityQueue[(K2, V)] = new mutable.PriorityQueue[(K2, V)]()(ord)

  override def reduce(q: mutable.PriorityQueue[(K2, V)],
                       a: (K1, K2, V)): mutable.PriorityQueue[(K2, V)] = {
    if (q.size < num) {
      q += ((a._2, a._3))
    } else {
      q += ord.min((a._2, a._3), q.dequeue)
    }
  }

  override def merge(q1: mutable.PriorityQueue[(K2, V)],
                      q2: mutable.PriorityQueue[(K2, V)]): mutable.PriorityQueue[(K2, V)] = {
    q1 ++= q2
    while (q1.length > num) {
      q1.dequeue()
    }
    q1
  }

  override def finish(r: mutable.PriorityQueue[(K2, V)]): Array[(K2, V)] = {
    r.toArray.sorted(ord.reverse)
  }

  override def bufferEncoder: Encoder[mutable.PriorityQueue[(K2, V)]] = {
    Encoders.kryo[mutable.PriorityQueue[(K2, V)]]
  }

  override def outputEncoder: Encoder[Array[(K2, V)]] = ExpressionEncoder[Array[(K2, V)]]()
}

// 使用
val topNAggregator = new TopNAggregator[Int, Int, Float](10, Ordering.by(-_._2))
df.groupByKey()
  .agg(topNAggregator.toColumn)

3.Join優化

預排序的join

針對SortMergeJoinExec,在mapper端提前sort。原代碼在Reducer端進行排序,但reducer端的數據不及mapper端均勻,所以排序工作量不一,會導致尾部延遲放大。Map階段會按照key的哈希值對數據進行重分區並按key排序。Reducer只需對來自不同Mapper的數據進行歸並排序。這種機制相當於把Reducer排序的任務分流給Mapper。而由於Mapper的數據量往往是比較均勻的,所以排序的性能會優於Reducer。

待考證:如果直接處理RDD,對兩個需要join的RDD調用 repartitionAndSortWithinPartitions 然后join

cross join

當每條數據都需要和其余的每條數據進行計算時,例如計算相似度矩陣,下面的方法進行crossjoin能夠大大減小其中間結果。實驗時直接crossjoin能產生3G以上的數據,應用此方法則只有幾十M。

val ready2Crossjoin = movieFeatures.as[(Int, Array[Float])]
  .mapPartitions(_.grouped(4096))

    implicit val ordering = new Ordering[(Int, Float)] {
      def compare(x: (Int, Float), y: (Int, Float)): Int = {
        val compare2 = x._2.compareTo(y._2)
        if (compare2 != 0) return -compare2
        0
      }
    }

val ratings = ready2Crossjoin.crossJoin(ready2Crossjoin)
  .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
  .flatMap {
    case (mf1Iter, mf2Iter) =>
      val m1 = mf1Iter.size
      val m2 = math.min(mf2Iter.size, 100)
      var i = 0
      val output = new Array[(Int, Int, Float)](m1 * m2)
      val pq = mutable.PriorityQueue[(Int, Float)]()
      val vectorOp = new F2jBLAS
      mf1Iter.foreach { case (m1Id, mf1Factor) =>
        mf2Iter.foreach { case (m2Id, mf2Factor) =>
          if (m1Id == m2Id) {
            // do nothing
          } else {
            val simScore = consinSim(ALSRank, vectorOp, mf1Factor, mf2Factor)
            if (pq.length < m2) {
              pq.enqueue((m2Id, simScore))
            } else {
              val temp = pq.dequeue()
              pq += (if (temp._2 > simScore) temp else (m2Id, simScore))
            }
          }
        }
        pq.foreach { case (mf2Id, score) =>
          output(i) = (m1Id, mf2Id, score)
          i += 1
        }
        pq.clear()
      }
      output.toSeq
  }

private def consinSim(rank: Int, operator: F2jBLAS, movie1: Array[Float], movie2: Array[Float]): Float = {
    operator.sdot(rank, movie1,1, movie2, 1) / operator.snrm2(rank, movie1,1) * operator.snrm2(rank, movie2,1)
}

考慮Join順序

Spark SQL的CBO尚未成熟,不能對SQL中的join的順序做智能調整。順序的確定需要對數據表的分布有所了解,從而推斷某些順序能夠產生更少的中間數據,進而提高效率。

4.根據HashMap、DF等數據集進行filter

在HashMap、DF等數據集較小的情況下:

  • HashMap:廣播map,然后根據contain來filter。適合數據集較小的情況。

  • DF:提取相應的列后,然后用left_anti。適合比上面數據集稍大的情況。

當數據集很大時,同樣利用上面DF的方法,但去掉broadcast,然Spark自行決定如何join。

// HashMap filter
val BCMap = sc.broadcast(mapForFilter)
val filteredDF = df.filter($"col_name" isin (BCMap.value: _*))

// DF filter
val DFForFilter = df1.select("id")
val filteredDF = df0.join(broadcast(filteredDF), Seq("id"), "left_anti"))

5.Join去掉重復的列

val df = left.join(right, Seq("name"))

6.展開NestedDF

+---+-----------+
| _1|         _2|
+---+-----------+
|  1|[2, [3, 4]]|
+---+-----------+

+---+-----+--------+--------+
| _1|_2._1|_2._2._1|_2._2._2|
+---+-----+--------+--------+
|  1|    2|       3|       4|
+---+-----+--------+--------+

implicit class DataFrameFlattener(df: DataFrame) {
  def flattenSchema: DataFrame = {
    df.select(flatten(Nil, df.schema): _*)
  }

  protected def flatten(path: Seq[String], schema: DataType): Seq[Column] = schema match {
    case s: StructType => s.fields.flatMap(f => flatten(path :+ f.name, f.dataType))
    case other => col(path.map(n => s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
  }
}

veryNestedDF.flattenSchema.show()

7.計算session/組內時間差

val timeFmt = "yyyy-MM-dd HH:mm:ss"
val sessionid2ActionsRDD2 = UserVisitActionDF
  .withColumn("action_time", unix_timestamp($"action_time", timeFmt))
  .groupBy("session_id")
  .agg(min("action_time") as "start",
    max("action_time") as "end",
  .withColumn("visitLength", $"start" - $"end")

8.用flatMap替代map + filter

df.flatMap(if (filter_condition) Some(result) else None)

9.分層抽樣

// 各種類型抽取10%
val fractions = HashMap(
  TYPE1 -> 0.1,
  TYPE2 -> 0.1,
  TYPE3 -> 0.1
)
val randomSeed = 2L
df.stat.sampleBy("col_name", fractions, randomSeed)

// 如果col_name的數據種類未知,用下面方式得出fractions
df.select("time_period")
  .distinct
  .map(x=> (x, 0.1))
  .collectAsMap

10.SQL與DF API

SQL作為聲明式語言,即只需要指定所需數據的模式就能得到結果。這種語言的編程思路容易讓人忽略代碼的執行順序,從而寫出一些執行效率低的代碼。盡管Spark有Optimizer優化,但尚未完全成熟,部分SQL語句無法實現filter、aggregation等下推。

DF API是一種函數式的語言,能讓編程者注意到執行順序,減小寫出低效代碼的可能。

11.Shuffle后的分區

使用DF時,開啟自動分區。

如果適用RDD,則有些shuffle是可以輸入partitioner參數的,這就可以控制shuffle后的分區數,一些情況還能避免shuffle。如下面代碼,rdd2執行reduceByKey的shuffle時使用rdd1的partitioner,那么之后的rdd3和rdd1的join就不需要shuffle了。

val rdd1Partitioner = rdd1.partitioner match {
  case Some(p) => p
  case None => new HashPartitioner(rdd1.partitions.length)
}
val rdd3 = rdd2.reduceByKey(rdd1Partitioner, (x, y) => if (x > y) x else y)
rdd3.join(rdd1)

12.多維分析的優化

多維分析,如rollup、cube等的算子,在Spark內置的是Expand方式,根據選用的算子一次性開辟足夠的內存。如果實現Union方式的二次開發,即讀取一次計算一個維度的結果,然后不斷union這些結果,能在某些情況提升效率。

總體來說,Expand方式適合維度小的多維分析,Union方式適合維度大的多維分析。這是因為Expand方式讀取數據的次數只有一次,但數據會膨脹2n倍,而Union方式會讀取數據2n次。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM