參考《分布式java應用》一書,簡單過一遍並發包(java.util.concurrent)
ConcurrentHashMap
ConcurrentHashMap是線程安全的HashMap的實現。
1)添加
put(Object key , Object value)
ConcurrentHashMap並沒有采用synchronized進行控制,而是使用了ReentrantLock。
public V put(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); return segmentFor(hash).put(key, hash, value, false); }
這里計算出key的hash值,根據hash值獲取對應的數組中的segment對象。接下來的工作都交由segment完成。
segment可以看成是HashMap的一個部分,(ConcurrentHashMap基於concurrencyLevel划分出了多個segment來對key-value進行存儲)每次操作都只對當前segment進行鎖定,從而避免每次put操作鎖住整個map。
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; if (c++ > threshold) // ensure capacity rehash(); HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { oldValue = e.value; if (!onlyIfAbsent) e.value = value; } else { oldValue = null; ++modCount; tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // write-volatile } return oldValue; } finally { unlock(); } }
這個方法進來就上鎖(lock),並在finally中確保釋放鎖(unlock)。
添加key-value的過程中,先判斷當前存儲對象個數加1后是否大於threshold,如果大於則進行擴容(對象數組擴大兩倍,進行重新hash,轉移到新數組)。
如果不大於,則進行后續操作。通過對hash值和對象數組大小減1的值進行按位與操作(取余),得到當前key需要放入數組的位置,接着尋找對應位置上的hashEntry對象鏈表,並進行遍歷。
如果找到相同key值的Entry,則替換該Entry對象的value。
如果沒有找到就創建一個Entry對象,賦值給對應位置的數組對象,並構成鏈表。
注意:采用segment這種方式,在並發操作過程中,可以在很多程度上減少阻塞現象。
2)刪除
remove(Object key)
public V remove(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).remove(key, hash, null); }
和put類似,刪除也要根據hash先獲得segment,然后在segment上執行remove操作。
V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }
segment的remove操作,首先加鎖,然后對hash值與數組大小減1的值按位與操作,得到數組對應位置上的HashEntry對象,接下來遍歷此鏈表,查找hash值相等並且key相等(equals)的對象。
如果沒有找到,返回null,釋放鎖。
如果找到了,則重新創建位於刪除元素之前的所有HashEntry,位於其后的不用處理。釋放鎖!
3)獲取
get(Object key)
直接看看segment中的get操作,如下:
V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }
可以看出並沒有加鎖操作,只有v==null時,進入readValueUnderLock才有加鎖操作。
這里假設一種情況,例如兩條線程a、b,a執行get操作,b執行put操作。
當a執行到getFirst,與當前數組長度減1按位與操作后得到指定位置index,此時cpu將執行權交給b,b線程put一對key-value,導致擴容並重新hash排列,然后cpu又將執行權還給a,a然后根據之前的index去獲取HashEntry就會發生問題。
當然這種情況發生的概率很小。
4)遍歷
其實這個過程和讀取過程類似,讀取所有分段中的數據即可。
ConcurrentHashMap默認情況下采用將數據分為16個段進行存儲,並且每個段各自擁有自己的鎖,鎖僅用於put和remove等改變集合對象的操作,基於voliate及hashEntry鏈表的不變性實現讀取的不加鎖。
這些方式使得ConcurrentHashMap能夠保持極好的並發操作,尤其是對於讀遠比插入和刪除頻繁的map而言,而它采用的這些方法也可謂是對於java內存模型、並發機制深刻掌握的體現,是一個設計得非常不錯的支持高並發的集合對象。
——摘自《分布式java應用》
CopyOnWriteArrayList
CopyOnWriteArrayList是一個線程安全、並且在讀操作時無鎖的ArrayList。
1)添加
add(E e)
public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1);//復制數組 newElements[len] = e;//添加到末尾 setArray(newElements); return true; } finally { lock.unlock(); } }
這里同樣沒有使用synchronized關鍵字,而是使用ReentrantLock。
和ArrayList不同的是,這里每次都會創建一個新的object數組,大小比之前數組大1。將之前的數組復制到新數組,並將新加入的元素加到數組末尾。
2)刪除
remove(Object o)
public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (len != 0) { // Copy while searching for element to remove // This wins in the normal case of element being present int newlen = len - 1; Object[] newElements = new Object[newlen];//新建數組 for (int i = 0; i < newlen; ++i) { if (eq(o, elements[i])) { // found one; copy remaining and exit for (int k = i + 1; k < len; ++k) newElements[k-1] = elements[k]; setArray(newElements); return true; } else newElements[i] = elements[i]; } // special handling for last cell if (eq(o, elements[newlen])) { setArray(newElements); return true; } } return false; } finally { lock.unlock(); } }
此方法為什么這么直接進行數組的復制呢?為何不適用system的arrayCopy來完成?
3)獲取
get(int index)
public E get(int index) { return (E)(getArray()[index]); }
這里有可能臟讀。但是銷量非常高。
//通過看集合包和並發包可以看出一些不同的編程思路。這里為什么就不事先做范圍的檢查?
從上可見,CopyOnWriteArrayList基於ReentrantLock保證了增加元素和刪除元素動作的互斥。在讀操作上沒有任何鎖,這樣就保證了讀的性能,帶來的副作用是有時候可能會讀取到臟數據。
CopyOnWriteArraySet
CopyOnWriteArraySet是基於CopyOnWriteArrayList的,可以知道set是不容許重復數據的,因此add操作和CopyOnWriteArrayList有所區別,他是調用CopyOnWriteArrayList的addIfAbsent方法。
public boolean addIfAbsent(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // Copy while checking if already present. // This wins in the most common case where it is not present Object[] elements = getArray(); int len = elements.length; Object[] newElements = new Object[len + 1]; for (int i = 0; i < len; ++i) { if (eq(e, elements[i])) //如果存在,直接返回! return false; // exit, throwing away copy else newElements[i] = elements[i]; } newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
由此可見,addIfAbsent需要每次都遍歷,在add方面,CopyOnWriteArraySet效率要比CopyOnWriteArrayList低一點。
ArrayBlockingQueue
ArrayBlockingQueue是一個基於數組、先進先出、線程安全的集合類,其特點是實現指定時間的阻塞讀寫,並且容量是可以限制的。
1)創建
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
初始化鎖和兩個鎖上的Condition,一個為notEmpty,一個為notFull。
2)添加
offer(E e , long timeout , TimeUtil unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (count != items.length) { insert(e); return true; } if (nanos <= 0) return false; try { nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } }
這個方法將元素插入數組的末尾,如果數組滿,則進入等待,只到以下三種情況發生才繼續:
被喚醒、達到指定的時間、當前線程被中斷。
該方法首先將等待時間轉換成納秒。然后加鎖,如果數組未滿,則在末尾插入數據,如果數組已滿,則調用notFull.awaitNanos進行等待。如果被喚醒或超時,重新判斷是否滿。如果線程被interrupt,則直接拋出異常。
另外一個不帶時間的offer方法在數組滿的情況下不進去等待,而是直接返回false。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } }
同時還可以選擇put方法,此方法在數組已滿的情況下會一直等待,知道數組不為空或線程被interrupt。
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
3)獲取
poll(long timeout, TimeUnit unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (count != 0) { E x = extract(); return x; } if (nanos <= 0) return null; try { nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } }
poll獲取隊列中的第一個元素,如果隊列中沒有元素,則進入等待。
poll首先將制定timeout轉換成納秒,然后加鎖,如果數組個數不為0,則從當前對象數組中獲取最后一個元素,在獲取后將位置上的元素置為null。
如果數組中的元素個數為0,首先判斷timeout是否小於等於0,若小於0則直接返回null。若大於則進行等待,如果被喚醒或者超時,重新判斷數據元素個數是否大於0。
如果線程被interrupt,則直接拋出InterruptedException。
和offer一樣,不帶時間的poll方法在數組元素個數為0直接返回null,不進行等待。
take方法在數據為空的情況下會一直等待,只到數組不為空或者interrupt。