【Redis】跳躍表原理分析與基本代碼實現(java)


最近開始看Redis設計原理,碰到一個從未遇見的數據結構:跳躍表(skiplist)。於是花時間學習了跳表的原理,並用java對其實現。

介紹

跳躍表是一種有序數據結構,它通過每個結點中維持多個指向其它結點的指針,從而達到快速訪問結點的目的。

我們平時熟知的鏈表,查找效率為O(N)。跳表在鏈表的基礎上,每個結點中維護了很多指向其它結點的指針,大大縮短時間復雜度。可以實現時間復雜度平均O(logN),最壞O(N)。后文會有具體的分析和計算。

一個跳躍表示意圖:
image
由左至右依次是,跳躍表結構結點(存儲跳表信息)、頭結點、連續的跳表結點。

最外層的跳表字段結構如下所示:

public class SkipList<T extends Comparable<? super T>> {

    //首尾結點的指針
    private SkipListNode<T> header;
    private SkipListNode<T> tail;

    //記錄跳表中結點數量
    private long length;

    //最大結點的層數
    private int level;
    
    //...
}

跳表節點

跳表節點記為SkipListNode,內部字段結構如下:

class SkipListNode <T> {

    //索引層
    private SkipListLevel[] level;

    //后退指針
    private SkipListNode<T> backword;

    //分值
    private double score;

    //成員對象
    private T obj;
    
    //......
}
  • 索引層數組:多個索引層組成的數組,每個元素包含一個指向其它節點的指針。通過這些指針的訪問來加快查找速度。
  • 后退指針:指向前一個節點;
  • 分值:是一個浮點數,跳表中所有節點都按照分值從小到大來排序;
  • 成員對象:即指向具體的數據對象。

索引層

索引層SkipListLevel的結構如下:

class SkipListLevel{

    //前進指針
    private SkipListNode forward;

    //跨度
    private int span;
    
    //......
}
  • 前進指針:指向后續節點;
  • 跨度:與指向的節點之間的距離。譬如,相鄰節點距離就是1。

到這里,我們對跳表的基本結構有了一個清晰的認識。

理想的跳表

這里想先講講理想狀態的跳表,不然無法理解實際跳表為什么可以縮減時間復雜度。

跳表節點間的關聯方式:(索引層中的前向指針)第一層逐個鏈接,第二層每隔t個節點進行鏈接,第三層每隔2*t個節點進行鏈接,不斷迭代。這里取t=2,畫出每個節點的索引層之間的關聯關系,得到如下圖形式的鏈式結構:

image

有點像完全二叉樹的結構。因此很容易理解:節點總數為N時,層最大高度為1+logN。例如圖中有8個節點,最大層高為4。

搜索規則:從頭結點的索引層的末端開始向下遍歷。如果第K層的下一節點小於target,則移到該節點;若不小於,則下移到第K-1層。

按照此搜索規則,假設需要查找的target為7a,則搜索路徑為0d--8d--0c--4c--4b--6b--6a--7a,如下圖所示:

image

上述過程中,分別在8d、4c、6b、7a處進行比較。可見每一層都比較了一次,所以比較次數等於層數,為logN+1。所以時間復雜度為O(logN)。

如果實際的跳表按照這種形式進行設計,每次插入節點時,需要對很多結點的索引層進行調整,節點的插入刪除將成為極其復雜的工作。因此,實際的跳表使用一種基於概率統計的算法,簡化插入刪除帶來的調整工作,同時也能得到O(logN)的時間復雜度。

實際的跳表

每當需要新增一個節點時,需要考慮如何確定該節點的索引層層數,即SkipListLevel[]數組的長度。

如何確定“層”的高度?

在redis中,每次創建一個節點,都會根據冪次定律隨機生成一個介於1和32之間的值作為索引層的高度。問題是,這個隨機的過程如何設計?

我們觀察理想狀態跳表,可以發現,不算頭節點總共8個節點,其中4個節點擁有2層索引,2個節點擁有3層索引,1個節點擁有4層索引。

可以近似看作滿足這樣的規律:節點索引層高度為 j 的概率為 1/2^j。因此每次生成新節點時,通過這樣的概率計算可以得到索引層層數。代碼如下所示:

/**
 * 獲取隨機的層高度
 * @return
 */
private int getRandomHeight() {
    Random random = new Random();
    int i = 1;
    for (; i < 32; ++i) {
        if (random.nextInt(2) == 0) {
            break;
        }
    }
    return i;
}

注意:在redis中最大索引高度不超過32

為什么時間復雜度平均O(logN),最壞O(N)?

當節點數量足夠多時,這種方式得到的跳躍表形態可以逼近理想的跳表的。很慚愧我不知道怎么證明,學過概率統計的同學一定很容易理解。它的時間復雜度就是近似為 O(logN) 。當然也有不理想的情況,當跳表中每一個節點隨機得到的層高度都是 1 時,跳表就是一個普通雙向鏈表,時間復雜度為 O(N) 。因此,時間復雜度平均O(logN)、最壞O(N),這種說法是比較嚴謹的。

節點的分值

這個分值 score 很容易與節點的“跨度”混淆。跨度其實就是節點在跳表中的排位,或者說序號。而分值是一個節點屬性。節點按照分值大小由小到大排列,不同節點的分值可以相等。如果分值相等,對象較大的會排在后面(靠近表尾方向)。

在實際API應用中,需要以分值和obj成員對象作為target進行查詢、插入等操作。

功能實現

跳躍表的初始化-代碼實現

class SkipList:

//構造方法初始化SkipList
public SkipList() {
    SkipListNode<T> node = new SkipListNode<>(null);
    this.header = node;
    this.tail = node;
    this.length = 0;
    this.maxLevelHeight = 0;
}

class SkipListNode:

//初始化頭結點
SkipListNode(T obj){
    this.obj = obj;
    this.level = new SkipListLevel[32];
    initLevel(this.level,32);
    this.score = 0;
}
//根據"層高"和"分值",新建一個節點
SkipListNode(T obj, int levelHeight,double score){
    this.obj = obj;
    this.level = new SkipListLevel[levelHeight];
    initLevel(this.level,levelHeight);
    this.score = score;
}
private void initLevel(SkipListLevel[] level, int height){
    for(int i=0;i<height;++i){
        level[i] = new SkipListLevel();
    }
}

跳躍表的插入-代碼實現

流程如下:

  • 按照冪次定律獲取隨機數,作為索引層的高度levelHeight,實例化新節點target;
  • 設置一個SkipListNode類型的數組,update[](記錄所有需要進行調整的前置位節點,包括需要調整forword、或者只需要修改span值的節點),update[]的大小為max(levelHeight,maxLevelHeight);
  • 設置int數組rank[],記錄update[]數組中各個對應節點的排位
  • 遍歷 update[] 進行插入和更新操作;根據update[]獲取插入位置節點,進行插入;根據rank[]來輔助更新跨度值span。

實際代碼比上述流程要復雜很多,levelHeight與maxLevelHeight的大小關系不能確定,根據不同的情況要對update[]進行不同的處理。

跳躍表插入的代碼如下所示:

注意:是依據score大小和obj的大小來決定插入順序

public SkipListNode slInsert(double score, T obj) {
    int levelHeight = getRandomHeight();
    SkipListNode<T> target = new SkipListNode<>(obj, levelHeight, score);
    // update[i] 記錄所有需要進行調整的前置位節點
    SkipListNode[] update = new SkipListNode[Math.max(levelHeight, maxLevel)];
    int[] rank = new int[update.length];//記錄每一個update節點的排位
    int i = update.length - 1;
    if (levelHeight > maxLevel) {
        for (; i >= maxLevel; --i) {
            update[i] = header;
            rank[i] = 0;
        }
        maxLevel = levelHeight;
    }
    for (; i >= 0; --i) {

        SkipListNode<T> node = header;
        SkipListNode<T> next = node.getLevel()[i].getForward();
        rank[i] = 0;
        //遍歷得到與target最接近的節點(左側)
        while (next != null && (score > next.getScore() || score == next.getScore() && next.getObj().compareTo(obj) < 0)) {
            rank[i] += node.getLevel()[i].getSpan();
            node = next;
            next = node.getLevel()[i].getForward();

        }
        update[i] = node;
    }

    //當maxLevel>levelHeight,前面部分節點的span值加1,因為該節點與forword指向節點之間將要 多出來一個新節點
    for (i = update.length - 1; i >= levelHeight; --i) {
        int span = update[i].getLevel()[i].getSpan();
        update[i].getLevel()[i].setSpan(++span);
    }
    //遍歷 update[] 進行插入和更新操作
    for (; i >= 0; --i) {

        SkipListLevel pre = update[i].getLevel()[i];
        //將target節點插入update[i]和temp之間
        SkipListNode<T> temp = pre.getForward();
        int span = pre.getSpan();

        pre.setForward(target);
        pre.setSpan(rank[0] + 1 - rank[i]);

        target.getLevel()[i].setSpan(span > 0 ? (span - rank[0] + rank[i]) : 0);
        target.getLevel()[i].setForward(temp);
        //設置后退指針
        if (temp == null) {
            target.setBackword(header);
        } else {
            target.setBackword(temp.getBackword());
            temp.setBackword(target);
        }

    }

    if (tail.getLevel()[0].getForward() != null) {
        tail = target;
    }
    length++;
    return target;

}

跳躍表的節點刪除-代碼實現

根據分值和成員對象來刪除跳表中對應節點

/**
 * 刪除節點
 * @param obj
 * @return 刪除的節點(若節點不存在則返回null)
 */
public SkipListNode zslDelete(double score, T obj) {
    SkipListNode[] update = new SkipListNode[maxLevelHeight];
    SkipListNode<T> node = header;
    for (int i = maxLevelHeight - 1; i >= 0; --i) {
        SkipListNode<T> next = node.getLevel()[i].getForward();
        //遍歷得到與target最接近的節點
        while (next != null && (score > next.getScore() || score == next.getScore() && next.getObj().compareTo(obj) < 0)) {
            node = next;
            next = node.getLevel()[i].getForward();
        }
        update[i] = node;
    }
    //待刪除的目標節點
    SkipListNode<T> target = update[0].getLevel()[0].getForward();
    if(target==null) return null;

    for (int i = maxLevelHeight - 1; i >= 0; --i) {
        SkipListLevel current = update[i].getLevel()[i];
        SkipListNode<T> next = current.getForward();
        if (next == null) continue;
        if (next != target) {
            current.modifySpan(-1);
            continue;
        }
        current.setForward(target.getLevel()[i].getForward());
        if(current.getForward()!=null)
            current.modifySpan(target.getLevel()[i].getSpan() - 1);
        else
            current.setSpan(0);
    }
    length--;
    while(header.getLevel()[maxLevelHeight-1].getSpan()==0){
        maxLevelHeight--;
    }
    return target;
}

跳躍表的節點查詢-代碼實現

  1. 根據分值范圍 fromScore~toScore,返回第一個符合范圍的節點
  • 參數 node 是開始查詢的位置,調用時傳入header , 遞歸過程會發生變化;
  • k 是當前層數,從最高層開始遞歸遍歷;
public SkipListNode<T> zslFirstInRange(double fromScore, double toScore, SkipListNode<T> node, int k) {
    if (!zslIsInRange(fromScore, toScore)) {
        return null;
    }

    SkipListNode<T> next = node.getLevel()[k].getForward();

    if (next == null || next.getScore() >= fromScore) {
        if (k == 0) return next != null && next.getScore() > toScore ? null : next;
        return zslFirstInRange(fromScore, toScore, node, k - 1);
    }
    return zslFirstInRange(fromScore, toScore, next, k);
}
  1. 根據分值范圍,返回最后一個符合范圍的節點
public SkipListNode<T> zslLastInRange(double fromScore, double toScore, SkipListNode<T> node, int k) {
    if (!zslIsInRange(fromScore, toScore)) {
        return null;
    }

    SkipListNode<T> next = node.getLevel()[k].getForward();

    if (next == null || next.getScore() > toScore) {
        if (k == 0) return next != null && next.getScore() < fromScore ? null : node;
        return zslLastInRange(fromScore, toScore, node, k - 1);
    }
    return zslLastInRange(fromScore, toScore, next, k);
}

本篇博客介紹了跳躍表基本原理,並使用java完成了基本數據結構的封裝,實現了節點“插入”、“刪除”、“搜索”等核心功能的代碼實現。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM