概述
LinkedBlockingQueue內部由單鏈表實現,只能從head取元素,從tail添加元素。添加元素和獲取元素都有獨立的鎖,也就是說LinkedBlockingQueue是讀寫分離的,讀寫操作可以並行執行。LinkedBlockingQueue采用可重入鎖(ReentrantLock)來保證在並發情況下的線程安全。
構造器
LinkedBlockingQueue一共有三個構造器,分別是無參構造器、可以指定容量的構造器、可以穿入一個容器的構造器。如果在創建實例的時候調用的是無參構造器,LinkedBlockingQueue的默認容量是Integer.MAX_VALUE,這樣做很可能會導致隊列還沒有滿,但是內存卻已經滿了的情況(內存溢出)。
1 public LinkedBlockingQueue(); //設置容量為Integer.MAX 2 3 public LinkedBlockingQueue(int capacity); //設置指定容量 4 5 public LinkedBlockingQueue(Collection<? extends E> c); //穿入一個容器,如果調用該構造器,容量默認也是Integer.MAX_VALUE
LinkedBlockingQueue常用操作
取數據
take():首選。當隊列為空時阻塞
poll():彈出隊頂元素,隊列為空時,返回空
peek():和poll烈性,返回隊隊頂元素,但頂元素不彈出。隊列為空時返回null
remove(Object o):移除某個元素,隊列為空時拋出異常。成功移除返回true
添加數據
put():首選。隊滿是阻塞
offer():隊滿時返回false
判斷隊列是否為空
size()方法會遍歷整個隊列,時間復雜度為O(n),所以最好選用isEmtpy
put元素原理
基本過程:
1.判斷元素是否為null,為null拋出異常
2.加鎖(可中斷鎖)
3.判斷隊列長度是否到達容量,如果到達一直等待
4.如果沒有隊滿,enqueue()在隊尾加入元素
5.隊列長度加1,此時如果隊列還沒有滿,調用signal喚醒其他堵塞隊列
1 if (e == null) throw new NullPointerException(); 2 3 int c = -1; 4 Node<E> node = new Node<E>(e); 5 final ReentrantLock putLock = this.putLock; 6 final AtomicInteger count = this.count; 7 putLock.lockInterruptibly(); 8 try { 9 while (count.get() == capacity) { 10 notFull.await(); 11 } 12 enqueue(node); 13 c = count.getAndIncrement(); 14 if (c + 1 < capacity) 15 notFull.signal(); 16 } finally { 17 putLock.unlock(); 18 }
take元素原理
基本過程:
1.加鎖(依舊是ReentrantLock),注意這里的鎖和寫入是不同的兩把鎖
2.判斷隊列是否為空,如果為空就一直等待
3.通過dequeue方法取得數據
3.取走元素后隊列是否為空,如果不為空喚醒其他等待中的隊列
1 public E take() throws InterruptedException { 2 E x; 3 int c = -1; 4 final AtomicInteger count = this.count; 5 final ReentrantLock takeLock = this.takeLock; 6 takeLock.lockInterruptibly(); 7 try { 8 while (count.get() == 0) { 9 notEmpty.await(); 10 } 11 x = dequeue(); 12 c = count.getAndDecrement(); 13 if (c > 1) 14 notEmpty.signal(); 15 } finally { 16 takeLock.unlock(); 17 } 18 if (c == capacity) 19 signalNotFull(); 20 return x; 21 }
enqueue()和dequeue()方法實現都比較簡單,無非就是將元素添加到隊尾,從隊頂取走元素,感興趣的朋友可以自己去看一下,這里就不粘貼了。
LinkedBlockingQueue與LinkedBlockingDeque比較
LinkedBlockingDeque和LinkedBlockingQueue的相同點在於:
1. 基於鏈表
2. 容量可選,不設置的話,就是Int的最大值
和LinkedBlockingQueue的不同點在於:
1. 雙端鏈表和單鏈表
2. 不存在哨兵節點
3. 一把鎖+兩個條件
實例:
小記:AtomicInteger的getAndIncrment和getAndDcrement()等方法,這些方法分為兩步,get和increment(decrement),在get和increment中間可能有其他線程進入,導致多個線程get到的數值是相同的,也會導致多個線程累加后的值其實累加1.在這種情況下,使用volatile也是沒有效果的,因為get之后沒有對值進行修改,不能觸發volatile的效果。
1 public class ProducerAndConsumer { 2 public static void main(String[] args){ 3 4 try{ 5 BlockingQueue queue = new LinkedBlockingQueue(5); 6 7 ExecutorService executor = Executors.newFixedThreadPool(5); 8 Produer producer = new Produer(queue); 9 for(int i=0;i<3;i++){ 10 executor.execute(producer); 11 } 12 executor.execute(new Consumer(queue)); 13 14 executor.shutdown(); 15 }catch (Exception e){ 16 e.printStackTrace(); 17 } 18 19 } 20 } 21 22 class Produer implements Runnable{ 23 24 private BlockingQueue queue; 25 private int nums = 20; //循環次數 26 27 //標記數據編號 28 private static volatile AtomicInteger count = new AtomicInteger(); 29 private boolean isRunning = true; 30 public Produer(){} 31 32 public Produer(BlockingQueue queue){ 33 this.queue = queue; 34 } 35 36 public void run() { 37 String data = null; 38 try{ 39 System.out.println("開始生產數據"); 40 System.out.println("-----------------------"); 41 42 while(nums>0){ 43 nums--; 44 count.decrementAndGet(); 45 46 Thread.sleep(500); 47 System.out.println(Thread.currentThread().getId()+ " :生產者生產了一個數據"); 48 queue.put(count.getAndIncrement()); 49 } 50 }catch(Exception e){ 51 e.printStackTrace(); 52 Thread.currentThread().interrupt(); 53 }finally{ 54 System.out.println("生產者線程退出!"); 55 } 56 } 57 } 58 59 class Consumer implements Runnable{ 60 61 private BlockingQueue queue; 62 private int nums = 20; 63 private boolean isRunning = true; 64 65 public Consumer(){} 66 67 public Consumer(BlockingQueue queue){ 68 this.queue = queue; 69 } 70 71 public void run() { 72 73 System.out.println("消費者開始消費"); 74 System.out.println("-------------------------"); 75 76 while(nums>0){ 77 nums--; 78 try{ 79 while(isRunning){ 80 int data = (Integer)queue.take(); 81 Thread.sleep(500); 82 System.out.println("消費者消費的數據是" + data); 83 } 84 85 }catch(Exception e){ 86 e.printStackTrace(); 87 Thread.currentThread().interrupt(); 88 }finally { 89 System.out.println("消費者線程退出!"); 90 } 91 92 } 93 } 94 }
效果:
1 12 :生產者生產了一個數據 2 11 :生產者生產了一個數據 3 13 :生產者生產了一個數據 4 12 :生產者生產了一個數據 5 消費者消費的數據是-3 6 11 :生產者生產了一個數據 7 13 :生產者生產了一個數據 8 12 :生產者生產了一個數據 9 消費者消費的數據是-3 10 13 :生產者生產了一個數據 11 11 :生產者生產了一個數據 12 12 :生產者生產了一個數據 13 消費者消費的數據是-3 14 13 :生產者生產了一個數據 15 11 :生產者生產了一個數據 16 消費者消費的數據是-3 17 消費者消費的數據是-3
可以看到,有多個producer在生產數據的時候get到的是相同的值。