Java設計模式—生產者消費者模式(阻塞隊列實現)


生產者消費者模式是並發、多線程編程中經典的設計模式,生產者和消費者通過分離的執行工作解耦,簡化了開發模式,生產者和消費者可以以不同的速度生產和消費數據。這篇文章我們來看看什么是生產者消費者模式,這個問題也是多線程面試題中經常被提及的。如何使用阻塞隊列(Blocking Queue)解決生產者消費者模式,以及使用生產者消費者模式的好處。

真實世界中的生產者消費者模式

生產者和消費者模式在生活當中隨處可見,它描述的是協調與協作的關系。比如一個人正在准備食物(生產者),而另一個人正在吃(消費者),他們使用一個共用的桌子用於放置盤子和取走盤子,生產者准備食物,如果桌子上已經滿了就等待,消費者(那個吃的)等待如果桌子空了的話。這里桌子就是一個共享的對象。在Java Executor框架自身實現了生產者消費者模式它們分別負責添加和執行任務。

生產者消費者模式的好處

它的確是一種實用的設計模式,常用於編寫多線程或並發代碼。下面是它的一些優點:

  1.  它簡化的開發,你可以獨立地或並發的編寫消費者和生產者,它僅僅只需知道共享對象是誰
  2. 生產者不需要知道誰是消費者或者有多少消費者,對消費者來說也是一樣
  3.  生產者和消費者可以以不同的速度執行
  4.  分離的消費者和生產者在功能上能寫出更簡潔、可讀、易維護的代碼

多線程中的生產者消費者問題

生產者消費者問題是一個流行的面試題,面試官會要求你實現生產者消費者設計模式,以至於能讓生產者應等待如果隊列或籃子滿了的話,消費者等待如果隊列或者籃子是空的。這個問題可以用不同的方式來現實,經典的方法是使用wait和notify方法在生產者和消費者線程中合作,在隊列滿了或者隊列是空的條件下阻塞,Java5的阻塞隊列(BlockingQueue)數據結構更簡單,因為它隱含的提供了這些控制,現在你不需要使用wait和nofity在生產者和消費者之間通信了,阻塞隊列的put()方法將阻塞如果隊列滿了,隊列take()方法將阻塞如果隊列是空的。在下部分我們可以看到代碼例子。

使用阻塞隊列實現生產者消費者模式

阻塞隊列實現生產者消費者模式超級簡單,它提供開箱即用支持阻塞的方法put()和take(),開發者不需要寫困惑的wait-nofity代碼去實現通信。BlockingQueue 一個接口,Java5提供了不同的現實,如ArrayBlockingQueue和LinkedBlockingQueue,兩者都是先進先出(FIFO)順序。而ArrayLinkedQueue是自然有界的,LinkedBlockingQueue可選的邊界。下面這是一個完整的生產者消費者代碼例子,對比傳統的wait、nofity代碼,它更易於理解。

注意:

使用take()函數,如果隊列中沒有數據,則線程wait釋放CPU,而poll()則不會等待,直接返回null;同樣,空間耗盡時offer()函數不會等待,直接返回false,而put()則會wait,因此如果你使用while(true)來獲得隊列元素,千萬別用poll(),CPU會100%的. 

 

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
 
public class ProducerConsumerPattern {
 
    public static void main(String args[]){
 
     //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();
 
     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));
 
     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
    }
 
}
 
//Producer Class in java
class Producer implements Runnable {
 
    private final BlockingQueue sharedQueue;
 
    public Producer(BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }
 
    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
 
}
 
//Consumer Class in Java
class Consumer implements Runnable{
 
    private final BlockingQueue sharedQueue;
 
    public Consumer (BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }
 
    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
 
}
 
Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM