阻塞隊列之LinkedBlockingQueue


  作為一個隊列,這個隊列還是蠻特殊的,今天第一次遇見,好像很有用,我決定晚上回家之后研究研究。

一:概述

  LinkedBlockingQueue內部由單鏈表實現,只能從head取元素,從tail添加元素。實現了先進先出等特性,是作為生產者消費者的首選。

  添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的,讀寫操作可以並行執行。

  LinkedBlockingQueue采用可重入鎖(ReentrantLock)來保證在並發情況下的線程安全。

  其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。

 

二:構造器

  LinkedBlockingQueue一共有三個構造器,分別是無參構造器、可以指定容量的構造器、可以穿入一個容器的構造器。

  如果在創建實例的時候調用的是無參構造器,LinkedBlockingQueue的默認容量是Integer.MAX_VALUE,這樣做很可能會導致隊列還沒有滿,但是內存卻已經滿了的情況(內存溢出)。

1 public LinkedBlockingQueue();   //設置容量為Integer.MAX
2 
3 public LinkedBlockingQueue(int capacity);  //設置指定容量
4 
5 public LinkedBlockingQueue(Collection<? extends E> c);  //穿入一個容器,如果調用該構造器,容量默認也是Integer.MAX_VALUE

  

三:常用操作

取數據

take():首選。當隊列為空時阻塞

poll():彈出隊頂元素,隊列為空時,返回空

peek():和poll烈性,返回隊隊頂元素,但頂元素不彈出。隊列為空時返回null

remove(Object o):移除某個元素,隊列為空時拋出異常。成功移除返回true

 

添加數據

put():首選。隊滿是阻塞

offer():隊滿時返回false

 

判斷隊列是否為空

size()方法會遍歷整個隊列,時間復雜度為O(n),所以最好選用isEmtpy

 

四:詳細說明

LinkedBlockingQueue的添加方法

put(e)

  該方法沒有返回值,當隊列已滿時,會阻塞當前線程

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();//先檢查添加值是否為null
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1; // 必須使用局部變量來表示隊列元素數量,負數表示操作失敗
        Node<E> node = new Node(e); //先創建新的節點
        final ReentrantLock putLock = this.putLock; //使用putLock來保證線程安全
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
 
            while (count.get() == capacity) {//當隊列已滿,添加線程阻塞
                notFull.await();
            }
            enqueue(node); // 調用enqueue方法添加到隊尾
            c = count.getAndIncrement(); //調用AtomicInteger的getAndIncrement()是數量加1
            if (c + 1 < capacity)//添加成功后判斷是否可以繼續添加,隊列未滿
                notFull.signal(); //喚醒添加線程
        } finally {
            putLock.unlock();
        }
        if (c == 0) // 添加后如果隊列中只有一個元素,喚醒一個取出線程,使用取出鎖
            signalNotEmpty();
    }

  

offer(e,timeout,unit)

  該方法返回true或false,當隊列已滿時,會阻塞給定時間,添加操作成功返回true,否則返回false

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
 
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock; //使用putLock
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) { //當隊列已滿阻塞給定時間
                if (nanos <= 0) //當時間消耗完全,操作未成功 返回false
                    return false; 
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e)); // 調用enqueue方法添加一個新的節點
            c = count.getAndIncrement(); //同樣調用AtomicInteger的方法
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true; // 操作成功返回true
    }

  

offer(e)

  該方法返回true或false,不會阻塞,直接返回

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false; //當隊列已滿,直接返回false
        int c = -1;
        Node<E> node = new Node(e); // 先創建新的節點
        final ReentrantLock putLock = this.putLock;//使用putLock
        putLock.lock();
        try {
            if (count.get() < capacity) { // 加鎖后再次判斷隊列是否已滿
                enqueue(node); //調用enqueue方法將節點添加到隊尾
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0; // 比較c的大小,判斷是否成功,當c大於-1時則添加操作成功
    }

  

 

LinkedBlockingQueue的取出方法

take()

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;//使用takeLock保證線程安全
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {//當隊列為空,取出線程阻塞
                notEmpty.await();
            }
            x = dequeue(); //掉用dequeue方法從隊頭取出元素
            c = count.getAndDecrement(); //調用AtomicInteger的getAndDecrement()將count值減1
            if (c > 1)//判斷如果當前隊列之前元素的數量大於1,喚醒取出線程
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)//之前隊列元素數量為容量值,取出一個,只能喚醒一個添加線程
            signalNotFull();
        return x;
    }

  

poll(timeout,unit)

  該方法取出元素時,如果隊列為空,則阻塞給定的時間

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;//使用takeLock保證線程安全        
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {//當隊列為空則阻塞給定時間
                if (nanos <= 0)//時間消耗完全后,如果操作未成功則返回null
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();//調用dequeue方法返回節點值
            c = count.getAndDecrement();//將count值減1
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

  

poll()

  該方法取出元素時,如果隊列為空,則直接返回null

public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)// 如果隊列為空,直接返回null
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;//使用takeLock保證線程安全
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();//調用dequeue方法取出隊頭節點元素的值
                c = count.getAndDecrement();//count減1
                if (c > 1)//如果取出元素不是唯一的,喚醒取出線程
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)//如果從已滿隊列取出的,則喚醒一個添加線程
            signalNotFull();
        return x;
    }

  

peek()

  該方法只返回隊頭元素的值,並不能將節點從隊列中刪除

public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)//如果隊列為空,則直接返回null
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

  

remove(o)

  從隊列中刪除指定元素值的節點

public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock(); //此時將入隊鎖和出隊鎖全部鎖住來保證線程安全
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {// 循環遍歷查找值相等的元素
                if (o.equals(p.item)) {
                    unlink(p, trail);//調用unlink刪除此節點
                    return true;//操作成功返回true
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

  

獲取隊列當前大小及剩余容量

size()

 public int size() {
        return count.get();
    }

  

remainingCapacity()

public int remainingCapacity() {
        return capacity - count.get();
    }

  

這兩個方法都沒有使用鎖來保證線程安全,是因為count自身為AtomicInteger對象,保證了操作的原子性

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM