阻塞隊列之五:LinkedBlockingQueue


一、LinkedBlockingQueue簡介

  LinkedBlockingQueue是一個使用鏈表完成隊列操作的阻塞隊列。鏈表是單向鏈表,而不是雙向鏈表。采用對於的next構成鏈表的方式來存儲對象。由於讀只操作隊頭,而寫只操作隊尾,這里巧妙地采用了兩把鎖,對put和offer采用putLock,對take和poll采用takeLock,即為寫鎖和讀鎖,這兩個鎖實現阻塞(“two lock queue” algorithm)。避免了讀寫時相互競爭鎖的現象,因為其對於生產者端和消費者端分別采用了獨立的鎖來控制數據同步,因此LinkedBlockingQueue在高並發讀寫操作都多的情況下,性能會比ArrayBlockingQueue好很多在遍歷以及刪除元素時則要把兩把鎖都鎖住

  可選的容量范圍構造方法參數作為防止隊列過度擴展的一種方法。如果未指定容量,則它等於 Integer.MAX_VALUE。除非插入節點會使隊列超出容量,否則每次插入后會動態地創建鏈接節點。

 注意1:容量范圍可以在構造方法參數中指定作為防止隊列過度擴展。如果未指定容量,則它等於 Integer.MAX_VALUE 
 注意2:它是線程安全的,是線程阻塞的。 
 注意3:不接受 null 元素 
 注意4:它實現了BlockingQueue接口。關於BlockingQueue,請參考《BlockingQueue》 
 注意5:它沒有線程ArrayBlockingQueue那樣的公平性設置。為什么這樣設計呢?puzzle. 
 注意6:此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選 方法

 

二、LinkedBlockingQueue源碼分析

2.1、LinkedBlockingQueue的lock

  在LinkedBlockingQueue中有2個lock,分別是讀鎖和寫鎖,讀寫的的鎖都是全局的,而且是final的。

ArrayBlockingQueue只有1個鎖,添加數據和刪除數據的時候只能有1個被執行,不允許並行執行。而LinkedBlockingQueue有2個鎖,寫鎖和讀鎖,添加數據和刪除數據是可以並行進行的,當然添加數據和刪除數據的時候只能有1個線程各自執行。

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

LinkedBlockingQueue使用ReentrantLock來實現的添加元素原子操作,具體可以看poll和offer中的阻塞awaitNanos(nanos)是使用了Condition中的AQS中的一個方法。

2.2、成員變量

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

如圖LinkedBlockingQueue中也有兩個Node分別用來存放首尾節點,並且里面有個初始值為0的原子變量count用來記錄隊列元素個數。

2.3、入隊

  inkedBlockingQueue有不同的幾個數據添加方法,add、offer、put方法,add方法內部調用offer方法。

public boolean offer(E e) {
    if (e == null) throw new NullPointerException(); // 不允許空元素
    final AtomicInteger count = this.count;
    if (count.get() == capacity) // 如果容量滿了,返回false
        return false;
    int c = -1;
    Node<E> node = new Node(e); // 容量沒滿,以新元素構造節點
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // 放鎖加鎖,保證調用offer方法的時候只有1個線程
    try {
        if (count.get() < capacity) { // 再次判斷容量是否已滿,因為可能拿鎖在進行消費數據,沒滿的話繼續執行
            enqueue(node); // 節點添加到鏈表尾部
            c = count.getAndIncrement(); // 元素個數+1
            if (c + 1 < capacity) // 如果容量還沒滿
                notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊列里面加數據了,隊列還沒滿
        }
    } finally {
        putLock.unlock(); // 釋放放鎖,讓其他線程可以調用offer方法
    }
    if (c == 0) // 由於存在放鎖和拿鎖,這里可能拿鎖一直在消費數據,count會變化。這里的if條件表示如果隊列中還有1條數據
        signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列里還有1條數據,可以進行消費
    return c >= 0; // 添加成功返回true,否則返回false
}

put方法:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException(); // 不允許空元素
    int c = -1;
    Node<E> node = new Node(e); // 以新元素構造節點
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly(); // 放鎖加鎖,保證調用put方法的時候只有1個線程
    try {
        while (count.get() == capacity) { // 如果容量滿了
            notFull.await(); // 阻塞並掛起當前線程
        }
        enqueue(node); // 節點添加到鏈表尾部
        c = count.getAndIncrement(); // 元素個數+1
        if (c + 1 < capacity) // 如果容量還沒滿
            notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊列里面加數據了,隊列還沒滿
    } finally {
        putLock.unlock(); // 釋放放鎖,讓其他線程可以調用put方法
    }
    if (c == 0) // 由於存在放鎖和拿鎖,這里可能拿鎖一直在消費數據,count會變化。這里的if條件表示如果隊列中還有1條數據
        signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列里還有1條數據,可以進行消費
}

LinkedBlockingQueue的添加數據方法add,put,offer跟ArrayBlockingQueue一樣,不同的是它們的底層實現不一樣。

ArrayBlockingQueue中放入數據阻塞的時候,需要消費數據才能喚醒。

而LinkedBlockingQueue中放入數據阻塞的時候,因為它內部有2個鎖,可以並行執行放入數據和消費數據,不僅在消費數據的時候進行喚醒插入阻塞的線程,同時在插入的時候如果容量還沒滿,也會喚醒插入阻塞的線程。

enqueue()真正往單鏈表中增加元素的方法:

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

2.4、出隊

LinkedBlockingQueue有不同的幾個數據刪除方法,poll、take、remove方法。

poll方法:

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0) // 如果元素個數為0
        return null; // 返回null
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock(); // 拿鎖加鎖,保證調用poll方法的時候只有1個線程
    try {
        if (count.get() > 0) { // 判斷隊列里是否還有數據
            x = dequeue(); // 刪除頭結點
            c = count.getAndDecrement(); // 元素個數-1
            if (c > 1) // 如果隊列里還有元素
                notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列里還有數據,可以再次消費
        }
    } finally {
        takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調用poll方法
    }
    if (c == capacity) // 由於存在放鎖和拿鎖,這里可能放鎖一直在添加數據,count會變化。這里的if條件表示如果隊列中還可以再插入數據
        signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列里還能再次添加數據
                return x;
}

take方法:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly(); // 拿鎖加鎖,保證調用take方法的時候只有1個線程
    try {
        while (count.get() == 0) { // 如果隊列里已經沒有元素了
            notEmpty.await(); // 阻塞並掛起當前線程
        }
        x = dequeue(); // 刪除頭結點
        c = count.getAndDecrement(); // 元素個數-1
        if (c > 1) // 如果隊列里還有元素
            notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列里還有數據,可以再次消費
    } finally {
        takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調用take方法
    }
    if (c == capacity) // 由於存在放鎖和拿鎖,這里可能放鎖一直在添加數據,count會變化。這里的if條件表示如果隊列中還可以再插入數據
        signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列里還能再次添加數據
    return x;
}

remove方法:

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock(); // remove操作要移動的位置不固定,2個鎖都需要加鎖
    try {
        for (Node<E> trail = head, p = trail.next; // 從鏈表頭結點開始遍歷
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) { // 判斷是否找到對象
                unlink(p, trail); // 修改節點的鏈接信息,同時調用notFull的signal方法
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock(); // 2個鎖解鎖
    }
}

LinkedBlockingQueue的take方法對於沒數據的情況下會阻塞,poll方法刪除鏈表頭結點,remove方法刪除指定的對象。

需要注意的是remove方法由於要刪除的數據的位置不確定,需要2個鎖同時加鎖

    /**
     * Locks to prevent both puts and takes.
     */
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    /**
     * Unlocks to allow both puts and takes.
     */
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

 

真正出隊的方法:

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

2.5、peek操作

獲取但是不移除當前隊列的頭元素,沒有則返回null

    public E peek() {
        //隊列空,則返回null
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

2.6、size方法

public int size() {
    return count.get();
}

2.7、完整源碼

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {  
    /** 
     * 鏈表節點node類結構 
     */  
    static class Node<E> {  
        volatile E item;//volatile使得所有的write happen-befor read,保證了數據的可見性   
        Node<E> next;  
        Node(E x) { item = x; }  
    }  
    /** 隊列容量,默認為Integer.MAX_VALUE*/  
    private final int capacity;  
    /** 用原子變量 表示當前元素的個數 */  
    private final AtomicInteger count = new AtomicInteger(0);  
    /** 表頭節點 */  
    private transient Node<E> head;  
    /** 表尾節點 */  
    private transient Node<E> last;  
    /** 獲取元素或刪除元素時 要加的takeLock鎖 */  
    private final ReentrantLock takeLock = new ReentrantLock();  
    /** 獲取元素 notEmpty條件 */  
    private final Condition notEmpty = takeLock.newCondition();  
    /** 插入元素時 要加putLock鎖 */  
    private final ReentrantLock putLock = new ReentrantLock();  
    /** 插入時,要判滿 */  
    private final Condition notFull = putLock.newCondition();  
    /** 
     * 喚醒等待的take操作,在put/offer中調用(因為這些操作中不會用到takeLock鎖) 
     */  
    private void signalNotEmpty() {  
        final ReentrantLock takeLock = this.takeLock;  
        takeLock.lock();  
        try {  
            notEmpty.signal();  
        } finally {  
            takeLock.unlock();  
        }  
    }  
    /** 
     * 喚醒等待插入操作,在take/poll中調用. 
     */  
    private void signalNotFull() {  
        final ReentrantLock putLock = this.putLock;  
        putLock.lock();  
        try {  
            notFull.signal();  
        } finally {  
            putLock.unlock();  
        }  
    }  
    /** 
     * 插入到尾部 
     */  
    private void insert(E x) {  
        last = last.next = new Node<E>(x);  
    }  
    /** 
     * 獲取並移除頭元素 
     */  
    private E extract() {  
        Node<E> first = head.next;  
        head = first;  
        E x = first.item;  
        first.item = null;  
        return x;  
    }  
    /** 
     * 鎖住兩把鎖,在remove,clear等方法中調用 
     */  
    private void fullyLock() {  
        putLock.lock();  
        takeLock.lock();  
    }  
    /** 
     * 和fullyLock成對使用 
     */  
    private void fullyUnlock() {  
        takeLock.unlock();  
        putLock.unlock();  
    }  
    /** 
     * 默認構造,容量為 Integer.MAX_VALUE  
     */  
    public LinkedBlockingQueue() {  
        this(Integer.MAX_VALUE);  
    }  
    /** 
     *指定容量的構造 
     */  
    public LinkedBlockingQueue(int capacity) {  
        if (capacity <= 0) throw new IllegalArgumentException();  
        this.capacity = capacity;  
        last = head = new Node<E>(null);  
    }  
    /** 
     * 指定初始化集合的構造 
     */  
    public LinkedBlockingQueue(Collection<? extends E> c) {  
        this(Integer.MAX_VALUE);  
        for (E e : c)  
            add(e);  
    }  
    /** 
     * 通過原子變量,直接獲得大小 
     */  
    public int size() {  
        return count.get();  
    }  
    /** 
     *返回理想情況下(沒有內存和資源約束)此隊列可接受並且不會被阻塞的附加元素數量。 
     */  
    public int remainingCapacity() {  
        return capacity - count.get();  
    }  
    /** 
     * 將指定元素插入到此隊列的尾部,如有必要,則等待空間變得可用。 
     */  
    public void put(E e) throws InterruptedException {  
        if (e == null) throw new NullPointerException();  
        int c = -1;  
        final ReentrantLock putLock = this.putLock;  
        final AtomicInteger count = this.count;  
        putLock.lockInterruptibly();  
        try {  
            try {  
                while (count.get() == capacity)  
                    notFull.await();  
            } catch (InterruptedException ie) {  
                notFull.signal(); // propagate to a non-interrupted thread  
                throw ie;  
            }  
            insert(e);  
            c = count.getAndIncrement();  
            if (c + 1 < capacity)  
                notFull.signal();  
        } finally {  
            putLock.unlock();  
        }  
        if (c == 0)  
            signalNotEmpty();  
    }  
    /** 
     * 將指定元素插入到此隊列的尾部,如有必要,則等待指定的時間以使空間變得可用。 
     */  
    public boolean offer(E e, long timeout, TimeUnit unit)  
        throws InterruptedException {  
        if (e == null) throw new NullPointerException();  
        long nanos = unit.toNanos(timeout);  
        int c = -1;  
        final ReentrantLock putLock = this.putLock;  
        final AtomicInteger count = this.count;  
        putLock.lockInterruptibly();  
        try {  
            for (;;) {  
                if (count.get() < capacity) {  
                    insert(e);  
                    c = count.getAndIncrement();  
                    if (c + 1 < capacity)  
                        notFull.signal();  
                    break;  
                }  
                if (nanos <= 0)  
                    return false;  
                try {  
                    nanos = notFull.awaitNanos(nanos);  
                } catch (InterruptedException ie) {  
                    notFull.signal(); // propagate to a non-interrupted thread  
                    throw ie;  
                }  
            }  
        } finally {  
            putLock.unlock();  
        }  
        if (c == 0)  
            signalNotEmpty();  
        return true;  
    }  
    /** 
     *將指定元素插入到此隊列的尾部(如果立即可行且不會超出此隊列的容量), 
     *在成功時返回 true,如果此隊列已滿,則返回 false。 
     */  
    public boolean offer(E e) {  
        if (e == null) throw new NullPointerException();  
        final AtomicInteger count = this.count;  
        if (count.get() == capacity)  
            return false;  
        int c = -1;  
        final ReentrantLock putLock = this.putLock;  
        putLock.lock();  
        try {  
            if (count.get() < capacity) {  
                insert(e);  
                c = count.getAndIncrement();  
                if (c + 1 < capacity)  
                    notFull.signal();  
            }  
        } finally {  
            putLock.unlock();  
        }  
        if (c == 0)  
            signalNotEmpty();  
        return c >= 0;  
    }  
    //獲取並移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)。  
    public E take() throws InterruptedException {  
        E x;  
        int c = -1;  
        final AtomicInteger count = this.count;  
        final ReentrantLock takeLock = this.takeLock;  
        takeLock.lockInterruptibly();  
        try {  
            try {  
                while (count.get() == 0)  
                    notEmpty.await();  
            } catch (InterruptedException ie) {  
                notEmpty.signal(); // propagate to a non-interrupted thread  
                throw ie;  
            }  
            x = extract();  
            c = count.getAndDecrement();  
            if (c > 1)  
                notEmpty.signal();  
        } finally {  
            takeLock.unlock();  
        }  
        if (c == capacity)  
            signalNotFull();  
        return x;  
    }  
      
    //獲取並移除此隊列的頭部,在指定的等待時間前等待可用的元素(如果有必要  
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {  
        E x = null;  
        int c = -1;  
        long nanos = unit.toNanos(timeout);  
        final AtomicInteger count = this.count;  
        final ReentrantLock takeLock = this.takeLock;  
        takeLock.lockInterruptibly();  
        try {  
            for (;;) {  
                if (count.get() > 0) {  
                    x = extract();  
                    c = count.getAndDecrement();  
                    if (c > 1)  
                        notEmpty.signal();  
                    break;  
                }  
                if (nanos <= 0)  
                    return null;  
                try {  
                    nanos = notEmpty.awaitNanos(nanos);  
                } catch (InterruptedException ie) {  
                    notEmpty.signal(); // propagate to a non-interrupted thread  
                    throw ie;  
                }  
            }  
        } finally {  
            takeLock.unlock();  
        }  
        if (c == capacity)  
            signalNotFull();  
        return x;  
    }  
      
    //獲取並移除此隊列的頭,如果此隊列為空,則返回 null。  
    public E poll() {  
        final AtomicInteger count = this.count;  
        if (count.get() == 0)  
            return null;  
        E x = null;  
        int c = -1;  
        final ReentrantLock takeLock = this.takeLock;  
        takeLock.lock();  
        try {  
            if (count.get() > 0) {  
                x = extract();  
                c = count.getAndDecrement();  
                if (c > 1)  
                    notEmpty.signal();  
            }  
        } finally {  
            takeLock.unlock();  
        }  
        if (c == capacity)  
            signalNotFull();  
        return x;  
    }  
    //獲取但不移除此隊列的頭;如果此隊列為空,則返回 null。  
    public E peek() {  
        if (count.get() == 0)  
            return null;  
        final ReentrantLock takeLock = this.takeLock;  
        takeLock.lock();  
        try {  
            Node<E> first = head.next;  
            if (first == null)  
                return null;  
            else  
                return first.item;  
        } finally {  
            takeLock.unlock();  
        }  
    }  
    /** 
     * 從此隊列移除指定元素的單個實例(如果存在)。 
     */  
    public boolean remove(Object o) {  
        if (o == null) return false;  
        boolean removed = false;  
        fullyLock();  
        try {  
            Node<E> trail = head;  
            Node<E> p = head.next;  
            while (p != null) {  
                if (o.equals(p.item)) {  
                    removed = true;  
                    break;  
                }  
                trail = p;  
                p = p.next;  
            }  
            if (removed) {  
                p.item = null;  
                trail.next = p.next;  
                if (last == p)  
                    last = trail;  
                if (count.getAndDecrement() == capacity)  
                    notFull.signalAll();  
            }  
        } finally {  
            fullyUnlock();  
        }  
        return removed;  
    }  
    ……  
}  
View Code

三、使用示例


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM