ConcurrentHashMap1.7和1.8對比


ConcurrentHashMap1.7和1.8對比

數據結構

1.7中采用Segment+HashEntry的方式實現

ConcurrentHashMap初始化時,計算出Segment數組的大小ssize和每個SegmentHashEntry數組的大小cap,並初始化Segment數組的第一個元素;

其中ssize大小為2的冪次方默認為16cap大小也是2的冪次方最小值為2,最終結果根據初始化容量initialCapacity進行計算,計算過程如下

if (c * ssize < initialCapacity)
    ++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
    cap <<= 1;

因為Segment繼承了ReentrantLock,所有segment是線程安全的

1.8中放棄了Segment分段鎖的設計,使用的是Node+CAS+Synchronized來保證線程安全性

只有在第一次執行put方法是才會初始化Node數組

 

put操作

1.7 put

當執行put方法插入數據的時候,根據key的hash值,在Segment數組中找到對應的位置

如果當前位置沒有值,則通過CAS進行賦值,接着執行Segmentput方法通過加鎖機制插入數據

假如有線程AB同時執行相同Segmentput方法

線程A 執行tryLock方法成功獲取鎖,然后把HashEntry對象插入到相應位置

線程B 嘗試獲取鎖失敗,則執行scanAndLockForPut()方法,通過重復執行tryLock()方法嘗試獲取鎖

多處理器環境重復64次,單處理器環境重復1次,當執行tryLock()方法的次數超過上限時,則執行lock()方法掛起線程B

當線程A執行完插入操作時,會通過unlock方法施放鎖,接着喚醒線程B繼續執行

1.8 put

當執行put方法插入數據的時候,根據key的hash值在Node數組中找到相應的位置

如果當前位置的Node還沒有初始化,則通過CAS插入數據

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    //如果當前位置的`Node`還沒有初始化,則通過CAS插入數據
    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
        break;                   // no lock when adding to empty bin
}

 

如果當前位置的Node已經有值,則對該節點加synchronized鎖,然后從該節點開始遍歷,直到插入新的節點或者更新新的節點

if (fh >= 0) {
    binCount = 1;
    for (Node<K,V> e = f;; ++binCount) {
        K ek;
        if (e.hash == hash &&
            ((ek = e.key) == key ||
             (ek != null && key.equals(ek)))) {
            oldVal = e.val;
            if (!onlyIfAbsent)
                e.val = value;
            break;
        }
        Node<K,V> pred = e;
        if ((e = e.next) == null) {
            pred.next = new Node<K,V>(hash, key, value, null);
            break;
        }
    }
}

 

如果當前節點是TreeBin類型,說明該節點下的鏈表已經進化成紅黑樹結構,則通過putTreeVal方法向紅黑樹中插入新的節點

else if (f instanceof TreeBin) {
   Node<K,V> p;
   binCount = 2;
   if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
       oldVal = p.val;
       if (!onlyIfAbsent)
           p.val = value;
  }
}

如果binCount不為0,說明put操作對數據產生了影響,如果當前鏈表的節點個數達到了8個,則通過treeifyBin方法將鏈表轉化為紅黑樹

 

size 操作

1.7 size實現

統計每個segment對象中的元素個數,然后進行累加

但是這種方式計算出來的結果不一定准確

因為在計算后面的segment的元素個數時

前面計算過了的segment可能有數據的新增或刪除

計算方式為:

先采用不加鎖的方式,連續計算兩次

如果兩次結果相等,說明計算結果准確

如果兩次結果不相等,說明計算過程中出現了並發新增或者刪除操作

於是給每個segment加鎖,然后再次計算

try {
    for (;;) {
        if (retries++ == RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                ensureSegment(j).lock(); // force creation
        }
        sum = 0L;
        size = 0;
        overflow = false;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K,V> seg = segmentAt(segments, j);
            if (seg != null) {
                sum += seg.modCount;
                int c = seg.count;
                if (c < 0 || (size += c) < 0)
                    overflow = true;
            }
        }
        if (sum == last)
            break;
        last = sum;
    }
} finally {
    if (retries > RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            segmentAt(segments, j).unlock();
    }
}

 

1.8 size實現

使用一個volatile類型的變量baseCount記錄元素的個數

當新增或者刪除節點的時候會調用,addCount()更新baseCount

if ((as = counterCells) != null ||
    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    CounterCell a; long v; int m;
    boolean uncontended = true;
    if (as == null || (m = as.length - 1) < 0 ||
        (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
        !(uncontended =
          U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
        fullAddCount(x, uncontended);
        return;
    }
    if (check <= 1)
        return;
    s = sumCount();
}

初始化時counterCells為空

在並發量很高時,如果存在兩個線程同時執行CAS修改baseCount

則失敗的線程會繼續執行方法體中的邏輯

使用CounterCell記錄元素個數的變化

 

如果CounterCell數組counterCells為空

調用fullAddCount()方法進行初始化

並插入對應的記錄數,通過CAS設置cellsBusy字段

只有設置成功的線程才能初始化CounterCell數組

else if (cellsBusy == 0 && counterCells == as &&
         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    boolean init = false;
    try {                           // Initialize table
        if (counterCells == as) {
            CounterCell[] rs = new CounterCell[2];
            rs[h & 1] = new CounterCell(x);
            counterCells = rs;
            init = true;
        }
    } finally {
        cellsBusy = 0;
    }
    if (init)
        break;
}

如果通過CAS設置cellsBusy字段失敗的話

則繼續嘗試通過CAS修改baseCount字段

如果修改baseCount字段成功的話,就退出循環

else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
    break; 

否則繼續循環插入CounterCell對象;

所以在1.8中的size實現比1.7簡單多,因為元素個數保存baseCount中,部分元素的變化個數保存在CounterCell數組中,實現如下:

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
​
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

通過累加baseCountCounterCell數組中的數量,即可得到元素的總個數;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM