並發隊列之ConcurrentLinkedQueue


  本來想着直接說線程池的,不過在說線程池之前,我們必須要知道並發安全隊列;因為一般情況下線程池中的線程數量是一定的,肯定不會超過某個閾值,那么當任務太多了的時候,我們必須把多余的任務保存到並發安全隊列中,當線程池中的線程空閑下來了,就會到並發安全隊列中拿任務;

  那么什么是並發安全隊列呢?其實可以簡單看作是一個鏈表,然后我們先辦法去存取節點;總的來說,並發安全隊列分為兩種,一種是阻塞的,一種是非阻塞的,前者是用鎖來實現的,后者用CAS實現的;

 

一.簡單介紹ConcurrentLinkedQueue

  這個隊列用法沒什么好說的,就類似LinkedList的用法,怎么對一個鏈表繼續增刪改查,不多說,我們就說一下其中幾個關鍵的方法;

  首先,這個隊列是一個線程安全的無界非阻塞隊列,其實就是一個單向鏈表,無界的意思就是沒有限制最大長度,非阻塞表示用CAS實現入隊和出隊操作,我們打開這個類就可以知道,有一個內部類Node,其中重要的屬性如下所示:

//用於存放節點的值
volatile E item;
//指向下一個節點
volatile Node<E> next;
//這里也是用的是UNSAFE類,前面說過了,這個類直接提供CAS操作
private static final sun.misc.Unsafe UNSAFE;
//item字段的偏移量
private static final long itemOffset;
//next的偏移量
private static final long nextOffset;

 

 

  然后ConcurrentLinkedQueue中幾個重要的屬性,好像也沒什么重要的,就保存了頭節點和尾節點,注意,默認情況下頭節點和尾節點都是哨兵節點,也就是一個存null的Node節點

//存放鏈表的頭節點
private transient volatile Node<E> head;
//存放鏈表的尾節點
private transient volatile Node<E> tail;
//UNSAFE對象
private static final sun.misc.Unsafe UNSAFE;
//head字段的偏移量
private static final long headOffset;
//tail字段偏移量
private static final long tailOffset;

 

 

 

 

  下面我們直接看一些重要方法吧!慢慢分析其中的算法才是關鍵的

 

二.offer方法

  這個方法的作用就是在隊列末端添加一個節點,如果傳遞的參數是null,就拋出空指針異常,否則由於該隊列是無界隊列,該方法會一直返回true,而且該方法使用CAS算法實現的,所以不會阻塞線程;

//隊列末端添加一個節點
public boolean offer(E e) {
    //如果e為空,那么拋出空指針異常
    checkNotNull(e);
    //將傳進來的元素封裝成一個節點,Node的構造器中調用UNSAFE.putObject(this, itemOffset, item)把e賦值給節點中的item
    final Node<E> newNode = new Node<E>(e);

    //[1] //這里的for循環從最后的節點開始
    for (Node<E> t = tail, p = t;;) {
      Node<E> q = p.next;
      //[2]如果q為null,說明p就是最后的節點了
        if (q == null) {
            //[3]CAS更新:如果p節點的下一個節點是null,就把寫個節點更新為newNode
            if (p.casNext(null, newNode)) {
                //[4]CAS成功,但是這時p==t,所以不會進入到這里的if里面,直接返回true
                //那么什么時候會走到這里面來呢?其實是要有另外一個線程也在調用offer方法的時候,會進入到這里面來
                if (p != t) 
                    casTail(t, newNode);  
                return true;
            }
        }
        else if (p == q) //[5]
            
            p = (t != (t = tail)) ? t : head;
        else //[6]
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

 

  

  上面執行到[3]的時候,由於頭節點和尾節點默認都是指向哨兵節點的,由於這個時候p的下一個節點為null,所以當前線程A執行CAS會成功,下圖所示;

 

 

  如果此時還有一個線程B也來嘗試[3]中CAS,由於此時p節點的下一個節點不是null了,於是線程B會跳到[1]出進行第二次循環,然后會到[6]中,由於p和t此時是相等的,所以這里是false,即p=q,下圖所示:

 

 

  然后線程B又會跳到[1]處進行第三次循環,由於執行了Node<E> q = p.next,所以此時q指向最后的null,就到了[3]處進行CAS,這次是可以成功的,成功之后如下圖所示:

 

 

   

  這個時候因為p!=t,所以可以進入到[4],這里又會進行一個CAS:如果tail和t指向的節點一樣,那么就將tail指向新添加的節點,如圖所示,這個時候線程B也就執行完了;

 

   

  其實還有[5]沒有走到,這個是在poll操作之后才執行的,我們先跳過,等說完poll方法之后再回頭看看;另外說一下,add方法其實就是調用的是offer方法,就不多說了;

 

 

三.poll方法

  這個方法是獲取頭部的這個節點,如果隊列為空則返回null;

public E poll() {
    //這里其實就是一個goto的標記,用於跳出for循環
    restartFromHead:
    //[1]
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            //[2]如果當前節點中存的值不為空,則CAS設置為null
            if (item != null && p.casItem(item, null)) {
                //[3]CAS成功就更新頭節點的位置
                if (p != h) 
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            //[4]當前隊列為空,就返回null
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //[5]當前節點和下一個節點一樣,說明節點自引用,則重新找頭節點
            else if (p == q)
                continue restartFromHead;
            //[6]
            else
                p = q;
        }
    }
}

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

  

  分為幾種情況,第一種情況是線程A調用poll方法的時候,發現隊列是空的,即頭節點和尾節點都指向哨兵節點,就會直接到[4],返回null

  第二種情況,線程A執行到了[4],此時有一個線程卻調用offer方法添加了一個節點,下圖所示,那么此時線程A就不會走[4]了,[5]也不滿足,於是會到[6]這里來,然后線程A又會跳到[1]處進行循環,此時p指向的節點中item不為null,所以會到[2]中;

 

   

  到了[2]中將p指向的節點中item用CAS設置為null,然后就到了[3],下面第一個圖,由於p!=h,q=null,所以最后調用的是updateHead(h,p),這方法:如果頭節點和h指向的是一樣的,就將頭節點指向p,我們還能看到updateHead方法中h.lazySetNext(h)表示h的下一個節點指向自己,下面圖二

 

   到了這里還沒完,還記不記得offer方法中有一個地方的代碼沒有執行的啊!就是這種情況,尾節點自己引用自己,我們再調用offer會怎么樣呢?

  回到offer方法,先會到[1],然后q指向自己這個哨兵節點(注意,此時雖然p指向的節點中存的是null,但是p!=null},於是再到[5],此時的圖如下左圖所示;此時由於t==tail,所以p=head;

 

   再在offer方法循環一次,此時q指向null,下面左圖所示,然后就可以進入[2]中進行CAS,CAS成功,因為此時p!=t,所以還要進行CAS將tail指向新節點,下面右圖所示,可以讓GC回收那個垃圾!

媽耶,這里比較繞!哈哈哈哈哈哈哈哈哈哈哈

 

 

 

四.peek方法

  這個方法的作用就是獲取隊列頭部的元素,只獲取不移除,注意這個方法和上面的poll方法的區別啊!

public E peek() {
    //[1]goto標志
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            //[2]
            E item = p.item;
            //[3]
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            //[4]
            else if (p == q)
                continue restartFromHead;
            else//[5]
                p = q;
        }
    }
}

  final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
  }

 

 

  

  如果隊列中為空的時候,走到[3]的時候,就會如下圖所示,由於h==p,所以updateHead方法啥也不做,然后返回就返回item為null

 

 

  如果隊列不為空,那么如下左圖所示,此時進入循環內不滿足條件,會到[5]這里,將p指向q,然后再進行一次循環到[3],將q指向p的后一個節點,下面右圖所示;

 

 

  然后調用updateHead方法,用CAS將頭節點指向p這里,然后將h自己指向自己,下圖所示,最后返回item

 

 

五.總結

  其實還有幾個方法沒說,但是感覺比較容易就不浪費篇幅了,有興趣的可以看看:size方法用於計算隊列中節點的數量,可是由於沒有加鎖,在並發的條件下不准確;remove方法刪除某個節點,其實就是遍歷然后用equals方法比較item是不是一樣,只不過如果存在多個符合條件的節點只刪除第一個,然后返回true,否則返回false;contains方法判斷隊列中是否包含指定item的節點,也就是遍歷,很容易;

  最麻煩的就是offer方法和poll方法,offer方法是在隊列的最后面添加節點,而poll是獲取頭節點,並且刪除第一個真正的隊列節點(注意,節點分為兩種,一種是哨兵節點,一種是真正的存了數據的節點啊),還簡單的說了一下poll方法和peek方法的區別,后者只是獲取,而不刪除啊!用下面這個圖幫助記憶一下;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM