在1.7和1.8版本中,計算size()方法有寫不同。先介紹1.7版本的實現。
1.7版本
在1.7版本中,有一個重要的類Segment
,利用它來實現分段鎖
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
// 最大嘗試獲取鎖次數,tryLock可能會阻塞,准備鎖住segment操作獲取鎖。
在多處理器中,用一個有界的嘗試次數,保證在定位node的時候,可以從緩存直接獲取。
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
//segment內部的Hash table,訪問HashEntry,通過具有volatile的entryAt/setEntryAt方法
transient volatile HashEntry<K,V>[] table;
//segment的table中HashEntry的數量,只有在lock或其他保證可見性的volatile reads中,才可以訪問count
transient int count;
//在segment上所有的修改操作數。盡管可能會溢出,但它為isEmpty和size方法,
提供了有效准確穩定的檢查或校驗。只有在lock或其他保證可見性的volatile reads
中,才可以訪問
transient int modCount;
transient int threshold;
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
}
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
}
剛一開始不加鎖,前后計算兩次所有segment里面的數量大小和,兩次結果相等,表明沒有新的元素加入,計算的結果是正確的。如果不相等,就對每個segment加鎖,再進行計算,返回結果並釋放鎖。
public int size() {
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
1.8版本
先利用sumCount()
計算,然后如果值超過int的最大值,就返回int的最大值。但是有時size就會超過最大值,這時最好用mappingCount
方法
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
sumCount有兩個重要的屬性baseCount
和counterCells
,如果counterCells
不為空,那么總共的大小就是baseCount與遍歷counterCells
的value值累加獲得的。
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
baseCount是從哪里來的?
//當沒有線程爭用時,使用這個變量計數
private transient volatile long baseCount;
一個volatile變量,在addCount方法會使用它,而addCount方法在put結束后會調用
addCount(1L, binCount);
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))
從上可知,在put操作結束后,會調用addCount,更新計數。
在並發情況下,如果CAS修改baseCount失敗后,就會使用到CounterCell類,會創建一個對象,通常對象的volatile的value屬性是1。
// 一種用於分配計數的填充單元。改編自LongAdder和Striped64。請查看他們的內部文檔進行解釋。
@sun.misc.Contended
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
並發時,利用CAS修改baseCount失敗后,會利用CAS操作修改CountCell的值,
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
如果上面CAS操作也失敗了,在fullAddCount方法中,會繼續死循環操作,知道成功。
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}