一、隊列的定義
我們都知道隊列(Queue)是一種先進先出(FIFO)的數據結構,Java中定義了java.util.Queue
接口用來表示隊列。Java中的Queue
與List
、Set
屬於同一個級別接口,它們都是繼承於Collection
接口。
Java中還定義了一種雙端隊列java.util.Deque
,我們常用的LinkedList
就是實現了Deque
接口。
下面我們看一下類的定義:
Queue & Deque
public interface Queue<E> extends Collection<E> {
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
public interface Deque<E> extends Queue<E> {
void addFirst(E e);
void addLast(E e);
boolean offerFirst(E e);
boolean offerLast(E e);
E removeFirst();
E removeLast();
E pollFirst();
E pollLast();
E getFirst();
E getLast();
E peekFirst();
E peekLast();
boolean removeFirstOccurrence(Object o);
boolean removeLastOccurrence(Object o);
// *** Queue methods ***
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
// *** Stack methods ***
void push(E e);
E pop();
// *** Collection methods ***
boolean remove(Object o);
boolean contains(Object o);
public int size();
Iterator<E> iterator();
Iterator<E> descendingIterator();
}
二、隊列的實現
Java中對於隊列的實現分為非阻塞和阻塞兩種。
$ 非阻塞隊列分為如下:
-
LinkedList
LinkedList
是雙相鏈表結構,在添加和刪除元素時具有比ArrayList
更好的性能。但在 Get 與 Set 方面弱於ArrayList
。當然,這些對比都是指數據量很大或者操作很頻繁的情況下的對比。 -
PriorityQueue
PriorityQueue維護了一個有序列表,存儲到隊列中的元素會按照自然順序排列。當然,我們也可以給它指定一個實現了
java.util.Comparator
接口的排序類來指定元素排列的順序。 -
ConcurrentLinkedQueue
ConcurrentLinkedQueue
是基於鏈接節點的並且線程安全的隊列。因為它在隊列的尾部添加元素並從頭部刪除它們,所以只要不需要知道隊列的大小ConcurrentLinkedQueue
對公共集合的共享訪問就可以工作得很好。收集關於隊列大小的信息會很慢,需要遍歷隊列。
$ 阻塞隊列分為如下:
阻塞隊列定義在了java.util.concurrent
包中,java.util.concurrent.BlockingQueue
繼承了Queue
接口,它有 5 個實現類,分別是:
-
ArrayBlockingQueue
一個內部由數組支持的有界隊列。初始化時必須指定隊列的容量,還可以設置內部的ReentrantLock是否使用公平鎖。但是公平性會使你在性能上付出代價,只有在的確非常需要的時候再使用它。它是基於數組的阻塞循環隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
它的思想就是如果BlockQueue是空的,那么從BlockingQueue取東西的操作將會被阻斷進入等待狀態,直到BlockingQueue進了東西才會被喚醒。同樣,如果BlockingQueue是滿的,任何試圖往里存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue里有空間才會被喚醒繼續操作。
-
**LinkedBlockingQueue **
一個內部由鏈接節點支持的可選有界隊列。初始化時不需要指定隊列的容量,默認是
Integer.MAX_VALUE
,也可以看成容量無限大。此隊列按 FIFO(先進先出)排序元素 。 -
PriorityBlockingQueue
一個內部由優先級堆支持的無界優先級隊列。PriorityBlockingQueue是對 PriorityQueue的再次包裝,隊列中的元素按優先級順序被移除。
-
DelayQueue
一個內部由優先級堆支持的、基於時間的調度隊列。隊列中存放Delayed元素,只有在延遲期滿后才能從隊列中提取元素。當一個元素的getDelay()方法返回值小於等於0時才能從隊列中poll中元素,否則poll()方法會返回null。
-
SynchronousQueue
一個利用 BlockingQueue 接口的簡單聚集(rendezvous)機制。
下面簡單介紹一下其中常用的方法:
- add 增加一個元索 如果隊列已滿,則拋出一個IIIegaISlabEepeplian異常
- remove 移除並返回隊列頭部的元素 如果隊列為空,則拋出一個NoSuchElementException異常
- element 返回隊列頭部的元素 如果隊列為空,則拋出一個NoSuchElementException異常
- offer 添加一個元素並返回true 如果隊列已滿,則返回false
- poll 移除並返問隊列頭部的元素 如果隊列為空,則返回null
- peek 返回隊列頭部的元素 如果隊列為空,則返回null
- put 添加一個元素 如果隊列滿,則阻塞
- take 移除並返回隊列頭部的元素 如果隊列為空,則阻塞
三、示例
package com.ysc.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args) {
final BlockingQueue queue = new ArrayBlockingQueue(3);
for(int i=0;i<2;i++){
new Thread(){
public void run(){
while(true){
try {
Thread.sleep((long)(Math.random()*1000));
System.out.println(Thread.currentThread().getName() + "准備放數據!");
queue.put(1);
System.out.println(Thread.currentThread().getName() + "已經放了數據," +
"隊列目前有" + queue.size() + "個數據");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
new Thread(){
public void run(){
while(true){
try {
//將此處的睡眠時間分別改為100和1000,觀察運行結果
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "准備取數據!");
queue.take();
System.out.println(Thread.currentThread().getName() + "已經取走數據," +
"隊列目前有" + queue.size() + "個數據");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}
package com.ysc.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueueCondition {
public static void main(String[] args) {
ExecutorService service = Executors.newSingleThreadExecutor();
final Business business = new Business();
service.execute(new Runnable(){
@Override
public void run() {
for(int i=0;i<50;i++){
business.sub();
}
}
});
for(int i=0;i<50;i++){
business.main();
}
}
}
class Business {
BlockingQueue subQueue = new ArrayBlockingQueue(1);
BlockingQueue mainQueue = new ArrayBlockingQueue(1);
//這里是匿名構造方法,只要new一個對象都會調用這個匿名構造方法,它與靜態塊不同,靜態塊只會執行一次,
//在類第一次加載到JVM的時候執行
//這里主要是讓main線程首先put一個,就有東西可以取,如果不加這個匿名構造方法put一個的話程序就死鎖了
{
try {
mainQueue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void sub(){
try {
mainQueue.take();
for(int i=0;i<10;i++){
System.out.println(Thread.currentThread().getName() + " : " + i);
}
subQueue.put(1);
} catch (Exception e){
}
}
public void main() {
try {
subQueue.take();
for(int i=0;i<5;i++){
System.out.println(Thread.currentThread().getName() + " : " + i);
}
mainQueue.put(1);
} catch (Exception e){
}
}
}