JAVA並發(5)-並發隊列LinkedBlockingQueue的分析


本文介紹LinkedBlockingQueue,這個隊列在線程池中常用到。(請結合源碼,看本文)

1. 介紹

LinkedBlockingQueue, 不支持null,基於單向鏈表的可選有界阻塞隊列。隊列的順序是FIFO。基於鏈表的隊列通常比基於數組的隊列更高的吞吐量, 但在大多數的並發應用中具有更低的可預測性能較差(這句話,在最后解釋一下)
如果不選擇隊列的容量,默認值是Integer.MAX_VALUE,為了防止隊列的過度擴張.
還實現了Collection接口Iterator接口中所有的可選方法。

1.1 結構

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

LinkedBlockingQueue類圖
LinkedBlockingQueue類圖

LinkedBlockingQueue的構造器

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // head與last指向哨兵節點
        last = head = new Node<E>(null);
    }

    public LinkedBlockingQueue(Collection<? extends E> c)
    ...

1.2 保證線程安全

LinkedBlockingQueue的底層使用ReetrantLock保證線程安全,其實就是一個"消費-生產"模型,通過本文我們還可以學到ReetrantLock的實際使用場景。


    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

    /**
     * Signals a waiting take. Called only from put/offer (which do not
     * otherwise ordinarily lock takeLock.)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

2. 源碼分析

在講LinkedBlockingQueue前,我們先看看需要它實現的接口BlockingQueue

2.1 BlockingQueue

實現了BlockingQueue的類,必須額外支持在查找元素時,等待隊列直到非空為止的操作;在儲存元素的時候,要等待隊列的空間可用為止。
它的方法有四種形式,處理不能立即滿足但是未來可能滿足的操作的方式各有不同。

  1. 直接拋出異常
  2. 返回一個特殊值(null 或者 false)
  3. 一直等待,直到操作成功
  4. 超時設定,超過時間就放棄

Summary of BlockingQueue methods

我們知道了不同方法,不能立即滿足的不同的處理方式,這樣我們下面就更好理解LinkedBlockingQueue的源碼了。

下面我們從

  • offer(e)
  • offer(e, time, unit)
  • put(e)
  • poll()
    去分析一下LinkedBlockingQueue

2.2 offer(e)

添加成功就返回true; 插入值為null,報錯;或隊列已滿,直接返回false,不會等待隊列空閑

   public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        // 隊列的容量滿了,就直接返回了
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

代碼較簡單,就不細講了。

2.3 offer(e, time, unit)

若隊列已滿,還沒超過設定的時間,就等待,等待時,會對中斷作出反應;若超過了設定的時間,操作就跟offer(E e)一樣了

 public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                // 隊列已滿,超過了特定的時間才會返回false
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

2.4 put(e)

一直等待直到成功或者被中斷

 public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
           // 防止虛假喚醒
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

我沒有想到,上面發生虛假喚醒的場景(如果知道的同學,請告訴我一下,謝謝了)。

it is recommended that applications programmers always assume that they can occur and so always wait in a loop. --Condition

反正使用Condition在循環里等待就對了

2.5 poll()

隊列為空時,直接返回null,不會await;非阻塞方法

    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

其余的方法其實都是類似的,直接看上面BlockingQueue的四種方式。

最后再講一個方法remove(Object o), 將會提及一個知識點。

2.6 remove

刪除o, 若成功就返回true,反之.

 public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            // 算法題,刪除某一個鏈表的結點,可以看一下源碼,記錄兩個結點,一個在前,一個在后。
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

上面的代碼是簡單的對鏈表的操作,我們主要是看fullyLock()fullyUnlock()的代碼


    /**
     * Locks to prevent both puts and takes.
     */
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    /**
     * Unlocks to allow both puts and takes.
     */
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

我們都知道解鎖順序應該與獲取鎖順序相反,那么是為什么啦

其實我並不覺得,上面的fullyUnlock解鎖順序與獲取鎖的順序如果是相同的會出什么問題,也並不會出現死鎖(如果釋放鎖與獲取鎖,中間還存在其他操作就另當別論了)。那它僅僅是為了代碼的好看?

假如,我們有下面這段代碼(解鎖與獲取鎖中間有其他操作)

A.lock();
B.lock();
Foo();
A.unlock();
Bar();
B.unlock();

假設Bar()是要去重新獲取A鎖的。
時刻一: 線程X運行到了Bar()A鎖沒有被其他線程獲取,此時線程X持有B鎖,要去獲取A鎖
時刻二: 線程Y運行到代碼的最前面,A.lock(),獲取到了A鎖,此時線程Y持有A鎖,要去獲取B鎖,此時才會造成死鎖。

解鎖順序與獲取鎖順序相反,為的是

  1. 避免上面那種情況造成死鎖
  2. 為了美觀

3. 總結

LinkedBlockingQueue線程安全的隊列

  • 使用putLocktakeLock分別對增加刪除操作保證其線程安全性
  • 是一個有界(默認值為Integer.MAX_VALUE)的底層基於單向鏈表的隊列

解釋一下,相對於基於數組的隊列,鏈表隊列的可預測性能較差(less predictable performance in most concurrent applications)這句話

我認為是,數組是在初始化會分配"一塊連續的內存",而鏈表是在隊列添加元素時動態的分配內存地址的,是不連續的;還有就是它將會處理更多的內存結構,每個元素存在一個鏈表的節點。

This means that it has to flush more dirty memory pages between processors when synchronizing.

4. 參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM