ArrayBlockingQueue 和LinkedBlockQueue


ArrayBlockingQueue

ArrayBlockingQueue是Java多線程常用的線程安全的一個集合,基於數組實現,繼承自AbstractQueue,實現了BlockingQueue和Serializable接口。
//先看看器內部的成員變量:

private static final long serialVersionUID = -817911632652898426L;//實現了序列化接口

/** 基於數組的實現,內部持有一個Object數組 */
final Object[] items;

/** 數據讀取指針 */
int takeIndex;

/** 數據插入指針 */
int putIndex;

/** 當前隊列中元素的總數 */
int count;

/** 采用了ReentrantLock 的實現 */
final ReentrantLock lock;

/** 標識當前隊列中有可讀元素 */
private final Condition notEmpty;

/** 標識當前隊列可寫入 */
private final Condition notFull;

//可以看到,ArrayBlockingQueue內部維護了一個takeIndex指針和一個putIndex指針,分別用於讀取和寫入;一個notEmpty和一個notFull,分別用於保證寫入和讀取的線程安全,喚醒讀取和寫入線程
//再看看構造函數
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];//初始化數組
    lock = new ReentrantLock(fair);//初始化ReentrantLock,並標識是否為公平鎖
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

//然后來看看ArrayBlockingQueue的offer方法

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            //如果隊列滿,則添加失敗。offer方法不會阻塞,put方法會阻塞
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
//首先做空值檢查,如果為空,拋出空值異常。然后使用了ReentrantLock ,來保證offer的線程安全性。下面來看看真正的添加方法enqueue:
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}
//可以看到,ArrayBlockingQueue內部維護了一個putIndex 指針,該指針指向當前隊列可以插入的位置,直接將當前的Object對象插入到inputIndex位置,然后讓inputIndex自增,如果隊列已滿,則指向第一個元素。最后元素總數加一,並喚醒讀線程
//最后我們來看讀取take方法:
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();//take方法是阻塞的,poll方法不會阻塞,直接返回。
        return dequeue();
    } finally {
        lock.unlock();
    }
}
//可以看到,那么take方法將被阻塞。下面看看出對方法dequeue:
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;//如果取到最后一個元素,takeIndex 指向第一個元素
    count--;//元素總數減一
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();//喚醒寫入線程
    return x;
}
以上便是ArrayBlockingQueue的基本方法,內部鎖的實現是ReentrantLock ,維護了take和put兩個指針;入隊和出對方法也都挺簡單的,需要注意的是,take和put方法是阻塞的,offer、add、poll等方法是非阻塞的

LinkedBlockingQueue

LinkedBlockingQueue基於鏈表實現,繼承了AbstractQueue,實現了序列化接口Serializable和BlockingQueue接口
 //首先看看內部成員變量:

private final int capacity;

/** count用來記錄內部元素的總數 */
private final AtomicInteger count = new AtomicInteger();

/** Node節點的頭指針*/
transient Node<E> head;

/** 尾指針*/
private transient Node<E> last;

/** 讀鎖 */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** 寫鎖 */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
可以看到,與ArrayBlockQueue不同,元素總數使用了原子類AtomicInteger ,內部多維護了兩把鎖,讀鎖和寫鎖。其實現相對更加復雜
//下面看看其構造方法
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();//容量不能小於0
    this.capacity = capacity;
    last = head = new Node<E>(null);//初始化頭尾指針
}
//下面是offer方法
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();//不接受空值
    final AtomicInteger count = this.count;
    if (count.get() == capacity)//如果當前元素總數等於其容量大小,直接返回false
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}
//我們可以看到,LingkedBlockQueue是不接受空值的。offer是非阻塞的。入隊之后,如果隊列沒有滿,喚醒其他入隊線程,並且喚醒出隊線程。
//繼續看入隊方法enqueue
private void enqueue(Node<E> node) {
    last = last.next = node;
}//可以看到入隊方法相當簡單,就是把尾節點的下一個節點直接指向新加入的節點,然后將新加入的節點作為尾節點

//然后看看take方法:
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();//take方法是阻塞的
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}//也挺簡單的,就是先判斷是否可以出隊,不能則等待,否則出隊,然后喚醒其他出隊線程,並喚醒入隊線程
//最后是出隊方法:
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
} // 直接將一個元素取出,然后首位元素置空
總結,從實現來看,相比ArrayBlockQueue,LinkedBlockQueue的加鎖方法相對更加復雜,但是其入隊和出隊方法更加簡單;和ArrayBlockQueue一樣,take、put方法阻塞,offer、add、poll方法不會阻塞


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM