生產者消費者模式詳細解析


程序的基本實現

在多線程的開發過程之中最為著名的案例就是生產者與消費者操作,該操作的主要流程如下:
——生產者負責信息內容的生產;
——每當生產者生產完成一項完整的信息之后消費者要從這里面取走信息;
——如果生產者沒有生產完則消費者要等待它生產完成,如果消費者還沒有對信息進行消費,則生產者應該等消費者處理完成后再繼續生產。

真實世界中的生產者消費者模式生產者和消費者模式在生活當中隨處可見,它描述的是協調與協作的關系。比如一個人正在准備食物(生產者),而另一個人正在吃(消費者),他們使用一個共用的桌子用於放置盤子和取走盤子,生產者准備食物,如果桌子上已經滿了就等待,消費者(那個吃的)等待如果桌子空了的話。這里桌子就是一個共享的對象。在Java Executor框架自身實現了生產者消費者模式它們分別負責添加和執行任務。

生產者消費者模式的好處它的確是一種實用的設計模式,常用於編寫多線程或並發代碼。下面是它的一些優點:
1.它簡化的開發,你可以獨立地或並發的編寫消費者和生產者,它僅僅只需知道共享對象是誰
2.生產者不需要知道誰是消費者或者有多少消費者,對消費者來說也是一樣
3.生產者和消費者可以以不同的速度執行
4.分離的消費者和生產者在功能上能寫出更簡潔、可讀、易維護的代碼

可以將生產者與消費者定義為兩個獨立的線程類對象,但是對於現在生產的數據,可以使用如下的組成:
——數據一:title=王建、content=宇宙大帥哥;
——數據二:title=小高、content=猥瑣第一人;
既然消費者與生產者是兩個獨立的線程,那么這兩個獨立的線程之間就需要有一個數據的保存集中點,那么可以單獨定義一個Message類實現數據的保存。

范例:實現程序基本結構

package cn.mldn.demo;
public class ThreadDemo {
	public static void main(String[] args) throws Exception { 
		Message msg = new Message();
		new Thread(new Producer(msg)).start();  //啟動生產者線程
		new Thread(new Consumer(msg)).start();  //啟動消費者線程
	}	
}

class Producer implements Runnable {
	private Message msg;
	public Producer(Message msg) {
		this.msg = msg;
	}
	@Override
	public void run() {
		for (int x = 0; x < 100; x ++) {
			if(x % 2 == 0) {
				this.msg.setTitle("王建");
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				this.msg.setContent("宇宙大帥哥");
			} else {
				this.msg.setTitle("小高");
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				this.msg.setContent("猥瑣第一人");
			}
		}
	}
}
class Consumer implements Runnable {
	private Message msg;
	public Consumer(Message msg) {
		this.msg = msg;
	}
	@Override
	public void run() {
		for (int x = 0; x < 100; x ++) {
			try {
				Thread.sleep(10);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			System.out.println(this.msg.getTitle() + " - " + this.msg.getContent());
		}
	}
}

class Message {
	private String title;
	private String content;
	public String getTitle() {
		return title;
	}
	public void setTitle(String title) {
		this.title = title;
	}
	public String getContent() {
		return content;
	}
	public void setContent(String content) {
		this.content = content;
	}
	
}

通過整個代碼的執行你會發現此時有兩個主要問題:
——問題一:數據不同步了;
——問題二:生產一個取走一個,但是發現了有重復生產和重復取出問題;

解決數據同步問題

如果要解決問題,首先解決的就是數據同步的處理問題,如果要想解決數據同步最簡單的做法是使用synchronized關鍵字定義同步代碼塊或同步方法,於是這個時候對於同步的處理就可以直接在Message類中完成。
范例:解決同步操作

package cn.mldn.demo;
class Message {
	private String title;
	private String content;
	public synchronized void set(String title,String content) {
		this.title = title;
		try {
			Thread.sleep(100);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		this.content = content;
	} 
	public synchronized String get() {
		try {
			Thread.sleep(100);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return this.title + " - " + this.content;
	}
}
public class ThreadDemo {
	public static void main(String[] args) throws Exception { 
		Message msg = new Message();
		new Thread(new Producer(msg)).start();  //啟動生產者線程
		new Thread(new Consumer(msg)).start();  //啟動消費者線程
	}	
}

class Producer implements Runnable {
	private Message msg;
	public Producer(Message msg) {
		this.msg = msg;
	}
	@Override
	public void run() {
		for (int x = 0; x < 100; x ++) {
			if(x % 2 == 0) {
				this.msg.set("王建","宇宙大帥哥");
			} else {
				this.msg.set("小高","猥瑣第一人");
			}
		}
	}
}
class Consumer implements Runnable {
	private Message msg;
	public Consumer(Message msg) {
		this.msg = msg;
	}
	@Override
	public void run() {
		for (int x = 0; x < 100; x ++) {
			System.out.println(this.msg.get());
		}
	}
}

在進行同步處理的時候肯定需要有一個同步處理的對象,那么此時肯定要將同步操作交由Message類處理是最合適的。這個時候發現數據已經可以正常的保持一致了,但是對於重復操作的問題依然存在。

線程等待與喚醒

如果說現在想要解決生產者與消費者的問題,那么最好的解決方案就是使用等待與喚醒機制。而對於等待與喚醒機制,主要依靠的是Object類中提供的方法處理的:
——等待機制:
——死等:public final void wait() throws InterruptedException
——設置等待時間:public final void wait(long timeout)throws InterruptedException
——設置等待時間:public final void wait​(long timeout,int nanos)
throws InterruptedException
——喚醒第一個等待線程:public final void notify()
——喚醒全部等待線程:public final void notifyAll()
如果此時有若干個等待線程的話,那么notify()表示的是喚醒第一個等待的,而其它的線程繼續等待,而notifyAll()表示會喚醒所有等待的線程,那個線程的優先級高就有可能先執行。
對於當前問題主要的解決應該通過Message類完成處理。
范例:修改Message類

package cn.mldn.demo;
class Message {
	private String title;
	private String content;
	private boolean flag = true;  //表示生產或消費的形式
	//flag = true:允許生產,但是不允許消費
	//flag = false :允許消費,不允許生產
	public synchronized void set(String title,String content) {
		if(this.flag == false) {
			try {
				super.wait();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		this.title = title;
		try {
			Thread.sleep(100);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		this.content = content;
		this.flag = false;  //已經生產過了
		super.notify();
	} 
	public synchronized String get() {
		if (this.flag == true) {  //還未生產,需要等待
			try {
				super.wait();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		try {
			Thread.sleep(100);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		try {
		return this.title + " - " + this.content;
		} finally {  //不管如何都要執行
			this.flag = true;  //繼續生產
			super.notify();  //喚醒等待線程
		}
	}
}
public class ThreadDemo {
	public static void main(String[] args) throws Exception { 
		Message msg = new Message();
		new Thread(new Producer(msg)).start();  //啟動生產者線程
		new Thread(new Consumer(msg)).start();  //啟動消費者線程
	}	
}

class Producer implements Runnable {
	private Message msg;
	public Producer(Message msg) {
		this.msg = msg;
	}
	@Override
	public void run() {
		for (int x = 0; x < 100; x ++) {
			if(x % 2 == 0) {
				this.msg.set("王建","宇宙大帥哥");
			} else {
				this.msg.set("小高","猥瑣第一人");
			}
		}
	}
}
class Consumer implements Runnable {
	private Message msg;
	public Consumer(Message msg) {
		this.msg = msg;
	}
	@Override
	public void run() {
		for (int x = 0; x < 100; x ++) {
			System.out.println(this.msg.get());
		}
	}
}

這種處理形式就是在進行多線程開發過程之中最原始的處理方案,整個的等待、同步、喚醒機制都由開發者自行通過原生代碼實現控制。

使用阻塞隊列實現生產者消費者模式

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。

這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。

阻塞隊列常用於生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。

另外,阻塞隊列也可以用於線程池中。

生產者消費者模式是並發、多線程編程中經典的設計模式,生產者和消費者通過分離的執行工作解耦,簡化了開發模式,生產者和消費者可以以不同的速度生產和消費數據。這篇文章我們來看看什么是生產者消費者模式,這個問題也是多線程面試題中經常被提及的。如何使用阻塞隊列(Blocking Queue)解決生產者消費者模式,以及使用生產者消費者模式的好處。

多線程中的生產者消費者問題

生產者消費者問題是一個流行的面試題,面試官會要求你實現生產者消費者設計模式,以至於能讓生產者應等待如果隊列或籃子滿了的話,消費者等待如果隊列或者籃子是空的。這個問題可以用不同的方式來現實,經典的方法是使用wait和notify方法在生產者和消費者線程中合作,在隊列滿了或者隊列是空的條件下阻塞,Java5的阻塞隊列(BlockingQueue)數據結構更簡單,因為它隱含的提供了這些控制,現在你不需要使用wait和nofity在生產者和消費者之間通信了,阻塞隊列的put()方法將阻塞如果隊列滿了,隊列take()方法將阻塞如果隊列是空的。在下部分我們可以看到代碼例子。

使用阻塞隊列實現生產者消費者模式

阻塞隊列實現生產者消費者模式超級簡單,它提供開箱即用支持阻塞的方法put()和take(),開發者不需要寫困惑的wait-nofity代碼去實現通信。
BlockingQueue 一個接口,Java5提供了不同的現實,如ArrayBlockingQueue和LinkedBlockingQueue,兩者都是先進先出(FIFO)順序。而ArrayLinkedQueue是自然有界的,LinkedBlockingQueue可選的邊界。

下面這是一個完整的生產者消費者代碼例子,對比傳統的wait、nofity代碼,它更易於理解。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadDemo {
    public static void main(String[] args) {
        BlockingQueue shareQueue = new ArrayBlockingQueue();
        new Thread(new Producer(shareQueue)).start();
        new Thread(new Consumer(shareQueue)).start();    
    }
}

//消費者
class Consumer implements  Runnable {

    BlockingQueue shareQueue;
    public Consumer(BlockingQueue shareQueue) {
        this.shareQueue = shareQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("Consumed:" + shareQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

//生產者
class Producer implements Runnable {

    private final BlockingQueue shareQueue;
    public Producer(BlockingQueue shareQueue) {
        this.shareQueue = shareQueue;
    }

    @Override
    public void run() {
       for (int i = 0; i < 10; i++) {
          try {
            shareQueue.put(i);
            System.out.println("Produced:" + i);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
    }
}


}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM