並發讀寫緩存實現機制(二):高並發下數據寫入與過期


 
    在上一章中,我們講解了ConcurrentHashMap的讀取效率很高的原因,一般來說並發的讀取和寫入是一對矛盾體,而緩存的過期移除和持久化則是另一對矛盾體。這一節,我們着重來了解下高並發情況下緩存的寫入、過期控制及周邊相關功能。
 

1.高效的數據寫入(put)

    在研究寫入機制之前,我們先來回顧下上一節的內容。ConcurrentHashMap之所以讀取很快,很大一部分原因歸功於它的數據分割設計,就像是把書的內容划分為很多章,章下面又分了許多小節。同樣的原理,寫入過程也可以按這個規則把數據分為很多獨立的塊,也就是前一節提到的Segment。另一方面為了解決並發問題,加鎖是一個不錯的選擇。再回頭看看Segment類圖(清單1),Segment其實是繼承了ReentrantLock,自己本身就是一個鎖。
清單1:Segment類圖
 
    想要最大限度的支持並發,就是讀寫操作都不加鎖,JDK8 就實現了無鎖的並發HashMap,效率更高的同時復雜度也更大;而在鎖機制中,一個很好的方法就是讀操作不加鎖,寫操作加鎖,對於競爭資源來說就需要定義為volatile類型的。volatile類型能夠保證happens-before法則,所以volatile能夠近似保證正確性的情況下最大程度的降低加鎖帶來的影響,同時還與寫操作的鎖不產生沖突。

鎖分離” 技術

    在競爭激烈的情況下,如果寫入時對緩存中所有數據都加鎖,效率必然低下,HashTable的效率不高就是因為這個原因。為此ConcurrentHashMap默認把數據分為16個Segment,每個Segment就是一把鎖,也就是一個獨立的數據塊,那么多線程讀寫不同Segment的數據時就不會存在鎖競爭,從而可以有效的提高讀寫效率,這就是“鎖分離”技術。緩存中幾乎所有的操作都是基於獨立的segment數據塊,且在修改時必須對segment加鎖
 
    緩存的put()操作與get()操作類似,得到元素修改就行了,更多的參考源碼,這里有幾點要注意:
    a.如果對數據做了新增或移除,需要修改count的數值,這個要放到整個操作的最后,為什么?前面說過count是volatile類型,而讀取操作沒有加鎖,所以只能把元素真正寫回Segment中的時候才能修改count值,所以這個必須要放到整個操作的最后。
    b.因為HashEntry的next屬性是final的,所以后加入的元素都是加在鏈表的頭部
    c.為什么在put操作中首先建立一個臨時變量tab指向Segment的table,而不是直接使用table?這是因為table變量是volatile類型,多次讀取volatile類型的開銷要比非volatile開銷要大,而且編譯器也無法優化,多次讀寫tab的效率要比volatile類型的table要高,JVM也能夠對此進行優化。    

2.巧妙的數據移除(remove)

    上一節中,我們也提到,為了防止在遍歷HashEntry的時候被破壞,HashEntry中除了value之外其他屬性都是final常量,否則不可避免的會得到ConcurrentModificationException,這就意味着,不能把節點添加到鏈表的中間和尾部,也不能在鏈表的中間和尾部刪除節點,只能在頭部添加節點。這個特性可以保證:在訪問某個節點時,這個節點之后的鏈表不會被改變。這樣可以大大降低處理鏈表時的復雜性。既然不能改變鏈表,緩存到底是如何移除對象的呢?我們首先來看下面兩幅圖:
 
清單2. 執行刪除之前的原鏈表1:
清單3. 執行刪除之后的新鏈表2

    假設現在有一元素C需要刪除,根據上一節所講,C必然存在某一鏈表1中,假設這個鏈表結構為A->B->C->D->E,那現在該如何從鏈表1中刪除C元素?
    根據上一節的內容,我們先要經過2次hash定位,即我們先要定位到C所在的Segment,然后再定位到Segment中table的下標(C所在的鏈表)。然后遍歷鏈表找到C元素,找到之后就把C的next節點D作為臨時頭節點構成鏈表2,然后從現有頭節點A開始向后迭代加入到鏈表2的頭部,一直到需要刪除的C節點結束,即A、B依次都作為臨時頭節點加入鏈表2,最后的情形就是A加入到D前面,B又加入到A前面,這樣就構造出來一個新的鏈表B->A->D->E,然后將此鏈表的最新頭節點B設置到Segment的table中。這樣就完成了元素C的刪除操作。
    需要說明的是,盡管舊的鏈表仍然存在(A->B->C->D->E),但是由於沒有引用指向此鏈表,所以此鏈表中無引用的(A->B->C)最終會被GC回收掉。這樣做的一個好處是,如果某個讀操作在刪除時已經定位到了舊的鏈表上,那么此操作仍然將能讀到數據,只不過讀取到的是舊數據而已,這在多線程里面是沒有問題的。
    從上面的流程可以看出,緩存的數據移除不是通過更改節點的next屬性,而是通過重新構造一條新的鏈表來實現的,這樣即保證了鏈條的完整性,同時也保證了並發讀取的正確性。源碼如下:
 
 清單4:緩存數據的移除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 
void removeEntry(HashEntry entry, int hash) {
    int c = count - 1;
    AtomicReferenceArray<HashEntry> tab = table;
    int index = hash & (tab.length() - 1);
    HashEntry first = tab.get(index);

    for (HashEntry e = first; e != null; e = e.next) {
        if (e == entry) {
            ++modCount;
            // 從鏈表1中刪除元素entry,且返回鏈表2的頭節點
            HashEntry newFirst = removeEntryFromChain(first, entry);

            // 將鏈表2的新的頭節點設置到segment的table中
            tab.set(index, newFirst);

            count = c; // write-volatile,segment內的元素個數-1
            return;

        }
    }
}

HashEntry removeEntryFromChain(HashEntry first, HashEntry entry) {
    HashEntry newFirst = entry.next;
    // 從鏈條1的頭節點first開始迭代到需要刪除的節點entry
    for (HashEntry e = first; e != entry; e = e.next) {

        // 拷貝e的屬性,並作為鏈條2的臨時頭節點
        newFirst = copyEntry(e, newFirst);

    }
    accessQueue.remove(entry);
    return newFirst;
}

HashEntry copyEntry(HashEntry original, HashEntry newNext) {
    // 屬性拷貝
    HashEntry newEntry = new HashEntry(original.getKey(), original.getHash(), newNext, original.value);

    copyAccessEntry(original, newEntry);
    return newEntry;
}

3.按需擴容機制(rehash)

    緩存中構造函數有3個參數與容量相關:initialCapacity代表緩存的初始容量、loadFactor代表負載因子、concurrencyLevel字面上的意思是並發等級,其實就是segment的數量(內部會轉變為2的n次方)。既然有初始容量,則自然有容量不足的情況,這種情況就需要對系統擴容,隨之而來的就是2個問題:何時擴容以及如何擴容?
    第一個問題:何時擴容,緩存就使用到了loadFactor負載因子,在插入元素前會先判斷Segment里的HashEntry數組是否超過閾值threshold = (int) (newTable.length() * loadFactor),如果超過閥值,則調用rehash方法進行擴容。
    那如何擴容呢?擴容的時候首先會創建一個兩倍於原容量的數組,然后將原數組里的元素進行再hash后插入到新的數組里。為了效率緩存不會對整個容器進行擴容,而只對某個segment進行擴容。這樣對於其他segment的讀寫都不影響。擴容的本質就是把數據的key按照新的容量重新hash放到新組建的數組中,但是相較於HashMap的擴容,ConcurrentHashMap有了些許改進。
    我們來看個小例子:假設原有數組長度為16,根據上一節知識,我們知道掩碼為1111,擴容后新的數組長度為16*2=32,掩碼為11111。由下圖可以看出擴容前掩碼15到擴容后掩碼31,也就是粉色那一列的掩碼值由0變為1,這樣子如果hash藍色那一列的值原來是0,則擴容后下標和擴容前一樣,如果原來是1,則擴容后下標=擴容前下標+16,由此我們可以得出結論:擴容前的長度為length的數組下標為n的元素,映射到擴容后數組的下標為n或n+length。
 
01111110 hash二進制 
00001111 掩碼15
-------------‘與’運算-----------
00001110 擴容前數組下標

01111110 hash二進制 
00011111 掩碼31
-------------‘與’運算-----------
00011110 擴容后數組下標,比擴容前下標大10000,轉換為十進制就是16
 
    假設我們有鏈表A[5] -> B[21] -> C[5] -> D[21] -> E[21] -> F[21],中括號內代表的是擴容后的數組下標。基於上面的原理,ConcurrentHashMap擴容是先把鏈表后面的一整段連續相同下標的元素鏈(D[21] -> E[21] -> F[21])找出來,直接復用這個鏈,然后復制這個鏈之前的元素(A[5] -> B[21] -> C[5]),這樣就避免了所有元素的復制。
    事實上,根據JDK的描述:Statistically, at the default threshold, only about one-sixth of them need cloning when a table double,翻譯過來就是:據統計,使用默認的閾值,擴容時僅有1/6的數據需要復制。
 
 清單5:緩存數據的擴容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
 
void rehash() {
    /**
     * .... 部分代碼省略
     */

    for (int oldIndex = 0; oldIndex < oldCapacity; ++oldIndex) {
        HashEntry head = oldTable.get(oldIndex);
        if (head != null) {
            HashEntry next = head.next;
            int headIndex = head.getHash() & newMask;

            // next為空代表這個鏈表就只有一個元素,直接把這個元素設置到新數組中
            if (next == null) {

                newTable.set(headIndex, head);
            } else {
                // 有多個元素時
                HashEntry tail = head;

                int tailIndex = headIndex;
                // 從head開始,一直到鏈條末尾,找到最后一個下標與head下標不一致的元素
                for (HashEntry e = next; e != null; e = e.next) {

                    int newIndex = e.getHash() & newMask;
                    if (newIndex != tailIndex) { // 這里的找到后沒有退出循環,繼續找下一個不一致的下標
                        tailIndex = newIndex;

                        tail = e;
                    }
                }
                // 找到的是最后一個不一致的,所以tail往后的都是一致的下標
                newTable.set(tailIndex, tail);


                // 在這之前的元素下標有可能一樣,也有可能不一樣,所以把前面的元素重新復制一遍放到新數組中
                for (HashEntry e = head; e != tail; e = e.next) {

                    int newIndex = e.getHash() & newMask;
                    HashEntry newNext = newTable.get(newIndex);
                    HashEntry newFirst = copyEntry(e, newNext);
                    if (newFirst != null) {
                        newTable.set(newIndex, newFirst);
                    } else {
                        accessQueue.remove(e);
                        newCount--;
                    }
                }
            }
        }
    }
    table = newTable;
    this.count = newCount;
}
    
4.過期機制(expire)

    既然叫做緩存,則必定存在緩存過期的概念。為了提高性能,讀寫數據時需要自動延長緩存過期時間。又因為我們這里所講的緩存有持久化操作,則要求數據寫入DB之前緩存不能過期。
    數據擁有生命周期,我們可設置緩存的accessTime解決;讀寫數據時自動延長周期,也就是讀寫的時候都需要修改緩存的accessTime;那如何保證數據寫入DB前不能清除緩存呢?
    一種方法就是定期遍歷緩存中所有的元素,檢測緩存中數據是否全部寫入到庫中,如果已寫入且達到過期時間,則可移除此緩存。很明顯這種方式最大的問題在於需要定期檢測所有數據,也就是短期內會有高CPU負載;另一方面,緩存的過期時間不精准,因為緩存的過期是基於定期檢測的,不到定期檢測時間,緩存就會存在於內存中;還有一方面就是如果過期檢測到數據未保存到DB,則需要再延長數據的生命周期。

時間軸

    在平時,我們可能會了解到一個名詞:timeline,翻譯過來就是“時間軸”。請看下面一個簡單的示例
 
清單6:時間軸TimeLine
----進入時間最短-----Enter-->--D-->--C-->--B-->--A-->--進入時間最久-----
 
    我們的緩存數據就存在於這個時間軸上,如上例所示:數據A產生或變化后我們可以把它放到時間軸的Enter點,隨着時間的推移,B、C、D也都會依次放在Enter點,最終就形成了上面的一個時間軸。當有時間軸上的數據發生變更,我們再把它從時間軸上移除,當做新數據重新加入Enter點。
    那我們如何檢測數據有沒有過期?因為在時間軸上的數據都是有序的,問題就很簡單了,緩存的產生和改變都是有先后順序的,我們只要找到第一個沒過期的元素,則比它進入時間短的數據都是沒過期的。整個流程進一步看來就是一個可以刪除指定元素的先進先出隊列,基於這個原理可實現緩存的過期。

刪除指定元素的先進先出隊列AccessQueue

    目前存在一種常見做法-雙向鏈表形式的環狀隊列,在這種隊列中的元素提供了獲取前一個和后一個元素的引用,新的元素插入到鏈表的尾端,過期元素從頭部過期,而雙向鏈表一個很重要的特點就是它可以很方便的從隊列中間移除元素隊列AccessQueue繼承了AbstractQueue,擁有了隊列的基本功能,隊列內的元素都是ReferenceEntry,它有3個子類:Head(隊列頭)、HashEntry(隊列內元素)、NullEntry(主要用於移除隊列元素)。其中Head元素是一直存在的,默認其previousAccess和nextAccess都指向head自身。
 
清單7:接口ReferenceEntry
清單8:隊列AccessQueue的結構
    請看隊列AccessQueue的結構圖,這里我們假設隊列是按照逆時針添加元素的,則元素0、1、2是依次添加到隊列中的。
    數據移除:假設需要移除節點1,需要先把節點1的上個節點0和下個節點2鏈接起來,然后把節點1的previousAccess和nextAccess都鏈接到NullEntry,以確保元素1在JVM中無法再被引用,方便被GC回收。
    數據新增:假設需要增加節點1到tail,有可能節點1已經存在於鏈表中,則需要先把節點1的上個節點0和下個節點2鏈接起來,然后再添加到尾部。
 
 清單9:可以移除元素的先進先出隊列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
 
static final class AccessQueue extends AbstractQueue<ReferenceEntry> {
    // head代碼省略
    final ReferenceEntry head = XXX;

    // 其他部分代碼省略

    @Override
    public boolean offer(ReferenceEntry entry) {
        // 將上一個節點與下一個節點鏈接,也就是把entry從鏈表中移除
        connectAccessOrder(entry.getPreviousInAccessQueue(), entry.getNextInAccessQueue());


        // 添加到鏈表tail
        connectAccessOrder(head.getPreviousInAccessQueue(), entry);

        connectAccessOrder(entry, head);

        return true;
    }

    @Override
    public ReferenceEntry peek() {
        // 從head開始獲取
        ReferenceEntry next = head.getNextInAccessQueue();

        return (next == head) ? null : next;
    }

    @Override
    public boolean remove(Object o) {
        ReferenceEntry e = (ReferenceEntry) o;
        ReferenceEntry previous = e.getPreviousInAccessQueue();
        ReferenceEntry next = e.getNextInAccessQueue();
        // 將上一個節點與下一個節點鏈接
        connectAccessOrder(previous, next);

        // 方便GC回收
        nullifyAccessOrder(e);


        return next != NullEntry.INSTANCE;
    }
}

// 將previous與next鏈接起來
static void connectAccessOrder(ReferenceEntry previous, ReferenceEntry next) {

    previous.setNextInAccessQueue(next);
    next.setPreviousInAccessQueue(previous);
}

// 將nulled的previousAccess和nextAccess都設為nullEntry,方便GC回收nulled
static void nullifyAccessOrder(ReferenceEntry nulled) {

    ReferenceEntry nullEntry = nullEntry();
    nulled.setNextInAccessQueue(nullEntry);
    nulled.setPreviousInAccessQueue(nullEntry);
}

何時進行過期移除?

    在擁有了定制版的先進先出隊列,緩存過期就相對比較簡單了,我們只要把新增和修改的數據放到隊列尾部,然后從隊列首部依次判斷數據是否過期就可以了。那什么時候去執行這個操作呢?google的緩存是放在每次寫入操作或者每64次讀操作執行一次清理操作。一方面,因為緩存是不停在使用的,這就決定了過期的緩存不可能累積太多;另一方面,緩存的過期僅僅是時間點的判斷,速度非常快。所以這樣操作性能並沒有帶來性能的降低,但是卻帶來了緩存過期的准確性。
 
 清單10:讀操作執行清理 
1
2
3
4
5
6
7
 
void postReadCleanup() {
    // 作為位操作的mask(DRAIN_THRESHOLD),必須是(2^n)-1,也就是1111的二進制格式
    if ((readCount.incrementAndGet() & DRAIN_THRESHOLD) == 0) {

        // 代表每2^n執行一次
        cleanUp();

    }
}
 
 清單11:緩存過期移除 
1
2
3
4
5
6
7
8
9
10
11
 
void expireEntries(long now) {
    drainRecencyQueue();

    ReferenceEntry e;
    // 從頭部獲取,過期且已保存db則移除
    while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {

        if (e.getValue().isAllPersist()) {
            removeEntry((HashEntry) e, e.getHash());
        }
    }
}

5.個數統計(size)

    前面提到,緩存中數據是分為多個segment的,如果我們要統計緩存的大小,就要統計所有segment的大小后求和,我們是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?答案當然是否定的,雖然相加時可以獲取每個Segment的count的最新值,但是拿到之后可能累加前使用的count發生了變化,那么統計結果就不准了。那該如何解決這個問題?
    一個方法就是把所有segment都鎖定然后求和,很顯然這種方法效率非常低下,不可取。因此ConcurrentHashMap里提供了另一種解決方法,就是先嘗試2次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再采用加鎖的方式來統計所有Segment的大小。
    那么ConcurrentHashMap是如何判斷在統計的時候segment是否發生了變化呢?答案是使用modCount變量。每個segment中都有一個modCount變量,代表的是對segment中元素的數量造成影響的操作的次數,這個值只增不減。有了這個它就可以判定segment是否變化了。
    所以,size操作本質上就是兩次循環嘗試,失敗了則鎖定獲取,這種類似無鎖的操作方式對性能是有很大的提升,因為大部分情況下兩次循環嘗試就可以得到結果了。源碼相對比較簡單,有興趣的朋友可以自己去了解下,這里就不貼出來了。

總結:

    結合前2節的內容,至此我們就擁有了一個強大的緩存,它可以並發且高效的數據讀寫、數據加載和數據移除,支持數據過期控制和持久化的平衡。在下一節中我們會在此基礎上簡化緩存的使用,以方便日常的調用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM