高並發編程-隊列-BlockingQueue-LinkedBlockingQueue
一、LinkedBlockingQueue簡介
LinkedBlockingQueue是一個基於鏈表的阻塞隊列,該隊列在創建時候,默認大小為Integer.MAX_VALUE,這個數值很大的,所以可以說LinkedBlockingQueue的大小沒有限制的,業界有個比較專業的詞匯,把它叫做無界隊列。但是這也帶來了一些問題,比如當JVM內存比較小的時候,可能就會出現OOM的情況。所以建議大家在使用的時候根據實際情況設置一個大小。
LinkedBlockingQueue的內部是單向鏈表,在鏈表的內部只能next,也就是說查找元素只能從head查起,從tail插入。
LinkedBlockingQueue的采用兩把鎖的分離技術,實現了出隊和入隊的鎖分離,也就是說LinkedBlockingQueue的讀寫是分離的,提高了它得並發處理能力
二、LinkedBlockingQueue的特點
隊列特點:無界阻塞隊列,可以指定容量,默認為Integer.MAX_VALUE,先進先出 |
數據結構: 鏈表結構 |
鎖特點:讀寫鎖分離,操作各自的node對象。takeLock 取node節點保證了順序不會亂,putlock存數據時保證了有序 |
條件阻塞:出隊時候,如果隊列的count=0,證明隊列里面沒有元素的,notEmpty條件隊列會被阻塞。入隊時候,如果隊列的count=capacity,證明隊列已滿,這個時候入隊線程需要阻塞,notFull需要阻塞。 |
入隊:從tail入隊。出隊:從head出隊 |
三、LinkedBlockingQueue的簡單實用
//創建一個有界隊列,這個需要指定大寫 BlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>(100) //默認指定的Queue為無界隊列 BlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>(); //建議大家在使用的時候指定隊列大小,防止出現OOM
四、LinkedBlockingQueue的源碼
數據參數
//指定隊列容量大小,不指定默認 Integer.MAX_VALUE private final int capacity; //統計內部元素數量,這里使用了CAS的原子性操作 private final AtomicInteger count = new AtomicInteger(); //定義鏈表的頭部,初始化時 head=null transient Node<E> head; //定義鏈表的尾部 private transient Node<E> last; //定義take poll使用的鎖,也就是消費者使用的鎖 private final ReentrantLock takeLock = new ReentrantLock(); //等待隊列的條件隊列,當隊列無元素時,take鎖會阻塞在notEmpty條件上,等待其它線程喚醒 private final Condition notEmpty = takeLock.newCondition(); //定義 put, offer,使用的鎖,也就是生產者使用的鎖 private final ReentrantLock putLock = new ReentrantLock(); //等待入隊的條件隊列,當隊列滿了時,put鎖會會阻塞在notFull上,等待其它線程喚醒 private final Condition notFull = putLock.newCondition(); //單鏈表結構 static class Node<E> { E item;//存儲元素 Node<E> next;//后續節點,單鏈表結構 Node(E x) { item = x; } }
構造器
//默認的構造器 public LinkedBlockingQueue() { // 如果沒傳容量,就使用最大int值初始化其容量 this(Integer.MAX_VALUE); } //指定容量大小的構造器 public LinkedBlockingQueue(int capacity) { //如果指定容量大小小於等於0拋出異常 if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //定義初始的head和last節點,初始為空節點 last = head = new Node<E>(null); }
入隊put方法
//入隊 public void put(E e) throws InterruptedException { //如果入隊元素為空,則拋出異常 if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; //定義一個新節點 Node<E> node = new Node<E>(e); //加鎖操作 count原子性操作 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* *如果元素總數和空間總數相等,生產者阻塞 */ while (count.get() == capacity) { notFull.await(); } //入隊 enqueue(node); //統計數據加1,返回原數據 c = count.getAndIncrement(); //如果數據總數小於空間總數,喚醒生產者 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) // 如果原隊列長度為0,現在加了一個元素后立即喚醒阻塞在notEmpty上的線程 signalNotEmpty(); } //直接在尾部入隊 private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } //如果隊列中加入元素,喚醒消費者 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
出隊take方法
public E take() throws InterruptedException { E x; int c = -1; //獲取隊列中數據總數 final AtomicInteger count = this.count; 加鎖 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //如果隊列中數據為空,阻塞 while (count.get() == 0) { notEmpty.await(); } //獲取隊列元素 x = dequeue(); //隊列總數減一 c = count.getAndDecrement(); if (c > 1) //如果隊列總數大於1,喚醒消費線程 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) //如果取數據前數據總數和空間總數相等,可以喚醒生產者線程 signalNotFull(); return x; } //出隊操作 private E dequeue() { //得到頭結點 Node<E> h = head; //獲取第一個元素 Node<E> first = h.next; //第一個元素變成頭結點 h.next = h; // help GC //頭結點放入第一個元素 head = first; //拿到第一個元素 E x = first.item; //第一個元素制空 first.item = null; return x; } //喚醒生產者 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { //喚醒生產者 notFull.signal(); } finally { putLock.unlock(); } }
五、LinkedBlockingQueue與ArrayBlockingQueue對比
LinkedBlockingQueue是一個無界的隊列,默認大小為Integer.MAX_VALUE,LinkedBlockingQueue也是一個阻塞隊列,它有兩個鎖,一個生產者的putLock和一個消費者的takeLock。它和ArrayBlockingQueue的不同點如下
- 有界和無界之分:ArrayBlockingQueue 在定時時候必須指定隊列大小,如果不指定會報錯。LinkedBlockingQueue可以指定大小也可以不指定大小,默認為(Integer.MAX_VALUE),LinkedBlockingQueue在生產者速度大於消費者的情況下會有OOM的風險
- 存儲結構不同: ArrayBlockingQueue采用的是數組的存儲結構,所以在查詢上速度較快,刪除等操作上速度較慢,但是不會產生額外的對象。LinkedBlockingQueue 采用的是鏈表的結構,刪除操作上速度較快。在高並發下回產生大量的游離對象,GC壓力大
- 加鎖方式不同:ArrayBlockingQueue只有一個lock鎖,入隊和出隊都使用這個鎖,所以並發能力有限。LinkedBlockingQueue采用兩個鎖,入隊和出隊各用各的鎖,並發處理能力高