java面試-阻塞隊列


一、阻塞隊列

當阻塞隊列是空,從隊列中獲取元素的操作會被阻塞

當阻塞隊列是滿,往隊列中添加元素的操作會被阻塞

二、為什么用,有什么好處?

我們不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切阻塞隊列都包辦了。

三、常見的阻塞隊列

ArrayBlockingQueue由數組構成的有界阻塞隊列.

LinkedBlockingQueue由鏈表構成的有界阻塞隊列(默認值為Integer.MAX_VALUE)

public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
        /**
         * 1、拋出異常 add()/remove()
         */
//        System.out.println(blockingQueue.add("a"));
//        System.out.println(blockingQueue.add("b"));
//        System.out.println(blockingQueue.add("c"));
//        System.out.println(blockingQueue.add("d"));

//        System.out.println(blockingQueue.element()); //檢查隊首元素

//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());


        /**
         * 2、返回布爾類型 offer()/pull()
         */
//        System.out.println(blockingQueue.offer("a"));
//        System.out.println(blockingQueue.offer("b"));
//        System.out.println(blockingQueue.offer("c"));
//        System.out.println(blockingQueue.offer("d"));
//
//        System.out.println(blockingQueue.peek()); //檢查隊首元素
//
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());



        /**
         * 3、阻塞 put()/take()
         */
//        blockingQueue.put("a");
//        blockingQueue.put("b");
//        blockingQueue.put("c");
//        System.out.println("############");
//        blockingQueue.put("d");
//
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());


        /**
         *4、超時
         */

        System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("b",2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("c",2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("d",2L, TimeUnit.SECONDS));
    }
}

SynchronousQueue是一個不存儲元素的阻塞隊列,也即單個元素的隊列

public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + " put 1");
                blockingQueue.put("1");

                System.out.println(Thread.currentThread().getName() + " put 2");
                blockingQueue.put("2");

                System.out.println(Thread.currentThread().getName() + " put 3");
                blockingQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "AAA").start();


        new Thread(() -> {
            try {

                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());

                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());

                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take());

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "BBB").start();
    }
}  

PriorityBlockingQueue:支持優先級排序的無界阻塞隊列

DelayQueue:使用優先級隊列實現的延遲無界阻塞隊列

LinkedTransferQueue:由鏈表構成的無界阻塞隊列

LinkedBlockingDeque:由鏈表構成的雙向阻塞隊列

四、BlockQueue的核心方法

add()/remove()/element():拋出異常

offer()/pull():返回布爾類型/支持超時  

put()/take():阻塞

peek() 檢查隊列首元素

五、使用場景

1、生產者-消費者模式

    問題: 一個初始值為零的變量,兩個線程對其交替操作,一個加1一個減1,來5輪

      1)傳統版的生產者-消費者模式

/**
 * Created by wujuhong on 2019/7/3.
 * 傳統版的生產者消費者模式
 */
public class ProductConsumer_TraditionDemo {

    public static void main(String[] args) {
        ShareData shareData = new ShareData();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "AA").start();


        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "BB").start();
    }
}

class ShareData {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws InterruptedException {
        lock.lock();
        try {
            //1、判斷
            while (number != 0) {
                //等待,不能生產
                condition.await();
            }
            //2、干活
            number++;
            System.out.println(Thread.currentThread().getName() + " " + number);
            //3、通知喚醒
            condition.signalAll();

        } finally {
            lock.unlock();
        }
    }


    public void decrement() throws InterruptedException {
        lock.lock();
        try {
            //1、判斷
            while (number == 0) {
                //等待,不能生產
                condition.await();
            }
            //2、干活
            number--;
            System.out.println(Thread.currentThread().getName() + " " + number);
            //3、通知喚醒
            condition.signalAll();

        } finally {
            lock.unlock();
        }
    }
}

  2)阻塞隊列的生產者-消費者模式

class MyResource {
    private volatile boolean FLAG = true; //默認生產,進行生產+消費
    private AtomicInteger atomicInteger = new AtomicInteger();
    BlockingQueue<String> blockingQueue = null;

    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());
    }


    public void myProduct() throws InterruptedException {
        String data = "";
        boolean returnValue;
        while (FLAG) {
            data = atomicInteger.incrementAndGet() + "";
            returnValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
            if (returnValue) {
                System.out.println(Thread.currentThread().getName() + " 插入隊列" + data + "成功");
            } else {
                System.out.println(Thread.currentThread().getName() + " 插入隊列" + data + "失敗");

            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName() + " 生產動作結束,FLAG = false");
    }


    public void myConsume() throws InterruptedException {
        String result = "";
        while (FLAG) {
            result = blockingQueue.poll(2, TimeUnit.SECONDS);
            if (null == result || result.equalsIgnoreCase("")) {
                FLAG = false;
                System.out.println(Thread.currentThread().getName() + " 超過2s鍾沒有取到蛋糕,消費隊列退出");
                return;

            }
            System.out.println(Thread.currentThread().getName() + " 消費隊列" + result + "成功");

            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName() + " 消費動作結束,FLAG = false");
    }


    public void stop() {
        this.FLAG = false;
    }
}

public class ProductConsumer_BlockQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " 生產線程啟動");
            try {
                myResource.myProduct();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "product").start();


        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " 消費線程啟動");
            try {
                myResource.myConsume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "consume").start();

        TimeUnit.SECONDS.sleep(5);

        System.out.println("5秒鍾時間到,main線程叫停,活動結束");

        myResource.stop();
    }
}  

2、線程池

3、消息中間件

 

 

阻塞隊列有沒有好的一面?

不得不阻塞,你如何管理?

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM