1.概述
離線數據處理生態系統包含許多關鍵任務,最大限度的提高數據管道基礎設施的穩定性和效率是至關重要的。這邊博客將分享Hive和Spark分區的各種策略,以最大限度的提高數據工程生態系統的穩定性和效率。
2.內容
大多數Spark Job可以通過三個階段來表述,即讀取輸入數據、使用Spark處理、保存輸出數據。這意味着雖然實際數據轉換主要發生在內存中,但是Job通常以大量的I/O開始和結束。使用Spark常用堆棧是使用存儲在HDFS上的Hive表作為輸入和輸出數據存儲。Hive分區有效地表示為分布式文件系統上的文件目錄。理論上,盡可能多的文件寫入是有意義的,但是,這個也是有代價的。HDFS不能很好的支持大量小文件,每個文件在NameNode內存中大概有150字節的開銷,而HDFS的整體IOPS數量有限。文件寫入中的峰值絕對會導致HDFS基礎架構的某些部分產生性能瓶頸。
比如從某個歷史日期到當前日期重新計算表,通常用於修復錯誤或者數據質量問題。在處理包含一年數據的大型數據集(比如1TB以上)時,可能會將數據分成幾千個Spark分區進行處理。雖然從表面上看,這種處理方法並不是最合適的,使用動態分區並將數據結果寫入按照日期分區的Hive表中將產生多大100+萬個文件。
假如有一個包含3個分區的Spark任務,並且想將數據寫入到包含3個分區的Hive中。在這種情況下,希望發送的是將3個文件寫入到HDFS,所有數據都存儲在每個分區鍵的單個文件中。實際發生的是將生成9個文件,並且每個文件都有1個記錄。使用動態分區寫入Hive時,每個Spark分區都由執行程序並行處理。處理Spark分區數據時,每次執行程序在給定Spark分區中遇到新的分區鍵時,它都會打開一個新文件。默認情況下,Spark對數據會使用Hash或者Round Robin分區器。當應用於任意數據時,可以假設這2中方法在整個Spark分區中相對均勻但是隨機分布數據行。如下圖所示:

理想情況下,目標文件大小應該大約是HDFS Block大小的倍數,默認情況下為128MB。在Hive管道中,提供了一些配置來自動將結果收集到合理大小的文件中,從開發人員的角度來看幾乎是透明的,比如hive.merge.smallfiles.avgsize和hive.merge.size.per.task。但是,Spark中不存在此類功能,因此,我們需要自己開發實現,來給定一個數據集,應該寫入多少文件。
2.1 基於Size的計算
理論上,這是最直接的方法,設置目標大小,估計數據的大小,然后進行划分。但是,在很多情況下,文件被寫入磁盤時會進行壓縮,並且其格式與存儲在Java堆中的記錄格式有所不同。這意味着估算寫入磁盤時內存的記錄大小不是一件容易的事情。
雖然可以使用Spark SizeEstimator實用程序通過內存中數據的大小進行估計,然后應用某種估計的壓縮文件格式因此,但是SizeEstimator會考慮數據幀、數據集的內部消耗,以及數據的大小。總體來說,這種方式不太容易准確實現。
2.2 基於行數的計算
這種方法是設置目標行數,計算數據集的大小,然后執行除法以估計目標。我們的目標行數可以通過多種方式確定,或者通過為所有數據集選擇一個靜態數字,或者通過確定磁盤上單個記錄的大小並執行必要的計算。哪種方式是最好取決於你的數據集數量及其復雜性。計數相對來說成本較低,但是需要在計數前緩存以避免重新計算數據集。
2.3 靜態文件計數
最簡單的解決方案是只要求開發人員在每個插入的基礎上告訴Spark總共應該寫入多少個文件,這種方式需要給開發人員一些其他方法來獲得具體的數字,可以通過這種方式來替換昂貴的計算。
3.如何讓Spark以合理的方式分發處理數據?
即使我們知道希望如何將文件寫入磁盤,我們仍然必須讓Spark以符合實際的方式生成這些文件來構建我們的分區。Spark提供了許多工具來確定數據在整個分區中的分布方式。但是,各種功能中隱藏着很多復雜性,在某些情況下,它們的含義並不明顯。下面將介紹Spark提供的一些選項來控制Spark輸出文件的數量。
3.1 合並
Spark Coalesce是一個特殊版本的重新分區,它只允許減少總的分區,但是不需要完全的Shuffle,因此比重新分區要快得多。它通過有效的合並分區來實現這一點。如下圖所示:

Coalesce在某些情況下看起來不錯,但是也有一些問題。首先,Coalesce有一個讓我們難以使用的行為。以一個非常基本的Spark應用程序為例,代碼如下:
load().map(…).filter(…).save()
比如設置的並行度為1000,但是最終只想寫入10個文件,可以設置如下:
load().map(…).filter(…).coalesce(10).save()
但是,Spark會盡可能早的有效的將合並操作下推,因此這將執行為:
load().coalesce(10).map(…).filter(…).save()
有效的解決這種問題的方法是在轉換和合並之間強制執行,代碼如下所示:
val df = load().map(…).filter(…).cache()
df.count()
df.coalesce(10)
緩存是必須的,否則,你將不得不重新計算數據,這可能會重新消耗資源。然后,緩存是需要消費一定資源的,如果你的數據集無法放入內存中,或者無法騰出內存將數據有效的存儲在內存中兩次,那么必須使用磁盤緩存,這有其自身的局限性和顯著的性能損失。
此外,正如我們看到的,通常需要執行Shuffle來獲得我們想要的更復雜的數據集結果。因此,Coalesce僅適用於特定的情況:
- 保證只寫入1個Hive分區;
- 目標文件數少於你用於處理數據的Spark分區數;
- 有充足的緩存資源。
3.2 簡單重新分區
一個簡單的重新分區,它的唯一參數是目標Spark分區計數,即df.repartition(100)。在這種情況下,使用循環分區器,這意味着唯一的保證是輸出數據具有大致相同大小的Spark分區。
這種分區僅適用於以下情況的文件計數問題:
- 保證只需要寫入1個Hive分區;
- 正在寫入的文件數大於你的Spark分區數或者由於某些其他原因你無法使用合並。
3.3 按列重新分區
按列重新分區接收目標Spark分區計數,以及要重新分區的列序列,例如,df.repartition(100,$"date")。這對於強制Spark將具有相同鍵的記錄分發到同一個分區很有用。一般來說,這對許多Spark操作(如JOIN)很有用,但是理論上,它也可以解決我們的問題。
按列重新分區使用HashPartitioner,它將具有相同值的記錄分配給同一個分區,實際上,它將執行以下操作:

但是,這種方法只有在每個分區鍵都可以安全的寫入到一個文件時才有效。這是因為無論有多少值具有特定的Hash值,它們最終都會在同一個分區中。按列重新分區僅在你寫入一個或者多個小的Hive分區時才有效。在任何其他情況下,它都沒有用,因為每個Hive分區總是會得到一個文件,這僅適用於最小的數據集。
3.4 按具有隨機因子的列重新分區
我們可以通過添加約束的隨機因子來按列修改重新分區,代碼如下:
df .withColumn("rand", rand() % filesPerPartitionKey) .repartition(100, $"key", $"rand")
理論上,只要滿足以下條件,這種方法應該會產生排序良好的記錄和大小相當均勻的文件:
- Hive分區的大小大致相同;
- 知道每個Hive分區的目標文件數並且可以在運行時對其進行編碼。
但是,即使我們滿足上述這些條件,還有另外一個問題:散列沖突。假設,現在正在處理一年的數據,日期作為分區的唯一鍵。如果每個分區需要5個文件,可以執行如下操作:
df.withColumn("rand", rand() % 5).repartition(5*365, $"date", $"rand")
在后台,Scala將構造一個包含日期和隨機因素的鍵,例如(<date>,<0-4>)。然后,如果我們查看HashPartitioner代碼,可以發現它將執行以下操作:
class HashPartitioner(partitions: Int) extends Partitioner { def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } }
實際上,所做的就是獲取關鍵元組的散列,然后使用目標數量的Spark分區獲取它的mod。我們可以分析一下在這種情況下我們的記錄將如何實現分布,分析代碼如下:
import java.time.LocalDate def hashCodeTuple(one: String, two: Int, mod: Int): Int = { val rawMod = (one, two).hashCode % mod rawMod + (if (rawMod < 0) mod else 0) } def hashCodeSeq(one: String, two: Int, mod: Int): Int = { val rawMod = Seq(one, two).hashCode % mod rawMod + (if (rawMod < 0) mod else 0) } def iteration(numberDS: Int, filesPerPartition: Int): (Double, Double, Double) = { val hashedRandKeys = (0 to numberDS - 1).map(x => LocalDate.of(2019, 1, 1).plusDays(x)).flatMap( x => (0 to filesPerPartition - 1).map(y => hashCodeTuple(x.toString, y, filesPerPartition*numberDS)) ) hashedRandKeys.size // Number of unique keys, with the random factor val groupedHashedKeys = hashedRandKeys.groupBy(identity).view.mapValues(_.size).toSeq groupedHashedKeys.size // number of actual sPartitions used val sortedKeyCollisions = groupedHashedKeys.filter(_._2 != 1).sortBy(_._2).reverse val sortedSevereKeyCollisions = groupedHashedKeys.filter(_._2 > 2).sortBy(_._2).reverse sortedKeyCollisions.size // number of sPartitions with a hashing collision // (collisions, occurences) val collisionCounts = sortedKeyCollisions.map(_._2).groupBy(identity).view.mapValues(_.size).toSeq.sortBy(_._2).reverse ( groupedHashedKeys.size.toDouble / hashedRandKeys.size.toDouble, sortedKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble, sortedSevereKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble ) } val results = Seq( iteration(365, 1), iteration(365, 5), iteration(365, 10), iteration(365, 100), iteration(365 * 2, 100), iteration(365 * 5, 100), iteration(365 * 10, 100) ) val avgEfficiency = results.map(_._1).sum / results.length val avgCollisionRate = results.map(_._2).sum / results.length val avgSevereCollisionRate = results.map(_._3).sum / results.length (avgEfficiency, avgCollisionRate, avgSevereCollisionRate) // 63.2%, 42%, 12.6%
上面的腳本計算了3個數量:
- 效率:非空的Spark分區與輸出文件數量的比率;
- 碰撞率:(date,rand)的Hash值發送沖突的Spark分區的百分比;
- 嚴重沖突率:同上,但是此鍵上的沖突次數為3或者更多。
沖突很重要,因為它們意味着我們的Spark分區包含多個唯一的分區鍵,而我們預計每個Spark分區只有1個。分析的結果可知,我們使用了63%的執行器,並且可能會出現嚴重的偏差,我們將近一半的執行者正在處理比預期多2到3倍或者在某些情況下高達8倍的數據。
現在,有一個解決方法,即分區縮放。在之前示例中,輸出的Spark分區數量等於預期的總文件數。如果將N個對象隨機分配給N個插槽,可以預期會有多個插槽包含多個對象,並且有幾個空插槽。因此,需要解決此問題,必須要降低對象與插槽的比率。
我們通過縮放輸出分區計數來實現這一點,通過將我們的輸出Spar分區計數乘以一個大因子,類似於:
df .withColumn(“rand”, rand() % 5) .repartition(5*365*SCALING_FACTOR, $”date”, $”rand”)
分析代碼如下:
import java.time.LocalDate def hashCodeTuple(one: String, two: Int, mod: Int): Int = { val rawMod = (one, two).hashCode % mod rawMod + (if (rawMod < 0) mod else 0) } def hashCodeSeq(one: String, two: Int, mod: Int): Int = { val rawMod = Seq(one, two).hashCode % mod rawMod + (if (rawMod < 0) mod else 0) } def iteration(numberDS: Int, filesPerPartition: Int, partitionFactor: Int = 1): (Double, Double, Double, Double) = { val partitionCount = filesPerPartition*numberDS * partitionFactor val hashedRandKeys = (0 to numberDS - 1).map(x => LocalDate.of(2019, 1, 1).plusDays(x)).flatMap( x => (0 to filesPerPartition - 1).map(y => hashCodeTuple(x.toString, y, partitionCount)) ) hashedRandKeys.size // Number of unique keys, with the random factor val groupedHashedKeys = hashedRandKeys.groupBy(identity).view.mapValues(_.size).toSeq groupedHashedKeys.size // number of unique hashes - and thus, sPartitions with > 0 records val sortedKeyCollisions = groupedHashedKeys.filter(_._2 != 1).sortBy(_._2).reverse val sortedSevereKeyCollisions = groupedHashedKeys.filter(_._2 > 2).sortBy(_._2).reverse sortedKeyCollisions.size // number of sPartitions with a hashing collision // (collisions, occurences) val collisionCounts = sortedKeyCollisions.map(_._2).groupBy(identity).view.mapValues(_.size).toSeq.sortBy(_._2).reverse ( groupedHashedKeys.size.toDouble / partitionCount, groupedHashedKeys.size.toDouble / hashedRandKeys.size.toDouble, sortedKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble, sortedSevereKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble ) } // With a scale factor of 1 val results = Seq( iteration(365, 1), iteration(365, 5), iteration(365, 10), iteration(365, 100), iteration(365 * 2, 100), iteration(365 * 5, 100), iteration(365 * 10, 100) ) val avgEfficiency = results.map(_._2).sum / results.length // What is the ratio of executors / output files val avgCollisionRate = results.map(_._3).sum / results.length // What is the average collision rate val avgSevereCollisionRate = results.map(_._4).sum / results.length // What is the average collision rate where 3 or more hashes collide (avgEfficiency, avgCollisionRate, avgSevereCollisionRate) // 63.2% Efficiency, 42% collision rate, 12.6% severe collision rate iteration(365, 5, 2) // 37.7% partitions in-use, 77.4% Efficiency, 24.4% collision rate, 4.2% severe collision rate iteration(365, 5, 5) iteration(365, 5, 10) iteration(365, 5, 100)
隨着我們的比例因子接近無窮大,碰撞很快接近於0,效率接近100%。但是,這會產生另外一個問題,即大量的輸出Spark分區將為空。同時這些空的Spark分區也會帶來一些資源開銷,增加驅動程序的內存要求,並使我們更容易受到由於錯誤或者意外復雜性而導致分區鍵空間意外大的問題。
這里的一個常見方法是在使用這種方法時不顯示設置分區技術(默認並行度和縮放),如果不提供分區計數,則依賴Spark默認的spark.default.parallelism值。雖然,通常並行度自然高於總輸出文件數(因此,隱式提供大於1 的縮放因子)。如果滿足以下條件,這種方式依然是一種有效的方法:
- Hive分區的文件數大致相等;
- 可以確定平均分區文件數應該是多少;
- 大致知道唯一分區鍵的總數。
在示例中,我們假設其中的許多事情都很容易知道,主要是輸出Hive分區的總數和每個Hive分區所需要的文件數。無論如何,這種方法都是可行的,並且可能適用於需要用例。
3.5 按范圍重新分區
按范圍重新分區是一個特列,它不使用RoundRobin和Hash Partitioner,而是使用一種特殊的方法,叫做Range Partitioner。
范圍分區器根據某些給定鍵的順序在Spark分區之間進行拆分行,但是,它不只是全局排序,它做出的保證是:
- 具有相同散列的所有記錄將在同一個分區中結束;
- 所有Spark分區都將有一個最小值和最大值與之關聯;
- 最小值和最大值將通過使用采樣來檢測關鍵頻率和范圍來確定,分區邊界將根據這些估計值進行初始設置;
- 分區的大小不能保證完全相等,它們的相等性基於樣本的准確性,因此,預測的每個Spark分區的最小值和最大值,分區將根據需要增長或縮小以保證前2個條件。
總而言之,范圍分區將導致Spark創建與請求的Spark分區數量相等的Bucket數量,然后它將這些Bucket映射到指定分區鍵的范圍。例如,如果你的分區鍵是日期,則范圍可能是(最小值2021-01-01,最大值2022-01-01)。然后,對於每條記錄,將記錄的分區鍵與存儲Bucket的最小值和最大值進行比較,並相應的進行分配。

4.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那里點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。
