JAVA並發(4)-並發隊列ConcurrentLinkedQueue


本文開始介紹並發隊列,為后面介紹線程池打下基礎。並發隊列莫非也是出隊入隊操作,還有一個比較重要的點就是如何保證其線程安全性,有些並發隊列保證線程安全是通過lock,有些是通過CAS
我們從ConcurrentLinkedQueue開始吧。

1. 介紹

ConcurrentLinkedQueue集合框架的一員,是一個無界限且線程安全,基於單向鏈表的隊列。該隊列的順序是FIFO。當多線程訪問公共集合時,使用這個類是一個不錯的選擇。不允許null元素。是一個非阻塞的隊列。

它的迭代器是弱一致性的,不會拋出java.util.ConcurrentModificationException,也可能在迭代期間,其他操作也正在進行。size()方法,不能保證是正確的,因為在迭代時,其他線程也可以操作該隊列。

1.1 類圖

ConcurrentLinkedQueue類圖
(顯示的方法都是公有方法)

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>

繼承至AbstractQueue,他提供了隊列操作的一個框架,有基本的方法,addremoveelement等等,這些方法基於offerpollpeek(最主要看這幾個方法)。

2. 源碼分析

2.1 類的整體結構

隊列中的元素Node

private static class Node<E> {
        // 保證兩個字段的可見性
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            // putOrderedXXX是putXXXVolatile的延遲版本,設置某個值不會被其他線程立即看到(可見性)
            // putOrderedXXX設置的值的修飾應該是volatile,這樣該方法才有用

            // 關於為什么使用這個方法,主要目的肯定是提高效率,但是具體原理,我只能告訴大家跟內存屏障有關(我也不太清楚這一塊,待我研究后,再寫一篇文章)
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe類中的東西,可以去了解一下

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

構造器1:

    // private transient volatile Node<E> head;
    // private transient volatile Node<E> tail;
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

構造器2:

public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {
            checkNotNull(e);
            Node<E> newNode = new Node<E>(e);
            if (h == null)
                h = t = newNode;
            else {
                t.lazySetNext(newNode);
                t = newNode;
            }
        }
        if (h == null)
            h = t = new Node<E>(null);
        head = h;
        tail = t;
    }

下面開始講方法,從offerpollpeek從這幾個方法入手

2.2 offer

添加元素到隊尾。因為隊列是無界的,這個方法永遠不會返回false

分為三種情況進行分析(一定自己跟着代碼debug,一步步的走)

  1. 單線程時(使用IDEA debug一直進入的是 else if把我搞迷茫了,我會寫一個博客來解釋原因
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        queue.offer("A");
        queue.offer("B");

以上面的代碼,分析每一個步驟。
執行構造函數后:
單線程初始化

此時鏈表的head與tail指向哨兵節點

插入"A", 此時沒有設置tail('兩跳機制',這里的原因后面詳見)

單線程插入'A'

插入"B",
單線程插入'B'

單線程情況比較簡單

  1. 多線程offer時
 public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                // 只有一個線程能夠CAS成功,其余的都重試
                if (p.casNext(null, newNode)) {

                    // 延遲設置tail,第一個node入隊不會設置tail,第二個node入隊才會設置tail
                    //以此類推, '兩跳機制'
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            // 這里是有其他線程正在poll操作才會進入,此時只考慮多線程offer的情況,暫不分析
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                // 存在tail被更改前,和更改后的兩種情況
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

結合上面的代碼,看圖

  • 步驟一線程A線程B都執行到
   if (p.casNext(null, newNode))

多線程初始化

  • 步驟二,只有一個線程執行成功,假設線程A成功,線程B失敗
    多線程Offer2
    因為p(a) == t(a), 此時不執行casTailtail不變。q = p.next, 所以此時q(b) = Node2 ,那么 p(b) != q(b), 線程B執行p = (p != t && t != (t = tail)) ? t : q;

線程B即將執行

   p = (p != t && t != (t = tail)) ? t : q;
  • 步驟三 此時線程C進入。
    此時,p(c) != q(c), 線程C執行
   p = (p != t && t != (t = tail)) ? t : q;

執行完后,q(c)賦值給p(c). 再次循環,此時,q(c) == null, 設置p(c)的next,線程C將值入隊
線程C Offer1.PNG

  • 步驟四 p(c) != t(c), 線程C執行casTail(t, newNode), 線程C設置尾結點
    線程C 設置tail后
  • 此時線程B執行
   p = (p != t && t != (t = tail)) ? t : q;

因為p(b) == t(b),所以 q(b) 賦值給 p(b)。繼續循環,最后得到
多線程offer,B插入

  1. 多線程的另一種情況,回到步驟三,此時線程C把值入隊了,但是還沒有設置tail
    線程C Offer1.PNG
  • 線程B,將值入隊成功
    步驟三的基礎上,線程B入隊成功后,目前的狀況如下:
    線程C 設置tail前

此時,線程C執行casTail(t, newNode),但是現在的tail != t(c), CAS失敗, 直接返回。

2.2.1 小結

上面不管是多線程還是單線程,都是努力的去尋找next為null的節點,若為next節點為null,再判斷是否滿足設置tail的條件。

多線程offer的第一種情況存在設置tail滯后的問題,我把它稱之為"兩跳機制",后面講使用這種機制的原因。
我們看到上面的情況一直沒有進入else if (p == q)分支,進入else if分支只會發生在有其他線程在poll時,我們先講講poll,再講講何時進入else if分支。

2.3 poll

刪除並返回頭結點的值

簡單提一下單線程多線程poll,着重分析一下polloffer共存的情況

  1. 單線程時
    單線程poll
    單線程比較簡單,就不畫圖了,按照上面的queue,進行一步一步的debug就行了

  2. 多線程,只有poll

 public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                // casItem這里只有一個線程能夠成功,其余的繼續下面的代碼
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            // 將之前的頭節點,自己指向自己,等待被GC
            h.lazySetNext(h);
    }

從上面代碼可以看出,修改itemhead都會使用CAS,這些變量都是被volatile修飾,所以保證了這些變量的線程安全性。不管是單線程還是多線程的poll,它們都是去尋找一個有效的頭節點,刪除並返回該值,若不是有效的就繼續找,若隊列為空了,就返回null

最后分析一下,offerpoll共存的情況

  • 線程Aoffer操作,線程Bpoll操作,初始的狀態如下:
    多線程offer與poll初始時

  • 線程A進入。
    多線程offer與poll,offer1

  • 線程A將要執行

Node<E> q = p.next;

線程B進入,進行poll操作
此時,線程B執行了一次內循環,將q(b)賦值給了p(b);
多線程offer與poll,poll1

  • 線程B再次執行內循環,此時將p(b).item置空,將p(b)賦值給head,之前的h(b)next指向自己,線程B退出
    多線程offer與poll,poll2

  • 線程A執行

  Node<E> q = p.next;

多線程offer與poll,offer2

此時,p(a).next 指向自己(等待被GC), 進入else if (p == q)分支,線程A退出,經過一番執行后,最后得到的狀態,如下:
多線程offer與poll,offer3

進入else if (p == q)分支的情況,只會發生在polloffer共存的情況下。

2.4 peek

獲取首個有效的節點,並返回

public E peek() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

peekpoll的操作類似,這里就貼一下代碼就是了。

3. 總結

ConcurrentLinkedQueue是使用非阻塞的方式保證線程的安全性,在設置關系到整個Queue結構的變量時(這些變量都被volatile修飾),都使用CAS的方式對它們進行賦值。

  • size方法是線程不安全的,返回的結果可能不准確

關於“兩跳機制”(自己取得名字),

Both head and tail are permitted to lag. In fact, failing to update them every time one could is a significant optimization (fewer CASes). As with LinkedTransferQueue (see the internal documentation for that class), we use a slack threshold of two; that is, we update head/tail when the current pointer appears to be two or more steps away from the first/last node.

Since head and tail are updated concurrently and independently, it is possible for tail to lag behind head (why not)? -- ConcurrentLinkedQueue

大致意思,headtail允許被延遲設置。不是每次更新它們是一個重大的優化,這樣做就可以更少的CAS(這樣在很多線程使用時,積少成多,效率更高)。它的延遲閾值是2,設置head/tail時,當前的結點離first/last有兩步或更多的距離。 這就是“兩跳機制

我們想不通的地方,可能是這個類或方法的一個優化的地方。向着大佬看齊~

4. 引用


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM