高並發編程-隊列-BlockingQueue-LinkedBlockingQueue


高並發編程-隊列-BlockingQueue-LinkedBlockingQueue

一、LinkedBlockingQueue簡介

  LinkedBlockingQueue是一個基於鏈表的阻塞隊列,該隊列在創建時候,默認大小為Integer.MAX_VALUE,這個數值很大的,所以可以說LinkedBlockingQueue的大小沒有限制的,業界有個比較專業的詞匯,把它叫做無界隊列。但是這也帶來了一些問題,比如當JVM內存比較小的時候,可能就會出現OOM的情況。所以建議大家在使用的時候根據實際情況設置一個大小。

  LinkedBlockingQueue的內部是單向鏈表,在鏈表的內部只能next,也就是說查找元素只能從head查起,從tail插入。

  LinkedBlockingQueue的采用兩把鎖的分離技術,實現了出隊和入隊的鎖分離,也就是說LinkedBlockingQueue的讀寫是分離的,提高了它得並發處理能力

二、LinkedBlockingQueue的特點

隊列特點:無界阻塞隊列,可以指定容量,默認為Integer.MAX_VALUE,先進先出
數據結構: 鏈表結構
鎖特點:讀寫鎖分離,操作各自的node對象。takeLock 取node節點保證了順序不會亂,putlock存數據時保證了有序
條件阻塞:出隊時候,如果隊列的count=0,證明隊列里面沒有元素的,notEmpty條件隊列會被阻塞。入隊時候,如果隊列的count=capacity,證明隊列已滿,這個時候入隊線程需要阻塞,notFull需要阻塞。
入隊:從tail入隊。出隊:從head出隊

三、LinkedBlockingQueue的簡單實用

//創建一個有界隊列,這個需要指定大寫
BlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>(100)
//默認指定的Queue為無界隊列
BlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>();

//建議大家在使用的時候指定隊列大小,防止出現OOM

四、LinkedBlockingQueue的源碼

數據參數

//指定隊列容量大小,不指定默認 Integer.MAX_VALUE
private final int capacity;
//統計內部元素數量,這里使用了CAS的原子性操作
private final AtomicInteger count = new AtomicInteger();
//定義鏈表的頭部,初始化時 head=null
transient Node<E> head;
//定義鏈表的尾部
private transient Node<E> last;
//定義take poll使用的鎖,也就是消費者使用的鎖
private final ReentrantLock takeLock = new ReentrantLock();
//等待隊列的條件隊列,當隊列無元素時,take鎖會阻塞在notEmpty條件上,等待其它線程喚醒
private final Condition notEmpty = takeLock.newCondition();
//定義 put, offer,使用的鎖,也就是生產者使用的鎖
private final ReentrantLock putLock = new ReentrantLock();
//等待入隊的條件隊列,當隊列滿了時,put鎖會會阻塞在notFull上,等待其它線程喚醒
private final Condition notFull = putLock.newCondition();


//單鏈表結構
 static class Node<E> {
        E item;//存儲元素

        Node<E> next;//后續節點,單鏈表結構

        Node(E x) { item = x; }
    }

構造器

//默認的構造器 
public LinkedBlockingQueue() {
// 如果沒傳容量,就使用最大int值初始化其容量
        this(Integer.MAX_VALUE);
    }

//指定容量大小的構造器
 public LinkedBlockingQueue(int capacity) {
    //如果指定容量大小小於等於0拋出異常
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
//定義初始的head和last節點,初始為空節點
        last = head = new Node<E>(null);
    }

 入隊put方法

//入隊
public void put(E e) throws InterruptedException {
        //如果入隊元素為空,則拋出異常 
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        //定義一個新節點
        Node<E> node = new Node<E>(e);
        //加鎖操作 count原子性操作
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
              *如果元素總數和空間總數相等,生產者阻塞
              */
            while (count.get() == capacity) {
                notFull.await();
            }
           //入隊
            enqueue(node);
            //統計數據加1,返回原數據
            c = count.getAndIncrement();
            //如果數據總數小於空間總數,喚醒生產者
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            // 如果原隊列長度為0,現在加了一個元素后立即喚醒阻塞在notEmpty上的線程

            signalNotEmpty();
    }

//直接在尾部入隊
 private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
//如果隊列中加入元素,喚醒消費者
 private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

出隊take方法

 public E take() throws InterruptedException {
        E x;
        int c = -1;
        //獲取隊列中數據總數
        final AtomicInteger count = this.count;
        加鎖
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
           //如果隊列中數據為空,阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
           //獲取隊列元素
            x = dequeue();
           //隊列總數減一
            c = count.getAndDecrement();
            if (c > 1)
            //如果隊列總數大於1,喚醒消費線程
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
         //如果取數據前數據總數和空間總數相等,可以喚醒生產者線程
            signalNotFull();
        return x;
    }
//出隊操作
  private E dequeue() {
     //得到頭結點
        Node<E> h = head;
      //獲取第一個元素
        Node<E> first = h.next;
     //第一個元素變成頭結點
        h.next = h; // help GC
     //頭結點放入第一個元素
        head = first;
        //拿到第一個元素
        E x = first.item;
       //第一個元素制空
        first.item = null;
        return x;
    }

//喚醒生產者
 private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
        //喚醒生產者
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

五、LinkedBlockingQueue與ArrayBlockingQueue對比

     LinkedBlockingQueue是一個無界的隊列,默認大小為Integer.MAX_VALUE,LinkedBlockingQueue也是一個阻塞隊列,它有兩個鎖,一個生產者的putLock和一個消費者的takeLock。它和ArrayBlockingQueue的不同點如下

  • 有界和無界之分:ArrayBlockingQueue 在定時時候必須指定隊列大小,如果不指定會報錯。LinkedBlockingQueue可以指定大小也可以不指定大小,默認為(Integer.MAX_VALUE),LinkedBlockingQueue在生產者速度大於消費者的情況下會有OOM的風險
  • 存儲結構不同: ArrayBlockingQueue采用的是數組的存儲結構,所以在查詢上速度較快,刪除等操作上速度較慢,但是不會產生額外的對象。LinkedBlockingQueue 采用的是鏈表的結構,刪除操作上速度較快。在高並發下回產生大量的游離對象,GC壓力大
  • 加鎖方式不同:ArrayBlockingQueue只有一個lock鎖,入隊和出隊都使用這個鎖,所以並發能力有限。LinkedBlockingQueue采用兩個鎖,入隊和出隊各用各的鎖,並發處理能力高

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM