JDK(二)JDK1.8源碼分析【排序】timsort


如無特殊說明,文中的代碼均是JDK 1.8版本。

JDK集合框架中描述過,JDK存儲一組Object的集合框架是Collection。而針對Collection框架的一組操作集合體是Collections,里面包含了多種針對Collection的操作,例如:排序、查找、交換、反轉、復制等。

這一節講述Collections的排序操作。

public static <T extends Comparable<? super T>> void sort(List<T> list) {
    list.sort(null);
}

Collections.sort方法調用的是List.sort方法,List.sort方法如下:

    @SuppressWarnings({"unchecked", "rawtypes"})
    default void sort(Comparator<? super E> c) {
        Object[] a = this.toArray();
        Arrays.sort(a, (Comparator) c);  // Arrays的排序方法
        ListIterator<E> i = this.listIterator();
        for (Object e : a) {
            i.next();
            i.set((E) e);
        }
    }

 

看到這里可能會覺得奇怪,List是接口,但為什么會有實現方法,這是JDK 1.8的新特性。具體特性描述請參考:Java 8接口有default method后是不是可以放棄抽象類了?

在List.sort方法實現中,排序使用的是Arrays#sort(T[], java.util.Comparator<? super T>)方法,所以Collections的sort操作最終也是使用Arrays#sort(T[], java.util.Comparator<? super T>)方法。

    public static <T> void sort(T[] a, Comparator<? super T> c) {
        if (c == null) {
            sort(a);
        } else {
            if (LegacyMergeSort.userRequested)
                legacyMergeSort(a, c);
            else
                TimSort.sort(a, 0, a.length, c, null, 0, 0);
        }
    }

Arrays#sort(T[], java.util.Comparator<? super T>)方法使用了3種排序算法:

java.util.Arrays#legacyMergeSort 歸並排序,但可能會在新版本中廢棄
java.util.ComparableTimSort#sort 不使用自定義比較器的TimSort
java.util.TimSort#sort 使用自定義比較器的TimSort

 

 

 

 

Arrays源碼中有這么一段定義:

    /**
     * Old merge sort implementation can be selected (for
     * compatibility with broken comparators) using a system property.
     * Cannot be a static boolean in the enclosing class due to
     * circular dependencies. To be removed in a future release.
     */
    static final class LegacyMergeSort {
        private static final boolean userRequested =
            java.security.AccessController.doPrivileged(
                new sun.security.action.GetBooleanAction(
                    "java.util.Arrays.useLegacyMergeSort")).booleanValue();
    }

該定義描述是否使用LegacyMergeSort,即歷史歸並排序算法,默認為false,即不使用。所以Arrays.sort只會使用java.util.ComparableTimSort#sort或java.util.TimSort#sort,這兩種方法的實現邏輯是一樣的,只是java.util.TimSort#sort可以使用自定義的Comparator,而java.util.ComparableTimSort#sort不使用Comparator而已。

順便補充一下,Comparator是策略模式的一個完美又簡潔的示例。總體來說,策略模式允許在程序執行時選擇不同的算法。比如在排序時,傳入不同的比較器(Comparator),就采用不同的算法。

Timsort算法

Timsort是結合了合並排序(merge sort)和插入排序(insertion sort)而得出的排序算法,它在現實中有很好的效率。Tim Peters在2002年設計了該算法並在Python中使用(TimSort 是 Python 中 list.sort 的默認實現)。該算法找到數據中已經排好序的塊-分區,每一個分區叫一個run,然后按規則合並這些run。Pyhton自從2.3版以來一直采用Timsort算法排序,JDK 1.7開始也采用Timsort算法對數組排序。

Timsort的主要步驟:

判斷數組的大小,小於32使用二分插入排序

    static void sort(Object[] a, int lo, int hi, Object[] work, int workBase, int workLen) {
        // 檢查lo,hi的的准確性
        assert a != null && lo >= 0 && lo <= hi && hi <= a.length;

        int nRemaining  = hi - lo;
        // 當長度為0或1時永遠都是已經排序狀態
        if (nRemaining < 2)
            return;  // Arrays of size 0 and 1 are always sorted
    
        // 數組個數小於32的時候
        // If array is small, do a "mini-TimSort" with no merges
        if (nRemaining < MIN_MERGE) {
            // 找出連續升序的最大個數
            int initRunLen = countRunAndMakeAscending(a, lo, hi);
            // 二分插入排序
            binarySort(a, lo, hi, lo + initRunLen);
            return;
        }

        // 數組個數大於32的時候
       ......

找出最大的遞增或者遞減的個數,如果遞減,則此段數組嚴格反一下方向

    private static int countRunAndMakeAscending(Object[] a, int lo, int hi) {
        assert lo < hi;
        int runHi = lo + 1;
        if (runHi == hi)
            return 1;

        // Find end of run, and reverse range if descending
        if (((Comparable) a[runHi++]).compareTo(a[lo]) < 0) { // Descending 遞減
            while (runHi < hi && ((Comparable) a[runHi]).compareTo(a[runHi - 1]) < 0) 
                runHi++;
            // 調整順序
            reverseRange(a, lo, runHi);
        } else {                              // Ascending 遞增
            while (runHi < hi && ((Comparable) a[runHi]).compareTo(a[runHi - 1]) >= 0)
                runHi++;
        }

        return runHi - lo;
    }

在使用二分查找位置,進行插入排序。start之前為全部遞增數組,從start+1開始進行插入,插入位置使用二分法查找。最后根據移動的個數使用不同的移動方法。

    private static void binarySort(Object[] a, int lo, int hi, int start) {
        assert lo <= start && start <= hi;
        if (start == lo)
            start++;
        for ( ; start < hi; start++) {
            Comparable pivot = (Comparable) a[start];

            // Set left (and right) to the index where a[start] (pivot) belongs
            int left = lo;
            int right = start;
            assert left <= right;
            /*
             * Invariants:
             *   pivot >= all in [lo, left).
             *   pivot <  all in [right, start).
             */
            while (left < right) {
                int mid = (left + right) >>> 1;
                if (pivot.compareTo(a[mid]) < 0)
                    right = mid;
                else
                    left = mid + 1;
            }
            assert left == right;

            /*
             * The invariants still hold: pivot >= all in [lo, left) and
             * pivot < all in [left, start), so pivot belongs at left.  Note
             * that if there are elements equal to pivot, left points to the
             * first slot after them -- that's why this sort is stable.
             * Slide elements over to make room for pivot.
             */
            int n = start - left;  // The number of elements to move 要移動的個數
            // Switch is just an optimization for arraycopy in default case
            // 移動的方法
            switch (n) {
                case 2:  a[left + 2] = a[left + 1];
                case 1:  a[left + 1] = a[left];
                         break;
                // native復制數組方法
                default: System.arraycopy(a, left, a, left + 1, n);
            }
            a[left] = pivot;
        }
    }

 

數組大小大於32時

數組大於32時, 先算出一個合適的大小,在將輸入按其升序和降序特點進行了分區。排序的輸入的單位不是一個個單獨的數字,而是一個個的塊-分區。其中每一個分區叫一個run。針對這些 run 序列,每次拿一個run出來按規則進行合並。每次合並會將兩個run合並成一個 run。合並的結果保存到棧中。合並直到消耗掉所有的run,這時將棧上剩余的 run合並到只剩一個 run 為止。這時這個僅剩的 run 便是排好序的結果。

    static void sort(Object[] a, int lo, int hi, Object[] work, int workBase, int workLen) {
        //數組個數小於32的時候
        ......
        
        // 數組個數大於32的時候
        /**
         * March over the array once, left to right, finding natural runs,
         * extending short natural runs to minRun elements, and merging runs
         * to maintain stack invariant.
         */
        ComparableTimSort ts = new ComparableTimSort(a, work, workBase, workLen);
        // 計算run的長度
        int minRun = minRunLength(nRemaining);
        do {
            // Identify next run
            // 找出連續升序的最大個數
            int runLen = countRunAndMakeAscending(a, lo, hi);

            // If run is short, extend to min(minRun, nRemaining)
            // 如果run長度小於規定的minRun長度,先進行二分插入排序
            if (runLen < minRun) {
                int force = nRemaining <= minRun ? nRemaining : minRun;
                binarySort(a, lo, lo + force, lo + runLen);
                runLen = force;
            }

            // Push run onto pending-run stack, and maybe merge
            ts.pushRun(lo, runLen);
            // 進行歸並
            ts.mergeCollapse();

            // Advance to find next run
            lo += runLen;
            nRemaining -= runLen;
        } while (nRemaining != 0);

        // Merge all remaining runs to complete sort
        assert lo == hi;
        // 歸並所有的run
        ts.mergeForceCollapse();
        assert ts.stackSize == 1;
    }

1. 計算出run的最小的長度minRun

  a)  如果數組大小為2的N次冪,則返回16(MIN_MERGE / 2);

  b)  其他情況下,逐位向右位移(即除以2),直到找到介於16和32間的一個數;

    /**
     * Returns the minimum acceptable run length for an array of the specified
     * length. Natural runs shorter than this will be extended with
     * {@link #binarySort}.
     *
     * Roughly speaking, the computation is:
     *
     *  If n < MIN_MERGE, return n (it's too small to bother with fancy stuff).
     *  Else if n is an exact power of 2, return MIN_MERGE/2.
     *  Else return an int k, MIN_MERGE/2 <= k <= MIN_MERGE, such that n/k
     *   is close to, but strictly less than, an exact power of 2.
     *
     * For the rationale, see listsort.txt.
     *
     * @param n the length of the array to be sorted
     * @return the length of the minimum run to be merged
     */
    private static int minRunLength(int n) {
        assert n >= 0;
        int r = 0;      // Becomes 1 if any 1 bits are shifted off
        while (n >= MIN_MERGE) {
            r |= (n & 1);
            n >>= 1;
        }
        return n + r;
    }

2. 求最小遞增的長度,如果長度小於minRun,使用插入排序補充到minRun的個數,操作和小於32的個數是一樣。 
3. 用stack記錄每個run的長度,當下面的條件其中一個成立時歸並,直到數量不變:

runLen[i - 3] > runLen[i - 2] + runLen[i - 1] 
runLen[i - 2] > runLen[i - 1]
    /**
     * Examines the stack of runs waiting to be merged and merges adjacent runs
     * until the stack invariants are reestablished:
     *
     *     1. runLen[i - 3] > runLen[i - 2] + runLen[i - 1]
     *     2. runLen[i - 2] > runLen[i - 1]
     *
     * This method is called each time a new run is pushed onto the stack,
     * so the invariants are guaranteed to hold for i < stackSize upon
     * entry to the method.
     */
    private void mergeCollapse() {
        while (stackSize > 1) {
            int n = stackSize - 2;
            if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
                if (runLen[n - 1] < runLen[n + 1])
                    n--;
                mergeAt(n);
            } else if (runLen[n] <= runLen[n + 1]) {
                mergeAt(n);
            } else {
                break; // Invariant is established
            }
        }
    }

關於歸並方法和對一般的歸並排序做出了簡單的優化。假設兩個 run 是 run1,run2 ,先用 gallopRight在 run1 里使用 binarySearch 查找run2 首元素 的位置k,那么 run1 中 k 前面的元素就是合並后最小的那些元素。然后,在run2 中查找run1 尾元素 的位置 len2,那么run2 中 len2 后面的那些元素就是合並后最大的那些元素。最后,根據len1 與len2 大小,調用mergeLo 或者 mergeHi 將剩余元素合並。

    /**
     * Merges the two runs at stack indices i and i+1.  Run i must be
     * the penultimate or antepenultimate run on the stack.  In other words,
     * i must be equal to stackSize-2 or stackSize-3.
     *
     * @param i stack index of the first of the two runs to merge
     */
    @SuppressWarnings("unchecked")
    private void mergeAt(int i) {
        assert stackSize >= 2;
        assert i >= 0;
        assert i == stackSize - 2 || i == stackSize - 3;

        int base1 = runBase[i];
        int len1 = runLen[i];
        int base2 = runBase[i + 1];
        int len2 = runLen[i + 1];
        assert len1 > 0 && len2 > 0;
        assert base1 + len1 == base2;

        /*
         * Record the length of the combined runs; if i is the 3rd-last
         * run now, also slide over the last run (which isn't involved
         * in this merge).  The current run (i+1) goes away in any case.
         */
        runLen[i] = len1 + len2;
        if (i == stackSize - 3) {
            runBase[i + 1] = runBase[i + 2];
            runLen[i + 1] = runLen[i + 2];
        }
        stackSize--;

        /*
         * Find where the first element of run2 goes in run1. Prior elements
         * in run1 can be ignored (because they're already in place).
         */
        int k = gallopRight((Comparable<Object>) a[base2], a, base1, len1, 0);
        assert k >= 0;
        base1 += k;
        len1 -= k;
        if (len1 == 0)
            return;

        /*
         * Find where the last element of run1 goes in run2. Subsequent elements
         * in run2 can be ignored (because they're already in place).
         */
        len2 = gallopLeft((Comparable<Object>) a[base1 + len1 - 1], a,
                base2, len2, len2 - 1);
        assert len2 >= 0;
        if (len2 == 0)
            return;

        // Merge remaining runs, using tmp array with min(len1, len2) elements
        if (len1 <= len2)
            mergeLo(base1, len1, base2, len2);
        else
            mergeHi(base1, len1, base2, len2);
    }

4. 最后歸並還有沒有歸並的run,知道run的數量為1。

例子

為了演示方便,我將TimSort中的minRun直接設置為2,否則我不能用很小的數組演示。同時把MIN_MERGE也改成2(默認為32),這樣避免直接進入二分插入排序。

 

1.  初始數組為[7,5,1,2,6,8,10,12,4,3,9,11,13,15,16,14]

2.  尋找第一個連續的降序或升序序列:[1,5,7] [2,6,8,10,12,4,3,9,11,13,15,16,14]

3.  stackSize=1,所以不合並,繼續找第二個run

4.  找到一個遞減序列,調整次序:[1,5,7] [2,6,8,10,12] [4,3,9,11,13,15,16,14]

5.  因為runLen[0] <= runLen[1]所以歸並 

  1) gallopRight:尋找run1的第一個元素應當插入run0中哪個位置(”2”應當插入”1”之后),然后就可以忽略之前run0的元素(都比run1的第一個元素小) 

  2) gallopLeft:尋找run0的最后一個元素應當插入run1中哪個位置(”7”應當插入”8”之前),然后就可以忽略之后run1的元素(都比run0的最后一個元素大) 

  這樣需要排序的元素就僅剩下[5,7] [2,6],然后進行mergeLow 完成之后的結果: [1,2,5,6,7,8,10,12] [4,3,9,11,13,15,16,14]

6.  尋找連續的降序或升序序列[1,2,5,6,7,8,10,12] [3,4] [9,11,13,15,16,14]

7.  不進行歸並排序,因為runLen[0] > runLen[1]

8.  尋找連續的降序或升序序列:[1,2,5,6,7,8,10,12] [3,4] [9,11,13,15,16] [14]

9.  因為runLen[1] <= runLen[2],所以需要歸並

10. 使用gallopRight,發現為正常順序。得[1,2,5,6,7,8,10,12] [3,4,9,11,13,15,16] [14]

11. 最后只剩下[14]這個元素:[1,2,5,6,7,8,10,12] [3,4,9,11,13,15,16] [14]

12. 因為runLen[0] <= runLen[1] + runLen[2]所以合並。因為runLen[0] > runLen[2],所以將run1和run2先合並。(否則將run0和run1先合並) 
  完成之后的結果: [1,2,5,6,7,8,10,12] [3,4,9,11,13,14,15,16]

13. 完成之后的結果:[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]

 

參考:

Java源碼之Arrays內部排序實現(timsort的實現)

Timsort WiKi

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM