SynchronousQueue應用


SynchronousQueue是無界的,是一種無緩沖的等待隊列,但是由於該Queue本身的特性,在某次添加元素后必須等待其他線程取走后才能繼續添加;可以認為SynchronousQueue是一個緩存值為1的阻塞隊列,但是 isEmpty()方法永遠返回是true,remainingCapacity() 方法永遠返回是0,remove()和removeAll() 方法永遠返回是false,iterator()方法永遠返回空,peek()方法永遠返回null。

聲明一個SynchronousQueue有兩種不同的方式,它們之間有着不太一樣的行為。

公平模式和非公平模式的區別:如果采用公平模式:SynchronousQueue會采用公平鎖,並配合一個FIFO隊列來阻塞多余的生產者和消費者,從而體系整體的公平策略;

但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者,而后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理。

=======================================================================================

SynchronousQueue是這樣 一種阻塞隊列,其中每個 put 必須等待一個 take,反之亦然。同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有。
 不能在同步隊列上進行 peek,因為僅在試圖要取得元素時,該元素才存在;
 除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)添加元素;
 也不能迭代隊列,因為其中沒有元素可用於迭代。隊列的頭是嘗試添加到隊列中的首個已排隊線程元素; 如果沒有已排隊線程,則不添加元素並且頭為 null。

 注意1:它一種阻塞隊列,其中每個 put 必須等待一個 take,反之亦然。
       同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有。
 注意2:它是線程安全的,是阻塞的。
 注意3:不允許使用 null 元素。
 注意4:公平排序策略是指調用put的線程之間,或take的線程之間。
 公平排序策略可以查考ArrayBlockingQueue中的公平策略。
 注意5:SynchronousQueue的以下方法很有趣:
    * iterator() 永遠返回空,因為里面沒東西。
    * peek() 永遠返回null。
    * put() 往queue放進去一個element以后就一直wait直到有其他thread進來把這個element取走。
    * offer() 往queue里放一個element后立即返回,如果碰巧這個element被另一個thread取走了,offer方法返回true,認為offer成功;否則返回false。
    * offer(2000, TimeUnit.SECONDS) 往queue里放一個element但是等待指定的時間后才返回,返回的邏輯和offer()方法一樣。
    * take() 取出並且remove掉queue里的element(認為是在queue里的。。。),取不到東西他會一直等。
    * poll() 取出並且remove掉queue里的element(認為是在queue里的。。。),只有到碰巧另外一個線程正在往queue里offer數據或者put數據的時候,該方法才會取到東西。否則立即返回null。
    * poll(2000, TimeUnit.SECONDS) 等待指定的時間然后取出並且remove掉queue里的element,其實就是再等其他的thread來往里塞。
    * isEmpty()永遠是true。
    * remainingCapacity() 永遠是0。
    * remove()和removeAll() 永遠是false。

Demo:

簡化版:

import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
        new Customer(queue).start();
        new Product(queue).start();
    }

    static class Product extends Thread{
        SynchronousQueue<Integer> queue;
        public Product(SynchronousQueue<Integer> queue){
            this.queue = queue;
        }
        @Override
        public void run(){
            while(true){
                int rand = new Random().nextInt(1000);
                System.out.println("生產了一個產品:"+rand);
                System.out.println("等待三秒后運送出去...");
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                queue.offer(rand);
                System.out.println("產品生成完成:"+rand);
            }
        }
    }
    static class Customer extends Thread{
        SynchronousQueue<Integer> queue;
        public Customer(SynchronousQueue<Integer> queue){
            this.queue = queue;
        }
        @Override
        public void run(){
            while(true){
                try {
                    System.out.println("消費了一個產品:"+queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("------------------------------------------");
            }
        }
    }
}
生產了一個產品:326
等待三秒后運送出去...
產品生成完成:326
生產了一個產品:291
等待三秒后運送出去...
消費了一個產品:326
------------------------------------------
產品生成完成:291
消費了一個產品:291
------------------------------------------
生產了一個產品:913
等待三秒后運送出去...
產品生成完成:913
消費了一個產品:913
------------------------------------------
生產了一個產品:993
等待三秒后運送出去...
產品生成完成:993
消費了一個產品:993
------------------------------------------
生產了一個產品:295
等待三秒后運送出去...
產品生成完成:295
消費了一個產品:295
------------------------------------------
生產了一個產品:772
等待三秒后運送出去...
產品生成完成:772
消費了一個產品:772
------------------------------------------
生產了一個產品:977
等待三秒后運送出去...
產品生成完成:977
消費了一個產品:977
------------------------------------------
生產了一個產品:182
等待三秒后運送出去...
產品生成完成:182
消費了一個產品:182
------------------------------------------
生產了一個產品:606
等待三秒后運送出去...
產品生成完成:606
消費了一個產品:606
------------------------------------------
生產了一個產品:704
等待三秒后運送出去...
產品生成完成:704
消費了一個產品:704
------------------------------------------
生產了一個產品:194
等待三秒后運送出去...
產品生成完成:194
生產了一個產品:355
等待三秒后運送出去...
消費了一個產品:194
------------------------------------------
產品生成完成:355
消費了一個產品:355
------------------------------------------
生產了一個產品:991
等待三秒后運送出去...
產品生成完成:991
消費了一個產品:991
------------------------------------------
生產了一個產品:958
等待三秒后運送出去...
產品生成完成:958
消費了一個產品:958
------------------------------------------
生產了一個產品:388
等待三秒后運送出去..
View Code

從結果中可以看出如果已經生產但是還未消費的,那么會阻塞在生產一直等到消費才能生成下一個。

多線程版本:

import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<String> queue=new SynchronousQueue();
        // TODO Auto-generated method stub
        for(int i=0;i<5;i++)
            new Thread(new ThreadProducer(queue)).start();
        for(int i=0;i<5;i++)
            new Thread(new ThreadConsumer(queue)).start();
    }
}
class ThreadProducer implements Runnable {
    ThreadProducer(SynchronousQueue<String> queue)
    {
        this.queue=queue;
    }
    SynchronousQueue<String> queue;
    static int cnt=0;
    public void run()
    {
        String name="";
        int val=0;
        Random random =new Random(System.currentTimeMillis());
        for(int i=0;i<2;i++){

            cnt=(cnt+1)&0xFFFFFFFF;

            try{
                val=random.nextInt()%15;
                if(val<5)
                {
                    name="offer name:"+cnt;
                    queue.offer(name);
                }
                else if(val<10)
                {
                    name="put name:"+cnt;
                    queue.put(name);
                }
                else
                {
                    name="offer wait time and name:"+cnt;
                    queue.offer(name, 1000, TimeUnit.MILLISECONDS);
                }
                Thread.sleep(1);
            }catch(InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
}

class ThreadConsumer implements Runnable {
    ThreadConsumer(SynchronousQueue<String> queue) {
        this.queue = queue;
    }
    SynchronousQueue<String> queue;

    public void run() {
        String name;
         for(int i=0;i<2;i++){
            try {
                name = queue.take();
                System.out.println("take " + name);
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
take offer wait time and name:4
take offer wait time and name:4
take offer wait time and name:5
take offer wait time and name:4
take offer wait time and name:4
take offer name:9
View Code

結果有很多種可能性,要自己嘗試運行。

http://blog.csdn.net/hudashi/article/details/7076814

http://ifeve.com/java-synchronousqueue/

http://blog.csdn.net/liu88010988/article/details/50789179


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM