一、partition的划分問題
如何划分partition對block數據的收集有很大影響。如果需要根據block來加速task的執行,partition應該滿足什么條件?
參考思路1:range partition
1、出處:
IBM DB2 BLU;Google PowerDrill;Shark on HDFS
2、規則:
range partition遵循三個原則:1、針對每一列進行細粒度的范圍細分,防止數據傾斜和工作量傾斜;2、每一個partition分配的列是不同的;3、需要針對數據相關性和過濾關聯性來考慮partition的划分。
實現方法可以參考思路3中的Spark的實現。
3、簡單思考:
這樣划分partition需要很多額外的工作,如果針對我的設計是不需要這么多的,唯一需要考慮的就是第一點:避免數據傾斜和工作量傾斜。
參考思路2:Fine-grained Partitioning ( one of horizontal partitioning )
1、出處:
Fine-grained Partitioning for Aggressive Data Skipping (ACMSIGMOD14)
2、規則:
fine-grained partitioning的划分目的很明確:划分成細粒度的、大小平衡的block,而划分的依據是讓查詢操作(該partition方式針對 shark query制定)更大程度上的跳過對該block的掃描。具體的方法是:
(1)從之前頻繁使用的過濾集(文章已證明一部分典型的過濾能夠用來決策)中抽取過濾的判斷條件作為特征;
(2)根據抽取的特征對數據進行重計算,產生特征向量,從而將問題修改成最優解的問題;
示例如下:每個partition對應一個Feature為0,即不滿足該特征,如果利用該Feature掃描時可以直接跳過改partition。
3、簡單思考:
首先,fine-grained具有以下特點:(1)由一個額外的進程守護,工作於數據加載時或者某個最新的任務(這個任務必然是對partition重新提出了請求,諸如顯示的用戶partition操作)執行時;(2)該方法針對partition的形成、block的形成同時適用;(3)從filter中抽取典型的特征來划分數據。
然后分析該方法的思想,該方法利用所需要的信息,即filter特性與skip block的特性,來將划分block和partition,具有很高的參考價值。而我所需要的數據特性應該包括數據的重要性、啟動task所需要的數據的完整性、啟動task所需要的block塊的完整性,更確切的說應該是block數據完成了多少就可以啟動task了,如何判斷這個量?
參考思路3:Spark實現的Hash Partition
1、出處:
Spark1.3.1源碼解析,Spark默認的自帶Partitioner。實際上,Spark1.3.1在這一塊也實現了RangePartitioner,而HashPartitioner利用的還是Range Partition。
2、規則:
首先,為了粗略達到輸出Partition之間的平衡,需要定義一些參數,這些參數輔助決策一個RDD結果分配到每個Partition的樣本數:
sampleSize,限定取樣的大小,min(20*partitions,1M);
sampleSizePerPartition,Partition取樣樣本數,ceil(3*sampleSize/partitions),通過向上取整表示允許超過取樣數目少量;
sketched,用來描述一個rdd的結果,包括了(partition id, items number, sample),sample是根據前面的參數決定的;
(key,weight)和imbalancedPartitions,分別是一個buffer數組和mutable類型值,分別存放平衡的Partition的權重和非平衡的Partition,平衡與否根據Partition大小與平均Partition大小的關系判斷,權重=Partition大小/取樣大小。
具體看看如何實現平衡的判斷:
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val balance_or_not = if(fraction * numItems > sampleSizePerPartition) true else false
針對不平衡的Partition重新取樣,取樣后權重為1/fraction。
其次,需要注意的是,Partition中利用了一個implicitly方法,該方法獲取RangPartition中隱藏的參數值:Ordering[ClassTag]。改參數值用來寫入和讀取Spark框架中的數據流。通過writeObject和readObject可以控制寫入和讀取Partition中的數據。
最后,決策Partition的bounds時利用的是Object RangePartitioner中的determineBounds方法,該方法利用weight的值來平衡block的大小,然后放入Partition中,進而平衡Partition的大小。
3、簡單思考:
Spark自帶的Partition策略是利用hashcode獲取Partition id的RangePartition,RangePartition采用取樣權重的方法來平衡各個Partition的大小,但是並未考慮Partition內部數據的關聯度,也就是Block層面的決策沒有體現在這里,需要進一步考慮如何按block優化。
二、如何利用Partition和Block的划分策略——重點論文:The Power of Choice in Data-Aware Cluster Scheduling,OSDI14
前文講了三個如何划分Partition和Block的方法,但是划分之后如何應用其優化,除了上述提到的相應文章,與開題更加對應的是OSDI14年的文章The Power of Choice in Data-Aware Cluster Scheduling。該論文設計實現的系統為KMN,因此下文以KMN代替該論文。
1、概要
原始的Spark中的task在需要資源(該資源為上游stage的output)時,由調度器拉取task需要的資源數目,然后交付給task;而KMN的策略是,調度器拉取的是全部的資源中的數學組合,數量上仍舊是task所需的資源數,大體的關系如下圖。
因此,KMN的特色之處在於choice,如何組合最優的block給Scheduler,然后調度給task,達到最高的效率。進而將問題轉化為NP問題。需要注意的是,KMN選擇choice時是根據全部的block來決策,那么必須等全部的block產生,即上游stage運行完成后才能決策。這樣KMN就需要考慮上游Straggler的影響了,很遺憾的是,KMN針對的是近似解問題,從而導致它決定將Straggler丟棄來加快速度。
2、詳細實現
KMN核心在於數據感知的choice,其決策分Input Stage和Intermediate Stage兩種基本場景,決策Memory Locality和NetWork Blance兩個方面。
(1)Input Stage
Input Stage中通過組合block的決策可以在各種集群利用率下保證很高的數據本地性,論文以N中采樣K個block為例說明自然采樣和用戶自定義采樣條件下的數據本地性概率。
(2)Intermediate Stage
Intermediate Stage需要考慮其上下游的Stage。KMN為上游Stage設置了額外的task,需要確認額外的task對Block決策調度的影響,文章以M個task和K個block的模型,分析M/K下上游額外task對cross-rack skew,即傾斜度造成的干擾。然后,根據上游Stage的輸出選擇最優的Block,此時問題轉化為一個NP困難問題。最后,需要處理上游Stage的Straggler問題,因為Straggler的出現會導致Intermediate Stage的block的決策受到影響。文章對比Straggler的出現和額外的choice決策的時間,發現Straggler的影響占20%-40%,因此文章采用如下方法解決該問題:當M個上游task中的K個task執行完成后就啟動下游task。實際上就是通過加速Stage的執行來加快下游stage的啟動時間。