skiplist(跳表)的原理及JAVA實現


 

 

 

前記

最近在看Redis,之間就嘗試用sortedSet用在實現排行榜的項目,那么sortedSet底層是什么結構呢? "Redis sorted set的內部使用HashMap和跳躍表(SkipList)來保證數據的存儲和有序,HashMap里放的是成員到score的映射,而跳躍表里存放的是所有的成員,排序依據是HashMap里存的score,使用跳躍表的結構可以獲得比較高的查找效率,並且在實現上比較簡單。”   那么什么是SkipList跳表呢?下面我們從理解它的思想到實現及應用去做一個大致的了解。

 

一.跳表的原理及思想

跳表的背景

 

Skip list是一個用於有序元素序列快速搜索的數據結構,由美國計算機科學家William Pugh發明於1989年。他在論文《Skip lists: a probabilistic alternative to balanced trees》中詳細介紹了跳表的數據結構和插入刪除等操作。論文是這么介紹跳表的:

 

Skip lists are a data structure that can be used in place of balanced trees.
Skip lists use probabilistic balancing rather than strictly enforced balancing and as a result the algorithms for insertion and deletion in skip lists are much simpler and significantly faster than equivalent algorithms for balanced trees.

 

也就是說,

Skip list是一個“概率型”的數據結構,可以在很多應用場景中替代平衡樹。Skip list算法與平衡樹相比,有相似的漸進期望時間邊界,但是它更簡單,更快,使用更少的空間。 
Skip list是一個分層結構多級鏈表,最下層是原始的鏈表,每個層級都是下一個層級的“高速跑道”。 

 

為什么選擇跳表

目前經常使用的平衡數據結構有:B樹,紅黑樹,AVL樹,Splay Tree, Treep等。

想象一下,給你一張草稿紙,一只筆,一個編輯器,你能立即實現一顆紅黑樹,或者AVL樹

出來嗎? 很難吧,這需要時間,要考慮很多細節,要參考一堆算法與數據結構之類的樹,

還要參考網上的代碼,相當麻煩。

用跳表吧,跳表是一種隨機化的數據結構,目前開源軟件 Redis 和 LevelDB 都有用到它,

它的效率和紅黑樹以及 AVL 樹不相上下,但跳表的原理相當簡單,只要你能熟練操作鏈表,就能輕松實現一個 SkipList。

有序表的搜索

考慮一個有序表:

clip_image001

從該有序表中搜索元素 < 23, 43, 59 > ,需要比較的次數分別為 < 2, 4, 6 >,總共比較的次數

為 2 + 4 + 6 = 12 次。有沒有優化的算法嗎?  鏈表是有序的,但不能使用二分查找。類似二叉

搜索樹,我們把一些節點提取出來,作為索引。得到如下結構:

clip_image002

這里我們把 < 14, 34, 50, 72 > 提取出來作為一級索引,這樣搜索的時候就可以減少比較次數了。

我們還可以再從一級索引提取一些元素出來,作為二級索引,變成如下結構:

clip_image003

這里元素不多,體現不出優勢,如果元素足夠多,這種索引結構就能體現出優勢來了。

這基本上就是跳表的核心思想,其實也是一種通過“空間來換取時間”的一個算法,通過在每個節點中增加了向前的指針,從而提升查找的效率。

跳表

下面的結構是就是跳表:

其中 -1 表示 INT_MIN, 鏈表的最小值,1 表示 INT_MAX,鏈表的最大值。

clip_image005

跳表具有如下性質:

(1) 由很多層結構組成

(2) 每一層都是一個有序的鏈表

(3) 最底層(Level 1)的鏈表包含所有元素

(4) 如果一個元素出現在 Level i 的鏈表中,則它在 Level i 之下的鏈表也都會出現。

(5) 每個節點包含兩個指針,一個指向同一鏈表中的下一個元素,一個指向下面一層的元素。

跳表的搜索

clip_image007

例子:查找元素 117

(1) 比較 21, 比 21 大,往后面找

(2) 比較 37,   比 37大,比鏈表最大值小,從 37 的下面一層開始找

(3) 比較 71,  比 71 大,比鏈表最大值小,從 71 的下面一層開始找

(4) 比較 85, 比 85 大,從后面找

(5) 比較 117, 等於 117, 找到了節點。

 

 

二. 自己動手用JAVA實現SkipList跳表

單純的用鏈表來實現一個SkipList。

 

基本Node結構

package com.shoshana.skiplist;

public class SkipListNode<T> {
    public int key;
    public T value;
    public SkipListNode<T> pre, next, up, down; //上下左右四個節點,pre和up存在的意義在於 "升層"的時候需要查找相鄰節點

    public static final int HEAD_KEY = Integer.MIN_VALUE; // 負無窮
    public static final int TAIL_KEY = Integer.MAX_VALUE; // 正無窮

    public SkipListNode(int k, T v) {
        key = k;
        value = v;
    }

    public int getKey() {
        return key;
    }

    public void setKey(int key) {
        this.key = key;
    }

    public T getValue() {
        return value;
    }

    public void setValue(T value) {
        this.value = value;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null) {
            return false;
        }
        if (!(o instanceof SkipListNode<?>)) {
            return false;
        }
        SkipListNode<T> ent;
        try {
            ent = (SkipListNode<T>) o; //檢測類型
        } catch (ClassCastException ex) {
            return false;
        }
        return (ent.getKey() == key) && (ent.getValue() == value);
    }

    @Override
    public String toString() {
        return "key-value:" + key + "," + value;
    }
}

跳表實現

package com.shoshana.skiplist;

import java.util.Random;

public class SkipList<T> {
    private SkipListNode<T> head, tail;
    private int size;
    private int listLevel;
    private Random random;
    private static final double PROBABILITY = 0.5;

    public SkipList() {
        head = new SkipListNode<T>(SkipListNode.HEAD_KEY, null);
        tail = new SkipListNode<>(SkipListNode.TAIL_KEY, null);
        head.next = tail;
        tail.pre = head;
        size = 0;
        listLevel = 0;
        random = new Random();
    }

    public SkipListNode<T> get(int key) {
        SkipListNode<T> p = findNode(key);
        if (p.key == key) {
            return p;
        }
        return null;
    }

    //首先查找到包含key值的節點,將節點從鏈表中移除,接着如果有更高level的節點,則repeat這個操作即可。
    public T remove(int k) {
        SkipListNode<T> p = get(k);
        if (p == null) {
            return null;
        }
        T oldV = p.value;
        SkipListNode<T> q;
        while (p != null) {
            q = p.next;
            q.pre = p.pre;
            p.pre.next = q;
            p = p.up;
        }
        return oldV;
    }

    /**
     * put方法有一些需要注意的步驟:
     * 1.如果put的key值在跳躍表中存在,則進行修改操作;
     * 2.如果put的key值在跳躍表中不存在,則需要進行新增節點的操作,並且需要由random隨機數決定新加入的節點的高度(最大level);
     * 3.當新添加的節點高度達到跳躍表的最大level,需要添加一個空白層(除了-oo和+oo沒有別的節點)
     *
     * @param k
     * @param v
     */
    public void put(int k, T v) {
        System.out.println("添加key:" + k);
        SkipListNode<T> p = findNode(k);//這里不用get是因為下面可能用到這個節點
        System.out.println("找到P:" + p);
        if (p.key == k) {
            p.value = v;
            return;
        }

        SkipListNode<T> q = new SkipListNode<>(k, v);
        insertNode(p, q);

        int currentLevel = 0;
        while (random.nextDouble() > PROBABILITY) {
            if (currentLevel >= listLevel) {
                addEmptyLevel();
                System.out.println("升層");
            }
            while (p.up == null) {
                System.out.println(p);
                p = p.pre;
                System.out.println("找到第一個有上層結點的值" + p);
            }
            p = p.up;
            //創建 q的鏡像變量(只存儲k,不存儲v,因為查找的時候會自動找最底層數據)
            SkipListNode<T> z = new SkipListNode<>(k, null);
            insertNode(p, z);
            z.down = q;
            q.up = z;
            //別忘了把指針移到上一層。
            q = z;
            currentLevel++;
            System.out.println("添加后" + this);

        }
        size++;


    }

    /**
     * 如果傳入的key值在跳躍表中不存在,則findNode返回跳躍表中key值小於key,並且key值相差最小的底層節點;
     * 所以不能用此方法來代替get
     *
     * @param key
     * @return
     */
    public SkipListNode<T> findNode(int key) {
        SkipListNode<T> p = head;
        while (true) {
            System.out.println("p.next.key:" + p.next.key);
            if (p.next != null && p.next.key <= key) {
                p = p.next;
            }
            System.out.println("找到node:" + p);
            if (p.down != null) {
                System.out.println("node.down :" + p);
                p = p.down;
            } else if (p.next != null && p.next.key > key) {
                break;
            }
        }
        return p;
    }
    public boolean isEmpty() {
        return size == 0;
    }
    public int size() {
        return size;
    }

    public void addEmptyLevel() {
        SkipListNode<T> p1 = new SkipListNode<T>(SkipListNode.HEAD_KEY, null);
        SkipListNode<T> p2 = new SkipListNode<T>(SkipListNode.TAIL_KEY, null);
        p1.next = p2;
        p1.down = head;
        p2.pre = p1;
        p2.down = tail;
        head.up = p1;
        tail.up = p2;
        head = p1;
        tail = p2;
        listLevel++;
    }

    private void insertNode(SkipListNode<T> p, SkipListNode<T> q) {
        q.next = p.next;
        q.pre = p;
        p.next.pre = q;
        p.next = q;
    }

    public int getLevel() {
        return listLevel;
    }

}

Demo及運行

package com.shoshana.skiplist;

public class SkipListDemo {
    public static void main(String[] args) {
        SkipList<String> list = new SkipList<String>();
        list.put(10, "sho");
        list.put(1, "sha");
        list.put(9, "na");
        list.put(2, "bing");
        list.put(8, "ling");
        list.put(7, "xiao");
        list.put(100, "你好,skiplist");
        list.put(5, "冰");
        list.put(6, "靈");
        System.out.println("列表元素:\n" + list);
        System.out.println("刪除100:" + list.remove(100));
        System.out.println("列表元素:\n" + list);
        System.out.println("5對於的value:\n" + list.get(5).value);
        System.out.println("鏈表大小:" + list.size() + ",深度:" + list.getLevel());
    }
}

  運行結果:

classpath "C:\Program com.shoshana.skiplist.SkipListDemo
添加key:10
p.next.key:2147483647
找到node:key-value:-2147483648,null
找到P:key-value:-2147483648,null
升層
添加后com.shoshana.skiplist.SkipList@74a14482
添加key:1
p.next.key:10
找到node:key-value:-2147483648,null
node.down :key-value:-2147483648,null
p.next.key:10
找到node:key-value:-2147483648,null
找到P:key-value:-2147483648,null
添加key:9
p.next.key:10
找到node:key-value:-2147483648,null
node.down :key-value:-2147483648,null
p.next.key:1
找到node:key-value:1,sha
找到P:key-value:1,sha
添加key:2
p.next.key:10
找到node:key-value:-2147483648,null
node.down :key-value:-2147483648,null
p.next.key:1
找到node:key-value:1,sha
找到P:key-value:1,sha
key-value:1,sha
找到第一個有上層結點的值key-value:-2147483648,null
添加后com.shoshana.skiplist.SkipList@74a14482
添加key:8
p.next.key:2
找到node:key-value:2,null
node.down :key-value:2,null
p.next.key:9
找到node:key-value:2,bing
找到P:key-value:2,bing
添加key:7
p.next.key:2
找到node:key-value:2,null
node.down :key-value:2,null
p.next.key:8
找到node:key-value:2,bing
找到P:key-value:2,bing
添加后com.shoshana.skiplist.SkipList@74a14482
升層
key-value:2,null
找到第一個有上層結點的值key-value:-2147483648,null
添加后com.shoshana.skiplist.SkipList@74a14482
升層
添加后com.shoshana.skiplist.SkipList@74a14482
添加key:100
p.next.key:7
找到node:key-value:7,null
node.down :key-value:7,null
p.next.key:2147483647
找到node:key-value:7,null
node.down :key-value:7,null
p.next.key:10
找到node:key-value:10,null
node.down :key-value:10,null
p.next.key:2147483647
找到node:key-value:10,sho
找到P:key-value:10,sho
添加后com.shoshana.skiplist.SkipList@74a14482
key-value:10,null
找到第一個有上層結點的值key-value:7,null
添加后com.shoshana.skiplist.SkipList@74a14482
添加key:5
p.next.key:7
找到node:key-value:-2147483648,null
node.down :key-value:-2147483648,null
p.next.key:7
找到node:key-value:-2147483648,null
node.down :key-value:-2147483648,null
p.next.key:2
找到node:key-value:2,null
node.down :key-value:2,null
p.next.key:7
找到node:key-value:2,bing
找到P:key-value:2,bing
添加key:6
p.next.key:7
找到node:key-value:-2147483648,null
node.down :key-value:-2147483648,null
p.next.key:7
找到node:key-value:-2147483648,null
node.down :key-value:-2147483648,null
p.next.key:2
找到node:key-value:2,null
node.down :key-value:2,null
p.next.key:5
找到node:key-value:5,冰
找到P:key-value:5,冰
key-value:5,冰
找到第一個有上層結點的值key-value:2,bing
添加后com.shoshana.skiplist.SkipList@74a14482
key-value:2,null
找到第一個有上層結點的值key-value:-2147483648,null
添加后com.shoshana.skiplist.SkipList@74a14482
添加后com.shoshana.skiplist.SkipList@74a14482
列表元素:
com.shoshana.skiplist.SkipList@74a14482
p.next.key:6
找到node:key-value:6,null
node.down :key-value:6,null
p.next.key:7
找到node:key-value:7,null
node.down :key-value:7,null
p.next.key:10
找到node:key-value:10,null
node.down :key-value:10,null
p.next.key:100
找到node:key-value:100,你好,skiplist
刪除100:你好,skiplist
列表元素:
com.shoshana.skiplist.SkipList@74a14482
p.next.key:6
找到node:key-value:-2147483648,null
node.down :key-value:-2147483648,null
p.next.key:6
找到node:key-value:-2147483648,null
node.down :key-value:-2147483648,null
p.next.key:2
找到node:key-value:2,null
node.down :key-value:2,null
p.next.key:5
找到node:key-value:5,冰
5對於的value:
冰
鏈表大小:9,深度:3

Process finished with exit code 0

  

 

三. 分析JDK實現的跳表ConcurrentSkipListMap 

 

在JDK內部,也使用了該數據結構,比如ConcurrentSkipListMap,ConcurrentSkipListSet等。下面我們主要介紹ConcurrentSkipListMap。說到ConcurrentSkipListMap,我們就應該比較HashMap,ConcurrentHashMap,ConcurrentSkipListMap這三個類來講解。它們都是以鍵值對的方式來存儲數據的。HashMap是線程不安全的,而ConcurrentHashMap和ConcurrentSkipListMap是線程安全的,它們內部都使用無鎖CAS算法實現了同步。ConcurrentHashMap中的元素是無序的,ConcurrentSkipListMap中的元素是有序的。它們三者的具體區別可以參考具體的資料,下面主要講解ConcurrentSkipListMap的實現原理。

ConcurrentSkipListMap提供了一種線程安全的並發訪問的排序映射表。內部是SkipList(跳表)結構實現,在理論上能夠在O(log(n))時間內完成查找、插入、刪除操作。注意,調用ConcurrentSkipListMap的size時,由於多個線程可以同時對映射表進行操作,所以映射表需要遍歷整個鏈表才能返回元素個數,這個操作是個O(log(n))的操作。

 

doPut()

private V doPut(K kkey, V value, boolean onlyIfAbsent) {
    Comparable<? super K> key = comparable(kkey);
    for (;;) {
        // 找到key的前繼節點
        Node<K,V> b = findPredecessor(key);
        // 設置n為“key的前繼節點的后繼節點”,即n應該是“插入節點”的“后繼節點”
        Node<K,V> n = b.next;
        for (;;) {
            if (n != null) {
                Node<K,V> f = n.next;
                // 如果兩次獲得的b.next不是相同的Node,就跳轉到”外層for循環“,重新獲得b和n后再遍歷。
                if (n != b.next)
                    break;
                // v是“n的值”
                Object v = n.value;
                // 當n的值為null(意味着其它線程刪除了n);此時刪除b的下一個節點,然后跳轉到”外層for循環“,重新獲得b和n后再遍歷。
                if (v == null) {               // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                // 如果其它線程刪除了b;則跳轉到”外層for循環“,重新獲得b和n后再遍歷。
                if (v == n || b.value == null) // b is deleted
                    break;
                // 比較key和n.key
                int c = key.compareTo(n.key);
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (c == 0) {
                    if (onlyIfAbsent || n.casValue(v, value))
                        return (V)v;
                    else
                        break; // restart if lost race to replace value
                }
                // else c < 0; fall through
            }

            // 新建節點(對應是“要插入的鍵值對”)
            Node<K,V> z = new Node<K,V>(kkey, value, n);
            // 設置“b的后繼節點”為z
            if (!b.casNext(n, z))
                break;         // 多線程情況下,break才可能發生(其它線程對b進行了操作)
            // 隨機獲取一個level
            // 然后在“第1層”到“第level層”的鏈表中都插入新建節點
            int level = randomLevel();
            if (level > 0)
                insertIndex(z, level);
            return null;
        }
    }
}

doRemove

final V doRemove(Object okey, Object value) {
    Comparable<? super K> key = comparable(okey);
    for (;;) {
        // 找到“key的前繼節點”
        Node<K,V> b = findPredecessor(key);
        // 設置n為“b的后繼節點”(即若key存在於“跳表中”,n就是key對應的節點)
        Node<K,V> n = b.next;
        for (;;) {
            if (n == null)
                return null;
            // f是“當前節點n的后繼節點”
            Node<K,V> f = n.next;
            // 如果兩次讀取到的“b的后繼節點”不同(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
            if (n != b.next)                    // inconsistent read
                break;
            // 如果“當前節點n的值”變為null(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
            Object v = n.value;
            if (v == null) {                    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            // 如果“前繼節點b”被刪除(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
            if (v == n || b.value == null)      // b is deleted
                break;
            int c = key.compareTo(n.key);
            if (c < 0)
                return null;
            if (c > 0) {
                b = n;
                n = f;
                continue;
            }

            // 以下是c=0的情況
            if (value != null && !value.equals(v))
                return null;
            // 設置“當前節點n”的值為null
            if (!n.casValue(v, null))
                break;
            // 設置“b的后繼節點”為f
            if (!n.appendMarker(f) || !b.casNext(n, f))
                findNode(key);                  // Retry via findNode
            else {
                // 清除“跳表”中每一層的key節點
                findPredecessor(key);           // Clean index
                // 如果“表頭的右索引為空”,則將“跳表的層次”-1。
                if (head.right == null)
                    tryReduceLevel();
            }
            return (V)v;
        }
    }
}

findNode

private Node<K,V> findNode(Comparable<? super K> key) {
    for (;;) {
        // 找到key的前繼節點
        Node<K,V> b = findPredecessor(key);
        // 設置n為“b的后繼節點”(即若key存在於“跳表中”,n就是key對應的節點)
        Node<K,V> n = b.next;
        for (;;) {
            // 如果“n為null”,則跳轉中不存在key對應的節點,直接返回null。
            if (n == null)
                return null;
            Node<K,V> f = n.next;
            // 如果兩次讀取到的“b的后繼節點”不同(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
            if (n != b.next)                // inconsistent read
                break;
            Object v = n.value;
            // 如果“當前節點n的值”變為null(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
            if (v == null) {                // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (v == n || b.value == null)  // b is deleted
                break;
            // 若n是當前節點,則返回n。
            int c = key.compareTo(n.key);
            if (c == 0)
                return n;
            // 若“節點n的key”小於“key”,則說明跳表中不存在key對應的節點,返回null
            if (c < 0)
                return null;
            // 若“節點n的key”大於“key”,則更新b和n,繼續查找。
            b = n;
            n = f;
        }
    }
}

 

四. 跳表的應用場景

Java API中提供了支持並發操作的跳躍表ConcurrentSkipListSetConcurrentSkipListMap
有序的情況下: 
 在非多線程的情況下,應當盡量使用TreeMap(紅黑樹實現)。 
 對於並發性相對較低的並行程序可以使用Collections.synchronizedSortedMap將TreeMap進行包裝,也可以提供較好的效率。

但是對於高並發程序,應當使用ConcurrentSkipListMap

 
無序情況下: 
並發程度低,數據量大時,ConcurrentHashMap 存取遠大於ConcurrentSkipListMap。 
數據量一定,並發程度高時,ConcurrentSkipListMap比ConcurrentHashMap效率更高。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM