一、簡介
一個基於鏈接節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。
新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素。當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。此隊列不允許使用 null 元素。
offer和poll
poll()
獲取並移除此隊列的頭,如果此隊列為空,則返回 null。
public static void main(String[] args) { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("哈哈哈"); System.out.println("offer后,隊列是否空?" + queue.isEmpty()); System.out.println("從隊列中poll:" + queue.poll()); System.out.println("pool后,隊列是否空?" + queue.isEmpty()); }
offer是往隊列添加元素,poll是從隊列取出元素並且刪除該元素
執行結果
offer后,隊列是否空?false 從隊列中poll:哈哈哈 pool后,隊列是否空?true
ConcurrentLinkedQueue中的add() 和 offer() 完全一樣,都是往隊列尾部添加元素
還有個取元素方法peek
peek()
獲取但不移除此隊列的頭;如果此隊列為空,則返回 null
public static void main(String[] args) { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("哈哈哈"); System.out.println("offer后,隊列是否空?" + queue.isEmpty()); System.out.println("從隊列中peek:" + queue.peek()); System.out.println("從隊列中peek:" + queue.peek()); System.out.println("從隊列中peek:" + queue.peek()); System.out.println("pool后,隊列是否空?" + queue.isEmpty()); }
執行結果:
offer后,隊列是否空?false 從隊列中peek:哈哈哈 從隊列中peek:哈哈哈 從隊列中peek:哈哈哈 pool后,隊列是否空?false
remove
remove(Object o)
從隊列中移除指定元素的單個實例(如果存在)
public static void main(String[] args) { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("哈哈哈"); System.out.println("offer后,隊列是否空?" + queue.isEmpty()); System.out.println("從隊列中remove已存在元素 :" + queue.remove("哈哈哈")); System.out.println("從隊列中remove不存在元素:" + queue.remove("123")); System.out.println("remove后,隊列是否空?" + queue.isEmpty()); }
remove一個已存在元素,會返回true,remove不存在元素,返回false
執行結果:
offer后,隊列是否空?false 從隊列中remove已存在元素 :true 從隊列中remove不存在元素:false remove后,隊列是否空?true
size or isEmpty
size()
返回此隊列中的元素數量
注意:
如果此隊列包含的元素數大於 Integer.MAX_VALUE,則返回 Integer.MAX_VALUE。 需要小心的是,與大多數 collection 不同,此方法不是 一個固定時間操作。由於這些隊列的異步特性,確定當前的元素數需要進行一次花費 O(n) 時間的遍歷。
所以在需要判斷隊列是否為空時,盡量不要用 queue.size()>0,而是用 !queue.isEmpty()
比較size()和isEmpty() 效率的示例:
場景:10000個人去飯店吃飯,10張桌子供飯,分別比較size() 和 isEmpty() 的耗時
public class Test01ConcurrentLinkedQueue { public static void main(String[] args) throws InterruptedException { int peopleNum = 10000;//吃飯人數 int tableNum = 10;//飯桌數量 ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); CountDownLatch count = new CountDownLatch(tableNum);//計數器 //將吃飯人數放入隊列(吃飯的人進行排隊) for(int i=1;i<=peopleNum;i++){ queue.offer("消費者_" + i); } //執行10個線程從隊列取出元素(10個桌子開始供飯) System.out.println("-----------------------------------開飯了-----------------------------------"); long start = System.currentTimeMillis(); ExecutorService executorService = Executors.newFixedThreadPool(tableNum); for(int i=0;i<tableNum;i++) { executorService.submit(new Dinner("00" + (i+1), queue, count)); } //計數器等待,知道隊列為空(所有人吃完) count.await(); long time = System.currentTimeMillis() - start; System.out.println("-----------------------------------所有人已經吃完-----------------------------------"); System.out.println("共耗時:" + time); //停止線程池 executorService.shutdown(); } private static class Dinner implements Runnable{ private String name; private ConcurrentLinkedQueue<String> queue; private CountDownLatch count; public Dinner(String name, ConcurrentLinkedQueue<String> queue, CountDownLatch count) { this.name = name; this.queue = queue; this.count = count; } @Override public void run() { //while (queue.size() > 0){ while (!queue.isEmpty()){ //從隊列取出一個元素 排隊的人少一個 System.out.println("【" +queue.poll() + "】----已吃完..., 飯桌編號:" + name); } count.countDown();//計數器-1 } } }
執行結果:
使用size耗時:757ms
使用isEmpty耗時:210
當數據量越大,這種耗時差距越明顯。所以這種判斷用isEmpty 更加合理
contains
contains(Object o)
如果此隊列包含指定元素,則返回 true
public static void main(String[] args) throws InterruptedException { ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("123"); System.out.println(queue.contains("123")); System.out.println(queue.contains("234")); }
執行結果:
toArray
toArray()
返回以恰當順序包含此隊列所有元素的數組
toArray(T[] a)
返回以恰當順序包含此隊列所有元素的數組;返回數組的運行時類型是指定數組的運行時類型
public static void main(String[] args) throws InterruptedException { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); queue.offer("123"); queue.offer("234"); Object[] objects = queue.toArray(); System.out.println(objects[0] + ", " + objects[1]); //將數據存儲到指定數組 String[] strs = new String[2]; queue.toArray(strs); System.out.println(strs[0] + ", " + strs[1]); }
執行結果:
iterator
iterator()
返回在此隊列元素上以恰當順序進行迭代的迭代器
public static void main(String[] args) throws InterruptedException { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); queue.offer("123"); queue.offer("234"); Iterator<String> iterator = queue.iterator(); while (iterator.hasNext()){ System.out.println(iterator.next()); } }
ConcurrentLinkedQueue文檔說明:
構造方法摘要 | |
---|---|
ConcurrentLinkedQueue() 創建一個最初為空的 ConcurrentLinkedQueue。 |
|
ConcurrentLinkedQueue(Collection<? extends E> c) 創建一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來添加元素。 |
方法摘要 | ||
---|---|---|
boolean |
add(E e) 將指定元素插入此隊列的尾部。 |
|
boolean |
contains(Object o) 如果此隊列包含指定元素,則返回 true。 |
|
boolean |
isEmpty() 如果此隊列不包含任何元素,則返回 true。 |
|
Iterator<E> |
iterator() 返回在此隊列元素上以恰當順序進行迭代的迭代器。 |
|
boolean |
offer(E e) 將指定元素插入此隊列的尾部。 |
|
E |
peek() 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。 |
|
E |
poll() 獲取並移除此隊列的頭,如果此隊列為空,則返回 null。 |
|
boolean |
remove(Object o) 從隊列中移除指定元素的單個實例(如果存在)。 |
|
int |
size() 返回此隊列中的元素數量。 |
|
Object[] |
toArray() 返回以恰當順序包含此隊列所有元素的數組。 |
|
|
toArray(T[] a) 返回以恰當順序包含此隊列所有元素的數組;返回數組的運行時類型是指定數組的運行時類型。 |
二、源代碼解析
offer操作是在鏈表末尾添加一個元素,下面看看實現原理。
public boolean offer(E e) { //e為null則拋出空指針異常 checkNotNull(e); //構造Node節點構造函數內部調用unsafe.putObject,后面統一講 final Node<E> newNode = new Node<E>(e); //從尾節點插入 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; //如果q=null說明p是尾節點則插入 if (q == null) { //cas插入(1) if (p.casNext(null, newNode)) { //cas成功說明新增節點已經被放入鏈表,然后設置當前尾節點(包含head,1,3,5.。。個節點為尾節點) if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q)//(2) //多線程操作時候,由於poll時候會把老的head變為自引用,然后head的next變為新head,所以這里需要 //重新找新的head,因為新的head后面的節點才是激活的節點 p = (t != (t = tail)) ? t : head; else // 尋找尾節點(3) p = (p != t && t != (t = tail)) ? t : q; } }
從構造函數知道一開始有個item為null的哨兵節點,並且head和tail都是指向這個節點,然后當一個線程調用offer時候首先
如圖首先查找尾節點,q==null,p就是尾節點,所以執行p.casNext通過cas設置p的next為新增節點,這時候p==t所以不重新設置尾節點為當前新節點。由於多線程可以調用offer方法,所以可能兩個線程同時執行到了(1)進行cas,那么只有一個會成功(假如線程1成功了),成功后的鏈表為:
失敗的線程會循環一次這時候指針為:
這時候會執行(3)所以p=q,然后在循環后指針位置為:
所以沒有其他線程干擾的情況下會執行(1)執行cas把新增節點插入到尾部,沒有干擾的情況下線程2 cas會成功,然后去更新尾節點tail,由於p!=t所以更新。這時候鏈表和指針為:
假如線程2cas時候線程3也在執行,那么線程3會失敗,循環一次后,線程3的節點狀態為:
這時候p!=t ;並且t的原始值為told,t的新值為tnew ,所以told!=tnew,所以 p=tnew=tail;
然后在循環一下后節點狀態:
q==null所以執行(1)。
現在就差p==q這個分支還沒有走,這個要在執行poll操作后才會出現這個情況。poll后會存在下面的狀態
這個時候添加元素時候指針分布為:
所以會執行(2)分支 結果 p=head
然后循環,循環后指針分布:
所以執行(1),然后p!=t所以設置tail節點。現在分布圖:
自引用的節點會被垃圾回收掉。
本節引自:http://www.importnew.com/25668.html ,可以參考此文。
三、concurrentLinkedQueue的特性
1、應用場景
按照適用的並發強度從低到高排列如下:
LinkedList/ArrayList 非線程安全,不能用於並發場景(List的方法支持棧和隊列的操作,因此可以用List封裝成stack和queue)
Collections.synchronizedList 使用wrapper class封裝,每個方法都用synchronized(mutex:Object)做了同步
LinkedBlockingQueue 采用了鎖分離的設計,避免了讀/寫操作沖突,且自動負載均衡,可以有界。BlockingQueue在生產-消費模式下首選【Iterator安全,不保證數據一致性】
ConcurrentLinkedQueue 適用於高並發讀寫操作,理論上有最高的吞吐量,無界,不保證數據訪問實時一致性,Iterator不拋出並發修改異常,采用CAS機制實現無鎖訪問。
綜上:
在並發的場景下,如果並發強度較小,性能要求不苛刻,且鎖可控的場景下,可使用Collections.synchronizedList,既保證了數據一致又保證了線程安全,性能夠用;
在大部分高並發場景下,建議使用 LinkedBlockingQueue ,性能與 ConcurrentLinkedQueue 接近,且能保證數據一致性;
ConcurrentLinkedQueue 適用於超高並發的場景,但是需要針對數據不一致采取一些措施。
2、特點
2.1 訪問操作采用了無鎖設計
2.2 Iterator的弱一致性,即不保證Iteartor訪問數據的實時一致性(與current組的成員與COW成員類似)
2.3 並發poll
2.4 並發add
2.5 poll/add並發
3、注意事項
3.1 size操作不是一個固定時長的操作(not a constant-time operation)
因為size需要遍歷整個queue,如果此時queue正在被修改,size可能返回不准確的數值(仍然是無法保證數據一致性),就像concurrentHashMap一樣,
要獲取size,需要取得所有的bucket的鎖,這是一個非常耗時的操作。因此如果需要保證數據一致性,頻繁獲取集合對象的size,最好不使用concurrent
族的成員。
3.2 批量操作(bulk operations like addAll,removeAll,equals)無法保證原子性,因為不保證實時性,且沒有使用獨占鎖的設計。
例如,在執行addAll的同時,有另外一個線程通過Iterator在遍歷,則遍歷的線程可能只看到一部分新增的數據。
3.3 ConcurrentLinkedQueue 沒有實現BlockingQueue接口
當隊列為空時,take方法返回null,此時consumer會需要處理這個情況,consumer會循環調用take來保證及時獲取數據,此為busy waiting,會持續消耗CPU資源。
4、與 LinkedBlockingQueue 的對比
LinkedBlockingQueue 采用了鎖分離的設計,put、get鎖分離,保證兩種操作的並發,但同一種操作,然后是鎖控制的。並且當隊列為空/滿時,某種操作
會被掛起。
4.1 並發性能
4.1.1 高並發put操作
可支持高並發場景下,多線程無鎖put操作
4.1.2 高並發的put/poll操作
多線程場景,同時put,遍歷,以及poll,均可無鎖操作。但不保證遍歷的實時一致性。
4.2 數據的實時一致性
兩者的Iterator都不不保證數據一致性,Iterator遍歷的是Iterator創建時已存在的節點,創建后的修改不保證能反應出來。
參考 LinkedBlockingQueue 的java doc關於Iterator的解釋:
The returned iterator is a "weakly consistent" iterator that will never throw ConcurrentModificationException, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.
4.3 遍歷操作(Iterator的遍歷操作的差異)
目前看來,沒有差異
4.4 size操作
LinkedBlockingQueue 的size是在內部用一個AtomicInteger保存,執行size操作直接獲取此原子量的當前值,時間復雜度O(1)。
ConcurrentLinkedQueue 的size操作需要遍歷(traverse the queue),因此比較耗時,時間復雜度至少為O(n),建議使用isEmpty()。
The java doc says the size() method is typically not very useful in concurrent applications.
5.LinkedBlockingQueue和ConcurrentLinkedQueue適用場景
當許多線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。
如果CLQ,那么我需要收到處理sleep
阻塞隊列:線程安全
按 FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部 是在隊列中時間最短的元素。新元素插入到隊列的尾部,並且隊列檢索操作會獲得位於隊列頭部的元素。鏈接隊列的吞吐量通常要高於基於數組的隊列,但是在大多數並發應用程序中,其可預知的性能要低。
注意:
1、必須要使用take()方法在獲取的時候達成阻塞結果
2、使用poll()方法將產生非阻塞效果
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class BlockingDeque { //阻塞隊列,FIFO private static LinkedBlockingQueue<Integer> concurrentLinkedQueue = new LinkedBlockingQueue<Integer>(); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Producer("producer1")); executorService.submit(new Producer("producer2")); executorService.submit(new Producer("producer3")); executorService.submit(new Consumer("consumer1")); executorService.submit(new Consumer("consumer2")); executorService.submit(new Consumer("consumer3")); } static class Producer implements Runnable { private String name; public Producer(String name) { this.name = name; } public void run() { for (int i = 1; i < 10; ++i) { System.out.println(name+ " 生產: " + i); //concurrentLinkedQueue.add(i); try { concurrentLinkedQueue.put(i); Thread.sleep(200); //模擬慢速的生產,產生阻塞的效果 } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } } static class Consumer implements Runnable { private String name; public Consumer(String name) { this.name = name; } public void run() { for (int i = 1; i < 10; ++i) { try { //必須要使用take()方法在獲取的時候阻塞 System.out.println(name+"消費: " + concurrentLinkedQueue.take()); //使用poll()方法 將產生非阻塞效果 //System.out.println(name+"消費: " + concurrentLinkedQueue.poll()); //還有一個超時的用法,隊列空時,指定阻塞時間后返回,不會一直阻塞 //但有一個疑問,既然可以不阻塞,為啥還叫阻塞隊列? //System.out.println(name+" Consumer " + concurrentLinkedQueue.poll(300, TimeUnit.MILLISECONDS)); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
非阻塞隊列
基於鏈接節點的、無界的、線程安全。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列檢索操作從隊列頭部獲得元素。當許多線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。此隊列不允許 null 元素。
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; public class NoBlockQueue { private static ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<Integer>(); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Producer("producer1")); executorService.submit(new Producer("producer2")); executorService.submit(new Producer("producer3")); executorService.submit(new Consumer("consumer1")); executorService.submit(new Consumer("consumer2")); executorService.submit(new Consumer("consumer3")); } static class Producer implements Runnable { private String name; public Producer(String name) { this.name = name; } public void run() { for (int i = 1; i < 10; ++i) { System.out.println(name+ " start producer " + i); concurrentLinkedQueue.add(i); try { Thread.sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //System.out.println(name+"end producer " + i); } } } static class Consumer implements Runnable { private String name; public Consumer(String name) { this.name = name; } public void run() { for (int i = 1; i < 10; ++i) { try { System.out.println(name+" Consumer " + concurrentLinkedQueue.poll()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } // System.out.println(); // System.out.println(name+" end Consumer " + i); } } } }
在並發編程中,一般推薦使用阻塞隊列,這樣實現可以盡量地避免程序出現意外的錯誤。阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,然后解析線程不斷從隊列取數據解析。還有其他類似的場景,只要符合生產者-消費者模型的都可以使用阻塞隊列。
使用非阻塞隊列,雖然能即時返回結果(消費結果),但必須自行編碼解決返回為空的情況處理(以及消費重試等問題)。
另外他們都是線程安全的,不用考慮線程同步問題。