topk算法 -- heap,Quicksort


topk:即求大量數據中的前k大。
本文首先參照STL源碼。提出了用heapQuicksort兩套求topk的方案。然后對他們進行了詳細的分析與比較。

一、heap概述

堆是一種經過排序的樹形數據結構,通常我們所說的堆,是指binary heap(二叉堆)。所謂binary heap,就是一種complete binary tree(完全二叉樹),也就是說,整棵binary tree除了最底層的葉節點之外,是填滿的,且最頂層的葉節點由左至右又不得有空隙。
complete binary tree整棵樹內沒有任何節點漏洞,這帶來一個極大的好處:我們可以利用array來儲存所有節點。即某個節點位於i處時,其左子節點必位於array的2i處,其右子節點比位於array的2i+1處,其父節點必位於"i/2"處。這么一來,我們需要的工具就簡單了:一個array和一組heap算法。array的缺點是無法動態改變大小,而heap卻需要這項功能,因此,以vector代替array是更好的選擇。
根據元素的排序方式,heap分為max-heap和min-heap。max-heap每個節點的key都大於或等於其子節點,min-heap每個節點的key都小於或等於其子節點。因此,max-heap的最大值在根節點,並總是位於vector的其頭處。min-heap同理。

二、heap算法

注:以下源碼最后一個參數接收一個functor(仿函數),用來決定是max-heap或min-heap:less -> max-heap,greater -> min-heap。

push_heap算法

為了滿足complete binary tree條件,新元素必插入在底層vector的end()處。那么新元素是否適合於現有位置呢?為滿足max-heap條件,我們執行一個所謂的percolate up(上溯)程序:如果鍵值比父節點大,就父子對換位置。如此一直上溯,直到不需對換或到根節點位置。這是一個復雜度為O(logn)的操作。

push_heap
//------------------------------------------------------------------------------
// push_heap

template <class RandomAccessIterator,class Compare>
inline void push_heap(RandomAccessIterator first,
RandomAccessIterator last,
Compare Comp)
// 注意,此函數被調用時,新元素應已置於底部容器的最尾端
{

__push_heap_aux(first,
last,
distance_type(first),
value_type(first),
Comp);

}

template <class RandomAccessIterator, class Distance, class T,class Compare>
inline void __push_heap_aux(RandomAccessIterator first,
RandomAccessIterator last,
Distance*, T*, Compare Comp)
{
__push_heap(first,
Distance((last - first) - 1), // holeIndex
Distance(0), // topIndex
T(*(last - 1)),

Comp); // value
// 以上系根據implicit representation heap 的結構特性
// 即:新值必置於底部容器的最尾端,此即第一個洞號:(last-first)-1
}


template <class RandomAccessIterator, class Distance, class T,class Compare>
void __push_heap(RandomAccessIterator first,
Distance holeIndex,
Distance topIndex,
T value,Compare Comp)
{
Distance parent = (holeIndex - 1) / 2; // 找出父節點
while(holeIndex > topIndex && Comp(*(first+parent),value)/**(first + parent) < value*/)

{
// 當尚未到達頂端,且父節點小於新值(於是不符合heap的次序特性)
// 由於以上使用operator <,可知stl heap是max-heap
*(first + holeIndex) = *(first + parent); // 即把父節點往下"拉"
holeIndex = parent; // percolate up: 調整洞號,向上提升至父節點
parent = (holeIndex - 1) / 2; // 新洞的父節點

} // 持續至頂端,或滿足heap的次序特性為止
*(first + holeIndex) = value; // 令洞值為新值
}

pop_heap算法

既然身為max-heap,最大值必在根節點vector[0]。pop操作取走根節點(注意,此處STL算法並沒有移除他,只是把他移到vector的尾端),然后為了滿足complete binary tree條件,必須割舍最下層最右邊的葉節點vector[last-1],並將其重新安插置max-heap。這種操作的復雜度也為O(logn)。

pop_heap
//-----------------------------------------------------------------
// pop_heap

template <class RandomAccessIterator,class Compare>
inline void pop_heap(RandomAccessIterator first,
RandomAccessIterator last,
Compare Comp)
{
__pop_heap_aux(first, last, value_type(first),Comp);

}

template <class RandomAccessIterator, class T,class Compare>
inline void __pop_heap_aux(RandomAccessIterator first,
RandomAccessIterator last,
T*,Compare Comp)
{
__pop_heap(first, // first
last-1, // last
last-1, // resule
T(*(last-1)), // value,即把最后一個元素“挑”出來
distance_type(first),

Comp);
// 以上,根據implicit representation heap 的次序特性,pop操作的結果
// 應為底部容器的第一個元素。因此,首先設定欲調整值為尾指,然后將首值調制
// 尾節點(所以以上將迭代器result設為last-1)。然后重整[first,last-1),
// 使之重新成一個合格的heap
}


template <class RandomAccessIterator, class T, class Distance,class Compare>
inline void __pop_heap(RandomAccessIterator first,
RandomAccessIterator last,
RandomAccessIterator result,
T value,
Distance*,
Compare Comp)
{
*result = *first; // 設定尾值為首值,於是尾值即為欲求結果
// 可由客戶端稍后再以底層容器之pop_back()取出尾值

// 重新調整heap,洞號為0(亦即樹根處),欲調整值為value(原尾值)
__adjust_heap(first, // first
Distance(0), // holeIndex
Distance(last-first), // len
value,

Comp); // value
}



template <class RandomAccessIterator,class T,class Distance,class Compare>
void __adjust_heap(RandomAccessIterator first,
Distance holeIndex,
Distance len,
T value,
Compare Comp)
{
Distance topIndex = holeIndex;
Distance secondChild = 2 * holeIndex + 2; // 洞節點之右子節點

while (secondChild < len){
// 比較洞節點之左右兩個子值,然后以sencondChild代表較大子節點
if (Comp(*(first + secondChild),*(first + (secondChild - 1)))

/**(first + secondChild) < *(first + (secondChild - 1))*/)
secondChild--;

// Percolate down:令較大子值為洞值,再令洞號下移至較大子節點處
*(first+holeIndex) = *(first+secondChild);

holeIndex = secondChild;
// 找出新洞節點的右子節點
secondChild = 2 * secondChild + 2;

}
if (secondChild == len) { // 沒有右子節點,只有左子節點
// Percolate down: 令左子值為洞值,再令洞號下移至左子結點處。
*(first + holeIndex) = *(first + (secondChild - 1));

holeIndex = secondChild - 1;
}


// 此時(可能)尚未滿足次序特性,執行一次percolate up操作

// 有右子節點,即把右子節點再push進堆中即可
__push_heap(first,holeIndex,topIndex,value,Comp);


// 直接復制到hole中也可啊
// 錯啦!因為該值可能大於其父節點啦~~
// "wrong" : *(first+holeIndex) = value;
}

 

make_heap算法

make_heap
template <class RandomAccessIterator,class T,class Distance,class Compare>
inline void __make_heap(RandomAccessIterator first,
RandomAccessIterator last,T*,
Distance*,
Compare Comp)
{
if (last - first < 2)
return; // 如果長度為0或1,不必重新排列
Distance len = last - first;


Distance parent = (len - 2)/2;

while(true){
__adjust_heap(first,
parent,
len,
T(*(first + parent)),
Comp);
if (parent == 0) return;
parent--;
}
}

template <class RandomAccessIterator,class Compare>
inline void make_heap(RandomAccessIterator first,
RandomAccessIterator last,
Compare Comp)
{
__make_heap(first, last, value_type(first), distance_type(first),Comp);
}

 

這個算法用來將一段現有數據轉化為一個heap。

// for example 
  int ia[9] = {0,1,2,3,4,8,9,3,5};
  vector<int> ivec(ia,ia+9);
  make_heap(ivec.begin(), ivec.end());  // 9 5 8 3 4 0 2 3 1 

sort_heap算法

持續對整個heap做pop_heap操作,每次將操作范圍從后向前縮減一個元素(因為pop_heap會把鍵值最大元素放在底部容器最尾端),所以最后我們會得到一個遞增序列。

sort_heap
//-----------------------------------------------------------------
// sort_heap
template <class RandomAccessIterator,class Compare>

void sort_heap(RandomAccessIterator first,
RandomAccessIterator last,
Compare Comp)
{
while (last - first > 1)
mySTL::pop_heap(first, last--,Comp);
}

注意,排序過后,原來的heap就不是一個合法的heap了。

三、partial_sort算法

現在,就來看看怎樣利用heap算法來計算大量數據中的topk。
舉例來說,比如求前k小,那么首先取前k個元素做成max-heap,然后遍歷剩下的所有元素,分別於堆中的最大元素(root節點)比較,如果它小於最大元素,則替換掉那個元素。這樣一來,當把序列掃描一遍之后,該max-heap中的k個元素就是最小的k個元素,效率很高。

partial_sort
//------------------------------------------------------------------------------
// partial_sort

template <class RandomAccessIterator,class T,class Compare>
inline void __partial_sort(RandomAccessIterator first,
RandomAccessIterator middle,
RandomAccessIterator last, T*,Compare Comp)
// less --- max-heap
// greater --- min-heap
{

mySTL::make_heap(first, middle,Comp);
for (RandomAccessIterator i = middle; i < last; ++i)
if ( Comp(*i,*first) )
mySTL::__pop_heap(first, middle, i, T(*i), distance_type(first),Comp);
mySTL::sort_heap(first,middle,Comp);
}


// paitial_sort的任務是找出middle - first個最小元素。
template <class RandomAccessIterator,class Compare>

inline void partial_sort(RandomAccessIterator first,
RandomAccessIterator middle,
RandomAccessIterator last,
Compare Comp)
{
__partial_sort(first, middle, last, value_type(first),Comp);
}

 

四、用Quicksort求topk

Quicksort相關原理就不多說了,STL中Quicksort算法的實現我在<stl sort源碼剖析>中一文也詳細說明過。所以這里就只說說怎么樣用Quicksort來求topk。
其實很簡單,大家都知道Quicksort每次分割后即把比軸小的值和比軸大的值給cut開了。那么,假如是想求前k小(以下源碼即是),我就只關心比軸小的一邊,繼續分割這一邊,直到如果分割后的元素沒有k個了,就結束循環。我們用源碼說話:

topk_qsort
// 利用快速排序思想求topk小
template <class Iter,class T>

void __topk_qsort(Iter first,
Iter last,
int k,
T*)
{
if (last - first < k) throw runtime_error("too little elements!");
Iter cut = last;
do{

// 只考慮元素小的一邊,以為是求前k小。
last = cut;

cut = mySTL::__unguarded_partition
(first, last, T(mySTL::__median(*first,
*(first + (last - first)/2),
*(last - 1))));
// [first - cut] -> 小
// [cut - last] -> 大

}while(cut - first > k); // 當[cut-first]已經不足k個元素時停止qsort
mySTL::sort(first,last);

}


 /--------------------------------------華麗的分割線--------------------------------------/

五、分析分析

 ............  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM