上一篇我們說了並發隊列中的LinkedBlockingQueue隊列,這次我們看看ArrayBlockingQueue,看看名字,我們想象一下LinkedList和ArrayList的區別,我們可以知道ArrayBlockingQueue底層肯定是基於數組實現的,這是一個有界數組;
ArrayBlockingQueue其中的組成部分和LinkedBlockingQueue及其相似,也是有兩個條件變量,維護阻塞隊列,實現了生產消費者模式;
一.簡單認識ArrayBlockingQueue
先看看幾個常用屬性:
//數組用於存放隊列元素 final Object[] items; //出隊索引 int takeIndex; //入隊索引 int putIndex; //隊列中元素數量 int count; //獨占鎖 final ReentrantLock lock; //如果數組中為空,還有線程取數據,就丟到這個條件變量中來阻塞 private final Condition notEmpty; //隊列滿了,還有線程往數組中添加數據,就把線程丟到這里來阻塞 private final Condition notFull;
由於這是一個有界的數組,我們再看看構造器:
//指定容量,默認是非公平策略 public ArrayBlockingQueue(int capacity) { this(capacity, false); } //指定容量和獨占鎖的策略 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //可以指定容量,鎖的策略,還有初始化數據 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
二.offer方法
向隊列尾部添加一個元素,添加成功就返回true,隊列滿了就丟掉當前元素直接返回false,方法不阻塞;
public boolean offer(E e) { //非空檢驗 checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { //如果數組中實際數量和最大容量相等,添加失敗,返回false if (count == items.length) return false; else { //添加成功,方法實現在下面 enqueue(e); return true; } } finally { //釋放鎖 lock.unlock(); } } private void enqueue(E x) { //拿到數組 final Object[] items = this.items; //在putIndex這個位置放入數據x,然后把putIndex加一,說明這個參數表示的是下一個數據要放入的位置的索引 items[putIndex] = x; //這里putIndex是先加一然后再比較是否相等,比如這里數組的最大容量是5,那么索引的最大值應該是4,而如果putIndex等於5了,說明數組 //越界了,加把這個索引重置為0 if (++putIndex == items.length) putIndex = 0; count++; //添加完成之后,說明了數組中有數據了,這里會喚醒之前因為去數組中取數據而阻塞的線程 notEmpty.signal(); }
三.put方法
向隊列尾部插入一個元素,隊列有空閑就插入成功返回true,隊列滿了就阻塞當前線程到notFull的條件隊列中,等有空閑之后就會被喚醒;阻塞過程中對中斷會有響應的;
public void put(E e) throws InterruptedException { //非空檢查 checkNotNull(e); final ReentrantLock lock = this.lock; //注意該鎖的獲取方式 lock.lockInterruptibly(); try { //如果線程滿了,就把當前線程放到notFull條件變量的阻塞隊列中 while (count == items.length) notFull.await(); //沒有滿,就添加數據 enqueue(e); } finally { //釋放鎖 lock.unlock(); } }
四.poll方法
頭部獲取並移除一個元素,如果隊列為空,就返回null,方法不阻塞;
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //如果隊列為空,就返回null //如果隊列不為空,就調用dequeue方法獲取並刪除隊列頭部的元素 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } private E dequeue() { //獲取數組 final Object[] items = this.items; @SuppressWarnings("unchecked") //獲取takeIndex位置的元素,最后會將這個返回 E x = (E) items[takeIndex]; //然后將takeInde位置置為空 items[takeIndex] = null; //如果takeIndex已經是數組的最后一個位置了,就將takeIndex重置為0 if (++takeIndex == items.length) takeIndex = 0; //實際數量減一 count--; if (itrs != null) itrs.elementDequeued(); //喚醒notFull中線程 notFull.signal(); return x; }
五.take方法
獲取並刪除當前隊列頭部的元素,如果隊列為空當前線程阻塞直到被喚醒,對中斷有響應;
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //可中斷的方式獲取鎖 lock.lockInterruptibly(); try { //如果數組為空,此時就喚醒notEmpty中條件隊列里的線程 while (count == 0) notEmpty.await(); //獲取並刪除頭節點 return dequeue(); } finally { lock.unlock(); } }
六.peek方法
只是獲取頭部元素,不刪除,如果隊列為空就返回null,這個方法是線程不阻塞的
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } //獲取到數組中索引為takeIndex中的數據 @SuppressWarnings("unchecked") final E itemAt(int i) { return (E) items[i]; }
七.總結
理解了上一篇博客中說的LinkedBlockingQueue,那么再看這一篇其實太容易了,就是操作數組嘛!用下面這個圖表示: