並發隊列之ArrayBlockingQueue


  上一篇我們說了並發隊列中的LinkedBlockingQueue隊列,這次我們看看ArrayBlockingQueue,看看名字,我們想象一下LinkedList和ArrayList的區別,我們可以知道ArrayBlockingQueue底層肯定是基於數組實現的,這是一個有界數組;

  ArrayBlockingQueue其中的組成部分和LinkedBlockingQueue及其相似,也是有兩個條件變量,維護阻塞隊列,實現了生產消費者模式;

 

一.簡單認識ArrayBlockingQueue

  先看看幾個常用屬性:

//數組用於存放隊列元素
final Object[] items;
//出隊索引
int takeIndex;
//入隊索引
int putIndex;
//隊列中元素數量
int count;
//獨占鎖
final ReentrantLock lock;
//如果數組中為空,還有線程取數據,就丟到這個條件變量中來阻塞
private final Condition notEmpty;
//隊列滿了,還有線程往數組中添加數據,就把線程丟到這里來阻塞
private final Condition notFull;

 

 

  由於這是一個有界的數組,我們再看看構造器:

//指定容量,默認是非公平策略
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
//指定容量和獨占鎖的策略
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
//可以指定容量,鎖的策略,還有初始化數據
public ArrayBlockingQueue(int capacity, boolean fair,
                            Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

 

 

二.offer方法

  向隊列尾部添加一個元素,添加成功就返回true,隊列滿了就丟掉當前元素直接返回false,方法不阻塞;

public boolean offer(E e) {
    //非空檢驗
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果數組中實際數量和最大容量相等,添加失敗,返回false
        if (count == items.length)
            return false;
        else {
            //添加成功,方法實現在下面
            enqueue(e);
            return true;
        }
    } finally {
        //釋放鎖
        lock.unlock();
    }
}
private void enqueue(E x) {
   //拿到數組
    final Object[] items = this.items;
    //在putIndex這個位置放入數據x,然后把putIndex加一,說明這個參數表示的是下一個數據要放入的位置的索引
    items[putIndex] = x;
    //這里putIndex是先加一然后再比較是否相等,比如這里數組的最大容量是5,那么索引的最大值應該是4,而如果putIndex等於5了,說明數組
    //越界了,加把這個索引重置為0
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    //添加完成之后,說明了數組中有數據了,這里會喚醒之前因為去數組中取數據而阻塞的線程
    notEmpty.signal();
}

 

 

三.put方法

  向隊列尾部插入一個元素,隊列有空閑就插入成功返回true,隊列滿了就阻塞當前線程到notFull的條件隊列中,等有空閑之后就會被喚醒;阻塞過程中對中斷會有響應的;

public void put(E e) throws InterruptedException {
    //非空檢查
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    //注意該鎖的獲取方式
    lock.lockInterruptibly();
    try {
        //如果線程滿了,就把當前線程放到notFull條件變量的阻塞隊列中
        while (count == items.length)
            notFull.await();
        //沒有滿,就添加數據
        enqueue(e);
    } finally {
        //釋放鎖
        lock.unlock();
    }
}

 

 

四.poll方法

  頭部獲取並移除一個元素,如果隊列為空,就返回null,方法不阻塞;

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果隊列為空,就返回null
        //如果隊列不為空,就調用dequeue方法獲取並刪除隊列頭部的元素
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    //獲取數組
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    //獲取takeIndex位置的元素,最后會將這個返回
    E x = (E) items[takeIndex];
    //然后將takeInde位置置為空
    items[takeIndex] = null;
    //如果takeIndex已經是數組的最后一個位置了,就將takeIndex重置為0
    if (++takeIndex == items.length)
        takeIndex = 0;
    //實際數量減一
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //喚醒notFull中線程
    notFull.signal();
    return x;
}

 

 

五.take方法

  獲取並刪除當前隊列頭部的元素,如果隊列為空當前線程阻塞直到被喚醒,對中斷有響應;

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //可中斷的方式獲取鎖
        lock.lockInterruptibly();
        try {
            //如果數組為空,此時就喚醒notEmpty中條件隊列里的線程
            while (count == 0)
                notEmpty.await();
            //獲取並刪除頭節點
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

 

 

六.peek方法

  只是獲取頭部元素,不刪除,如果隊列為空就返回null,這個方法是線程不阻塞的

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

//獲取到數組中索引為takeIndex中的數據
@SuppressWarnings("unchecked")
final E itemAt(int i) {
    return (E) items[i];
}

 

 

七.總結

  理解了上一篇博客中說的LinkedBlockingQueue,那么再看這一篇其實太容易了,就是操作數組嘛!用下面這個圖表示:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM