內存模型的相關概念
算計機在執行程序時,每條指令都是在CPU中執行的,而執行指令過程中勢必涉及到數據的讀取和寫入。當程序在運行時,會將運算需要的數據從主存復制一份到CPU的高速緩存,CPU進行計算時就可以直接從他的高速緩存讀取數據和向其中寫入數據,當運算結束后,在將高速緩存中的數據刷新到主存當中。
每個線程會有自己的高速緩存,當我們調用兩個線程執行: i=i+1;
我們希望執行過后 i=2,而事實並不一定如此,初始時兩個線程分別讀取 i 的值到各自的高速緩存,線程1加1然后把最新值 1 寫入到內存,此時線程2中的 i 還是 0 ,加一操作后 i 值 為 1,然后把線程 2 中的 i 寫入到 內存。
這就是著名的緩存一致性問題。通常這種被多線程訪問的變量成為共享變量。
並發編程中的三大概念
1、原子性
銀行賬戶轉賬問題
從A中取出錢,向B中存入錢,必須保證兩個操作要么全部執行,要么全不執行。
2、可見性(volatile可以保證可見性)
當多個線程訪問同一個變量時,一個線程修改了這變量的值,其他線程能夠及時看到修改的值。
3、有序性
即程序執行的順序按照代碼先后順序執行。
Java內存模型中,允許編譯器和處理器對指令進行重排序,但是只會重排數據無關的語句,保證單線程內的運行結果一致。
另外Java內存模型具有一些先天的有序性, happens-before 原則。
1)程序次序規則,一個線程內,按照代碼順序,書寫在前面的操作現行發生於書寫在后面的操作
2)鎖定規則:一個unLock操作先行發生於同一個鎖的lock操作
3)volatile變量規則:對於一個的寫操作先行發生於后面對這個變量的讀操作。
4)傳遞規則:A先於B,B先於C,則A先於C。
5)線程啟動規則:Thread對象的start()方法先行發生於此線程的每一個動作。
6)線程中斷規則:線程 interrupt() 方法的調用先行發生於被中斷線程的代碼檢測到中斷事件的發生。
7)線程終結規則:線程中所有的操作都先行發生與線程的終止檢測,我們可以通過Thread.join()方法時結束、Thread.isAlive() 的返回值手段檢測到線程已經終止執行。
8)對象終結規則,一個對象的初始化完成先行發生於他的 finalize() 方法的開始。
1、Synchronized 修飾
2、volatile實現同步 (只能保證可見性,不能保證原子性)
1)使用volatile關鍵字會強制將修改的緩存值立即寫入主存。
2)使用volatile關鍵字,當線程2進行修改時,會導致線程1的工作內存中變量緩存無效,然后線程1讀取時發現自己的緩存無效他會等待緩存行對應的主存地址被更新之后,然后去主存讀取最新信息。
3)禁止指令重排序
(1)當程序執行到volatile變量的讀操作或者寫操作時,其前面的操作的更改肯定全部已經進 行,且結果已經對后面的操作可見,在其后面的操作還沒有進行。
(2)在進行指令優化時,不能將在對volatile變量訪問的語句放在其后面執行,也不能把 volatile變量后面的語句放在其前面執行。
在下列代碼中: 語句3不會在 1、2之前執行,也不會在 4、5 之后執行,但 1和2的順序, 4和5的順序是無法保證的。 volatile 保證,執行到 3 時 1、2必定執行,且對 4、5 可見。
// x y 非volatile // z 為 volatile修飾 x = 2; //語句1 y = 0;//語句2 z = true;//語句3 x = 1;//語句4 y = -1;//語句5
使用場景:1)對變量的寫操作不依賴當前值。2)該變量沒有包含在具有其他變量的不變式中
也就是變量取值時值的內容獨立於程序其他部分(賦值語句為原子性操作)。
3、使用局部變量ThreadLocal
(轉載自:鏈接:https://www.zhihu.com/question/23089780/answer/62097840) ThreadLocal類用來提供線程內部的局部變量。這種變量在多線程環境下訪問(通過get或set方法訪問)時能保證各個線程里的變量相對獨立於其他線程內的變量。ThreadLocal實例通常來說都是private static類型的,用於關聯線程和線程的上下文。 可以總結為一句話:ThreadLocal的作用是提供線程內的局部變量,這種變量在線程的生命周期內起作用,減少同一個線程內多個函數或者組件之間一些公共變量的傳遞的復雜度。 舉個例子,我出門需要先坐公交再做地鐵,這里的坐公交和坐地鐵就好比是同一個線程內的兩個函數,我就是一個線程,我要完成這兩個函數都需要同一個東西:公交卡(北京公交和地鐵都使用公交卡),那么我為了不向這兩個函數都傳遞公交卡這個變量(相當於不是一直帶着公交卡上路),我可以這么做:將公交卡事先交給一個機構,當我需要刷卡的時候再向這個機構要公交卡(當然每次拿的都是同一張公交卡)。這樣就能達到只要是我(同一個線程)需要公交卡,何時何地都能向這個機構要的目的。 有人要說了:你可以將公交卡設置為全局變量啊,這樣不是也能何時何地都能取公交卡嗎?但是如果有很多個人(很多個線程)呢?大家可不能都使用同一張公交卡吧(我們假設公交卡是實名認證的),這樣不就亂套了嘛。現在明白了吧?這就是ThreadLocal設計的初衷:提供線程內部的局部變量,在本線程內隨時隨地可取,隔離其他線程。
主要方法:
1 public class ThreadLocalTest { 2 public static void main(String[] args) { 3 /** 4 * 5 * 內部屬性 6 private final int threadLocalHashCode = nextHashCode(); 7 8 private static AtomicInteger nextHashCode = new AtomicInteger(); 9 10 private static final int HASH_INCREMENT = 0x61c88647; 11 12 */ 13 14 /** 15 * 構造函數為空 ,不作處理 16 * public ThreadLocal() { } 17 */ 18 ThreadLocal<Integer> threadLocal = new ThreadLocal(); 19 /** 20 * 創建一個線程局部變量ThreadLocal 21 */ 22 Integer i = 1; 23 threadLocal.set(i); 24 /** 25 * 取出線程局部變量ThreadLocal 26 */ 27 Integer j = null; 28 j =threadLocal.get(); 29 /** 30 * 解除當前線程綁定的ThreadLocal 31 */ 32 threadLocal.remove(); 33 }}
測試:
1 public class ThreadLocalTest { 2 private static final ThreadLocal<Integer> value = new ThreadLocal() { 3 @Override 4 protected Object initialValue() { 5 return 0; 6 } 7 8 }; 9 10 static class MyThread implements Runnable { 11 private int index; 12 13 public void MyThread(int index) { 14 this.index = index; 15 } 16 17 @Override 18 public void run() { 19 System.out.println("線程" + index + "的初始value:" + value.get()); 20 for (int i = 0; i < 10; i++) { 21 value.set(value.get() + 1); 22 } 23 System.out.println("線程" + index + "的累加value:" + value.get()); 24 } 25 26 } 27 28 public static void main(String[] args) { 29 for (int i = 0; i < 5; i++) { 30 new Thread(new MyThread()).start(); 31 32 } 33 } 34 }
1 結果: 2 3 線程0的初始value:0 4 線程0的初始value:0 5 線程0的累加value:10 6 線程0的累加value:10 7 線程0的初始value:0 8 線程0的初始value:0 9 線程0的初始value:0 10 線程0的累加value:10 11 線程0的累加value:10 12 線程0的累加value:10
4.原子類(AtomicInteger、AtomicBoolean……)
原子類是基本類的原子化版本,通過線程安全的方式操作,等同於自動加synchronized。
如: Integer 與AtomicInteger:
1 import java.util.concurrent.atomic.AtomicInteger; 2 public class IntegerTest { 3 static Integer i = 0; 4 static public void increase() { 5 synchronized(i){ 6 ++i; 7 i++; 8 } 9 System.out.println(i); 10 } 11 public static void main(String[] args) { 12 increase(); 13 } 14 }
1 import java.util.concurrent.atomic.AtomicInteger; 2 public class AtomicIntegerTest { 3 public static void main(String[] args) { 4 /** 5 * AtomicIntegerTest 線程安全 6 */ 7 int i = 0; 8 AtomicInteger atomicInteger = new AtomicInteger(i); 9 //相當於 ++i 10 i = atomicInteger.incrementAndGet(); 11 //相當於 i++ 12 i = atomicInteger.getAndIncrement(); 13 System.out.println(i); 14 } 15 }
AtomicInteger.compareAndSet(int expect, int update)
如果當前值 == 預期值,則以原子方式將該值設置為給定的更新值。
AtomicReference
賦值操作不是線程安全的。若想不用鎖來實現,可以用AtomicReference<V>這個類,實現對象引用的原子更新。
常用方法
構造函數。
java.util.concurrent.atomic.AtomicReference.AtomicReference(V initialValue)
返回當前的引用。
java.util.concurrent.atomic.AtomicReference.get()
如果當前值與給定的expect相等,(注意是引用相等而不是equals()相等),更新為指定的update值。
boolean java.util.concurrent.atomic.AtomicReference.compareAndSet(V expect, V update)
原子地設為給定值並返回舊值。
java.util.concurrent.atomic.AtomicReference.getAndSet(V newValue)
注意此方法不是原子的。不明白為什么要提供這個方法,很容易誤用。
void java.util.concurrent.atomic.AtomicReference.set(V newValue)
5、使用Lock
lock: 在java.util.concurrent包內。共有三個實現:
ReentrantLock
ReentrantReadWriteLock.ReadLock
ReentrantReadWriteLock.WriteLock
主要目的是和synchronized一樣, 兩者都是為了解決同步問題,處理資源爭端而產生的技術。功能類似但有一些區別。
區別如下:
lock更靈活,可以自由定義多把鎖的枷鎖解鎖順序(synchronized要按照先加的后解順序)
提供多種加鎖方案,lock 阻塞式, trylock 無阻塞式, lockInterruptily 可打斷式, 還有trylock的帶超時時間版本。
本質上和監視器鎖(即synchronized是一樣的)
能力越大,責任越大,必須控制好加鎖和解鎖,否則會導致災難。
和Condition類的結合。
線程越多,性能相對更高,
ReentrantLock
可重入的意義在於持有鎖的線程可以繼續持有,並且要釋放對等的次數后才真正釋放該鎖。
使用方法是:
1.先new一個實例
static ReentrantLock r=new ReentrantLock();
2.加鎖
1 //加鎖 2 //后者可被打斷。 3 //當a線程lock后,b線程阻塞,此時如果是lockInterruptibly,那么在調用b.interrupt()之、后,b線程退出阻塞,並放棄對資源的爭搶,進入catch塊。 4 //(如果使用后者,必須throw interruptable exception 或catch) 5 r.lock()或r.lockInterruptibly()
3.釋放鎖
r.unlock()
必須做!何為必須做呢,要放在finally里面。以防止異常跳出了正常流程,導致災難。這里補充一個小知識點,finally是可以信任的:經過測試,哪怕是發生了OutofMemoryError,finally塊中的語句執行也能夠得到保證。
ReentrantReadWriteLock
可重入讀寫鎖(讀寫鎖的一個實現)
ReentrantReadWriteLock lock = new ReentrantReadWriteLock() ReadLock r = lock.readLock(); WriteLock w = lock.writeLock();
兩者都有lock,unlock方法。寫寫,寫讀互斥;讀讀不互斥。可以實現並發讀的高效線程安全代碼
6、容器類(BlockingQueue、ConcurrentHashMap)
這里就討論比較常用的兩個:
BlockingQueue ConcurrentHashMap
BlockingQueue
阻塞隊列。該類是java.util.concurrent包下的重要類,通過對Queue的學習可以得知,這個queue是單向隊列,可以在隊列頭添加元素和在隊尾刪除或取出元素。類似於一個管道,特別適用於先進先出策略的一些應用場景。普通的queue接口主要實現有PriorityQueue(優先隊列),有興趣可以研究
BlockingQueue在隊列的基礎上添加了多線程協作的功能:
放入數據:
offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,
則返回true,否則返回false.(本方法不阻塞當前執行方法的線程)
則返回true,否則返回false.(本方法不阻塞當前執行方法的線程)
offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷,直到BlockingQueue里面有空間再繼續.
獲取數據:
poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,
取不到時返回null;
poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗。
take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入;
drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),
通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。
BlockingQueue成員詳細介紹
1. ArrayBlockingQueue
基於數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。
ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味着兩者無法真正並行運行,這點尤其不同於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現生產者和消費者操作的完全並行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數據寫入和獲取操作已經足夠輕巧,以至於引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在於,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內需要高效並發地處理大批量數據的系統中,其對於GC的影響還是存在一定的區別。而在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。
2.LinkedBlockingQueue
基於鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也維持着一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理並發數據,還因為其對於生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味着在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。
1 import java.util.Random; 2 import java.util.concurrent.BlockingQueue; 3 import java.util.concurrent.TimeUnit; 4 import java.util.concurrent.atomic.AtomicInteger; 5 public class Producer implements Runnable{ 6 private BlockingQueue queue; 7 private volatile boolean isRunning = true; 8 private static AtomicInteger count = new AtomicInteger(); 9 private static final int DEFAULT_FOR_SLEEP = 1000; 10 public Producer(BlockingQueue queue) { 11 this.queue = queue; 12 } 13 @Override 14 public void run() { 15 String data = null; 16 System.out.println("啟動生產者線程!"); 17 Random random = new Random(); 18 try { 19 while(isRunning) { 20 System.out.println("正在生產數據!"); 21 Thread.sleep(random.nextInt(DEFAULT_FOR_SLEEP)); 22 data = "data:"+count.incrementAndGet(); 23 System.out.println("將隊列 "+data+"放入隊列"); 24 if (!queue.offer(data,2,TimeUnit.SECONDS)) { 25 System.out.println("放入數據失敗"+data); 26 } 27 } 28 } catch (InterruptedException e) { 29 // TODO Auto-generated catch block 30 e.printStackTrace(); 31 }finally { 32 System.out.println("退出生產者線程!"); 33 } 34 } 35 public void stop() { 36 isRunning = false; 37 } 38 }
1 import java.util.Random; 2 import java.util.concurrent.BlockingQueue; 3 import java.util.concurrent.TimeUnit; 4 5 public class Consumer implements Runnable { 6 private BlockingQueue queue; 7 private boolean isRunning = true; 8 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; 9 10 public Consumer(BlockingQueue queue) { 11 this.queue = queue; 12 } 13 @Override 14 public void run() { 15 System.out.println("啟動消費者線程!"); 16 Random random = new Random(); 17 isRunning = true; 18 try { 19 while (isRunning) { 20 System.out.println("正在從隊列里獲取數據"); 21 String data = (String) queue.poll(2, TimeUnit.SECONDS); 22 if (null != data) { 23 System.out.println("拿到數據 :" + data); 24 System.out.println("正在消費數據:" + data); 25 Thread.sleep(random.nextInt(DEFAULT_RANGE_FOR_SLEEP)); 26 } else { 27 // 超過2s還沒數據,認為所有生產線都已經退出,自動退出消費線程 28 isRunning = false; 29 } 30 } 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 Thread.currentThread().interrupt(); 34 } finally { 35 System.out.println("退出消費者線程!"); 36 } 37 } 38 }
1 package org.clockQueue; 2 3 import java.util.concurrent.BlockingQueue; 4 import java.util.concurrent.Executor; 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.LinkedBlockingQueue; 8 9 public class BlocakingQueueTest { 10 public static void main(String[] args) throws InterruptedException { 11 BlockingQueue<String> queue =new LinkedBlockingQueue(10); 12 Producer p1 = new Producer(queue); 13 Producer p2 = new Producer(queue); 14 Producer p3 = new Producer(queue); 15 Consumer c = new Consumer(queue); 16 17 ExecutorService service = Executors.newCachedThreadPool(); 18 //啟動線程 19 service.execute(p1); 20 service.execute(p2); 21 service.execute(p3); 22 service.execute(c); 23 //運行10s 24 Thread.sleep(10*1000); 25 p1.stop(); 26 p2.stop(); 27 p3.stop(); 28 29 Thread.sleep(2000); 30 31 //退出executor 32 service.shutdown(); 33 34 35 } 36 }
1 啟動生產者線程! 2 啟動消費者線程! 3 啟動生產者線程! 4 啟動生產者線程! 5 正在生產數據! 6 正在生產數據! 7 正在生產數據! 8 正在從隊列里獲取數據 9 將隊列 data:1放入隊列 10 正在生產數據! 11 拿到數據 :data:1 12 正在消費數據:data:1 13 正在從隊列里獲取數據 14 將隊列 data:2放入隊列 15 正在生產數據! 16 拿到數據 :data:2 17 正在消費數據:data:2 18 將隊列 data:3放入隊列 19 正在生產數據! 20 將隊列 data:4放入隊列 21 正在生產數據! 22 將隊列 data:5放入隊列 23 正在生產數據! 24 將隊列 data:6放入隊列 25 正在生產數據! 26 正在從隊列里獲取數據 27 拿到數據 :data:3 28 正在消費數據:data:3 29 將隊列 data:7放入隊列 30 正在生產數據! 31 將隊列 data:8放入隊列 32 正在生產數據! 33 正在從隊列里獲取數據 34 拿到數據 :data:4 35 正在消費數據:data:4 36 將隊列 data:9放入隊列 37 正在生產數據! 38 正在從隊列里獲取數據 39 拿到數據 :data:5 40 正在消費數據:data:5 41 將隊列 data:10放入隊列 42 正在生產數據! 43 將隊列 data:11放入隊列 44 正在生產數據! 45 將隊列 data:12放入隊列 46 正在生產數據! 47 將隊列 data:13放入隊列 48 正在生產數據! 49 將隊列 data:14放入隊列 50 正在生產數據! 51 正在從隊列里獲取數據 52 拿到數據 :data:6 53 正在消費數據:data:6 54 將隊列 data:15放入隊列 55 正在生產數據! 56 將隊列 data:16放入隊列 57 正在生產數據! 58 將隊列 data:17放入隊列 59 將隊列 data:18放入隊列 60 正在從隊列里獲取數據 61 拿到數據 :data:7 62 正在消費數據:data:7 63 正在生產數據! 64 將隊列 data:19放入隊列 65 正在從隊列里獲取數據 66 拿到數據 :data:8 67 正在生產數據! 68 正在消費數據:data:8 69 將隊列 data:20放入隊列 70 將隊列 data:21放入隊列 71 正在從隊列里獲取數據 72 拿到數據 :data:9 73 正在生產數據! 74 正在消費數據:data:9 75 正在從隊列里獲取數據 76 拿到數據 :data:10 77 正在生產數據! 78 正在消費數據:data:10 79 正在從隊列里獲取數據 80 拿到數據 :data:11 81 正在生產數據! 82 正在消費數據:data:11 83 將隊列 data:22放入隊列 84 將隊列 data:23放入隊列 85 將隊列 data:24放入隊列 86 正在從隊列里獲取數據 87 拿到數據 :data:12 88 正在生產數據! 89 正在消費數據:data:12 90 正在從隊列里獲取數據 91 拿到數據 :data:13 92 正在生產數據! 93 正在消費數據:data:13 94 正在從隊列里獲取數據 95 拿到數據 :data:14 96 正在生產數據! 97 正在消費數據:data:14 98 正在從隊列里獲取數據 99 拿到數據 :data:15 100 正在消費數據:data:15 101 正在從隊列里獲取數據 102 拿到數據 :data:16 103 正在消費數據:data:16 104 正在從隊列里獲取數據 105 拿到數據 :data:17 106 正在消費數據:data:17 107 將隊列 data:25放入隊列 108 正在生產數據! 109 將隊列 data:26放入隊列 110 正在生產數據! 111 正在從隊列里獲取數據 112 拿到數據 :data:18 113 正在消費數據:data:18 114 將隊列 data:27放入隊列 115 正在生產數據! 116 將隊列 data:28放入隊列 117 正在生產數據! 118 將隊列 data:29放入隊列 119 正在從隊列里獲取數據 120 拿到數據 :data:19 121 正在生產數據! 122 正在消費數據:data:19 123 將隊列 data:30放入隊列 124 將隊列 data:31放入隊列 125 將隊列 data:32放入隊列 126 正在從隊列里獲取數據 127 拿到數據 :data:20 128 正在生產數據! 129 正在消費數據:data:20 130 正在從隊列里獲取數據 131 拿到數據 :data:21 132 正在生產數據! 133 正在消費數據:data:21 134 將隊列 data:33放入隊列 135 正在從隊列里獲取數據 136 拿到數據 :data:22 137 正在生產數據! 138 正在消費數據:data:22 139 將隊列 data:34放入隊列 140 正在從隊列里獲取數據 141 拿到數據 :data:23 142 正在生產數據! 143 正在消費數據:data:23 144 將隊列 data:35放入隊列 145 正在從隊列里獲取數據 146 拿到數據 :data:24 147 退出生產者線程! 148 正在消費數據:data:24 149 正在從隊列里獲取數據 150 拿到數據 :data:25 151 正在消費數據:data:25 152 退出生產者線程! 153 將隊列 data:36放入隊列 154 正在從隊列里獲取數據 155 拿到數據 :data:26 156 退出生產者線程! 157 正在消費數據:data:26 158 正在從隊列里獲取數據 159 拿到數據 :data:27 160 正在消費數據:data:27 161 正在從隊列里獲取數據 162 拿到數據 :data:28 163 正在消費數據:data:28 164 正在從隊列里獲取數據 165 拿到數據 :data:29 166 正在消費數據:data:29 167 正在從隊列里獲取數據 168 拿到數據 :data:30 169 正在消費數據:data:30 170 正在從隊列里獲取數據 171 拿到數據 :data:31 172 正在消費數據:data:31 173 正在從隊列里獲取數據 174 拿到數據 :data:32 175 正在消費數據:data:32 176 正在從隊列里獲取數據 177 拿到數據 :data:33 178 正在消費數據:data:33 179 正在從隊列里獲取數據 180 拿到數據 :data:34 181 正在消費數據:data:34 182 正在從隊列里獲取數據 183 拿到數據 :data:35 184 正在消費數據:data:35 185 正在從隊列里獲取數據 186 拿到數據 :data:36 187 正在消費數據:data:36 188 正在從隊列里獲取數據 189 退出消費者線程!
3. DelayQueue
DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
使用場景:
DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連接隊列。
4. PriorityBlockingQueue
基於優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是公平鎖。
5. SynchronousQueue
一種無緩沖的等待隊列,類似於無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿着產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那么對不起,大家都在集市等待。相對於有緩沖的BlockingQueue來說,少了一個中間經銷商的環節(緩沖區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由於經銷商可以庫存一部分商品,因此相對於直接交易模式,總體來說采用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應性能可能會降低。
聲明一個SynchronousQueue有兩種不同的方式,它們之間有着不太一樣的行為。公平模式和非公平模式的區別:
如果采用公平模式:SynchronousQueue會采用公平鎖,並配合一個FIFO隊列來阻塞多余的生產者和消費者,從而體系整體的公平策略;
但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者,而后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理。
小結
BlockingQueue不光實現了一個完整隊列所具有的基本功能,同時在多線程環境下,他還自動管理了多線間的自動等待於喚醒功能,從而使得程序員可以忽略這些細節,關注更高級的功能。
ConcurrentHashMap
高效的線程安全哈希map。請對比hashTable , concurrentHashMap, HashMap