SynchronousQueue是無界的,是一種無緩沖的等待隊列,但是由於該Queue本身的特性,在某次添加元素后必須等待其他線程取走后才能繼續添加;可以認為SynchronousQueue是一個緩存值為1的阻塞隊列,但是 isEmpty()方法永遠返回是true,remainingCapacity() 方法永遠返回是0,remove()和removeAll() 方法永遠返回是false,iterator()方法永遠返回空,peek()方法永遠返回null。
聲明一個SynchronousQueue有兩種不同的方式,它們之間有着不太一樣的行為。
公平模式和非公平模式的區別:如果采用公平模式:SynchronousQueue會采用公平鎖,並配合一個FIFO隊列來阻塞多余的生產者和消費者,從而體系整體的公平策略;
但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者,而后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理。
=======================================================================================
SynchronousQueue是這樣 一種阻塞隊列,其中每個 put 必須等待一個 take,反之亦然。同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有。
不能在同步隊列上進行 peek,因為僅在試圖要取得元素時,該元素才存在;
除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)添加元素;
也不能迭代隊列,因為其中沒有元素可用於迭代。隊列的頭是嘗試添加到隊列中的首個已排隊線程元素; 如果沒有已排隊線程,則不添加元素並且頭為 null。
注意1:它一種阻塞隊列,其中每個 put 必須等待一個 take,反之亦然。 同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有。 注意2:它是線程安全的,是阻塞的。 注意3:不允許使用 null 元素。 注意4:公平排序策略是指調用put的線程之間,或take的線程之間。 公平排序策略可以查考ArrayBlockingQueue中的公平策略。 注意5:SynchronousQueue的以下方法很有趣: * iterator() 永遠返回空,因為里面沒東西。 * peek() 永遠返回null。 * put() 往queue放進去一個element以后就一直wait直到有其他thread進來把這個element取走。 * offer() 往queue里放一個element后立即返回,如果碰巧這個element被另一個thread取走了,offer方法返回true,認為offer成功;否則返回false。 * offer(2000, TimeUnit.SECONDS) 往queue里放一個element但是等待指定的時間后才返回,返回的邏輯和offer()方法一樣。 * take() 取出並且remove掉queue里的element(認為是在queue里的。。。),取不到東西他會一直等。 * poll() 取出並且remove掉queue里的element(認為是在queue里的。。。),只有到碰巧另外一個線程正在往queue里offer數據或者put數據的時候,該方法才會取到東西。否則立即返回null。 * poll(2000, TimeUnit.SECONDS) 等待指定的時間然后取出並且remove掉queue里的element,其實就是再等其他的thread來往里塞。 * isEmpty()永遠是true。 * remainingCapacity() 永遠是0。 * remove()和removeAll() 永遠是false。
Demo:
簡化版:
import java.util.Random; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws InterruptedException { SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>(); new Customer(queue).start(); new Product(queue).start(); } static class Product extends Thread{ SynchronousQueue<Integer> queue; public Product(SynchronousQueue<Integer> queue){ this.queue = queue; } @Override public void run(){ while(true){ int rand = new Random().nextInt(1000); System.out.println("生產了一個產品:"+rand); System.out.println("等待三秒后運送出去..."); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } queue.offer(rand); System.out.println("產品生成完成:"+rand); } } } static class Customer extends Thread{ SynchronousQueue<Integer> queue; public Customer(SynchronousQueue<Integer> queue){ this.queue = queue; } @Override public void run(){ while(true){ try { System.out.println("消費了一個產品:"+queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------------------------------------------"); } } } }

生產了一個產品:326 等待三秒后運送出去... 產品生成完成:326 生產了一個產品:291 等待三秒后運送出去... 消費了一個產品:326 ------------------------------------------ 產品生成完成:291 消費了一個產品:291 ------------------------------------------ 生產了一個產品:913 等待三秒后運送出去... 產品生成完成:913 消費了一個產品:913 ------------------------------------------ 生產了一個產品:993 等待三秒后運送出去... 產品生成完成:993 消費了一個產品:993 ------------------------------------------ 生產了一個產品:295 等待三秒后運送出去... 產品生成完成:295 消費了一個產品:295 ------------------------------------------ 生產了一個產品:772 等待三秒后運送出去... 產品生成完成:772 消費了一個產品:772 ------------------------------------------ 生產了一個產品:977 等待三秒后運送出去... 產品生成完成:977 消費了一個產品:977 ------------------------------------------ 生產了一個產品:182 等待三秒后運送出去... 產品生成完成:182 消費了一個產品:182 ------------------------------------------ 生產了一個產品:606 等待三秒后運送出去... 產品生成完成:606 消費了一個產品:606 ------------------------------------------ 生產了一個產品:704 等待三秒后運送出去... 產品生成完成:704 消費了一個產品:704 ------------------------------------------ 生產了一個產品:194 等待三秒后運送出去... 產品生成完成:194 生產了一個產品:355 等待三秒后運送出去... 消費了一個產品:194 ------------------------------------------ 產品生成完成:355 消費了一個產品:355 ------------------------------------------ 生產了一個產品:991 等待三秒后運送出去... 產品生成完成:991 消費了一個產品:991 ------------------------------------------ 生產了一個產品:958 等待三秒后運送出去... 產品生成完成:958 消費了一個產品:958 ------------------------------------------ 生產了一個產品:388 等待三秒后運送出去..
從結果中可以看出如果已經生產但是還未消費的,那么會阻塞在生產一直等到消費才能生成下一個。
多線程版本:
import java.util.Random; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws InterruptedException { SynchronousQueue<String> queue=new SynchronousQueue(); // TODO Auto-generated method stub for(int i=0;i<5;i++) new Thread(new ThreadProducer(queue)).start(); for(int i=0;i<5;i++) new Thread(new ThreadConsumer(queue)).start(); } } class ThreadProducer implements Runnable { ThreadProducer(SynchronousQueue<String> queue) { this.queue=queue; } SynchronousQueue<String> queue; static int cnt=0; public void run() { String name=""; int val=0; Random random =new Random(System.currentTimeMillis()); for(int i=0;i<2;i++){ cnt=(cnt+1)&0xFFFFFFFF; try{ val=random.nextInt()%15; if(val<5) { name="offer name:"+cnt; queue.offer(name); } else if(val<10) { name="put name:"+cnt; queue.put(name); } else { name="offer wait time and name:"+cnt; queue.offer(name, 1000, TimeUnit.MILLISECONDS); } Thread.sleep(1); }catch(InterruptedException e) { e.printStackTrace(); } } } } class ThreadConsumer implements Runnable { ThreadConsumer(SynchronousQueue<String> queue) { this.queue = queue; } SynchronousQueue<String> queue; public void run() { String name; for(int i=0;i<2;i++){ try { name = queue.take(); System.out.println("take " + name); Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }

take offer wait time and name:4 take offer wait time and name:4 take offer wait time and name:5 take offer wait time and name:4 take offer wait time and name:4 take offer name:9
結果有很多種可能性,要自己嘗試運行。
http://blog.csdn.net/hudashi/article/details/7076814