面試:手寫代碼生產者和消費者實現


生產者消費者問題是線程模型中的經典問題:生產者和消費者在同一時間段內共用同一存儲空間,生產者向空間里生產數據,而消費者取走數據。

 

1. 使用阻塞隊列實現生產者消費者模式

生產者:

 1 public class Producer implements Runable{
 2     private final BlockingQueue sharedQueue;
 3     public Producer(BlockingQueue sharedQueue){
 4     this.sharedQueue = sharedQueue;
 5     }
 6     
 7     publlic void run(){
 8     for(i=0;i<10;i++){
 9         try{
10             System.out.println("Producer:"+i);
11             sharedQueue.put(i);
12             }catch(InterruptedException ex){
13                 System.out.println("exception:"+ex);
14               }
15             }
16     }
17 }          

消費者:

public class Consumer implements Runable{
        private final BlockingQueue sharedQueue;
        public Consumer(BlockingQueue sharedQueue){
            this.sharedQueue = sharedQueue 
        }
        
        public void run(){
             while(true){
                try{
                        System.out.println("Consumed:"+sharedQueue.take());
                }catch(InterruptedException ex){
                        System.out.println("Exception:"+ex);
                }
             }
        }
}

生產者消費者模式:

public class ProducerConsumerPattern {
    private static final Logger logger = 
    public static void main(String[] args) {
        //阻塞隊列
        BlockingQueue sharedQueue = new LinkedBlockingDeque();

        //創建生產者和消費者,共享隊列
        Thread prodThread = new Thread(new Producer(sharedQueue));
        Thread consThread = new Thread(new Consumer(sharedQueue));

        //開啟生產者和消費者進程
        prodThread.start();
        consThread.start();
    }
}

BlockingQueue是一個阻塞隊列,它的存取可以保證只有一個線程在進行,所以根據邏輯,生產者在內存滿的時候進行等待,並且喚醒消費者隊列,反過來消費者在飢餓狀態下等待並喚醒生產者進行生產。

2. wait/notify方法實現

/**
 * 生產者消費者模式:使用Object.wait() / notify()方法實現
 */
public class ProducerConsumer {
    private static final int CAPACITY = 5;

    public static void main(String args[]){
        Queue<Integer> queue = new LinkedList<Integer>();

        Thread producer1 = new Producer("P-1", queue, CAPACITY);
        Thread producer2 = new Producer("P-2", queue, CAPACITY);
        Thread consumer1 = new Consumer("C1", queue, CAPACITY);
        Thread consumer2 = new Consumer("C2", queue, CAPACITY);
        Thread consumer3 = new Consumer("C3", queue, CAPACITY);

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }

    /**
     * 生產者
     */
    public static class Producer extends Thread{
        private Queue<Integer> queue;
        String name;
        int maxSize;
        int i = 0;

        public Producer(String name, Queue<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){
                synchronized(queue){
                    while(queue.size() == maxSize){
                        try {
                            System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                            queue.wait();
                        } catch (Exception ex) {
                            ex.printStackTrace();
                        }
                    }
                    System.out.println("[" + name + "] Producing value : +" + i);
                    queue.offer(i++);
                    queue.notifyAll();

                    try {
Thread.sleep(
new Random().nextInt(1000));//模擬隨機生產 } catch (InterruptedException e) { e.printStackTrace(); } } } } } /** * 消費者 */ public static class Consumer extends Thread{ private Queue<Integer> queue; String name; int maxSize; public Consumer(String name, Queue<Integer> queue, int maxSize){ super(name); this.name = name; this.queue = queue; this.maxSize = maxSize; } @Override public void run(){ while(true){ synchronized(queue){ while(queue.isEmpty()){ try { System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer"); queue.wait(); } catch (Exception ex) { ex.printStackTrace(); } } int x = queue.poll(); System.out.println("[" + name + "] Consuming value : " + x); queue.notifyAll(); try { Thread.sleep(new Random().nextInt(1000));//模擬隨機消費 } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
  • wait():當緩沖區已滿/空時,生產者/消費者線程停止自己的執行,放棄鎖,使自己處於等待狀態,讓其他線程執行。
  • notify():當生產者/消費者向緩沖區放入/取出一個產品時,向其他等待的線程發出可執行的通知,同時放棄鎖,使自己處於等待狀態。

3. 使用Lock和Condition的await() / signal()方法

/**
 * 生產者消費者模式:使用Lock和Condition實現
 * {@link java.util.concurrent.locks.Lock}
 * {@link java.util.concurrent.locks.Condition}
 */
public class ProducerConsumerByLock {
    private static final int CAPACITY = 5;
    private static final Lock lock = new ReentrantLock();
    private static final Condition fullCondition = lock.newCondition();     //隊列滿的條件
    private static final Condition emptyCondition = lock.newCondition();        //隊列空的條件


    public static void main(String args[]){
        Queue<Integer> queue = new LinkedList<Integer>();

        Thread producer1 = new Producer("P-1", queue, CAPACITY);
        Thread producer2 = new Producer("P-2", queue, CAPACITY);
        Thread consumer1 = new Consumer("C1", queue, CAPACITY);
        Thread consumer2 = new Consumer("C2", queue, CAPACITY);
        Thread consumer3 = new Consumer("C3", queue, CAPACITY);

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
    }

    /**
     * 生產者
     */
    public static class Producer extends Thread{
        private Queue<Integer> queue;
        String name;
        int maxSize;
        int i = 0;

        public Producer(String name, Queue<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){

                //獲得鎖
                lock.lock();
                while(queue.size() == maxSize){
                    try {
                        System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");
                        //條件不滿足,生產阻塞
                        fullCondition.await();
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                System.out.println("[" + name + "] Producing value : +" + i);
                queue.offer(i++);

                //喚醒其他所有生產者、消費者
                fullCondition.signalAll();
                emptyCondition.signalAll();

                //釋放鎖
                lock.unlock();

                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    /**
     * 消費者
     */
    public static class Consumer extends Thread{
        private Queue<Integer> queue;
        String name;
        int maxSize;

        public Consumer(String name, Queue<Integer> queue, int maxSize){
            super(name);
            this.name = name;
            this.queue = queue;
            this.maxSize = maxSize;
        }

        @Override
        public void run(){
            while(true){
                //獲得鎖
                lock.lock();

                while(queue.isEmpty()){
                    try {
                        System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");
                        //條件不滿足,消費阻塞
                        emptyCondition.await();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                int x = queue.poll();
                System.out.println("[" + name + "] Consuming value : " + x);

                //喚醒其他所有生產者、消費者
                fullCondition.signalAll();
                emptyCondition.signalAll();

                //釋放鎖
                lock.unlock();

                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4. 總結

實現生產者消費者模式有3點:

1. 拿什么作為緩沖區,給生產者和消費者解耦,平衡了生產者和消費者的處理能力。一般使用隊列

2. 構建生產者,隊列滿使得生產者線程阻塞

3. 構建消費者 ,隊列空使得消費者現成阻塞

參考:https://blog.csdn.net/u010983881/article/details/78554671


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM