優先隊列實現 大小根堆 解決top k 問題


 

 

目錄:[ - ]

1、認識 PriorityQueue

PriorityQueue是從JDK1.5開始提供的新的數據結構接口,它是一種基於優先級堆的極大優先級隊列。優先級隊列是不同於先進先出隊列的另一種隊列。每次從隊列中取出的是具有最高優先權的元素。如果不提供Comparator的話,優先隊列中元素默認按自然順序排列,也就是數字默認是小的在隊列頭,字符串則按字典序排列(參閱 Comparable),也可以根據 Comparator 來指定,這取決於使用哪種構造方法。優先級隊列不允許 null 元素。依靠自然排序的優先級隊列還不允許插入不可比較的對象(這樣做可能導致 ClassCastException)。

比如隊列 1 3 5 10 2 自動會被排列 1 2 3 5 10

 
             
import java.util.Comparator;
import  java.util.PriorityQueue;
import  java.util.Queue;
 
/*
 * 重寫 Comparator<Integer>來決定
 * 優先隊列是小根堆還是大根堆
 * */
public   class  PriorityQueueExample {
 
        public   static  void  main(String[] args) {
              //實現小根堆
            Queue<Integer> qi =  new  PriorityQueue<Integer>();
            qi.add(5);
            qi.add(2);
            qi.add(1);
            qi.add(10);
            qi.add(3);
 
              while  (!qi.isEmpty()) {
                  System.  out  .print(qi.poll() +  ","  );
            }
            System.  out  .println();
            System.  out  .println( "-----------------------------"  );
       
              // 自定義的比較器,可以讓我們自由定義比較的順序  Comparator<Integer> cmp;
              // 生成最大堆使用e2-e1,生成最小堆使用e1-e2,
            Comparator<Integer> cmp =  new Comparator<Integer>() {
                   public int compare(Integer e1, Integer e2) {
                         return e2 - e1; 
                  }
            };
            
              //實現大根堆
            Queue<Integer> q2 =  new  PriorityQueue<Integer>(5, cmp); 
            q2.add(2);
            q2.add(8);
            q2.add(9);
            q2.add(1);
              while  (!q2.isEmpty()) {
                  System.  out  .print(q2.poll() +  ","  );
            }
 
      }
 
}
 
            

 

output  

1,2,3,5,10,  
-----------------------------  
9,8,2,1,

 

隊列的頭是按指定排序方式的最小元素。如果多個元素都是最小值,則頭是其中一個元素——選擇方法是任意的。

隊列檢索操作 poll、remove、peek 和 element 訪問處於隊列頭的元素。
優先級隊列是無界的,但是有一個內部容量,控制着用於存儲隊列元素的數組的大小。
它總是至少與隊列的大小相同。隨着不斷向優先級隊列添加元素,其容量會自動增加。無需指定容量增加策略的細節。
注意1:該隊列是用數組實現,但是數組大小可以動態增加,容量無限。
注意2:此實現不是同步的。不是線程安全的。如果多個線程中的任意線程從結構上修改了列表, 則這些線程不應同時訪問 PriorityQueue 實例,這時請使用線程安全的PriorityBlockingQueue 類。
注意3:不允許使用 null 元素。
注意4:此實現為插入方法(offer、poll、remove() 和 add 方法)提供 O(log(n)) 時間;
為 remove(Object) 和 contains(Object) 方法提供線性時間;
為檢索方法(peek、element 和 size)提供固定時間。
注意5:方法iterator()中提供的迭代器並不保證以有序的方式遍歷優先級隊列中的元素。
至於原因可參考下面關於PriorityQueue的內部實現
如果需要按順序遍歷,請考慮使用 Arrays.sort(pq.toArray())。
注意6:可以在構造函數中指定如何排序。如:
PriorityQueue()
使用默認的初始容量(11)創建一個 PriorityQueue,並根據其自然順序來排序其元素(使用 Comparable)。
PriorityQueue(int initialCapacity)
使用指定的初始容量創建一個 PriorityQueue,並根據其自然順序來排序其元素(使用 Comparable)。
PriorityQueue(int initialCapacity, Comparator comparator)
使用指定的初始容量創建一個 PriorityQueue,並根據指定的比較器comparator來排序其元素。
注意7:此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選 方法。
PriorityQueue的內部實現
PriorityQueue對元素采用的是堆排序,頭是按指定排序方式的最小元素。堆排序只能保證根是最大(最小),整個堆並不是有序的。
方法iterator()中提供的迭代器可能只是對整個數組的依次遍歷。也就只能保證數組的第一個元素是最小的。
實例1的結果也正好與此相符。

2、應用:求 Top K 大/小 的元素

了解了優先隊列之后,我們再來看它的一個應用:

在面試的時候,問到算法,Top k 的問題是經常被問到的,網上已有很多種方法可以解決,今天來看看如何使用 PriorityQueue 構造固定容量的優先隊列,模擬大頂堆,來解決 top K 小的問題。

如果求top k 大的問題,就建立小根堆!!! 不是大根堆!!

 

import java.util.ArrayList;
import  java.util.Collections;
import  java.util.Comparator;
import  java.util.Iterator;
import  java.util.List;
import  java.util.PriorityQueue;
import  java.util.Random;
 
//固定容量的優先隊列,模擬大頂堆,用於解決求topN小或 topk大的問題
public  class  TopKwithPriorityQueue<E  extends  Comparable> {
        private  PriorityQueue<E>  queue  ;
        private  int  K  ;  // 堆的最大容量,即 topk,所以maxsize=k
 
        public  TopKwithPriorityQueue( int  maxSize) {
              if  (maxSize <= 0)
                    throw  new  IllegalArgumentException();
              this . K  = maxSize;
        this.queue = new PriorityQueue(maxSize, new Comparator<E>() {
    public int compare(E o1, E o2) { // 生成最大堆使用o2-o1,生成最小堆使用o1-o2, 並修改 e.compareTo(peek) 比較規則return (o2.compareTo(o1)); } }); }
      }
 
        public  void  add(E e) {
              if  ( queue  .size() <  K ) {  // 未達到最大容量,直接添加
                    queue .add(e);
            }  else  {  // 隊列已滿
                  E peek =  queue .peek();         //取堆頂元素
                    if  (e.compareTo(peek) < 0) {  // 將新元素與當前堆頂元素比較,保留較小的元素
                          queue .poll();
                          queue .add(e);
                  }
            }
      }
 
        public  List<E> sortedList() {
            List<E> list =  new  ArrayList<E>( queue  );           //可以將整個優先隊列傳入 arraylist的構造方法做參數
             Collections.sort(list);  // PriorityQueue本身的遍歷是無序的,最終需要對隊列中的元素進行排序
              return  list;
      }
 
        public  static  void  main(String[] args) {
              final  TopKwithPriorityQueue pq =  new  TopKwithPriorityQueue(10);  //返回前k=10位
            Random random =  new  Random();
              int  rNum = 0;
            System.  out .println( "100 個 0~999 之間的隨機數:-----------------------------------"  );
              for  ( int  i = 1; i <= 100; i++) {
                  rNum = random.nextInt(1000);
                  System.  out .print(rNum+ "\t"  );
                   pq.add(rNum);
            }
            System.  out .println( "\n PriorityQueue 本身的遍歷是無序的:返回的top10 最小堆是:-----------------------------------"  );
            Iterable<Integer> iter =  new  Iterable<Integer>() {
                    public  Iterator<Integer> iterator() {
                          return  pq. queue .iterator() ;
                  }
            };
              for  (Integer item : iter) {
                  System.  out .print(item +  ", "  );
            }
            System.  out .println();
            System.  out .println( "PriorityQueue 排序后的遍歷:返回的top10 最小堆是:-----------------------------------" );
              /*
             * for (Integer item : pq.sortedList()) { System.out.println(item); }
             */
              // 或者直接用內置的 poll() 方法,每次取隊首元素(堆頂的最大值)
              while  (!pq. queue  .isEmpty()) {
                  System.  out .print(pq. queue  .poll() +  ", " );
            }
      }
}  

由於僅僅保存了K個數據,有調整最小堆的時間復雜度為O(lnK),因此TOp K算法(問題)時間復雜度為O(nlnK).

 
         

 

3、PriorityQueue  在 hadoop 中的應用:

最后來聊下 “基於堆實現的優先級隊列(PriorityQueue)” 在hadoop 中的應用:

在 hadoop 中,排序是 MapReduce 的靈魂,MapTask 和 ReduceTask 均會對數據按 Key 排序,這個操作是 MR 框架的默認行為,不管你的業務邏輯上是否需要這一操作。

MapReduce 框架中,用到的排序主要有兩種:快速排序 和 基於堆實現的優先級隊列

Mapper 階段:  

從 map 輸出到環形緩沖區的數據會被排序(這是 MR 框架中改良的快速排序),這個排序涉及 partition 和 key,當緩沖區容量占用 80%,會 spill 數據到磁盤,生成 IFile 文件,Map 結束后,會將 IFile 文件排序合並成一個大文件(基於堆實現的優先級隊列),以供不同的 reduce 來拉取相應的數據。

Reducer 階段:  

從 Mapper 端取回的數據已是部分有序,Reduce Task 只需進行一次歸並排序即可保證數據整體有序。為了提高效率,Hadoop 將 sort 階段和 reduce 階段並行化,在 sort 階段,Reduce Task 為內存和磁盤中的文件建立了小頂堆,保存了指向該小頂堆根節點的迭代器,並不斷的移動迭代器,以將 key 相同的數據順次交給 reduce() 函數處理,期間移動迭代器的過程實際上就是不斷調整小頂堆的過程(建堆→取堆頂元素→重新建堆→取堆頂元素...),這樣,sort 和 reduce 可以並行進行。

了解了這個,你就明白為什么之前有同學提到遍歷一遍 values 之后,值都不存在了,同時你也能更加理解之前提到的 二次排序。

在 hadoop 中,用到了這一數據結構的類主要有如下:(hadoop-0.20.203.0)  

core/org/apache/hadoop/io/SequenceFile.java
hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
mapred/org/apache/hadoop/mapred/join/JoinRecordReader.java
mapred/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
mapred/org/apache/hadoop/mapred/join/OverrideRecordReader.java
mapred/org/apache/hadoop/mapred/Merger.java
tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java

可以看到,這一數據結構,在 hadoop 中用的還是比較廣泛的。

需要說明的是,求 Top k,更簡單的方法可以直接用內置的 TreeMap 或者 TreeSet,這兩者是基於紅黑樹的一種數據結構,內部維持 key 的次序,但每次添加新元素,其排序的開銷要大於堆調整的開銷。例如要找最大的10個元素,那么創建的是小根堆。小根堆的特性是根節點是最小元素。不需要對堆進行再排序,當堆的根節點被替換成新的元素時,需要進行堆化,以保持小根堆的特性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM