作為一個隊列,這個隊列還是蠻特殊的,今天第一次遇見,好像很有用,我決定晚上回家之后研究研究。
一:概述
LinkedBlockingQueue內部由單鏈表實現,只能從head取元素,從tail添加元素。實現了先進先出等特性,是作為生產者消費者的首選。
添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的,讀寫操作可以並行執行。
LinkedBlockingQueue采用可重入鎖(ReentrantLock)來保證在並發情況下的線程安全。
其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。
二:構造器
LinkedBlockingQueue一共有三個構造器,分別是無參構造器、可以指定容量的構造器、可以穿入一個容器的構造器。
如果在創建實例的時候調用的是無參構造器,LinkedBlockingQueue的默認容量是Integer.MAX_VALUE,這樣做很可能會導致隊列還沒有滿,但是內存卻已經滿了的情況(內存溢出)。
1 public LinkedBlockingQueue(); //設置容量為Integer.MAX 2 3 public LinkedBlockingQueue(int capacity); //設置指定容量 4 5 public LinkedBlockingQueue(Collection<? extends E> c); //穿入一個容器,如果調用該構造器,容量默認也是Integer.MAX_VALUE
三:常用操作
取數據
take():首選。當隊列為空時阻塞
poll():彈出隊頂元素,隊列為空時,返回空
peek():和poll烈性,返回隊隊頂元素,但頂元素不彈出。隊列為空時返回null
remove(Object o):移除某個元素,隊列為空時拋出異常。成功移除返回true
添加數據
put():首選。隊滿是阻塞
offer():隊滿時返回false
判斷隊列是否為空
size()方法會遍歷整個隊列,時間復雜度為O(n),所以最好選用isEmtpy
四:詳細說明
LinkedBlockingQueue的添加方法
put(e)
該方法沒有返回值,當隊列已滿時,會阻塞當前線程
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException();//先檢查添加值是否為null // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; // 必須使用局部變量來表示隊列元素數量,負數表示操作失敗 Node<E> node = new Node(e); //先創建新的節點 final ReentrantLock putLock = this.putLock; //使用putLock來保證線程安全 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) {//當隊列已滿,添加線程阻塞 notFull.await(); } enqueue(node); // 調用enqueue方法添加到隊尾 c = count.getAndIncrement(); //調用AtomicInteger的getAndIncrement()是數量加1 if (c + 1 < capacity)//添加成功后判斷是否可以繼續添加,隊列未滿 notFull.signal(); //喚醒添加線程 } finally { putLock.unlock(); } if (c == 0) // 添加后如果隊列中只有一個元素,喚醒一個取出線程,使用取出鎖 signalNotEmpty(); }
offer(e,timeout,unit)
該方法返回true或false,當隊列已滿時,會阻塞給定時間,添加操作成功返回true,否則返回false
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; //使用putLock final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { //當隊列已滿阻塞給定時間 if (nanos <= 0) //當時間消耗完全,操作未成功 返回false return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); // 調用enqueue方法添加一個新的節點 c = count.getAndIncrement(); //同樣調用AtomicInteger的方法 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; // 操作成功返回true }
offer(e)
該方法返回true或false,不會阻塞,直接返回
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; //當隊列已滿,直接返回false int c = -1; Node<E> node = new Node(e); // 先創建新的節點 final ReentrantLock putLock = this.putLock;//使用putLock putLock.lock(); try { if (count.get() < capacity) { // 加鎖后再次判斷隊列是否已滿 enqueue(node); //調用enqueue方法將節點添加到隊尾 c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; // 比較c的大小,判斷是否成功,當c大於-1時則添加操作成功 }
LinkedBlockingQueue的取出方法
take()
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock;//使用takeLock保證線程安全 takeLock.lockInterruptibly(); try { while (count.get() == 0) {//當隊列為空,取出線程阻塞 notEmpty.await(); } x = dequeue(); //掉用dequeue方法從隊頭取出元素 c = count.getAndDecrement(); //調用AtomicInteger的getAndDecrement()將count值減1 if (c > 1)//判斷如果當前隊列之前元素的數量大於1,喚醒取出線程 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity)//之前隊列元素數量為容量值,取出一個,只能喚醒一個添加線程 signalNotFull(); return x; }
poll(timeout,unit)
該方法取出元素時,如果隊列為空,則阻塞給定的時間
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock;//使用takeLock保證線程安全 takeLock.lockInterruptibly(); try { while (count.get() == 0) {//當隊列為空則阻塞給定時間 if (nanos <= 0)//時間消耗完全后,如果操作未成功則返回null return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue();//調用dequeue方法返回節點值 c = count.getAndDecrement();//將count值減1 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
poll()
該方法取出元素時,如果隊列為空,則直接返回null
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0)// 如果隊列為空,直接返回null return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock;//使用takeLock保證線程安全 takeLock.lock(); try { if (count.get() > 0) { x = dequeue();//調用dequeue方法取出隊頭節點元素的值 c = count.getAndDecrement();//count減1 if (c > 1)//如果取出元素不是唯一的,喚醒取出線程 notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity)//如果從已滿隊列取出的,則喚醒一個添加線程 signalNotFull(); return x; }
peek()
該方法只返回隊頭元素的值,並不能將節點從隊列中刪除
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null)//如果隊列為空,則直接返回null return null; else return first.item; } finally { takeLock.unlock(); } }
remove(o)
從隊列中刪除指定元素值的節點
public boolean remove(Object o) { if (o == null) return false; fullyLock(); //此時將入隊鎖和出隊鎖全部鎖住來保證線程安全 try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {// 循環遍歷查找值相等的元素 if (o.equals(p.item)) { unlink(p, trail);//調用unlink刪除此節點 return true;//操作成功返回true } } return false; } finally { fullyUnlock(); } }
獲取隊列當前大小及剩余容量
size()
public int size() { return count.get(); }
remainingCapacity()
public int remainingCapacity() { return capacity - count.get(); }
這兩個方法都沒有使用鎖來保證線程安全,是因為count自身為AtomicInteger對象,保證了操作的原子性