生產者和消費者模型


生產者和消費者模型

1. 什么是生產者和消費者模型

生產者消費者模型具體來講,就是在一個系統中,存在生產者和消費者兩種角色,他們通過內存緩沖區進行通信,生產者生產消費者需要的資料,消費者把資料做成產品。

再具體一點:

  1. 生產者生產數據到緩沖區中,消費者從緩沖區中取數據。
  2. 如果緩沖區已經滿了,則生產者線程阻塞。
  3. 如果緩沖區為空,那么消費者線程阻塞。

2. 如何實現

實現生產者消費者模型有兩種方式:

  1. 采用 wait—notify 方式實現生產者消費者模型(注意這里需要加同步鎖 synchronized)
  2. 采用 阻塞隊列 方式實現生產者消費者模式

3. wait-notify 方式

實現過程並不復雜,直接上代碼:

這里設置了生產者生產速度大於消費者消費速度(通過 sleep() 方法實現)。

緩沖區 BufferArea.java

public class BufferArea {

    // 當前資源數量的計數值
    private int currNum = 0;

    // 資源池中允許存放的資源數目
    private int maxSize = 10;

    /**
     * 從資源池中取走資源
     */
    public synchronized void get() {
        if (currNum > 0) {
            currNum--;
            System.out.println("Cosumer_" + Thread.currentThread().getName() + "消耗一件資源," + "當前緩沖區有" + currNum + "個");
            // 通知生產者生產資源
            notifyAll();
        } else {
            try {
                // 如果沒有資源,則 Cosumer_ 進入等待狀態
                System.out.println("Cosumer_" + Thread.currentThread().getName() + ": 當前緩沖區資源不足,進入等待狀態");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 向緩沖區中添加資源
     */
    public synchronized void put() {
        // 若當前緩沖區內的資源計數小於最大 size 數,才加
        if (currNum < maxSize) {
            currNum++;
            System.out.println(Thread.currentThread().getName() + "生產一件資源,當前資源池有" + currNum + "個");

            // 通知等待的消費者
            notifyAll();
        } else {
            // 若當前緩沖區的資源計數大於最大 size 數,則等待
            try {
                System.out.println(Thread.currentThread().getName() + "線程進入等待 << 當前緩沖區的資源計數大於最大 size 數");
                // 生產者進入等待狀態,並釋放鎖
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

生產者 Producer.java

public class Producer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Producer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Producer_" + getName());
    }

    @Override
    public void run() {
        // 不斷的生產資源
        while (true) {
            sleepSomeTime();
            mBufferArea.put();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

消費者 Consumer

public class Consumer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Consumer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Consumer_" + getName());
    }

    @Override
    public void run() {
        // 不斷的取出資源
        while (true) {
            sleepSomeTime();
            mBufferArea.get();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

測試 Test.java

public class Test {

    public static void main(String[] args) {
        BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();

        Consumer consumer1 = new Consumer(bufferArea);
        Consumer consumer2 = new Consumer(bufferArea);
        Consumer consumer3 = new Consumer(bufferArea);

        Producer producer1 = new Producer(bufferArea);
        Producer producer2 = new Producer(bufferArea);
        Producer producer3 = new Producer(bufferArea);

        consumer1.start();
        consumer2.start();
        consumer3.start();

        producer1.start();
        producer2.start();
        producer3.start();

    }

}

打印結果如下:

ProducerThread-5生產一件資源,當前資源池有1個
ProducerThread-4生產一件資源,當前資源池有2個
ProducerThread-3生產一件資源,當前資源池有3個
ProducerThread-5生產一件資源,當前資源池有4個
ProducerThread-4生產一件資源,當前資源池有5個
ProducerThread-3生產一件資源,當前資源池有6個
ProducerThread-5生產一件資源,當前資源池有7個
ProducerThread-4生產一件資源,當前資源池有8個
ProducerThread-3生產一件資源,當前資源池有9個
ProducerThread-3生產一件資源,當前資源池有10個
ProducerThread-4線程進入等待 << 當前緩沖區的資源計數大於最大 size 數
ProducerThread-5線程進入等待 << 當前緩沖區的資源計數大於最大 size 數
ProducerThread-3線程進入等待 << 當前緩沖區的資源計數大於最大 size 數

>> 注釋:3個生產者線程生產滿了10個(maxSize)產品,然后就都進入了等待

Cosumer_Consumer_Thread-0消耗一件資源,當前緩沖區有9個
Cosumer_Consumer_Thread-1消耗一件資源,當前緩沖區有8個
Cosumer_Consumer_Thread-2消耗一件資源,當前緩沖區有7個

>> 注釋:3個消費者消費了3個產品

ProducerThread-3生產一件資源,當前資源池有8個
ProducerThread-5生產一件資源,當前資源池有9個
ProducerThread-4生產一件資源,當前資源池有10個

>> 注釋:生產者立馬又生產3個

...

>> 然后一直循環往復這個過程

4. 阻塞隊列方式

阻塞隊列的特點:
  • 當隊列元素已滿的時候,阻塞插入操作
  • 當隊列元素為空的時候,阻塞獲取操作
不同的阻塞隊列:

ArrayBlockingQueue 與 LinkedBlockingQueue 都是支持 FIFO (先進先出),但是 LinkedBlockingQueue 是無界的,而ArrayBlockingQueue 是有界的。

這里我們采用無界阻塞隊列來演示生產者消費者模式。

演示

還是設置生產者生產速度大於消費者消費速度(通過 sleep() 方法實現)

緩沖區 BlockQueueBufferArea.java

public class BlockQueueBufferArea {

    BlockingQueue<Integer> mProductPoll = new LinkedBlockingQueue(10);

    public void  put() {
        try {
            System.out.println(Thread.currentThread().getName() + "產品池被放入了一個資源");
            mProductPoll.put(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void get() {
        try {
            System.out.println(Thread.currentThread().getName() + "產品池被取走了一個資源");
            mProductPoll.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

生產者 Producer.java

public class Producer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Producer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Producer_" + getName());
    }

    @Override
    public void run() {
        // 不斷的生產資源
        while (true) {
            sleepSomeTime();
            mBufferArea.put();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

消費者 Consumer.java

public class Consumer extends Thread {

    private BlockQueueBufferArea mBufferArea;

    public Consumer(BlockQueueBufferArea bufferArea) {
        this.mBufferArea = bufferArea;
        setName("Consumer_" + getName());
    }

    @Override
    public void run() {
        // 不斷的取出資源
        while (true) {
            sleepSomeTime();
            mBufferArea.get();
        }
    }

    private void sleepSomeTime() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

測試 Test.java

public class Test {

    public static void main(String[] args) {
        BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();

        Consumer consumer1 = new Consumer(bufferArea);
        Consumer consumer2 = new Consumer(bufferArea);
        Consumer consumer3 = new Consumer(bufferArea);

        Producer producer1 = new Producer(bufferArea);
        Producer producer2 = new Producer(bufferArea);
        Producer producer3 = new Producer(bufferArea);

        consumer1.start();
        consumer2.start();
        consumer3.start();

        producer1.start();
        producer2.start();
        producer3.start();

    }

}

打印結果如下:

Producer_Thread-5產品池被放入了一個資源
Producer_Thread-4產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源
Producer_Thread-4產品池被放入了一個資源
Producer_Thread-5產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源
Producer_Thread-4產品池被放入了一個資源
Producer_Thread-5產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源
Producer_Thread-4產品池被放入了一個資源
Producer_Thread-5產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源
Consumer_Thread-0產品池被取走了一個資源
Consumer_Thread-1產品池被取走了一個資源
Consumer_Thread-2產品池被取走了一個資源
Producer_Thread-4產品池被放入了一個資源
Producer_Thread-5產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源

5. 參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM