JAVA線程隊列BlockingQueue
介紹
BlockingQueue阻塞隊列,顧名思義,首先它是一個隊列,通過一個共享的隊列,可以使得數據由隊列的一端輸入,從另外一端輸出.
常用的隊列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同類型的隊列,DelayQueue就是其中的一種)
- 先進先出(FIFO):先插入的隊列的元素也最先出隊列,類似於排隊的功能。從某種程度上來說這種隊列也體現了一種公平性。
- 后進先出(LIFO):后插入隊列的元素最先出隊列,這種隊列優先處理最近發生的事件。
多線程環境中,通過隊列可以很容易實現數據共享,比如經典的“生產者”和“消費者”模型中,通過隊列可以很便利地實現兩者之間的數據共享。假設我們有若干生產者線程,另外又有若干個消費者線程。如果生產者線程需要把准備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就可以很方便地解決他們之間的數據共享問題。但如果生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的情況呢?理想情況下,如果生產者產出數據的速度大於消費者消費的速度,並且當生產出來的數據累積到一定程度的時候,那么生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。然而,在concurrent包發布以前,在多線程環境下,我們每個程序員都必須去自己控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們的程序帶來不小的復雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒)
常用方法
作為BlockingQueue的使用者,我們再也不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下它的常用方法:
放入數據:
-
offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,
則返回true,否則返回false.(本方法不阻塞當前執行方法的線程) -
offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往隊列中
加入BlockingQueue,則返回失敗。 -
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷
直到BlockingQueue里面有空間再繼續. -
add(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則拋出一個IIIegaISlabEepeplian異常
獲取數據:
- poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,
取不到時返回null; - poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,
隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗。 - take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到
BlockingQueue有新的數據被加入; - remove(): 從BlockingQueue取出一個隊首的對象,如果隊列為空,則拋出一個NoSuchElementException異常
- peek(): 返回隊列頭部的元素.如果隊列為空,則返回null
- drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),
通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。
BlockingQueue成員詳細介紹
ArrayBlockingQueue
基於數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。
ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味着兩者無法真正並行運行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現生產者和消費者操作的完全並行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數據寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內需要高效並發地處理大批量數據的系統中,其對於GC的影響還是存在一定的區別。而在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。
LinkedBlockingQueue
基於鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也維持着一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理並發數據,還因為其對於生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味着在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。
下面使用生產者消費者舉例說明:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 聲明一個容量為10的緩存隊列
BlockingQueue queue = new LinkedBlockingQueue(2);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 啟動線程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
// 執行10s
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Consumer implements Runnable {
private BlockingQueue queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
System.out.println("啟動消費者線程!");
Random r = new Random();
boolean isRunning = true;
try {
while (isRunning) {
System.out.println("正從隊列獲取數據...");
String data = (String) queue.poll(2, TimeUnit.SECONDS);
if (null != data) {
System.out.println("拿到數據:" + data);
System.out.println("正在消費數據:" + data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
} else {
// 超過2s還沒數據,認為所有生產線程都已經退出,自動退出消費線程。
isRunning = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消費者線程!");
}
}
}
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
String data = null;
Random r = new Random();
System.out.println("啟動生產者線程!");
try {
while (isRunning) {
System.out.println("正在生產數據...");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data = "data:" + count.incrementAndGet();
System.out.println("將數據:" + data + "放入隊列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("放入數據失敗:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生產者線程!");
}
}
public void stop() {
isRunning = false;
}
}
上面這個例子中使用了offer
和poll
,這兩個是非阻塞的函數。
PriorityBlockingQueue
PriorityBlockingQueue里面存儲的對象必須是實現Comparable接口。隊列通過這個接口的compare方法確定對象的priority。
規則是:當前和其他對象比較,如果compare方法返回負數,那么在隊列里面的優先級就比較搞。
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
public class PriorityQueueTest {
static Random r=new Random(47);
public static void main(String args[]) throws InterruptedException{
final PriorityBlockingQueue q=new PriorityBlockingQueue();
ExecutorService se=Executors.newCachedThreadPool();
//execute producer
se.execute(new Runnable(){
public void run() {
int i=0;
while(true){
q.put(new PriorityEntity(r.nextInt(10),i++));
try {
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//execute consumer
se.execute(new Runnable(){
public void run() {
while(true){
try {
System.out.println("take-- "+q.take()+" left:-- ["+q.toString()+"]");
try {
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("shutdown");
}
}
class PriorityEntity implements Comparable<PriorityEntity> {
private static int count=0;
private int id=count++;
private int priority;
private int index=0;
public PriorityEntity(int _priority,int _index) {
this.priority = _priority;
this.index=_index;
}
public String toString(){
return id+"# [index="+index+" priority="+priority+"]";
}
//數字小,優先級高
public int compareTo(PriorityEntity o) {
return this.priority > o.priority ? 1
: this.priority < o.priority ? -1 : 0;
}
//數字大,優先級高
// public int compareTo(PriorityTask o) {
// return this.priority < o.priority ? 1
// : this.priority > o.priority ? -1 : 0;
// }
}
http://blog.sina.com.cn/s/blog_6145ed8101010q1y.html
http://m635674608.iteye.com/blog/1739860