Java阻塞隊列


一、阻塞隊列(BlockingQueue):用於保存等待執行的任務。在阻塞隊列中,線程阻塞的兩種情況:

1、當隊列中沒有數據的情況下,消費者端的所有線程都會被自動阻塞(掛起),直到有數據放入隊列。

  

2、當隊列中填滿數據的情況下, 生產者端的所有線程都會被自動阻塞,直到隊列中有空位置,線程被自動喚醒。

  

二、阻塞隊列的主要方法

       

  拋出異常:拋出一個異常;

  特殊值:返回一個特殊值(null或false,視情況而定)

  阻塞:在成功操作之前,一直阻塞線程

  超時:放棄前只在最大的時間內阻塞

插入操作

  1)public abstract boolean add(E paramE):將指定元素插入此隊列中(如果立即可行且不會違反容量限制),成功是返回true,如果當前沒有可用空間,則拋出異常。如果鈣元素是null,則會拋出NullPointerException異常。

  2)public abstract boolean offer(E paramE):將指定元素插入此隊列中(如果立即可行且不會違反容量限制),成功時返回true,如果當前沒有可用的空間,則返回false。

  3)public abstract void put(E paramE)throws InterruptedExcaption:將指定元素插入隊列中,將等待可用的空間(如果有必要)。

 1 public void put(E paramE) throws InterruptedException {  2  checkNotNull(paramE);  3      ReentrantLock  localReentrantLock = this.lock;  4  localReentrantLock.lockInterruptibly();  5   try {  6          while (this.count == this.items.length)  7          this.notFull.await();//如果隊列滿了,則線程阻塞等待
 8  enqueue(paramE);  9  localReentrantLock.unlock(); 10      } finally { 11  localReentrantLock.unlock(); 12  } 13  }

  4)offer(E o,long timeout, TimeUnit unit):可以設定等待的時間,如果指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。

獲取數據操作

  1)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null;

  2)poll(long timeout,TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定的時間內,隊列一旦有數據可取,則立即返回隊列中的數據。否則直到時間超時還沒有數據可取,返回失敗;

  3)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入;

  4)drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),通過該方法,可以提升獲取數據效率,不需要多次分批加鎖或釋放鎖。

三、Java中的阻塞隊列

  1. ArrayBlockingQueue:由數組結構實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下不保證訪問者公平的訪問隊列,所謂的公平訪問隊列是指阻塞的所有生產者線程或消費者線程,當隊列可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產者線程,可以先往隊列里插入元素,先阻塞的消費者線程,可以先從隊列里獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下代碼創建一個公平的阻塞隊列:

    ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,ture);

  在讀寫操作上都需要鎖住整個容器,適合於實現“生產者消費者”模式。ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。這個類是線程安全的。生產者和消費者共用一把鎖。

  源碼:

final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; public ArrayBlockintQueue(int capacity,boolean fair){ if(capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void enqueue(E x){ final Object[] items = this.items; items[putIndex] = x; if(++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } private E dequeue(){ final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E)items[takeIndex]; items[takeIndex] = null; if(++takeIndex == items.length) takeIndex = 0; count--; if(itrs!=null) itrs.elementDequeued(); notFull.signal(); return x; } public void put(E e) throws InterruptedExcepion{ checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try{ while(count == items.length) notFull.await(); enqueue(e); }finally{ lock.unlock(); } } public E take() throws InterruptedException{ final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try{ while(count ==0) notEmpty.await()l return dequeue(); }finally{ lock.unlock(); } } public boolean offer(E e){ checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try{ if(count == items.length) return false; else{ enqueue(e); return true; } }finally{ lock.unlock(); } } public E poll(){ final ReentrantLock lock = this.lock; lock.lock(); try{ return (count ==0) ? null : dequeue(); }finally{ lock.unlock(); } }

  2. LinkedBlockingQueue:由鏈表結構組成的有界阻塞隊列,內部維持着一個數據緩沖隊列(該隊列由鏈表構成)。同ArrayBlockingQueue類似,此隊列按照先進先出(FIFO)的原則對元素進行排序。

  只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者線程,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於同樣的原理。

  而LinkedBlockingQueue之所以能夠高效地處理並發數據,還因為其對於生產者端和消費者端分別采用了獨立的鎖來控制數據同步,在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。吞吐量通常要高於ArrayBlockingQueue。

    LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE)。

  源碼:

private final int capacity; private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; private final ReentrantLock tackLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondtion(); private void enqueue(Node<E> node){ last = last.next = node; } private E dequeue(){ Node<E> h =head; Node<E> first = h.next; h.next=h; head=first; E x =first.item; first.item=null; return x; } public void put(E e) throws InterruptedException{ if(e == null) throw new NullPointerExcepion(); int c =-1; Node<E>node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try{ //當隊列滿時,調用notFull.await()方法釋放鎖,陷入等待狀態。 //有兩種情況會激活該線程 //第一,某個put線程添加元素后,發現隊列有空余,就調用notFull.signal()方法激活阻塞線程 //第二,take線程取元素時,發現隊列已滿。則其取出元素后,也會調用 notFull.signal()f方法激活阻塞線程
        while(count.get() == capacity){ notFull.await(); } enqueue(node); c =count.getAndIncrement(); //發現隊列未滿,調用notFull.signal()激活阻塞的put線程(可能存在)
        if(c+1<capacity) notFull.signal(); }finally{ putLock.unlock(); } if(c == 0) signalNotEmpty(); } public E take() throws InterruptedExcepion{ E x; int c=-1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try{ while(count.get()==0){ notEmpty.await(); } x = dequeue(); c = count.getAndDectement(); if(c>1) notEmpty.signal(); }finally{ takeLock.unlock(); } if(c == capacity) signalNotFull(); return x; } public boolean offer(E e){ if(e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if(count.get() == capacity) return false; int c=-1; Node<E>node = new Node<E>(e); final ReentrantLock putLock = this.putLock(); putLock.lock(); try{ if(count.get() < capacity){ enqueue(node); c = count.getAndInrement(); if(c+1 < capacity) notFull.signal; } }finally{ putLock.unlock(); } if(c==0) signalNotEmpty(); return c>=0; } public E poll(){ final AtomicInteger count = this.count; if(count.get() == 0) return null; E x = null; int c= -1; final ReentrantLock takeLock = this.takeLock(); takeLock.lock(); try{ if(count.get()>0){ x = dequeue(); c = count.getAndDecrement(); if(c>1) notEmpty.signal(); } }finally{ takeLock.unlock(); } if(c == capacity) signalNotFull(); return x; }

  ArrayBlockingQueue和LinkedBlockingQueue的區別:

  1)隊列大小的初始化方式不同

    ArrayBlockingQueue是有界的,必須指定隊列的大小;

    LinkedBlockingQueue是分情況的,指定隊列的大小時,就是有界的;不指定隊列的大小時,默認是Integer.MAX_VALUE,看成無界隊列,但當生產速度大於消費速度時候,有可能會內存溢出。

  2)隊列中鎖的實現不同

    ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即生產和消費用的是同一個鎖;進行put和take操作,共用同一個鎖對象。也就是說,put和take無法並行執行!

    LinkedBlockingQueue實現的隊列中的鎖是分離的,即生產用的是putLock,消費是takeLock。也就是說,生成端和消費端各自獨立擁有一把鎖,避免了讀(take)和寫(put)時互相競爭鎖的情況,可並行執行。

  3)在生產或消費時操作不同

    ArrayBlockingQueue基於數組,在插入或刪除元素時,是直接將枚舉對象插入或移除的,不會產生或銷毀任何額外的對象實例;

    LinkedBlockingQueue基於鏈表,在插入或刪除元素時,需要把枚舉對象轉換為Node<E>進行插入或刪除,會生成一個額外的Node對象,這在長時間內需要高效並發地處理大批量數據的系統中,其對於GC的影響還是存在一定的區別,會影響性能。

  Put()和take()方法

    都可以實現阻塞的功能。

    Put()方法:把元素加入到阻塞隊列中,如果阻塞隊列沒有空間,則調用此方法的線程被阻塞,直到有空間的時候再繼續。

    take()方法:取出排在阻塞隊列首位的對象,若阻塞隊列為空,則調用此方法的線程被阻塞,知道有新的對象被加入的時候再繼續。

  offer()和poll()方法

    不具有阻塞的功能。

    offer()方法:把元素加入到阻塞隊列中,如果可以容納,則返回true。如果不可以容納,則返回false。

    poll()方法:取出排在阻塞隊列首位的對象,若阻塞隊列為空,則返回null,如果不為空,則返回取出來的那個元素。

  3. PriorityBlockingQueue:基於數組的且支持優先級排序的無界阻塞隊列。默認情況下元素采取升序排列。可以自定義實現compareTo() 方法來指定元素進行排序規則,或初始化PriorityBlockingQueue時,制定構造函數Comparator來對元素進行排序。需要注意的是不能保證同優先級元素的順序。

  它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。注意,不會阻塞生產者,但會阻塞消費者。PriorityBlockingQueue里面存儲的對象必須是實現Compareable接口,隊列通過這個接口的Compare方法確定對象的priority。

  隊列的元素並不是全部按優先級排序的,但是對頭的優先級肯定是最高的。每取一個頭元素時候,都會對剩余的元素做一次調整,這樣就能保證每次隊頭的元素都是優先級最高的元素。

  4. DelayQueue:使用優先級隊列實現的支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在創建元素是可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。

  該隊列的頭部是延遲期滿后保存時間最長的Delayed元素。這個對列里面所存儲的對象都帶有一個時間參數,采用take獲取數據的時候,如果時間沒有到,取不出來任何數據。而加入數據的時候,是不會阻塞的(不會阻塞生產者,但會阻塞消費者)。DelayQueue內部使用PriorityQueue實現的。DelayQueue是一個使用PriorityQueue實現的BlockingQueue,優先隊列的比較基准是時間。本質上即:DelayQueue = BolckingQueue + PriorityQueue + Delayed。

  優勢:

  如果不使用DelayQueue,那么常規的解決方法就是:使用一個后台線程,遍歷所有對象,挨個檢查。這種笨的辦法簡單好用,但是對象數量過多時,可能存在性能問題,檢查間隔時間不好設置,間隔時間過大,影響精確度,國小則存在效率問題。而且做不到按超時的時間順序處理。 

  我們可以將DelayQueue運用在以下場景中:

    1)緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。

    2)定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。

class Wanging implements Delayed{ private String name; private String id; private long endTime; public Wanging(String name,String id,long endTime){ this.name=name; this.id=id; this.endTime=endTime; } pubic String getName(){ return this.name; } public String getId(){ return this.id; } //用來判斷是否到了截止時間
    public long getDelay(TimeUnit unit){ return endTime - System.currentTimeMillis(); } //相互比較排序用
    public int compareTo(Delayed o){ Wanging jia = (Wanging) o; return endTime - jia.endTime > 0? 1:0; } } public class WangBa implements Runnable{ private DelayQueue<Wanging> queue = new DelayQueue<Wanging>(); public boolean yinye = true; public void shangji(String name,String id,int money){ Wanging man = new Wanging(name,id,1000*60*money+System.currentTimeMillis()); System.out.println("網名"+man.getName()+"身份證"+man.getId()+"交錢"+money+"塊,開始上機..."); this.queue.add(man); } public void xiaji(Wanging man){ System.out.println("網名"+man.getName()+"身份證"+man.getId()+"時間到下機..."); } public void run(){ while(yinye){ try{ System.out.println("檢查ing"); Wanging man = queue.take(); xiaji(man); }catch(InterruptedException e){ e.printStackTrace(); } } } public static void main(String[] args){ try{ System.out.println("網吧開始營業"); WangBa siyu = new WangBa(); Thread shangwang = new Thread(siyu); shangwang.start(); siyu.shangji("路人甲","123",1); siyu.shangji("路人乙","234",2); siyu.shangji("路人丙","345",3); }catch(Exception e){ } } }

  5. SynchronousQueue:是一個不存儲元素的阻塞隊列,它的size()方法總是返回0。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態。可以認為SynchronousQueue是一個緩存值為1的阻塞隊列。SynchronousQueue負責把生產者線程處理的數據直接傳遞給消費者線程。隊列本身並不存儲任何元素,非常適合於傳遞性場景,比如在一個線程中使用的數據,傳遞給另外一個線程使用,SynchronousQueue的吞吐量高於LinkedBlockingQueue和ArrayBlockingQueue。

  6. LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列。相對於其他阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。

    1)transfer方法:如果當前有消費者正在等待接收元素(消費者使用take方法或帶時間限制的poll方法時),transfer方法可以把生產者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節點,並等待該元素被消費者消費了才返回。

    2)tryTransfer方法:是用來試探下生產者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回false。tryTransfer方法與transfer方法的區別是tryTransfer方法無論消費者是否接收,方法立即返回,而transfer方法是必須等到消費者消費了才返回。對於帶有時間限制的tryTransfer(E o,long timeout,TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再返回,如果超時還沒消費元素,則返回false,如果在超市時間內消費了元素,則返回true。

  7.LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指可以從隊列的兩端插入和移除元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結尾的方法,表示插入、獲取(peek)或移除雙端隊列第一個元素。以Last單詞結尾的方法,表示插入、獲取或移除雙端隊列的最后一個元素。另外插入方法add等同於addLast,移除方法remove等同於removeFirst。但是take方法卻等同於takeFirst,使用時還是用帶有First和Last后綴的方法更清楚。在初始化LinkedBlockingQueue時可以設置容量防止其過度膨脹。另外雙向阻塞隊列可以運用在“工作竊取”模式中。

 

編程實現一個最大元素為100的阻塞隊列。

Lock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();

Object[] items = new Object[100];
int putptr,takeptr,count;

public void put(Object x) throws InterruptedException{
    lock.lock();
    try{
        while(count == items.length)
            notFull.await();
        items[putptr]=x;
        if(++putptr == items.length)
            putptr = 0;
        ++count;
        notEmpty.signal(); 
    }finally{
        lock.unlock();
    }
}

public Object take() throws InterruptedException{
    lock.lock();
    try{
        while(count == 0)
            notEmpty.await();
        Object x =items[takeptr];
        if(++takeptr == items.length)
            takeptr = 0;
        --count;
        notFull.signal();
        return x;
    }finally{
        lock.unlock();
    }
}

設計一個雙緩沖阻塞隊列

  在服務器開發中,通常的做法是把邏輯處理線程和I/O處理線程分離。

  邏輯處理線程:對接收的包進行邏輯處理。

  I/O處理線程:網絡數據的發送和接收,連接的建立和維護。

  通常邏輯處理線程和I/O處理線程是通過數據隊列來交換數據,就是生產者--消費者模型。

  這個數據隊列是多個線程在共享,每次訪問都需要加鎖,因此如何減少互斥/同步的開銷就顯得尤為重要。解決方案:雙緩沖隊列。

  兩個隊列,將讀寫分離,一個給邏輯線程讀,一個給IO線程用來寫,當邏輯線程讀完隊列后會將自己的隊列與IO線程的隊列相調換。這里需要加鎖的地方有兩個,一個是IO線程每次寫隊列時都要加鎖,另一個是邏輯線程在調換隊列時也需要加鎖,但邏輯線程在讀隊列時是不需要加鎖的。如果是一塊緩沖區,讀、寫操作是不分離的,雙緩沖區起碼節省了單緩沖區時讀部分操作互斥/同步的開銷。本質是采用空間換時間的優化思路。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM