一、SynchronousQueue簡介
Java 6的並發編程包中的SynchronousQueue是一個沒有數據緩沖的BlockingQueue,生產者線程對其的插入操作put必須等待消費者的移除操作take,反過來也一樣。
不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue內部並沒有數據緩存空間,你不能調用peek()方法來看隊列中是否有數據元素,因為數據元素只有當你試着取走的時候才可能存在,不取走而只想偷窺一下是不行的,當然遍歷這個隊列的操作也是不允許的。隊列頭元素是第一個排隊要插入數據的線程,而不是要交換的數據。數據是在配對的生產者和消費者線程之間直接傳遞的,並不會將數據緩沖數據到隊列中。可以這樣來理解:生產者和消費者互相等待對方,握手,然后一起離開。
特點:
1、不能在同步隊列上進行 peek,因為僅在試圖要取得元素時,該元素才存在;
2、除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)添加元素;也不能迭代隊列,因為其中沒有元素可用於迭代。隊列的頭是嘗試添加到隊列中的首個已排隊線程元素; 如果沒有已排隊線程,則不添加元素並且頭為 null。
3、對於其他 Collection 方法(例如 contains),SynchronousQueue 作為一個空集合。此隊列不允許 null 元素。
4、它非常適合於傳遞性設計,在這種設計中,在一個線程中運行的對象要將某些信息、事件或任務傳遞給在另一個線程中運行的對象,它就必須與該對象同步。
5、對於正在等待的生產者和使用者線程而言,此類支持可選的公平排序策略。默認情況下不保證這種排序。 但是,使用公平設置為 true 所構造的隊列可保證線程以 FIFO 的順序進行訪問。 公平通常會降低吞吐量,但是可以減小可變性並避免得不到服務。
6、SynchronousQueue的以下方法:
* iterator() 永遠返回空,因為里面沒東西。
* peek() 永遠返回null。
* put() 往queue放進去一個element以后就一直wait直到有其他thread進來把這個element取走。
* offer() 往queue里放一個element后立即返回,如果碰巧這個element被另一個thread取走了,offer方法返回true,認為offer成功;否則返回false。
* offer(2000, TimeUnit.SECONDS) 往queue里放一個element但是等待指定的時間后才返回,返回的邏輯和offer()方法一樣。
* take() 取出並且remove掉queue里的element(認為是在queue里的。。。),取不到東西他會一直等。
* poll() 取出並且remove掉queue里的element(認為是在queue里的。。。),只有到碰巧另外一個線程正在往queue里offer數據或者put數據的時候,該方法才會取到東西。否則立即返回null。
* poll(2000, TimeUnit.SECONDS) 等待指定的時間然后取出並且remove掉queue里的element,其實就是再等其他的thread來往里塞。
* isEmpty()永遠是true。
* remainingCapacity() 永遠是0。
* remove()和removeAll() 永遠是false。
SynchronousQueue 內部沒有容量,但是由於一個插入操作總是對應一個移除操作,反過來同樣需要滿足。那么一個元素就不會再SynchronousQueue 里面長時間停留,一旦有了插入線程和移除線程,元素很快就從插入線程移交給移除線程。也就是說這更像是一種信道(管道),資源從一個方向快速傳遞到另一方 向。顯然這是一種快速傳遞元素的方式,也就是說在這種情況下元素總是以最快的方式從插入着(生產者)傳遞給移除着(消費者),這在多任務隊列中是最快處理任務的方式。在線程池里的一個典型應用是Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)創建新的線程,如果有空閑線程則會重復使用,線程空閑了60秒后會被回收。
二、 使用示例
package com.dxz.queue.block; import java.util.concurrent.SynchronousQueue; public class SynchronousQueueDemo { public static void main(String[] args) throws InterruptedException { final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>(); Thread putThread = new Thread(new Runnable() { @Override public void run() { System.out.println("put thread start"); try { queue.put(1); } catch (InterruptedException e) { } System.out.println("put thread end"); } }); Thread takeThread = new Thread(new Runnable() { @Override public void run() { System.out.println("take thread start"); try { System.out.println("take from putThread: " + queue.take()); } catch (InterruptedException e) { } System.out.println("take thread end"); } }); putThread.start(); Thread.sleep(1000); takeThread.start(); } }
結果:
put thread start take thread start take from putThread: 1 take thread end put thread end
三、實現原理
3.1、阻塞算法實現
3.1.1、使用wait和notify實現
阻塞算法實現通常在內部采用一個鎖來保證多個線程中的put()和take()方法是串行執行的。采用鎖的開銷是比較大的,還會存在一種情況是線程A持有線程B需要的鎖,B必須一直等待A釋放鎖,即使A可能一段時間內因為B的優先級比較高而得不到時間片運行。所以在高性能的應用中我們常常希望規避鎖的使用。
package com.dxz.queue.block; public class NativeSynchronousQueue<E> { boolean putting = false; E item = null; public synchronized E take() throws InterruptedException { while (item == null) wait(); E e = item; item = null; notifyAll(); return e; } public synchronized void put(E e) throws InterruptedException { if (e == null) return; while (putting) wait(); putting = true; item = e; notifyAll(); while (item != null) wait(); putting = false; notifyAll(); } } package com.dxz.queue.block; public class NativeSynchronousQueueTest { public static void main(String[] args) throws InterruptedException { final NativeSynchronousQueue<String> queue = new NativeSynchronousQueue<String>(); Thread putThread = new Thread(new Runnable() { @Override public void run() { System.out.println("put thread start"); try { queue.put("1"); } catch (InterruptedException e) { } System.out.println("put thread end"); } }); Thread takeThread = new Thread(new Runnable() { @Override public void run() { System.out.println("take thread start"); try { System.out.println("take from putThread: " + queue.take()); } catch (InterruptedException e) { } System.out.println("take thread end"); } }); putThread.start(); Thread.sleep(1000); takeThread.start(); } }
結果:
put thread start
take thread start
put thread end
take from putThread: 1
take thread end
3.1.2、信號量實現
經典同步隊列實現采用了三個信號量,代碼很簡單,比較容易理解:
package com.dxz.queue.block; import java.util.concurrent.Semaphore; public class SemaphoreSynchronousQueue<E> { E item = null; Semaphore sync = new Semaphore(0); Semaphore send = new Semaphore(1); Semaphore recv = new Semaphore(0); public E take() throws InterruptedException { recv.acquire(); E x = item; sync.release(); send.release(); return x; } public void put (E x) throws InterruptedException{ send.acquire(); item = x; recv.release(); sync.acquire(); } } package com.dxz.queue.block; public class SemaphoreSynchronousQueueTest { public static void main(String[] args) throws InterruptedException { final SemaphoreSynchronousQueue<String> queue = new SemaphoreSynchronousQueue<String>(); Thread putThread = new Thread(new Runnable() { @Override public void run() { System.out.println("put thread start"); try { queue.put("1"); } catch (InterruptedException e) { } System.out.println("put thread end"); } }); Thread takeThread = new Thread(new Runnable() { @Override public void run() { System.out.println("take thread start"); try { System.out.println("take from putThread: " + queue.take()); } catch (InterruptedException e) { } System.out.println("take thread end"); } }); putThread.start(); Thread.sleep(1000); takeThread.start(); } } 結果: put thread start take thread start take from putThread: 1 take thread end put thread end
在多核機器上,上面方法的同步代價仍然較高,操作系統調度器需要上千個時間片來阻塞或喚醒線程,而上面的實現即使在生產者put()時已經有一個消費者在等待的情況下,阻塞和喚醒的調用仍然需要。
3.1.3、Java 5實現
package com.dxz.queue.block; import java.util.Queue; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.ReentrantLock; public class Java5SynchronousQueue<E> { ReentrantLock qlock = new ReentrantLock(); Queue waitingProducers = new Queue(); Queue waitingConsumers = new Queue(); static class Node extends AbstractQueuedSynchronizer { E item; Node next; Node(Object x) { item = x; } void waitForTake() { /* (uses AQS) */ } E waitForPut() { /* (uses AQS) */ } } public E take() { Node node; boolean mustWait; qlock.lock(); node = waitingProducers.pop(); if(mustWait = (node == null)) node = waitingConsumers.push(null); qlock.unlock(); if (mustWait) return node.waitForPut(); else return node.item; } public void put(E e) { Node node; boolean mustWait; qlock.lock(); node = waitingConsumers.pop(); if (mustWait = (node == null)) node = waitingProducers.push(e); qlock.unlock(); if (mustWait) node.waitForTake(); else node.item = e; } }
Java 5的實現相對來說做了一些優化,只使用了一個鎖,使用隊列代替信號量也可以允許發布者直接發布數據,而不是要首先從阻塞在信號量處被喚醒。
3.1.4、Java6實現
Java 6的SynchronousQueue的實現采用了一種性能更好的無鎖算法 — 擴展的“Dual stack and Dual queue”算法。性能比Java5的實現有較大提升。競爭機制支持公平和非公平兩種:非公平競爭模式使用的數據結構是后進先出棧(Lifo Stack);公平競爭模式則使用先進先出隊列(Fifo Queue),性能上兩者是相當的,一般情況下,Fifo通常可以支持更大的吞吐量,但Lifo可以更大程度的保持線程的本地化。
代碼實現里的Dual Queue或Stack內部是用鏈表(LinkedList)來實現的,其節點狀態為以下三種情況:
- 持有數據 – put()方法的元素
- 持有請求 – take()方法
- 空
這個算法的特點就是任何操作都可以根據節點的狀態判斷執行,而不需要用到鎖。
其核心接口是Transfer,生產者的put或消費者的take都使用這個接口,根據第一個參數來區別是入列(棧)還是出列(棧)。
/** * Shared internal API for dual stacks and queues. */ static abstract class Transferer { /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item * offered by producer. * @param timed if this operation should timeout * @param nanos the timeout, in nanoseconds * @return if non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking Thread.interrupted. */ abstract Object transfer(Object e, boolean timed, long nanos); }
TransferQueue實現如下(摘自Java 6源代碼),入列和出列都基於Spin和CAS方法:
/** * Puts or takes an item. */ Object transfer(Object e, boolean timed, long nanos) { /* Basic algorithm is to loop trying to take either of * two actions: * * 1. If queue apparently empty or holding same-mode nodes, * try to add node to queue of waiters, wait to be * fulfilled (or cancelled) and return matching item. * * 2. If queue apparently contains waiting items, and this * call is of complementary mode, try to fulfill by CAS'ing * item field of waiting node and dequeuing it, and then * returning matching item. * * In each case, along the way, check for and try to help * advance head and tail on behalf of other stalled/slow * threads. * * The loop starts off with a null check guarding against * seeing uninitialized head or tail values. This never * happens in current SynchronousQueue, but could if * callers held non-volatile/final ref to the * transferer. The check is here anyway because it places * null checks at top of loop, which is usually faster * than having them implicitly interspersed. */ QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t != tail) // inconsistent read continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } if (timed && nanos <= 0) // can't wait return null; if (s == null) s = new QNode(e, isData); if (!t.casNext(null, s)) // failed to link in continue; advanceTail(t, s); // swing tail and wait Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null)? x : e; } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null)? x : e; } } }
3.2、SynchronousQueue實現原理
不像ArrayBlockingQueue、LinkedBlockingDeque之類的阻塞隊列依賴AQS實現並發操作,SynchronousQueue直接使用CAS實現線程的安全訪問。由於源碼中充斥着大量的CAS代碼,不易於理解,所以按照筆者的風格,接下來會使用簡單的示例來描述背后的實現模型。
隊列的實現策略通常分為公平模式和非公平模式,接下來將分別進行說明。
3.2.1、公平模式下的模型:
公平模式下,底層實現使用的是TransferQueue這個內部隊列,它有一個head和tail指針,用於指向當前正在等待匹配的線程節點。
初始化時,TransferQueue的狀態如下:
接着我們進行一些操作:
1、線程put1執行 put(1)操作,由於當前沒有配對的消費線程,所以put1線程入隊列,自旋一小會后睡眠等待,這時隊列狀態如下:
2、接着,線程put2執行了put(2)操作,跟前面一樣,put2線程入隊列,自旋一小會后睡眠等待,這時隊列狀態如下:
3、這時候,來了一個線程take1,執行了 take操作,由於tail指向put2線程,put2線程跟take1線程配對了(一put一take),這時take1線程不需要入隊,但是請注意了,這時候,要喚醒的線程並不是put2,而是put1。為何? 大家應該知道我們現在講的是公平策略,所謂公平就是誰先入隊了,誰就優先被喚醒,我們的例子明顯是put1應該優先被喚醒。至於讀者可能會有一個疑問,明明是take1線程跟put2線程匹配上了,結果是put1線程被喚醒消費,怎么確保take1線程一定可以和次首節點(head.next)也是匹配的呢?其實大家可以拿個紙畫一畫,就會發現真的就是這樣的。
公平策略總結下來就是:隊尾匹配隊頭出隊。
執行后put1線程被喚醒,take1線程的 take()方法返回了1(put1線程的數據),這樣就實現了線程間的一對一通信,這時候內部狀態如下:
4、最后,再來一個線程take2,執行take操作,這時候只有put2線程在等候,而且兩個線程匹配上了,線程put2被喚醒,
take2線程take操作返回了2(線程put2的數據),這時候隊列又回到了起點,如下所示:
以上便是公平模式下,SynchronousQueue的實現模型。總結下來就是:隊尾匹配隊頭出隊,先進先出,體現公平原則。
非公平模式下的模型:
我們還是使用跟公平模式下一樣的操作流程,對比兩種策略下有何不同。非公平模式底層的實現使用的是TransferStack,
一個棧,實現中用head指針指向棧頂,接着我們看看它的實現模型:
1、線程put1執行 put(1)操作,由於當前沒有配對的消費線程,所以put1線程入棧,自旋一小會后睡眠等待,這時棧狀態如下:
2、接着,線程put2再次執行了put(2)操作,跟前面一樣,put2線程入棧,自旋一小會后睡眠等待,這時棧狀態如下:
3、這時候,來了一個線程take1,執行了take操作,這時候發現棧頂為put2線程,匹配成功,但是實現會先把take1線程入棧,然后take1線程循環執行匹配put2線程邏輯,一旦發現沒有並發沖突,就會把棧頂指針直接指向 put1線程
4、最后,再來一個線程take2,執行take操作,這跟步驟3的邏輯基本是一致的,take2線程入棧,然后在循環中匹配put1線程,最終全部匹配完畢,棧變為空,恢復初始狀態,如下圖所示:
可以從上面流程看出,雖然put1線程先入棧了,但是卻是后匹配,這就是非公平的由來。
總結
SynchronousQueue由於其獨有的線程一一配對通信機制,在大部分平常開發中,可能都不太會用到,但線程池技術中會有所使用,由於內部沒有使用AQS,而是直接使用CAS,所以代碼理解起來會比較困難,但這並不妨礙我們理解底層的實現模型,在理解了模型的基礎上,有興趣的話再查閱源碼,就會有方向感,看起來也會比較容易,希望本文有所借鑒意義。
轉自:Java並發包中的同步隊列SynchronousQueue實現原理