Java多線程總結之線程安全隊列Queue


  在Java多線程應用中,隊列的使用率很高,多數生產消費模型的首選數據結構就是隊列。Java提供的線程安全的Queue可以分為阻塞隊列和非阻塞隊列,其中阻塞隊列的典型例子是BlockingQueue,非阻塞隊列的典型例子是

ConcurrentLinkedQueue,在實際應用中要根據實際需要選用阻塞隊列或者非阻塞隊列。

注:什么叫線程安全?這個首先要明確。線程安全的類 ,指的是類內共享的全局變量的訪問必須保證是不受多線程形式影響的。如果由於多線程的訪問(比如修改、遍歷、查看)而使這些變量結構被破壞或者針對這些變量操作的原子性被破壞,則這個類就不是線程安全的。

在並發的隊列上jdk提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高性能隊列,一個是以BlockingQueue接口為代表的阻塞隊列,無論在那種都繼承自Queue。
今天就聊聊這兩種Queue

  • BlockingQueue  阻塞算法
  • ConcurrentLinkedQueue,非阻塞算法

一、首先來看看BlockingQueue: 

Queue是什么就不需要多說了吧,一句話:隊列是先進先出。相對的,棧是后進先出。如果不熟悉的話先找本基礎的數據結構的書看看吧。 

BlockingQueue,顧名思義,“阻塞隊列”:可以提供阻塞功能的隊列。 
首先,看看BlockingQueue提供的常用方法: 

從上表可以很明顯看出每個方法的作用,這個不用多說。我想說的是:

  • add(e) remove() element() 方法不會阻塞線程。當不滿足約束條件時,會拋出IllegalStateException 異常。例如:當隊列被元素填滿后,再調用add(e),則會拋出異常。
  • offer(e) poll() peek() 方法即不會阻塞線程,也不會拋出異常。例如:當隊列被元素填滿后,再調用offer(e),則不會插入元素,函數返回false。
  • 要想要實現阻塞功能,需要調用put(e) take() 方法。當不滿足約束條件時,會阻塞線程。
  • BlockingQueue  阻塞算法

BlockingQueue作為線程容器,可以為線程同步提供有力的保障。

BlockingQueue定義的常用方法:

     拋出異常    特殊值      阻塞       超時

插入   add(e)        offer(e)      put(e)     offer(e, time, unit)
移除   remove()    poll()         take()      poll(time, unit)
檢查   element()   peek()       不可用    不可用

1、ArrayBlockingQueue

  基於數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。
  ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味着兩者無法真正並行運行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現生產者和消費者操作的完全並行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數據寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內需要高效並發地處理大批量數據的系統中,其對於GC的影響還是存在一定的區別。而在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。

2、LinkedBlockingQueue

  基於鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也維持着一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理並發數據,還因為其對於生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味着在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。

阻塞隊列:線程安全

  按 FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部 是在隊列中時間最短的元素。新元素插入到隊列的尾部,並且隊列檢索操作會獲得位於隊列頭部的元素。鏈接隊列的吞吐量通常要高於基於數組的隊列,但是在大多數並發應用程序中,其可預知的性能要低。

注意:

1、必須要使用take()方法在獲取的時候達成阻塞結果
2、使用poll()方法將產生非阻塞效果

3、LinkedBlockingQueue實例

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

 

public class BlockingDeque {
    //阻塞隊列,FIFO
    private static LinkedBlockingQueue<Integer> concurrentLinkedQueue = new LinkedBlockingQueue<Integer>(); 

          
 public static void main(String[] args) {  
     ExecutorService executorService = Executors.newFixedThreadPool(2);  

     executorService.submit(new Producer("producer1"));  
     executorService.submit(new Producer("producer2"));  
     executorService.submit(new Producer("producer3"));  
     executorService.submit(new Consumer("consumer1"));  
     executorService.submit(new Consumer("consumer2"));  
     executorService.submit(new Consumer("consumer3"));  

 }  

 static class Producer implements Runnable {  
     private String name;  

     public Producer(String name) {  
         this.name = name;  
     }  

     public void run() {  
         for (int i = 1; i < 10; ++i) {  
             System.out.println(name+ "  生產: " + i);  
             //concurrentLinkedQueue.add(i);  
             try {
                concurrentLinkedQueue.put(i);
                Thread.sleep(200); //模擬慢速的生產,產生阻塞的效果
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
             
         }  
     }  
 }  

 static class Consumer implements Runnable {  
     private String name;  

     public Consumer(String name) {  
         this.name = name;  
     }  
     public void run() {  
         for (int i = 1; i < 10; ++i) {  
             try {          
                    //必須要使用take()方法在獲取的時候阻塞
                      System.out.println(name+"消費: " +  concurrentLinkedQueue.take());  
                      //使用poll()方法 將產生非阻塞效果
                      //System.out.println(name+"消費: " +  concurrentLinkedQueue.poll());  
                     
                     //還有一個超時的用法,隊列空時,指定阻塞時間后返回,不會一直阻塞
                     //但有一個疑問,既然可以不阻塞,為啥還叫阻塞隊列?
                    //System.out.println(name+" Consumer " +  concurrentLinkedQueue.poll(300, TimeUnit.MILLISECONDS));                    
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }  

         }  
     }  
 }  
}

4、PriorityBlockingQueue

基於優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定,也就是說傳入隊列的對象必須實現Comparable接口),在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是公平鎖,他也是一個無界的隊列。

5、PriorityBlockingQueue 實例

Task.java

public class Task implements Comparable<Task>{ private int id ; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public int compareTo(Task task) { return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0); } public String toString(){ return this.id + "," + this.name; } }
View Code

UsePriorityBlockingQueue.java

public class UsePriorityBlockingQueue {

    public static void main(String[] args) throws Exception{

        PriorityBlockingQueue<Task> q2 = new PriorityBlockingQueue<Task>();

        Task t1 = new Task();
        t1.setId(3);
        t1.setName("id為3");
        Task t2 = new Task();
        t2.setId(4);
        t2.setName("id為4");
        Task t3 = new Task();
        t3.setId(1);
        t3.setName("id為1");
        Task t4 = new Task();
        t4.setId(2);
        t4.setName("id為2");

        //return this.id > task.id ? 1 : 0;
        q2.add(t1); //3
        q2.add(t2); //4
        q2.add(t3);  //1
        q2.add(t4);

        // 1 3 4
        //第一次取值時候是取最小的后面不做排序
        System.out.println("容器:" + q2);  //[1,id為1, 2,id為2, 3,id為3, 4,id為4]
        //拿出一個元素后  又會取一個最小的出來 放在第一個
        System.out.println(q2.take().getId());
        System.out.println("容器:" + q2);    //[2,id為2, 4,id為4, 3,id為3]
        System.out.println(q2.take().getId());
        System.out.println("容器:" + q2);  //[3,id為3, 4,id為4]
    }
}
View Code

打印結果:

容器:[1,id為1, 2,id為2, 3,id為3, 4,id為4]
1
容器:[2,id為2, 4,id為4, 3,id為3]
2
容器:[3,id為3, 4,id為4]
View Code

6、DelayQueue

帶有延遲時間的Queue,其中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue中的元素必須實現Delayed接口,DelayQueue是一個沒有大小限制的隊列,應用場景很多,比如對緩存超時的數據進行移除、任務超時處理、空閑連接的關閉等等。

7、DelayQueue實例

Wangmin.java

public class Wangmin implements Delayed {  

    private String name;  
    //身份證  
    private String id;  
    //截止時間  
    private long endTime;  
    //定義時間工具類
    private TimeUnit timeUnit = TimeUnit.SECONDS;

    public Wangmin(String name,String id,long endTime){  
        this.name=name;  
        this.id=id;  
        this.endTime = endTime;  
    }  

    public String getName(){  
        return this.name;  
    }  

    public String getId(){  
        return this.id;  
    }  

    /** 
     * 用來判斷是否到了截止時間 
     */  
    @Override  
    public long getDelay(TimeUnit unit) { 
        //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        return endTime - System.currentTimeMillis();
    }  

    /** 
     * 相互批較排序用 
     */  
    @Override  
    public int compareTo(Delayed delayed) {  
        Wangmin w = (Wangmin)delayed;  
        return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;  
    }  

} 
View Code

WangBa.java

public class WangBa implements Runnable {  

    private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();  

    public boolean yinye =true;  

    public void shangji(String name,String id,int money){  
        Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());  
        System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"交錢"+money+"塊,開始上機...");  
        this.queue.add(man);  
    }  

    public void xiaji(Wangmin man){  
        System.out.println("網名"+man.getName()+" 身份證"+man.getId()+"時間到下機...");  
    }  

    @Override  
    public void run() {  
        while(yinye){  
            try {  
                Wangmin man = queue.take();  
                xiaji(man);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  

    public static void main(String args[]){  
        try{  
            System.out.println("網吧開始營業");  
            WangBa siyu = new WangBa();  
            Thread shangwang = new Thread(siyu);  
            shangwang.start();  

            siyu.shangji("路人甲", "123", 1);  
            siyu.shangji("路人乙", "234", 10);  
            siyu.shangji("路人丙", "345", 5);  
        }  
        catch(Exception e){  
            e.printStackTrace();
        }  

    }  
} 
View Code

打印結果:

網吧開始營業
網名路人甲 身份證123交錢1塊,開始上機...
網名路人乙 身份證234交錢10塊,開始上機...
網名路人丙 身份證345交錢5塊,開始上機...
網名路人甲 身份證123時間到下機...
網名路人丙 身份證345時間到下機...
網名路人乙 身份證234時間到下機...
View Code

8、LinkedBlockingDeque

  LinkedBlockingDeque是一個線程安全的雙端隊列實現,由鏈表結構組成的雙向阻塞隊列,即可以從隊列的兩端插入和移除元素。雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。可以說他是最為復雜的一種隊列,在內部實現維護了前端和后端節點,但是其沒有實現讀寫分離,因此同一時間只能有一個線程對其講行操作。在高並發中性能要遠低於其它BlockingQueue。更要低於ConcurrentLinkedQueue,jdk早期有一個非線程安全的Deque就是ArryDeque了, java6里添加了LinkBlockingDeque來彌補多線程場景下線程安全的問題。

相比於其他阻塞隊列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法,以first結尾的方法,表示插入、獲取獲移除雙端隊列的第一個元素。以last結尾的方法,表示插入、獲取獲移除雙端隊列的最后一個元素。

此外,LinkedBlockingDeque還是可選容量的,防止過度膨脹,默認等於Integer.MAX_VALUE。

主要方法:

  akeFirst()和takeLast():分別返回類表中第一個和最后一個元素,返回的元素會從類表中移除。如果列表為空,調用的方法的線程將會被阻塞直達列表中有可用元素。

  getFirst()和getLast():分別返回類表中第一個和最后一個元素,返回的元素不會從列表中移除。如果列表為空,則拋出NoSuckElementException異常。

  peek()、peekFirst()和peekLast():分別返回列表中第一個元素和最后一個元素,返回元素不會被移除。如果列表為空返回null。

  poll()、pollFirst()和pollLast():分別返回類表中第一個和最后一個元素,返回的元素會從列表中移除。如果列表為空,返回Null。

public class UseDeque {
    public static void main(String[] args) {
        LinkedBlockingDeque<String> dq = new LinkedBlockingDeque<String>(10);
        dq.addFirst("a");
        dq.addFirst("b");
        dq.addFirst("c");
        dq.addFirst("d");
        dq.addFirst("e");
        dq.addLast("f");
        dq.addLast("g");
        dq.addLast("h");
        dq.addLast("i");
        dq.addLast("j");
        //dq.offerFirst("k");
        System.out.println("查看頭元素:" + dq.peekFirst());
        System.out.println("獲取尾元素:" + dq.pollLast());
        Object [] objs = dq.toArray();
        for (int i = 0; i < objs.length; i++) {
            System.out.print(objs[i] + " -- ");
        }
    }
}

打印結果:

查看頭元素:e
獲取尾元素:j
e -- d -- c -- b -- a -- f -- g -- h -- i -- 
View Code

9、LinkedBlockingDeque方法列表

// 創建一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque。
LinkedBlockingDeque()
// 創建一個容量為 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含給定 collection 的元素,以該 collection 迭代器的遍歷順序添加。
LinkedBlockingDeque(Collection<? extends E> c)
// 創建一個具有給定(固定)容量的 LinkedBlockingDeque。
LinkedBlockingDeque(int capacity)

// 在不違反容量限制的情況下,將指定的元素插入此雙端隊列的末尾。
boolean add(E e)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的開頭;如果當前沒有空間可用,則拋出 IllegalStateException。
void addFirst(E e)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的末尾;如果當前沒有空間可用,則拋出 IllegalStateException。
void addLast(E e)
// 以原子方式 (atomically) 從此雙端隊列移除所有元素。
void clear()
// 如果此雙端隊列包含指定的元素,則返回 true。
boolean contains(Object o)
// 返回在此雙端隊列的元素上以逆向連續順序進行迭代的迭代器。
Iterator<E> descendingIterator()
// 移除此隊列中所有可用的元素,並將它們添加到給定 collection 中。
int drainTo(Collection<? super E> c)
// 最多從此隊列中移除給定數量的可用元素,並將這些元素添加到給定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 獲取但不移除此雙端隊列表示的隊列的頭部。
E element()
// 獲取,但不移除此雙端隊列的第一個元素。
E getFirst()
// 獲取,但不移除此雙端隊列的最后一個元素。
E getLast()
// 返回在此雙端隊列元素上以恰當順序進行迭代的迭代器。
Iterator<E> iterator()
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列表示的隊列中(即此雙端隊列的尾部),並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offer(E e)
// 將指定的元素插入此雙端隊列表示的隊列中(即此雙端隊列的尾部),必要時將在指定的等待時間內一直等待可用空間。
boolean offer(E e, long timeout, TimeUnit unit)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的開頭,並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offerFirst(E e)
// 將指定的元素插入此雙端隊列的開頭,必要時將在指定的等待時間內等待可用空間。
boolean offerFirst(E e, long timeout, TimeUnit unit)
// 如果立即可行且不違反容量限制,則將指定的元素插入此雙端隊列的末尾,並在成功時返回 true;如果當前沒有空間可用,則返回 false。
boolean offerLast(E e)
// 將指定的元素插入此雙端隊列的末尾,必要時將在指定的等待時間內等待可用空間。
boolean offerLast(E e, long timeout, TimeUnit unit)
// 獲取但不移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素);如果此雙端隊列為空,則返回 null。
E peek()
// 獲取,但不移除此雙端隊列的第一個元素;如果此雙端隊列為空,則返回 null。
E peekFirst()
// 獲取,但不移除此雙端隊列的最后一個元素;如果此雙端隊列為空,則返回 null。
E peekLast()
// 獲取並移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素);如果此雙端隊列為空,則返回 null。
E poll()
// 獲取並移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素),如有必要將在指定的等待時間內等待可用元素。
E poll(long timeout, TimeUnit unit)
// 獲取並移除此雙端隊列的第一個元素;如果此雙端隊列為空,則返回 null。
E pollFirst()
// 獲取並移除此雙端隊列的第一個元素,必要時將在指定的等待時間等待可用元素。
E pollFirst(long timeout, TimeUnit unit)
// 獲取並移除此雙端隊列的最后一個元素;如果此雙端隊列為空,則返回 null。
E pollLast()
// 獲取並移除此雙端隊列的最后一個元素,必要時將在指定的等待時間內等待可用元素。
E pollLast(long timeout, TimeUnit unit)
// 從此雙端隊列所表示的堆棧中彈出一個元素。
E pop()
// 將元素推入此雙端隊列表示的棧。
void push(E e)
// 將指定的元素插入此雙端隊列表示的隊列中(即此雙端隊列的尾部),必要時將一直等待可用空間。
void put(E e)
// 將指定的元素插入此雙端隊列的開頭,必要時將一直等待可用空間。
void putFirst(E e)
// 將指定的元素插入此雙端隊列的末尾,必要時將一直等待可用空間。
void putLast(E e)
// 返回理想情況下(沒有內存和資源約束)此雙端隊列可不受阻塞地接受的額外元素數。
int remainingCapacity()
// 獲取並移除此雙端隊列表示的隊列的頭部。
E remove()
// 從此雙端隊列移除第一次出現的指定元素。
boolean remove(Object o)
// 獲取並移除此雙端隊列第一個元素。
E removeFirst()
// 從此雙端隊列移除第一次出現的指定元素。
boolean removeFirstOccurrence(Object o)
// 獲取並移除此雙端隊列的最后一個元素。
E removeLast()
// 從此雙端隊列移除最后一次出現的指定元素。
boolean removeLastOccurrence(Object o)
// 返回此雙端隊列中的元素數。
int size()
// 獲取並移除此雙端隊列表示的隊列的頭部(即此雙端隊列的第一個元素),必要時將一直等待可用元素。
E take()
// 獲取並移除此雙端隊列的第一個元素,必要時將一直等待可用元素。
E takeFirst()
// 獲取並移除此雙端隊列的最后一個元素,必要時將一直等待可用元素。
E takeLast()
// 返回以恰當順序(從第一個元素到最后一個元素)包含此雙端隊列所有元素的數組。
Object[] toArray()
// 返回以恰當順序包含此雙端隊列所有元素的數組;返回數組的運行時類型是指定數組的運行時類型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()
View Code

10、LinkedBlockingQeque和LinkedBlockingDeque源碼解讀

1)LinkedBlockingQeque

先看它的結構基本字段:

/**
 * 基於鏈表。
 * FIFO
 * 單向
 *最大容量是Integer.MAX_VALUE.
 */
public class LinkedBlockingQueueAnalysis<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /*
     * 兩個方向。
     * putLock
     * takeLock
     * 有些操作會需要同時獲取兩把鎖。
     * 例如remove操作,也需要獲取兩把鎖
     */

    //主要的node節點
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

    //容量,一開始就固定了的。
    private final int capacity;

    //用AtomicInteger 來記錄數量。
    private final AtomicInteger count = new AtomicInteger();

    //head節點 head.item == null
    transient Node<E> head;

    //last節點,last.next == null
    private transient Node<E> last;

    //take鎖
    private final ReentrantLock takeLock = new ReentrantLock();

    //等待take的節點序列。
    private final Condition notEmpty = takeLock.newCondition();

    //put的lock。
    private final ReentrantLock putLock = new ReentrantLock();

   //等待puts的隊列。
    private final Condition notFull = putLock.newCondition();
    ...
}
View Code

和LinkedBlockingDeque的區別之一就是,LinkedBlockingQueue采用了兩把鎖來對隊列進行操作,也就是隊尾添加的時候, 

隊頭仍然可以刪除等操作。接下來看典型的操作。

put操作

 public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();   //e不能為null
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;     //獲取put鎖
        final AtomicInteger count = this.count;          //獲取count
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {        //如果滿了,那么就需要使用notFull阻塞
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)                    //如果此時又有空間了,那么notFull喚醒
                notFull.signal();
        } finally {
            putLock.unlock();             //釋放鎖
        }
        if (c == 0)            //當c為0時候,也要根take鎖說一下,並發下
            signalNotEmpty();        //調用notEmpty        
    }
View Code

主要的思想還是比較容易理解的,現在看看enqueue 方法:

private void enqueue(Node<E> node) {        //入對操作。
        last = last.next = node;      //隊尾進
}

再看看signalNotEmpty方法:

private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();        //加鎖
        try {
            notEmpty.signal();    //用於signal,notEmpty
        } finally {
            takeLock.unlock();
        }
}

take操作

take操作,就是從隊列里面彈出一個元素,下面看它的詳細代碼:

public E take() throws InterruptedException {
        E x;
        int c = -1;            //設定一個記錄變量
        final AtomicInteger count = this.count;     //獲得count
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();        //加鎖
        try {
            while (count.get() == 0) {       //如果沒有元素,那么就阻塞性等待
                notEmpty.await();
            }
            x = dequeue();            //一定可以拿到。
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();        //報告還有元素,喚醒隊列
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();           //解鎖
        return x;
}

接下來看dequeue方法:

private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h;        // help GC 指向自己,幫助gc回收
        head = first;
        E x = first.item;       //從隊頭出。
        first.item = null;      //將head.item設為null。
        return x;
}

對於LinkedBlockingQueue來說,有兩個ReentrantLock分別控制隊頭和隊尾,這樣就可以使得添加操作分開來做,一般的操作是獲取一把鎖就可以,但有些操作例如remove操作,則需要同時獲取兩把鎖:

public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();     //獲取鎖
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {     //依次循環遍歷
                if (o.equals(p.item)) {       //找到了
                    unlink(p, trail);       //解除鏈接
                    return true;
                }
            }
            return false;        //沒找到,或者解除失敗
        } finally {
            fullyUnlock();
        }
}

當然,除了上述的remove方法外,在Iterator的next方法,remove方法以及LBQSpliterator分割迭代器中也是需要加全鎖進行操作的。

2)LinkedBlockingDeque

LinkedBlockingDeque類有三個構造方法:

public LinkedBlockingDeque()
public LinkedBlockingDeque(int capacity)
public LinkedBlockingDeque(Collection<? extends E> c)

LinkedBlockingDeque類中的數據都被封裝成了Node對象:

static final class Node<E> {
    E item;
    Node<E> prev;
    Node<E> next;
 
    Node(E x) {
        item = x;
    }
}

LinkedBlockingDeque類中的重要字段如下:

// 隊列雙向鏈表首節點
transient Node<E> first;
// 隊列雙向鏈表尾節點
transient Node<E> last;
// 雙向鏈表元素個數
private transient int count;
// 雙向鏈表最大容量
private final int capacity;
// 全局獨占鎖
final ReentrantLock lock = new ReentrantLock();
// 非空Condition對象
private final Condition notEmpty = lock.newCondition();
// 非滿Condition對象
private final Condition notFull = lock.newCondition();

LinkedBlockingDeque類的底層實現和LinkedBlockingQueue類很相似,都有一個全局獨占鎖,和兩個Condition對象,用來阻塞和喚醒線程。

LinkedBlockingDeque類對元素的操作方法比較多,我們下面以putFirst、putLast、pollFirst、pollLast方法來對元素的入隊、出隊操作進行分析。

入隊

putFirst(E e)方法是將指定的元素插入雙端隊列的開頭,源碼如下:

public void putFirst(E e) throws InterruptedException {
    // 若插入元素為null,則直接拋出NullPointerException異常
    if (e == null) throw new NullPointerException();
    // 將插入節點包裝為Node節點
    Node<E> node = new Node<E>(e);
    // 獲取全局獨占鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkFirst(node))
            notFull.await();
    } finally {
        // 釋放全局獨占鎖
        lock.unlock();
    }
}

入隊操作是通過linkFirst(E e)方法來完成的,如下所示:

private boolean linkFirst(Node<E> node) {
    // assert lock.isHeldByCurrentThread();
    // 元素個數超出容量。直接返回false
    if (count >= capacity)
        return false;
    // 獲取雙向鏈表的首節點
    Node<E> f = first;
    // 將node設置為首節點
    node.next = f;
    first = node;
    // 若last為null,設置尾節點為node節點
    if (last == null)
        last = node;
    else
        // 更新原首節點的前驅節點
        f.prev = node;
    ++count;
    // 喚醒阻塞在notEmpty上的線程
    notEmpty.signal();
    return true;
}

若入隊成功,則linkFirst(E e)方法返回true,否則,返回false。若該方法返回false,則當前線程會阻塞在notFull條件上。

putLast(E e)方法是將指定的元素插入到雙端隊列的末尾,源碼如下:

public void putLast(E e) throws InterruptedException {
    // 若插入元素為null,則直接拋出NullPointerException異常
    if (e == null) throw new NullPointerException();
    // 將插入節點包裝為Node節點
    Node<E> node = new Node<E>(e);
    // 獲取全局獨占鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkLast(node))
            notFull.await();
    } finally {
        // 釋放全局獨占鎖
        lock.unlock();
    }
}

該方法和putFirst(E e)方法幾乎一樣,不同點在於,putLast(E e)方法通過調用linkLast(E e)方法來插入節點:

private boolean linkLast(Node<E> node) {
    // assert lock.isHeldByCurrentThread();
    // 元素個數超出容量。直接返回false
    if (count >= capacity)
        return false;
    // 獲取雙向鏈表的尾節點
    Node<E> l = last;
    // 將node設置為尾節點
    node.prev = l;
    last = node;
    // 若first為null,設置首節點為node節點
    if (first == null)
        first = node;
    else
        // 更新原尾節點的后繼節點
        l.next = node;
    ++count;
    // 喚醒阻塞在notEmpty上的線程
    notEmpty.signal();
    return true;
}

若入隊成功,則linkLast(E e)方法返回true,否則,返回false。若該方法返回false,則當前線程會阻塞在notFull條件上。

出隊

pollFirst()方法是獲取並移除此雙端隊列的首節點,若不存在,則返回null,源碼如下:

public E pollFirst() {
    // 獲取全局獨占鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return unlinkFirst();
    } finally {
        // 釋放全局獨占鎖
        lock.unlock();
    }
}

移除首節點的操作是通過unlinkFirst()方法來完成的:

private E unlinkFirst() {
    // assert lock.isHeldByCurrentThread();
    // 獲取首節點
    Node<E> f = first;
    // 首節點為null,則返回null
    if (f == null)
        return null;
    // 獲取首節點的后繼節點
    Node<E> n = f.next;
    // 移除first,將首節點更新為n
    E item = f.item;
    f.item = null;
    f.next = f; // help GC
    first = n;
    // 移除首節點后,為空隊列
    if (n == null)
        last = null;
    else
        // 將新的首節點的前驅節點設置為null
        n.prev = null;
    --count;
    // 喚醒阻塞在notFull上的線程
    notFull.signal();
    return item;
}

 

pollLast()方法是獲取並移除此雙端隊列的尾節點,若不存在,則返回null,源碼如下:

public E pollLast() {
    // 獲取全局獨占鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return unlinkLast();
    } finally {
        // 釋放全局獨占鎖
        lock.unlock();
    }
}

移除尾節點的操作是通過unlinkLast()方法來完成的:

private E unlinkLast() {
    // assert lock.isHeldByCurrentThread();
    // 獲取尾節點
    Node<E> l = last;
    // 尾節點為null,則返回null
    if (l == null)
        return null;
    // 獲取尾節點的前驅節點
    Node<E> p = l.prev;
    // 移除尾節點,將尾節點更新為p
    E item = l.item;
    l.item = null;
    l.prev = l; // help GC
    last = p;
    // 移除尾節點后,為空隊列
    if (p == null)
        first = null;
    else
        // 將新的尾節點的后繼節點設置為null
        p.next = null;
    --count;
    // 喚醒阻塞在notFull上的線程
    notFull.signal();
    return item;
}

其實LinkedBlockingDeque類的入隊、出隊操作都是通過linkFirst、linkLast、unlinkFirst、unlinkLast這幾個方法來實現的,源碼讀起來也比較簡單。

二、ConcurrentLinkedQueue 非阻塞算法

1、非阻塞隊列

基於鏈接節點的、無界的、線程安全。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列檢索操作從隊列頭部獲得元素。當許多線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。此隊列不允許 null 元素。

ConcurrentLinkedQueue是一個適用於高並發場景下的隊列,通過無鎖的方式,實現了高並發狀態下的高性能,通常ConcurrentLinkedQueue性能好於BlockingQueue。

ConcurrentLinkedQueue重要方法:

add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,這兩個方法投有任何區別)

poll()和peek()都是取頭元素節點,區別在於前者會刪除元素,后者不會,相當於查看。

public class UseQueue_ConcurrentLinkedQueue {


    public static void main(String[] args) throws Exception {

        //高性能無阻塞無界隊列:ConcurrentLinkedQueue

        ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
        q.offer("a");
        q.offer("b");
        q.offer("c");
        q.offer("d");
        q.add("e");

        System.out.println("從頭部取出元素,並從隊列里刪除 >> "+q.poll());    //a 從頭部取出元素,並從隊列里刪除
        System.out.println("刪除后的長度 >> "+q.size());    //4
        System.out.println("取出頭部元素 >> "+q.peek());    //b
        System.out.println("長度 >> "+q.size());    //4
        }
}

打印結果:

從頭部取出元素,並從隊列里刪除 >> a
刪除后的長度 >> 4
取出頭部元素 >> b
長度 >> 4
View Code

2、實例

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;


public class NoBlockQueue {  
       private static ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<Integer>();   
          
    public static void main(String[] args) {  
        ExecutorService executorService = Executors.newFixedThreadPool(2);  

        executorService.submit(new Producer("producer1"));  
        executorService.submit(new Producer("producer2"));  
        executorService.submit(new Producer("producer3"));  
        executorService.submit(new Consumer("consumer1"));  
        executorService.submit(new Consumer("consumer2"));  
        executorService.submit(new Consumer("consumer3"));  

    }  
  
    static class Producer implements Runnable {  
        private String name;  
  
        public Producer(String name) {  
            this.name = name;  
        }  
  
        public void run() {  
            for (int i = 1; i < 10; ++i) {  
                System.out.println(name+ " start producer " + i);  
                concurrentLinkedQueue.add(i);  
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                //System.out.println(name+"end producer " + i);  
            }  
        }  
    }  
  
    static class Consumer implements Runnable {  
        private String name;  
  
        public Consumer(String name) {  
            this.name = name;  
        }  
        public void run() {  
            for (int i = 1; i < 10; ++i) {  
                try {
 
                    System.out.println(name+" Consumer " +  concurrentLinkedQueue.poll());

                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }  
//                System.out.println();  
//                System.out.println(name+" end Consumer " + i);  
            }  
        }  
    }  
} 

  在並發編程中,一般推薦使用阻塞隊列,這樣實現可以盡量地避免程序出現意外的錯誤。阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,然后解析線程不斷從隊列取數據解析。還有其他類似的場景,只要符合生產者-消費者模型的都可以使用阻塞隊列。

使用非阻塞隊列,雖然能即時返回結果(消費結果),但必須自行編碼解決返回為空的情況處理(以及消費重試等問題)。

另外它們都是線程安全的,不用考慮線程同步問題。

三、多線程模擬隊列

package com.bjsxt.base.conn009;
 
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
 
public class MyQueue {
    
    //1 需要一個承裝元素的集合 
    private LinkedList<Object> list = new LinkedList<Object>();
    
    //2 需要一個計數器
    private AtomicInteger count = new AtomicInteger(0);
    
    //3 需要制定上限和下限
    private final int minSize = 0;
    
    private final int maxSize ;
    
    //4 構造方法
    public MyQueue(int size){
        this.maxSize = size;
    }
    
    //5 初始化一個對象 用於加鎖
    private final Object lock = new Object();
    
    
    //put(anObject): 把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷,直到BlockingQueue里面有空間再繼續.
    public void put(Object obj){
        synchronized (lock) {
            while(count.get() == this.maxSize){
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //1 加入元素
            list.add(obj);
            //2 計數器累加
            count.incrementAndGet();
            //3 通知另外一個線程(喚醒)
            lock.notify();
            System.out.println("新加入的元素為:" + obj);
        }
    }
    
    
    //take: 取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入.
    public Object take(){
        Object ret = null;
        synchronized (lock) {
            while(count.get() == this.minSize){
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //1 做移除元素操作
            ret = list.removeFirst();
            //2 計數器遞減
            count.decrementAndGet();
            //3 喚醒另外一個線程
            lock.notify();
        }
        return ret;
    }
    
    public int getSize(){
        return this.count.get();
    }
    
    
    public static void main(String[] args) {
        
        final MyQueue mq = new MyQueue(5);
        mq.put("a");
        mq.put("b");
        mq.put("c");
        mq.put("d");
        mq.put("e");
        
        System.out.println("當前容器的長度:" + mq.getSize());
        
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                mq.put("f");
                mq.put("g");
            }
        },"t1");
        
        t1.start();
        
        
        
        
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                Object o1 = mq.take();
                System.out.println("移除的元素為:" + o1);
                Object o2 = mq.take();
                System.out.println("移除的元素為:" + o2);
            }
        },"t2");
        
        t2.start();
            
    }
    
}
View Code

 

 

 

文章參考:

https://blog.csdn.net/bieleyang/article/details/78027032

https://blog.csdn.net/u014535678/article/details/60583225

https://blog.csdn.net/qq_33524158/article/details/78578370


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM