寫這篇文章源於我經歷過的一次生產事故,在某家公司的時候,有個服務會收集業務系統的日志,此服務的開發人員在給業務系統的sdk中就因為使用了LinkedList,又沒有做並發控制,就造成了此服務經常不能正常收集到業務系統的日志(丟日志以及日志上報的線程停止運行)。看一下add()方法的源碼,我們就可以知道原因了:
public boolean add(E e) { linkLast(e);//調用linkLast,在隊列尾部添加元素 return true; } void linkLast(E e) { final Node<E> l = last; final Node<E> newNode = new Node<>(l, e, null); last = newNode; if (l == null) first = newNode; else l.next = newNode; size++;//多線程情況下,如果業務系統沒做並發控制,size的數量會遠遠大於實際元素的數量 modCount++; }
demo Lesson2LinkedListThreads 展示了在多線程且沒有做並發控制的環境下,size的值遠遠大於了隊列的實際值,100個線程,每個添加1000個元素,最后實際只加進去2030個元素:
List的變量size值為:88371
第2031個元素取出為null
解決方案,使用鎖或者使用ConcurrentLinkedQueue、LinkedBlockingQueue等支持添加元素為原子操作的隊列。
上一節我們已經分析過LinkedBlockingQueue的put等方法的源碼,是使用ReentrantLock來實現的添加元素原子操作。我們再簡單看一下高並發queue的add和offer()方法,方法中使用了CAS來實現的無鎖的原子操作:
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } }
接下來,我們再利用高並發queue對上面的demo進行改造,大家只要改變demo中的內容,講下面兩行的注釋內容顛倒,即可發現沒有丟失任何的元素:
public static LinkedList list = new LinkedList();
//public static ConcurrentLinkedQueue list = new ConcurrentLinkedQueue();
再看一下高性能queue的poll()方法,才覺得NB,取元素的方法也用CAS實現了原子操作,因此在實際使用的過程中,當我們在不那么在意元素處理順序的情況下,隊列元素的消費者,完全可以是多個,不會丟任何數據:
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }
關於ConcurrentLinkedQueue和LinkedBlockingQueue:
1.LinkedBlockingQueue是使用鎖機制,ConcurrentLinkedQueue是使用CAS算法,雖然LinkedBlockingQueue的底層獲取鎖也是使用的CAS算法
2.關於取元素,ConcurrentLinkedQueue不支持阻塞去取元素,LinkedBlockingQueue支持阻塞的take()方法,如若大家需要ConcurrentLinkedQueue的消費者產生阻塞效果,需要自行實現
3.關於插入元素的性能,從字面上和代碼簡單的分析來看ConcurrentLinkedQueue肯定是最快的,但是這個也要看具體的測試場景,我做了兩個簡單的demo做測試,測試的結果如下,兩個的性能差不多,但在實際的使用過程中,尤其在多cpu的服務器上,有鎖和無鎖的差距便體現出來了,ConcurrentLinkedQueue會比LinkedBlockingQueue快很多:
demo Lesson2ConcurrentLinkedQueuePerform:在使用ConcurrentLinkedQueue的情況下100個線程循環增加的元素數為:33828193
demo Lesson2LinkedBlockingQueuePerform:在使用LinkedBlockingQueue的情況下100個線程循環增加的元素數為:33827382