ArrayBlockingQueue
ArrayBlockingQueue是Java多線程常用的線程安全的一個集合,基於數組實現,繼承自AbstractQueue,實現了BlockingQueue和Serializable接口。
//先看看器內部的成員變量:
private static final long serialVersionUID = -817911632652898426L;//實現了序列化接口
/** 基於數組的實現,內部持有一個Object數組 */
final Object[] items;
/** 數據讀取指針 */
int takeIndex;
/** 數據插入指針 */
int putIndex;
/** 當前隊列中元素的總數 */
int count;
/** 采用了ReentrantLock 的實現 */
final ReentrantLock lock;
/** 標識當前隊列中有可讀元素 */
private final Condition notEmpty;
/** 標識當前隊列可寫入 */
private final Condition notFull;
//可以看到,ArrayBlockingQueue內部維護了一個takeIndex指針和一個putIndex指針,分別用於讀取和寫入;一個notEmpty和一個notFull,分別用於保證寫入和讀取的線程安全,喚醒讀取和寫入線程
//再看看構造函數
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];//初始化數組
lock = new ReentrantLock(fair);//初始化ReentrantLock,並標識是否為公平鎖
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//然后來看看ArrayBlockingQueue的offer方法
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
//如果隊列滿,則添加失敗。offer方法不會阻塞,put方法會阻塞
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
//首先做空值檢查,如果為空,拋出空值異常。然后使用了ReentrantLock ,來保證offer的線程安全性。下面來看看真正的添加方法enqueue:
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
//可以看到,ArrayBlockingQueue內部維護了一個putIndex 指針,該指針指向當前隊列可以插入的位置,直接將當前的Object對象插入到inputIndex位置,然后讓inputIndex自增,如果隊列已滿,則指向第一個元素。最后元素總數加一,並喚醒讀線程
//最后我們來看讀取take方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();//take方法是阻塞的,poll方法不會阻塞,直接返回。
return dequeue();
} finally {
lock.unlock();
}
}
//可以看到,那么take方法將被阻塞。下面看看出對方法dequeue:
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;//如果取到最后一個元素,takeIndex 指向第一個元素
count--;//元素總數減一
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//喚醒寫入線程
return x;
}
以上便是ArrayBlockingQueue的基本方法,內部鎖的實現是ReentrantLock ,維護了take和put兩個指針;入隊和出對方法也都挺簡單的,需要注意的是,take和put方法是阻塞的,offer、add、poll等方法是非阻塞的
LinkedBlockingQueue
LinkedBlockingQueue基於鏈表實現,繼承了AbstractQueue,實現了序列化接口Serializable和BlockingQueue接口
//首先看看內部成員變量:
private final int capacity;
/** count用來記錄內部元素的總數 */
private final AtomicInteger count = new AtomicInteger();
/** Node節點的頭指針*/
transient Node<E> head;
/** 尾指針*/
private transient Node<E> last;
/** 讀鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** 寫鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
可以看到,與ArrayBlockQueue不同,元素總數使用了原子類AtomicInteger ,內部多維護了兩把鎖,讀鎖和寫鎖。其實現相對更加復雜
//下面看看其構造方法
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();//容量不能小於0
this.capacity = capacity;
last = head = new Node<E>(null);//初始化頭尾指針
}
//下面是offer方法
public boolean offer(E e) {
if (e == null) throw new NullPointerException();//不接受空值
final AtomicInteger count = this.count;
if (count.get() == capacity)//如果當前元素總數等於其容量大小,直接返回false
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
//我們可以看到,LingkedBlockQueue是不接受空值的。offer是非阻塞的。入隊之后,如果隊列沒有滿,喚醒其他入隊線程,並且喚醒出隊線程。
//繼續看入隊方法enqueue
private void enqueue(Node<E> node) {
last = last.next = node;
}//可以看到入隊方法相當簡單,就是把尾節點的下一個節點直接指向新加入的節點,然后將新加入的節點作為尾節點
//然后看看take方法:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();//take方法是阻塞的
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}//也挺簡單的,就是先判斷是否可以出隊,不能則等待,否則出隊,然后喚醒其他出隊線程,並喚醒入隊線程
//最后是出隊方法:
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
} // 直接將一個元素取出,然后首位元素置空
總結,從實現來看,相比ArrayBlockQueue,LinkedBlockQueue的加鎖方法相對更加復雜,但是其入隊和出隊方法更加簡單;和ArrayBlockQueue一樣,take、put方法阻塞,offer、add、poll方法不會阻塞