【JUC】JDK1.8源碼分析之ConcurrentLinkedQueue(五)


一、前言

  接着前面的分析,接下來分析ConcurrentLinkedQueue,ConcurerntLinkedQueue一個基於鏈接節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素。當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue是一個恰當的選擇。此隊列不允許使用null元素。

二、ConcurrentLinkedQueue數據結構

  通過源碼分析可知,ConcurrentLinkedQueue的數據結構與LinkedBlockingQueue的數據結構相同,都是使用的鏈表結構。ConcurrentLinkedQueue的數據結構如下

  說明:ConcurrentLinkedQueue采用的鏈表結構,並且包含有一個頭結點和一個尾結點。

三、ConcurrentLinkedQueue源碼分析

  3.1 類的繼承關系

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {}

  說明:ConcurrentLinkedQueue繼承了抽象類AbstractQueue,AbstractQueue定義了對隊列的基本操作;同時實現了Queue接口,Queue定義了對隊列的基本操作,同時,還實現了Serializable接口,表示可以被序列化。

  3.2 類的內部類 

    private static class Node<E> {
        // 元素
        volatile E item;
        // next域
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        // 構造函數
        Node(E item) {
            // 設置item的值
            UNSAFE.putObject(this, itemOffset, item);
        }
        // 比較並替換item值
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
        
        void lazySetNext(Node<E> val) {
            // 設置next域的值,並不會保證修改對其他線程立即可見
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
        // 比較並替換next域的值
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics
        // 反射機制
        private static final sun.misc.Unsafe UNSAFE;
        // item域的偏移量
        private static final long itemOffset;
        // next域的偏移量
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
View Code

  說明:Node類表示鏈表結點,用於存放元素,包含item域和next域,item域表示元素,next域表示下一個結點,其利用反射機制和CAS機制來更新item域和next域,保證原子性。

  3.3 類的屬性  

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    // 版本序列號        
    private static final long serialVersionUID = 196745693267521676L;
    // 反射機制
    private static final sun.misc.Unsafe UNSAFE;
    // head域的偏移量
    private static final long headOffset;
    // tail域的偏移量
    private static final long tailOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    // 頭結點
    private transient volatile Node<E> head;
    // 尾結點
    private transient volatile Node<E> tail;
}
View Code

  說明:屬性中包含了head域和tail域,表示鏈表的頭結點和尾結點,同時,ConcurrentLinkedQueue也使用了反射機制和CAS機制來更新頭結點和尾結點,保證原子性。

  3.4 類的構造函數

  1. ConcurrentLinkedQueue()型構造函數  

    public ConcurrentLinkedQueue() {
        // 初始化頭結點與尾結點
        head = tail = new Node<E>(null);
    }
View Code

  說明:該構造函數用於創建一個最初為空的 ConcurrentLinkedQueue,頭結點與尾結點指向同一個結點,該結點的item域為null,next域也為null。

  2. ConcurrentLinkedQueue(Collection<? extends E>)型構造函數  

    public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) { // 遍歷c集合
            // 保證元素不為空
            checkNotNull(e);
            // 新生一個結點
            Node<E> newNode = new Node<E>(e);
            if (h == null) // 頭結點為null
                // 賦值頭結點與尾結點
                h = t = newNode;
            else {
                // 直接頭結點的next域
                t.lazySetNext(newNode);
                // 重新賦值頭結點
                t = newNode;
            }
        }
        if (h == null) // 頭結點為null
            // 新生頭結點與尾結點
            h = t = new Node<E>(null);
        // 賦值頭結點
        head = h;
        // 賦值尾結點
        tail = t;
    }
View Code

  說明:該構造函數用於創建一個最初包含給定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍歷順序來添加元素。

  3.5 核心函數分析

  1. offer函數  

    public boolean offer(E e) {
        // 元素不為null
        checkNotNull(e);
        // 新生一個結點
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) { // 無限循環
            // q為p結點的下一個結點
            Node<E> q = p.next;
            if (q == null) { // q結點為null
                // p is last node
                if (p.casNext(null, newNode)) { // 比較並進行替換p結點的next域
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // p不等於t結點,不一致    // hop two nodes at a time
                        // 比較並替換尾結點
                        casTail(t, newNode);  // Failure is OK.
                    // 返回
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q) // p結點等於q結點
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                // 原來的尾結點與現在的尾結點是否相等,若相等,則p賦值為head,否則,賦值為現在的尾結點
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                // 重新賦值p結點
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
View Code

  說明:offer函數用於將指定元素插入此隊列的尾部。下面模擬offer函數的操作,隊列狀態的變化(假設單線程添加元素,連續添加10、20兩個元素)。

  ① 若ConcurrentLinkedQueue的初始狀態如上圖所示,即隊列為空。單線程添加元素,此時,添加元素10,則狀態如下所示

  ② 如上圖所示,添加元素10后,tail沒有變化,還是指向之前的結點,繼續添加元素20,則狀態如下所示

  ③ 如上圖所示,添加元素20后,tail指向了最新添加的結點。

  2. poll函數  

    public E poll() {
        restartFromHead:
        for (;;) { // 無限循環
            for (Node<E> h = head, p = h, q;;) { // 保存頭結點
                // item項
                E item = p.item;
    
                if (item != null && p.casItem(item, null)) { // item不為null並且比較並替換item成功
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // p不等於h    // hop two nodes at a time
                        // 更新頭結點
                        updateHead(h, ((q = p.next) != null) ? q : p); 
                    // 返回item
                    return item;
                }
                else if ((q = p.next) == null) { // q結點為null
                    // 更新頭結點
                    updateHead(h, p);
                    return null;
                }
                else if (p == q) // p等於q
                    // 繼續循環
                    continue restartFromHead;
                else
                    // p賦值為q
                    p = q;
            }
        }
    }
View Code

  說明:此函數用於獲取並移除此隊列的頭,如果此隊列為空,則返回null。下面模擬poll函數的操作,隊列狀態的變化(假設單線程操作,狀態為之前offer10、20后的狀態,poll兩次)。

  ① 隊列初始狀態如上圖所示,在poll操作后,隊列的狀態如下圖所示

  ② 如上圖可知,poll操作后,head改變了,並且head所指向的結點的item變為了null。再進行一次poll操作,隊列的狀態如下圖所示。

  ③ 如上圖可知,poll操作后,head結點沒有變化,只是指示的結點的item域變成了null。

  3. remove函數  

    public boolean remove(Object o) {
        // 元素為null,返回
        if (o == null) return false;
        Node<E> pred = null;
        for (Node<E> p = first(); p != null; p = succ(p)) { // 獲取第一個存活的結點
            // 第一個存活結點的item值
            E item = p.item;
            if (item != null &&
                o.equals(item) &&
                p.casItem(item, null)) { // 找到item相等的結點,並且將該結點的item設置為null
                // p的后繼結點
                Node<E> next = succ(p);
                if (pred != null && next != null) // pred不為null並且next不為null
                    // 比較並替換next域
                    pred.casNext(p, next);
                return true;
            }
            // pred賦值為p
            pred = p;
        }
        return false;
    }
View Code

  說明:此函數用於從隊列中移除指定元素的單個實例(如果存在)。其中,會調用到first函數和succ函數,first函數的源碼如下  

    Node<E> first() {
        restartFromHead:
        for (;;) { // 無限循環,確保成功
            for (Node<E> h = head, p = h, q;;) {
                // p結點的item域是否為null
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) { // item不為null或者next域為null
                    // 更新頭結點
                    updateHead(h, p);
                    // 返回結點
                    return hasItem ? p : null;
                }
                else if (p == q) // p等於q
                    // 繼續從頭結點開始
                    continue restartFromHead;
                else
                    // p賦值為q
                    p = q;
            }
        }
    }
View Code

  說明:first函數用於找到鏈表中第一個存活的結點。succ函數源碼如下  

    final Node<E> succ(Node<E> p) {
        // p結點的next域
        Node<E> next = p.next;
        // 如果next域為自身,則返回頭結點,否則,返回next
        return (p == next) ? head : next;
    }
View Code

  說明:succ用於獲取結點的下一個結點。如果結點的next域指向自身,則返回head頭結點,否則,返回next結點。下面模擬remove函數的操作,隊列狀態的變化(假設單線程操作,狀態為之前offer10、20后的狀態,執行remove(10)、remove(20)操作)。

  ① 如上圖所示,為ConcurrentLinkedQueue的初始狀態,remove(10)后的狀態如下圖所示

  ② 如上圖所示,當執行remove(10)后,head指向了head結點之前指向的結點的下一個結點,並且head結點的item域置為null。繼續執行remove(20),狀態如下圖所示

  ③ 如上圖所示,執行remove(20)后,head與tail指向同一個結點,item域為null。

  4. size函數  

    public int size() {
        // 計數
        int count = 0;
        for (Node<E> p = first(); p != null; p = succ(p)) // 從第一個存活的結點開始往后遍歷
            if (p.item != null) // 結點的item域不為null
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE) // 增加計數,若達到最大值,則跳出循環
                    break;
        // 返回大小
        return count;
    }
View Code

  說明:此函數用於返回ConcurrenLinkedQueue的大小,從第一個存活的結點(first)開始,往后遍歷鏈表,當結點的item域不為null時,增加計數,之后返回大小。

五、示例

  下面通過一個示例來了解ConcurrentLinkedQueue的使用  

package com.hust.grid.leesf.collections;

import java.util.concurrent.ConcurrentLinkedQueue;

class PutThread extends Thread {
    private ConcurrentLinkedQueue<Integer> clq;
    public PutThread(ConcurrentLinkedQueue<Integer> clq) {
        this.clq = clq;
    }
    
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("add " + i);
                clq.add(i);
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class GetThread extends Thread {
    private ConcurrentLinkedQueue<Integer> clq;
    public GetThread(ConcurrentLinkedQueue<Integer> clq) {
        this.clq = clq;
    }
    
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("poll " + clq.poll());
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class ConcurrentLinkedQueueDemo {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();
        PutThread p1 = new PutThread(clq);
        GetThread g1 = new GetThread(clq);
        
        p1.start();
        g1.start();
        
    }
}
View Code

  運行結果(某一次):  

add 0
poll null
add 1
poll 0
add 2
poll 1
add 3
poll 2
add 4
poll 3
add 5
poll 4
poll 5
add 6
add 7
poll 6
poll 7
add 8
add 9
poll 8
View Code

  說明:GetThread線程不會因為ConcurrentLinkedQueue隊列為空而等待,而是直接返回null,所以當實現隊列不空時,等待時,則需要用戶自己實現等待邏輯。

六、總結

  ConcurrentLinkedQueue的源碼也相對簡單,其實對於並發集合而言,分析源碼時首先理解單線程情況,然后再考慮在多線程並發時的情況,這樣會使得分析源碼容易得多,ConcurrentLinkedQueue和LinkedBlockingQueue的區別還是很明顯的(前者在取元素時,若隊列為空,則返回null;后者會進行等待)。謝謝各位園友的觀看~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM