Java之集合(十六)ArrayBlockingQueue


  轉載請注明源出處:http://www.cnblogs.com/lighten/p/7427763.html

1.前言

  JDK5是一個重要的更新版本,其提供了大量的並發類。之前的介紹都是一些util下早期的集合類,本章開始介紹JDK5提供的並發包中所給出的在多線程下,線程安全的集合類。首先介紹的是Queue隊列,之前就介紹了2個:ArrayDeque和PriorityQueue。隊列算是天然需要線程安全的,因為其特性適用場景大多數都是並發操作,進行排隊。所以JDK5並發包中提供了大量不同特性的線程安全的隊列。本章從ArrayBlockingQueue開始介紹。

2.ArrayBlockingQueue

  ArrayBlockingQueue的數據結構和ArrayDeque十分相似,但是多了一些並發所需要的字段,比如鎖。默認使用的所是非公平鎖。

  初始化了容量和鎖的相關設置。

  offer方法可以隊列是不允許出現空的對象。使用的方法也是加鎖來確保線程安全,如果隊列滿了就會返回false,入隊列失敗。

  入隊列也是放入目前要放的位置,再找到下一個要放的位置,整個都是通過count計數來判斷隊列是否滿了,也不用擔心覆蓋了數據,因為入隊列的時候判斷了當前數量是否等於容量。最后發出隊列非空信號。

  poll方法會取出隊列的數據,如果count是0,自然是null。否則進行出隊列操作。

  取出當前下標的內容,並置為null,找到下一個要取出的位置。數量減一,迭代器存在也會處理取出迭代器。然后發出一個隊列非滿信號。

  之前所講的非空和非滿信號還是有所作用的。

  放入一個對象時,如果隊列滿了但是還是想放進去,就需要等待隊列有空位。隊列在移除一個對象時就會發出一個非滿信號。上面的放入方法就是用到了這個信號。如果隊列滿了其會等待非滿信號,直到超時。如果不希望超時,就是想放進去,可以使用put方法,不過其可能會拋出中斷異常。

  poll方法一樣,如果隊列為空自然取不到,就一直等待一個非空的信號。不希望超時可以使用take方法。

  值得一提的是該類的迭代器,所有的迭代器共享數據,隊列改變會影響所有的迭代器。為了保證正確,增加了許多復雜的操作,但是由於循環數組和一些內部移除會導致迭代器丟失它們的位置,或顯示一些它們不應該顯示的元素。為了避免這個情況,當隊列有一個或多個迭代器的時候,其通過以下手段保持狀態:

  1.跟蹤循環的次數。即 takeIndex為0的次數。

  2.每當刪除一個內部元素時,通過回調通知所有迭代器(因此其他元素也可以移動)。

  以上JDK注解難以理解,看代碼。queue中有一個字段是itrs,其存放了目前所創建的所有迭代器。

  每個迭代器都是一個節點,節點是虛引用。

  迭代器創建的時候會加入到這個itrs中。doSomeSweeping是用來清理無用的迭代器。在迭代器創建和detach的時候會觸發。sweeper字段就是記錄上次掃描到的位置。如果為null,就從鏈表頭開始掃描,有就從其下一個開始掃描。如果找到了一個被回收了或者是耗盡的迭代器,就清理掉它,重新建立鏈表的連接,繼續找下一個。這就完成了對無效迭代器的清理了。

  之前我們都看到,在出隊列的時候,調用了itrs的remove方法,下面是該方法做的具體事情:

  隊列中數量為0的時候,隊列就是空的,會將所有迭代器進行清理並移除。否者如果takeIndex的下標是0,意味着隊列從尾中取完了,又回到頭部獲取,此時的操作就是上面JDK注解1條件,實質上跟蹤的是隊列的循環次數,計數器cycles自增1,然后對所有的迭代器進行清理,清除耗盡了的迭代器。

  下面仔細研究一下ArrayBlockingQueue的迭代器實現,看看其是共享隊列數據的含義是什么。還是從迭代器的構造函數看起:

  count等於0的時候,創建的這個迭代器是個無用的迭代器,可以直接移除,進入detach模式。否則就把當前隊列的讀取位置給迭代器當做下一個元素,cursor存儲下個元素的位置。

  hashNext()先判斷nextItem有沒有值了,沒有值的時候觸發noNext函數,這個函數做的就是調整修正隊列。這里先提一下並發中迭代器會發生什么問題。ArrayBlockingQueue的實現是一個循環數組,使用takeIndex和putIndex來控制元素的出入隊列。這樣就產生了一個問題,迭代器在創建的時候,其位置已經確定,但是隊列可能在不斷的出入隊列,這樣迭代器會受到嚴重影響,可能造成隊列實際上入出循環了數組一圈,而迭代器記錄的是上一圈的情況,只有下標,這樣遍歷就會造成很大的問題。所以才需要上面所說的2點來保證,第一個就是每個迭代器記錄當前其循環的圈數,第二個就是隊列元素出隊列時影響所有的迭代器。這個修正隊列就是在當前迭代器遍歷完成之后,比較一下圈數來判斷具體情況,圈數和隊列讀取下標和迭代器讀取下標來判斷是否要廢棄該迭代器。

  next方法也會在迭代器沒有被廢棄的時候比較一下當前隊列的情況。之后就是更新上次的下標和游標已經下一個值。

  其它的方法就不再描述,現在給一些測試用例,來印證上面所描述的內容,更為直觀的表現其迭代器的使用。由於有鎖實際上也是順序操作,所以例子在單線程中模擬。

    @Test
	public void test() {
		ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
		queue.offer(1);
		queue.offer(2);	// 放入2個元素
		Iterator<Integer> it1 = queue.iterator();
		Iterator<Integer> it2 = queue.iterator();
		Iterator<Integer> it3 = queue.iterator();
		Iterator<Integer> it4 = queue.iterator();
		Iterator<Integer> it5 = queue.iterator();
		while(it1.hasNext()) {
			System.out.print(it1.next()+",");
		}
		System.out.println();
		queue.poll();				// 清除第一個元素,再放入數據
		queue.offer(3);				// 填充一個元素,測試可以追上的情況,隊列中有2,3
		while(it2.hasNext()) {
			System.out.print(it2.next()+",");
		}
		System.out.println();
		queue.poll();				// 再移除一個元素,此時1,2元素都移除了
		// 迭代器無法獲取元素2,但是隊列只相差一圈
		while(it3.hasNext()){
			System.out.print(it3.next()+",");
		}
		System.out.println();
		queue.offer(4);		// 隊列中有3,4
		queue.poll();
		while(it4.hasNext()) {
			System.out.print(it4.next()+",");
		}
		System.out.println();
		queue.poll();
		queue.offer(5);
		while(it5.hasNext()) {
			System.out.print(it5.next()+",");
		}
	}

  這5個迭代器都是在隊列滿的時候生成的,之后第一個迭代器正常遍歷,第2個迭代器是在隊列出了一個元素,又入了一個元素的時候,此時還是出現了所有的元素,隊列依舊是滿的。第3個迭代器是又出了一個元素,隊列中只剩下3,迭代器依舊沒影響,還是輸出了1,3。第4個迭代器放入一個元素,又移除一個元素,此時應該還剩4,輸出的也是1,4。問題在第5個迭代器,移除了一個元素,再添加一個元素,此時應該是從新回到位置2次了,所以5只輸出了一個1。這個就判斷迭代器失效了。再放入一個元素測試是一樣的。

  可能會有疑問,迭代器5先是12,然后34,最后填到5的時候才廢棄,不是超了兩圈?實際上,如果隊列中有1,2 全部取出隊列,再填3,這樣一圈迭代器就只能輸出1了就被廢棄了。

  上面只是個人的研究,實際上隊列使用迭代器的需求也不算大。使用到的時候就需要好好研究一下了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM