Java阻塞隊列(BlockingQueue)實現 生產者/消費者 示例
本文由 TonySpark 翻譯自 Javarevisited。轉載請參見文章末尾的要求。
Java.util.concurrent.BlockingQueue 是一個隊列實現類,支持這樣的操作:當從隊列中獲取或者移除元素時,如果隊列為空,需要等待,直到隊列不為空;同時如果向隊列中添加元素時,此時如果隊列無可用空間,也需要等待。
BlockingQueue 類不接收Null值,如果你試圖向隊列中存入Null值將拋出NullPointerException.
BlockingQueue的實現是線程安全的。所有隊列方法本身都是原子操作,使用並發控制的內部鎖或者其它形式。
BlockingQueue這個接口是Java集合架構的一部分,它主要用於解決生產者/消費者問題。在BlockingQueue中,我們不用擔心生產者操作時是否有可用空間或者消費者操作時是否有可用的對像而等待這樣的問題,這些都會在它的實現類中進行處理。
Java中提供了幾個對BlockingQueue的實現類,如: ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 等
在處理生產者/消費者問題上 我們將會使用ArrayBlockingQueue來實現,如下是我們需知道的重要方法:
- put(E e): 這個方法用於向隊列中插入元素,如果隊列已滿,需要等待可用的這間。
- E take(): 這個方法用於從隊列頭部獲取或者移除元素,如果隊列為空則需要等待可用的元素。
現在咱們看看用BlockingQueue來解決生產者/消費者問題。
Message
Producer產生的普通Java對象,並添加到隊列中。
Message.java
1 package com.journaldev.concurrency; 2 3 public class Message { 4 private String msg; 5 6 public Message(String str){ 7 this.msg=str; 8 } 9 10 public String getMsg() { 11 return msg; 12 } 13 14 }
Producer
Producer這個類會產生消息並將其放入隊列中。
Producer.java
package com.journaldev.concurrency; import java.util.concurrent.BlockingQueue; public class Producer implements Runnable { private BlockingQueue<Message> queue; public Producer(BlockingQueue<Message> q){ this.queue=q; } @Override public void run() { //生產消息 for(int i=0; i<100; i++){ Message msg = new Message(""+i); try { Thread.sleep(i); queue.put(msg); System.out.println("Produced "+msg.getMsg()); } catch (InterruptedException e) { e.printStackTrace(); } } //添加退出消息 Message msg = new Message("exit"); try { queue.put(msg); } catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer
Consumer類會從隊列獲取消息進行處理。如果獲取的是退出消息則結束。
Consumer.java
package com.journaldev.concurrency; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable{ private BlockingQueue<Message> queue; public Consumer(BlockingQueue<Message> q){ this.queue=q; } @Override public void run() { try{ Message msg; //獲取並處理消息直到接收到“exit”消息 while((msg = queue.take()).getMsg() !="exit"){ Thread.sleep(10); System.out.println("Consumed "+msg.getMsg()); } }catch(InterruptedException e) { e.printStackTrace(); } } }
ProducerConsumerService
生產者/消費者的服務類將會產生固定大小的BlockingQueue,生產者和消費者同時共享該BlockingQueue,該服務類會起啟動生產者和消費者線程。
ProducerConsumerService.java
1 package com.journaldev.concurrency; 2 3 4 import java.util.concurrent.ArrayBlockingQueue; 5 import java.util.concurrent.BlockingQueue; 6 7 8 public class ProducerConsumerService { 9 10 public static void main(String[] args) { 11 //創建大小為10的 BlockingQueue 12 BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10); 13 Producer producer = new Producer(queue); 14 Consumer consumer = new Consumer(queue); 15 //開啟 producer線程向隊列中生產消息 16 new Thread(producer).start(); 17 //開啟 consumer線程 中隊列中消費消息 18 new Thread(consumer).start(); 19 System.out.println("Producer and Consumer has been started"); 20 } 21 22 }
上面程序的運行結果:
1 Producer and Consumer has been started 2 Produced 0 3 Produced 1 4 Produced 2 5 Produced 3 6 Produced 4 7 Consumed 0 8 Produced 5 9 Consumed 1 10 Produced 6 11 Produced 7 12 Consumed 2 13 Produced 8 14 ...
Thread sleep 使得生產者/消費者 生產、消費這此消息有一定的延遲。
原文鏈接: Javarevisited 翻譯: TonySpark
譯文鏈接: http://www.cnblogs.com/tonyspark/p/3722013.html
[ 轉載請保留原文出處、譯者和譯文鏈接。]