前記
最近在看Redis,之間就嘗試用sortedSet用在實現排行榜的項目,那么sortedSet底層是什么結構呢? "Redis sorted set的內部使用HashMap和跳躍表(SkipList)來保證數據的存儲和有序,HashMap里放的是成員到score的映射,而跳躍表里存放的是所有的成員,排序依據是HashMap里存的score,使用跳躍表的結構可以獲得比較高的查找效率,並且在實現上比較簡單。” 那么什么是SkipList跳表呢?下面我們從理解它的思想到實現及應用去做一個大致的了解。
一.跳表的原理及思想
跳表的背景
Skip list是一個用於有序元素序列快速搜索的數據結構,由美國計算機科學家William Pugh發明於1989年。他在論文《Skip lists: a probabilistic alternative to balanced trees》中詳細介紹了跳表的數據結構和插入刪除等操作。論文是這么介紹跳表的:
Skip lists are a data structure that can be used in place of balanced trees.
Skip lists use probabilistic balancing rather than strictly enforced balancing and as a result the algorithms for insertion and deletion in skip lists are much simpler and significantly faster than equivalent algorithms for balanced trees.
也就是說,
Skip list是一個“概率型”的數據結構,可以在很多應用場景中替代平衡樹。Skip list算法與平衡樹相比,有相似的漸進期望時間邊界,但是它更簡單,更快,使用更少的空間。
Skip list是一個分層結構多級鏈表,最下層是原始的鏈表,每個層級都是下一個層級的“高速跑道”。
為什么選擇跳表
目前經常使用的平衡數據結構有:B樹,紅黑樹,AVL樹,Splay Tree, Treep等。
想象一下,給你一張草稿紙,一只筆,一個編輯器,你能立即實現一顆紅黑樹,或者AVL樹
出來嗎? 很難吧,這需要時間,要考慮很多細節,要參考一堆算法與數據結構之類的樹,
還要參考網上的代碼,相當麻煩。
用跳表吧,跳表是一種隨機化的數據結構,目前開源軟件 Redis 和 LevelDB 都有用到它,
它的效率和紅黑樹以及 AVL 樹不相上下,但跳表的原理相當簡單,只要你能熟練操作鏈表,就能輕松實現一個 SkipList。
有序表的搜索
考慮一個有序表:
從該有序表中搜索元素 < 23, 43, 59 > ,需要比較的次數分別為 < 2, 4, 6 >,總共比較的次數
為 2 + 4 + 6 = 12 次。有沒有優化的算法嗎? 鏈表是有序的,但不能使用二分查找。類似二叉
搜索樹,我們把一些節點提取出來,作為索引。得到如下結構:
這里我們把 < 14, 34, 50, 72 > 提取出來作為一級索引,這樣搜索的時候就可以減少比較次數了。
我們還可以再從一級索引提取一些元素出來,作為二級索引,變成如下結構:
這里元素不多,體現不出優勢,如果元素足夠多,這種索引結構就能體現出優勢來了。
這基本上就是跳表的核心思想,其實也是一種通過“空間來換取時間”的一個算法,通過在每個節點中增加了向前的指針,從而提升查找的效率。
跳表
下面的結構是就是跳表:
其中 -1 表示 INT_MIN, 鏈表的最小值,1 表示 INT_MAX,鏈表的最大值。
跳表具有如下性質:
(1) 由很多層結構組成
(2) 每一層都是一個有序的鏈表
(3) 最底層(Level 1)的鏈表包含所有元素
(4) 如果一個元素出現在 Level i 的鏈表中,則它在 Level i 之下的鏈表也都會出現。
(5) 每個節點包含兩個指針,一個指向同一鏈表中的下一個元素,一個指向下面一層的元素。
跳表的搜索
例子:查找元素 117
(1) 比較 21, 比 21 大,往后面找
(2) 比較 37, 比 37大,比鏈表最大值小,從 37 的下面一層開始找
(3) 比較 71, 比 71 大,比鏈表最大值小,從 71 的下面一層開始找
(4) 比較 85, 比 85 大,從后面找
(5) 比較 117, 等於 117, 找到了節點。
二. 自己動手用JAVA實現SkipList跳表
單純的用鏈表來實現一個SkipList。
基本Node結構
package com.shoshana.skiplist;
public class SkipListNode<T> {
public int key;
public T value;
public SkipListNode<T> pre, next, up, down; //上下左右四個節點,pre和up存在的意義在於 "升層"的時候需要查找相鄰節點
public static final int HEAD_KEY = Integer.MIN_VALUE; // 負無窮
public static final int TAIL_KEY = Integer.MAX_VALUE; // 正無窮
public SkipListNode(int k, T v) {
key = k;
value = v;
}
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null) {
return false;
}
if (!(o instanceof SkipListNode<?>)) {
return false;
}
SkipListNode<T> ent;
try {
ent = (SkipListNode<T>) o; //檢測類型
} catch (ClassCastException ex) {
return false;
}
return (ent.getKey() == key) && (ent.getValue() == value);
}
@Override
public String toString() {
return "key-value:" + key + "," + value;
}
}
跳表實現
package com.shoshana.skiplist;
import java.util.Random;
public class SkipList<T> {
private SkipListNode<T> head, tail;
private int size;
private int listLevel;
private Random random;
private static final double PROBABILITY = 0.5;
public SkipList() {
head = new SkipListNode<T>(SkipListNode.HEAD_KEY, null);
tail = new SkipListNode<>(SkipListNode.TAIL_KEY, null);
head.next = tail;
tail.pre = head;
size = 0;
listLevel = 0;
random = new Random();
}
public SkipListNode<T> get(int key) {
SkipListNode<T> p = findNode(key);
if (p.key == key) {
return p;
}
return null;
}
//首先查找到包含key值的節點,將節點從鏈表中移除,接着如果有更高level的節點,則repeat這個操作即可。
public T remove(int k) {
SkipListNode<T> p = get(k);
if (p == null) {
return null;
}
T oldV = p.value;
SkipListNode<T> q;
while (p != null) {
q = p.next;
q.pre = p.pre;
p.pre.next = q;
p = p.up;
}
return oldV;
}
/**
* put方法有一些需要注意的步驟:
* 1.如果put的key值在跳躍表中存在,則進行修改操作;
* 2.如果put的key值在跳躍表中不存在,則需要進行新增節點的操作,並且需要由random隨機數決定新加入的節點的高度(最大level);
* 3.當新添加的節點高度達到跳躍表的最大level,需要添加一個空白層(除了-oo和+oo沒有別的節點)
*
* @param k
* @param v
*/
public void put(int k, T v) {
System.out.println("添加key:" + k);
SkipListNode<T> p = findNode(k);//這里不用get是因為下面可能用到這個節點
System.out.println("找到P:" + p);
if (p.key == k) {
p.value = v;
return;
}
SkipListNode<T> q = new SkipListNode<>(k, v);
insertNode(p, q);
int currentLevel = 0;
while (random.nextDouble() > PROBABILITY) {
if (currentLevel >= listLevel) {
addEmptyLevel();
System.out.println("升層");
}
while (p.up == null) {
System.out.println(p);
p = p.pre;
System.out.println("找到第一個有上層結點的值" + p);
}
p = p.up;
//創建 q的鏡像變量(只存儲k,不存儲v,因為查找的時候會自動找最底層數據)
SkipListNode<T> z = new SkipListNode<>(k, null);
insertNode(p, z);
z.down = q;
q.up = z;
//別忘了把指針移到上一層。
q = z;
currentLevel++;
System.out.println("添加后" + this);
}
size++;
}
/**
* 如果傳入的key值在跳躍表中不存在,則findNode返回跳躍表中key值小於key,並且key值相差最小的底層節點;
* 所以不能用此方法來代替get
*
* @param key
* @return
*/
public SkipListNode<T> findNode(int key) {
SkipListNode<T> p = head;
while (true) {
System.out.println("p.next.key:" + p.next.key);
if (p.next != null && p.next.key <= key) {
p = p.next;
}
System.out.println("找到node:" + p);
if (p.down != null) {
System.out.println("node.down :" + p);
p = p.down;
} else if (p.next != null && p.next.key > key) {
break;
}
}
return p;
}
public boolean isEmpty() {
return size == 0;
}
public int size() {
return size;
}
public void addEmptyLevel() {
SkipListNode<T> p1 = new SkipListNode<T>(SkipListNode.HEAD_KEY, null);
SkipListNode<T> p2 = new SkipListNode<T>(SkipListNode.TAIL_KEY, null);
p1.next = p2;
p1.down = head;
p2.pre = p1;
p2.down = tail;
head.up = p1;
tail.up = p2;
head = p1;
tail = p2;
listLevel++;
}
private void insertNode(SkipListNode<T> p, SkipListNode<T> q) {
q.next = p.next;
q.pre = p;
p.next.pre = q;
p.next = q;
}
public int getLevel() {
return listLevel;
}
}
Demo及運行
package com.shoshana.skiplist;
public class SkipListDemo {
public static void main(String[] args) {
SkipList<String> list = new SkipList<String>();
list.put(10, "sho");
list.put(1, "sha");
list.put(9, "na");
list.put(2, "bing");
list.put(8, "ling");
list.put(7, "xiao");
list.put(100, "你好,skiplist");
list.put(5, "冰");
list.put(6, "靈");
System.out.println("列表元素:\n" + list);
System.out.println("刪除100:" + list.remove(100));
System.out.println("列表元素:\n" + list);
System.out.println("5對於的value:\n" + list.get(5).value);
System.out.println("鏈表大小:" + list.size() + ",深度:" + list.getLevel());
}
}
運行結果:
classpath "C:\Program com.shoshana.skiplist.SkipListDemo 添加key:10 p.next.key:2147483647 找到node:key-value:-2147483648,null 找到P:key-value:-2147483648,null 升層 添加后com.shoshana.skiplist.SkipList@74a14482 添加key:1 p.next.key:10 找到node:key-value:-2147483648,null node.down :key-value:-2147483648,null p.next.key:10 找到node:key-value:-2147483648,null 找到P:key-value:-2147483648,null 添加key:9 p.next.key:10 找到node:key-value:-2147483648,null node.down :key-value:-2147483648,null p.next.key:1 找到node:key-value:1,sha 找到P:key-value:1,sha 添加key:2 p.next.key:10 找到node:key-value:-2147483648,null node.down :key-value:-2147483648,null p.next.key:1 找到node:key-value:1,sha 找到P:key-value:1,sha key-value:1,sha 找到第一個有上層結點的值key-value:-2147483648,null 添加后com.shoshana.skiplist.SkipList@74a14482 添加key:8 p.next.key:2 找到node:key-value:2,null node.down :key-value:2,null p.next.key:9 找到node:key-value:2,bing 找到P:key-value:2,bing 添加key:7 p.next.key:2 找到node:key-value:2,null node.down :key-value:2,null p.next.key:8 找到node:key-value:2,bing 找到P:key-value:2,bing 添加后com.shoshana.skiplist.SkipList@74a14482 升層 key-value:2,null 找到第一個有上層結點的值key-value:-2147483648,null 添加后com.shoshana.skiplist.SkipList@74a14482 升層 添加后com.shoshana.skiplist.SkipList@74a14482 添加key:100 p.next.key:7 找到node:key-value:7,null node.down :key-value:7,null p.next.key:2147483647 找到node:key-value:7,null node.down :key-value:7,null p.next.key:10 找到node:key-value:10,null node.down :key-value:10,null p.next.key:2147483647 找到node:key-value:10,sho 找到P:key-value:10,sho 添加后com.shoshana.skiplist.SkipList@74a14482 key-value:10,null 找到第一個有上層結點的值key-value:7,null 添加后com.shoshana.skiplist.SkipList@74a14482 添加key:5 p.next.key:7 找到node:key-value:-2147483648,null node.down :key-value:-2147483648,null p.next.key:7 找到node:key-value:-2147483648,null node.down :key-value:-2147483648,null p.next.key:2 找到node:key-value:2,null node.down :key-value:2,null p.next.key:7 找到node:key-value:2,bing 找到P:key-value:2,bing 添加key:6 p.next.key:7 找到node:key-value:-2147483648,null node.down :key-value:-2147483648,null p.next.key:7 找到node:key-value:-2147483648,null node.down :key-value:-2147483648,null p.next.key:2 找到node:key-value:2,null node.down :key-value:2,null p.next.key:5 找到node:key-value:5,冰 找到P:key-value:5,冰 key-value:5,冰 找到第一個有上層結點的值key-value:2,bing 添加后com.shoshana.skiplist.SkipList@74a14482 key-value:2,null 找到第一個有上層結點的值key-value:-2147483648,null 添加后com.shoshana.skiplist.SkipList@74a14482 添加后com.shoshana.skiplist.SkipList@74a14482 列表元素: com.shoshana.skiplist.SkipList@74a14482 p.next.key:6 找到node:key-value:6,null node.down :key-value:6,null p.next.key:7 找到node:key-value:7,null node.down :key-value:7,null p.next.key:10 找到node:key-value:10,null node.down :key-value:10,null p.next.key:100 找到node:key-value:100,你好,skiplist 刪除100:你好,skiplist 列表元素: com.shoshana.skiplist.SkipList@74a14482 p.next.key:6 找到node:key-value:-2147483648,null node.down :key-value:-2147483648,null p.next.key:6 找到node:key-value:-2147483648,null node.down :key-value:-2147483648,null p.next.key:2 找到node:key-value:2,null node.down :key-value:2,null p.next.key:5 找到node:key-value:5,冰 5對於的value: 冰 鏈表大小:9,深度:3 Process finished with exit code 0
三. 分析JDK實現的跳表ConcurrentSkipListMap
在JDK內部,也使用了該數據結構,比如ConcurrentSkipListMap,ConcurrentSkipListSet等。下面我們主要介紹ConcurrentSkipListMap。說到ConcurrentSkipListMap,我們就應該比較HashMap,ConcurrentHashMap,ConcurrentSkipListMap這三個類來講解。它們都是以鍵值對的方式來存儲數據的。HashMap是線程不安全的,而ConcurrentHashMap和ConcurrentSkipListMap是線程安全的,它們內部都使用無鎖CAS算法實現了同步。ConcurrentHashMap中的元素是無序的,ConcurrentSkipListMap中的元素是有序的。它們三者的具體區別可以參考具體的資料,下面主要講解ConcurrentSkipListMap的實現原理。
ConcurrentSkipListMap提供了一種線程安全的並發訪問的排序映射表。內部是SkipList(跳表)結構實現,在理論上能夠在O(log(n))時間內完成查找、插入、刪除操作。注意,調用ConcurrentSkipListMap的size時,由於多個線程可以同時對映射表進行操作,所以映射表需要遍歷整個鏈表才能返回元素個數,這個操作是個O(log(n))的操作。
doPut()
private V doPut(K kkey, V value, boolean onlyIfAbsent) {
Comparable<? super K> key = comparable(kkey);
for (;;) {
// 找到key的前繼節點
Node<K,V> b = findPredecessor(key);
// 設置n為“key的前繼節點的后繼節點”,即n應該是“插入節點”的“后繼節點”
Node<K,V> n = b.next;
for (;;) {
if (n != null) {
Node<K,V> f = n.next;
// 如果兩次獲得的b.next不是相同的Node,就跳轉到”外層for循環“,重新獲得b和n后再遍歷。
if (n != b.next)
break;
// v是“n的值”
Object v = n.value;
// 當n的值為null(意味着其它線程刪除了n);此時刪除b的下一個節點,然后跳轉到”外層for循環“,重新獲得b和n后再遍歷。
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
// 如果其它線程刪除了b;則跳轉到”外層for循環“,重新獲得b和n后再遍歷。
if (v == n || b.value == null) // b is deleted
break;
// 比較key和n.key
int c = key.compareTo(n.key);
if (c > 0) {
b = n;
n = f;
continue;
}
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value))
return (V)v;
else
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
// 新建節點(對應是“要插入的鍵值對”)
Node<K,V> z = new Node<K,V>(kkey, value, n);
// 設置“b的后繼節點”為z
if (!b.casNext(n, z))
break; // 多線程情況下,break才可能發生(其它線程對b進行了操作)
// 隨機獲取一個level
// 然后在“第1層”到“第level層”的鏈表中都插入新建節點
int level = randomLevel();
if (level > 0)
insertIndex(z, level);
return null;
}
}
}
doRemove
final V doRemove(Object okey, Object value) {
Comparable<? super K> key = comparable(okey);
for (;;) {
// 找到“key的前繼節點”
Node<K,V> b = findPredecessor(key);
// 設置n為“b的后繼節點”(即若key存在於“跳表中”,n就是key對應的節點)
Node<K,V> n = b.next;
for (;;) {
if (n == null)
return null;
// f是“當前節點n的后繼節點”
Node<K,V> f = n.next;
// 如果兩次讀取到的“b的后繼節點”不同(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
if (n != b.next) // inconsistent read
break;
// 如果“當前節點n的值”變為null(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
Object v = n.value;
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
// 如果“前繼節點b”被刪除(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
if (v == n || b.value == null) // b is deleted
break;
int c = key.compareTo(n.key);
if (c < 0)
return null;
if (c > 0) {
b = n;
n = f;
continue;
}
// 以下是c=0的情況
if (value != null && !value.equals(v))
return null;
// 設置“當前節點n”的值為null
if (!n.casValue(v, null))
break;
// 設置“b的后繼節點”為f
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // Retry via findNode
else {
// 清除“跳表”中每一層的key節點
findPredecessor(key); // Clean index
// 如果“表頭的右索引為空”,則將“跳表的層次”-1。
if (head.right == null)
tryReduceLevel();
}
return (V)v;
}
}
}
findNode
private Node<K,V> findNode(Comparable<? super K> key) {
for (;;) {
// 找到key的前繼節點
Node<K,V> b = findPredecessor(key);
// 設置n為“b的后繼節點”(即若key存在於“跳表中”,n就是key對應的節點)
Node<K,V> n = b.next;
for (;;) {
// 如果“n為null”,則跳轉中不存在key對應的節點,直接返回null。
if (n == null)
return null;
Node<K,V> f = n.next;
// 如果兩次讀取到的“b的后繼節點”不同(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
if (n != b.next) // inconsistent read
break;
Object v = n.value;
// 如果“當前節點n的值”變為null(其它線程操作了該跳表),則返回到“外層for循環”重新遍歷。
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b is deleted
break;
// 若n是當前節點,則返回n。
int c = key.compareTo(n.key);
if (c == 0)
return n;
// 若“節點n的key”小於“key”,則說明跳表中不存在key對應的節點,返回null
if (c < 0)
return null;
// 若“節點n的key”大於“key”,則更新b和n,繼續查找。
b = n;
n = f;
}
}
}
四. 跳表的應用場景
Java API中提供了支持並發操作的跳躍表ConcurrentSkipListSet和ConcurrentSkipListMap。
有序的情況下:
在非多線程的情況下,應當盡量使用TreeMap(紅黑樹實現)。
對於並發性相對較低的並行程序可以使用Collections.synchronizedSortedMap將TreeMap進行包裝,也可以提供較好的效率。
但是對於高並發程序,應當使用ConcurrentSkipListMap。
無序情況下:
並發程度低,數據量大時,ConcurrentHashMap 存取遠大於ConcurrentSkipListMap。
數據量一定,並發程度高時,ConcurrentSkipListMap比ConcurrentHashMap效率更高。





