線程池 BlockingQueue synchronized volatile
前段時間看了一篇關於"一名3年工作經驗的程序員應該具備的技能"文章,倍受打擊。很多熟悉而又陌生的知識讓我懷疑自己是一個假的程序員。本章從線程池,阻塞隊列,synchronized 和 volatile關鍵字,wait,notify方法實現線程之間的通訊,死鎖,常考面試題。將這些零碎的知識整合在一起。如下圖所示。
學習流程圖:
技術:Executors,BlockingQueue,synchronized,volatile,wait,notify
說明:文章學習思路:線程池---->隊列---->關鍵字---->死鎖---->線程池實戰
源碼:https://github.com/ITDragonBlog/daydayup/tree/master/ThreadBase
線程池
線程池,顧名思義存放線程的池子,可以類比數據庫的連接池。因為頻繁地創建和銷毀線程會給服務器帶來很大的壓力。若能將創建的線程不再銷毀而是存放在池中等待下一個任務使用,可以不僅減少了創建和銷毀線程所用的時間,提高了性能,同時還減輕了服務器的壓力。
線程池的使用
初始化線程池有五個核心參數,分別是 corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue。還有兩個默認參數 threadFactory, handler
corePoolSize:線程池初始核心線程數。初始化線程池的時候,池內是沒有線程,只有在執行任務的時會創建線程。
maximumPoolSize:線程池允許存在的最大線程數。若超過該數字,默認提示RejectedExecutionException
異常
keepAliveTime:當前線程數大於核心線程時,該參數生效,其目的是終止多余的空閑線程等待新任務的最長時間。即指定時間內將還未接收任務的線程銷毀。
unit:keepAliveTime 的時間單位
workQueue:緩存任務的的隊列,一般采用LinkedBlockingQueue。
threadFactory:執行程序創建新線程時使用的工廠,一般采用默認值。
handler:超出線程范圍和隊列容量而使執行被阻塞時所使用的處理程序,一般采用默認值。
線程池工作流程
開始,游泳館來了一名學員,於是館主安排一個教練負責培訓這名學員;
然后,游泳館來了六名學員,可館主只招了五名教練,於是有一名學員被安排到休息室等待;
后來,游泳館來了十六名學員,休息室已經滿了,館主核算了開支,預計最多可招十名教練;
最后,游泳館只來了十名學員,館主對教練說,如果半天內接不到學員的教練就可以走了;
結果,游泳館沒有學員,關閉了。
在接收任務前,線程池內是沒有線程。只有當任務來了才開始新建線程。當任務數大於核心線程數時,任務進入隊列中等待。若隊列滿了,則線程池新增線程直到最大線程數。再超過則會執行拒絕策略。
線程池的三種關閉
shutdown: 線程池不再接收任務,等待線程池中所有任務完成后,關閉線程池。常用
shutdownNow: 線程池不再接收任務,忽略隊列中的任務,嘗試中斷正在執行的任務,返回未執行任務列表,關閉線程池。慎用
awaitTermination: 線程池可以繼續接收任務,當任務都完成后,或者超過設置的時間后,關閉線程池。方法是阻塞的,考慮使用
線程池的種類
1 newSingleThreadExecutor() 單線程線程池
初始線程數和允許最大線程數都是一,keepAliveTime 也就失效了,隊列是無界阻塞隊列。該線程池的主要作用是負責緩存任務。
2 newFixedThreadPool(n) 固定大小線程池
初始線程數和允許最大線程數相同,且大小自定義,keepAliveTime 也就失效了,隊列是無界阻塞隊列。符合大部分業務要求,常用。
3 newCachedThreadPool() 緩存無界線程池
初始線程數為零,最大線程數為無窮大,keepAliveTime 60秒類終止空閑線程,隊列是無緩沖無界隊列。適合任務數不多的場景,慎用。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 線程池
* 優勢,類比數據庫的連接池
* 1. 頻繁的創建和銷毀線程會給服務器帶來很大的壓力
* 2. 若創建的線程不銷毀而是留在線程池中等待下次使用,則會很大地提高效率也減輕了服務器的壓力
*
* 三種workQueue策略
* 直接提交 SynchronousQueue
* 無界隊列 LinkedBlockingQueue
* 有界隊列 ArrayBlockingQueue
*
* 四種拒絕策略
* AbortPolicy : JDK默認,超出 MAXIMUM_POOL_SIZE 放棄任務拋異常 RejectedExecutionException
* CallerRunsPolicy : 嘗試直接調用被拒絕的任務,若線程池被關閉,則丟棄任務
* DiscardOldestPolicy : 放棄隊列最前面的任務,然后重新嘗試執被拒絕的任務。若線程池被關閉,則丟棄任務
* DiscardPolicy : 放棄不能執行的任務但不拋異常
*/
public class ThreadPoolExecutorStu {
// 線程池中初始線程個數
private final static Integer CORE_POOL_SIZE = 3;
// 線程池中允許的最大線程數
private final static Integer MAXIMUM_POOL_SIZE = 8;
// 當線程數大於初始線程時。終止多余的空閑線程等待新任務的最長時間
private final static Long KEEP_ALIVE_TIME = 10L;
// 任務緩存隊列 ,即線程數大於初始線程數時先進入隊列中等待,此數字可以稍微設置大點,避免線程數超過最大線程數時報錯。或者直接用無界隊列
private final static ArrayBlockingQueue<Runnable> WORK_QUEUE = new ArrayBlockingQueue<Runnable>(5);
public static void main(String[] args) {
Long start = System.currentTimeMillis();
/**
* ITDragonThreadPoolExecutor 耗時 1503
* ITDragonFixedThreadPool 耗時 505
* ITDragonSingleThreadExecutor 語法問題報錯,
* ITDragonCachedThreadPool 耗時506
* 推薦使用自定義線程池,或newFixedThreadPool(n)
*/
ThreadPoolExecutor threadPoolExecutor = ITDragonThreadPoolExecutor();
for (int i = 0; i < 8; i++) { // 執行8個任務,若超過MAXIMUM_POOL_SIZE則會報錯 RejectedExecutionException
MyRunnableTest myRunnable = new MyRunnableTest(i);
threadPoolExecutor.execute(myRunnable);
System.out.println("線程池中現在的線程數目是:"+threadPoolExecutor.getPoolSize()+", 隊列中正在等待執行的任務數量為:"+
threadPoolExecutor.getQueue().size());
}
// 關掉線程池 ,並不會立即停止(停止接收外部的submit任務,等待內部任務完成后才停止),推薦使用。 與之對應的是shutdownNow,不推薦使用
threadPoolExecutor.shutdown();
try {
// 阻塞等待30秒關掉線程池,返回true表示已經關閉。和shutdown不同,它可以接收外部任務,並且還阻塞。這里為了方便統計時間,所以選擇阻塞等待關閉。
threadPoolExecutor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("耗時 : " + (System.currentTimeMillis() - start));
}
// 自定義線程池,開發推薦使用
public static ThreadPoolExecutor ITDragonThreadPoolExecutor() {
// 構建一個,初始線程數量為3,最大線程數據為8,等待時間10分鍾 ,隊列長度為5 的線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES, WORK_QUEUE);
return threadPoolExecutor;
}
/**
* 固定大小線程池
* corePoolSize初始線程數和maximumPoolSize最大線程數一樣,keepAliveTime參數不起作用,workQueue用的是無界阻塞隊列
*/
public static ThreadPoolExecutor ITDragonFixedThreadPool() {
ExecutorService executor = Executors.newFixedThreadPool(8);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
return threadPoolExecutor;
}
/**
* 單線程線程池
* 等價與Executors.newFixedThreadPool(1);
*/
public static ThreadPoolExecutor ITDragonSingleThreadExecutor() {
ExecutorService executor = Executors.newSingleThreadExecutor();
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
return threadPoolExecutor;
}
/**
* 無界線程池
* corePoolSize 初始線程數為零
* maximumPoolSize 最大線程數無窮大
* keepAliveTime 60秒類將沒有被用到的線程終止
* workQueue SynchronousQueue 隊列,無容量,來任務就直接新增線程
* 不推薦使用
*/
public static ThreadPoolExecutor ITDragonCachedThreadPool() {
ExecutorService executor = Executors.newCachedThreadPool();
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
return threadPoolExecutor;
}
}
class MyRunnableTest implements Runnable {
private Integer num; // 正在執行的任務數
public MyRunnableTest(Integer num) {
this.num = num;
}
public void run() {
System.out.println("正在執行的MyRunnable " + num);
try {
Thread.sleep(500);// 模擬執行事務需要耗時
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("MyRunnable " + num + "執行完畢");
}
}
隊列
隊列,是一種數據結構。大部分的隊列都是以FIFO(先進先出)的方式對各個元素進行排序的(PriorityBlockingQueue是根據優先級排序的)。隊列的頭移除元素,隊列的末尾插入元素。插入的元素建議不能為null。Queue主要分兩類,一類是高性能隊列 ConcurrentLinkedQueue;一類是阻塞隊列 BlockingQueue。本章重點介紹BlockingQueue
ConcurrentLinkedQueue
ConcurrentLinkedQueue性能好於BlockingQueue。是基於鏈接節點的無界限線程安全隊列。該隊列的元素遵循先進先出的原則。不允許null元素。
BlockingQueue
ArrayBlockingQueue: 基於數組的阻塞隊列,在內部維護了一個定長數組,以便緩存隊列中的數據對象。並沒有實現讀寫分離,也就意味着生產和消費不能完全並行。是一個有界隊列
LinkedBlockingQueue:基於列表的阻塞隊列,在內部維護了一個數據緩沖隊列(由一個鏈表構成),實現采用分離鎖(讀寫分離兩個鎖),從而實現生產者和消費者操作的完全並行運行。是一個無界隊列,也可以指定隊列大小
SynchronousQueue: 沒有緩存的隊列,生存者生產的數據直接會被消費者獲取並消費。若沒有數據就直接調用出棧方法則會報錯。
三種隊列使用場景
newFixedThreadPool 線程池采用的隊列是LinkedBlockingQueue。其優點是無界可緩存,內部實現讀寫分離,並發的處理能力高於ArrayBlockingQueue
newCachedThreadPool 線程池采用的隊列是SynchronousQueue。其優點就是無緩存,接收到的任務均可直接處理,再次強調,慎用!
並發量不大,服務器性能較好,可以考慮使用SynchronousQueue。
並發量較大,服務器性能較好,可以考慮使用LinkedBlockingQueue。
並發量很大,服務器性能無法滿足,可以考慮使用ArrayBlockingQueue。系統的穩定最重要。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
/**
* 阻塞隊列
* ArrayBlockingQueue :有界
* LinkedBlockingQueue :無界
* SynchronousQueue :無緩沖直接用
* 非阻塞隊列
* ConcurrentLinkedQueue :高性能
*/
public class ITDragonQueue {
/**
* ArrayBlockingQueue : 基於數組的阻塞隊列實現,在內部維護了一個定長數組,以便緩存隊列中的數據對象。
* 內部沒有實現讀寫分離,生產和消費不能完全並行,
* 長度是需要定義的,
* 可以指定先進先出或者先進后出,
* 是一個有界隊列。
*/
@Test
public void ITDragonArrayBlockingQueue() throws Exception {
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5); // 可以嘗試 隊列長度由3改到5
array.offer("offer 插入數據方法---成功返回true 否則返回false");
array.offer("offer 3秒后插入數據方法", 3, TimeUnit.SECONDS);
array.put("put 插入數據方法---但超出隊列長度則阻塞等待,沒有返回值");
array.add("add 插入數據方法---但超出隊列長度則提示 java.lang.IllegalStateException"); // java.lang.IllegalStateException: Queue full
System.out.println(array);
System.out.println(array.take() + " \t還剩元素 : " + array); // 從頭部取出元素,並從隊列里刪除,若隊列為null則一直等待
System.out.println(array.poll() + " \t還剩元素 : " + array); // 從頭部取出元素,並從隊列里刪除,執行poll 后 元素減少一個
System.out.println(array.peek() + " \t還剩元素 : " + array); // 從頭部取出元素,執行peek 不移除元素
}
/**
* LinkedBlockingQueue:基於列表的阻塞隊列,在內部維護了一個數據緩沖隊列(該隊列由一個鏈表構成)。
* 其內部實現采用讀寫分離鎖,能高效的處理並發數據,生產者和消費者操作的完全並行運行
* 可以不指定長度,
* 是一個無界隊列。
*/
@Test
public void ITDragonLinkedBlockingQueue() throws Exception {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.offer("1.無界隊列");
queue.add("2.語法和ArrayBlockingQueue差不多");
queue.put("3.實現采用讀寫分離");
List<String> list = new ArrayList<String>();
System.out.println("返回截取的長度 : " + queue.drainTo(list, 2));
System.out.println("list : " + list);
}
/**
* SynchronousQueue:沒有緩沖的隊列,生存者生產的數據直接會被消費者獲取並消費。
*/
@Test
public void ITDragonSynchronousQueue() throws Exception {
final SynchronousQueue<String> queue = new SynchronousQueue<String>();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("take , 在沒有取到值之前一直處理阻塞 : " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread1.start();
Thread.sleep(2000);
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
queue.add("進值!!!");
}
});
thread2.start();
}
/**
* ConcurrentLinkedQueue:是一個適合高並發場景下的隊列,通過無鎖的方式,實現了高並發狀態下的高性能,性能好於BlockingQueue。
* 它是一個基於鏈接節點的無界限線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最先加入的,尾是最后加入的,不允許null元素。
* 無阻塞隊列,沒有 put 和 take 方法
*/
@Test
public void ITDragonConcurrentLinkedQueue() throws Exception {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
queue.offer("1.高性能無阻塞");
queue.add("2.無界隊列");
System.out.println(queue);
System.out.println(queue.poll() + " \t : " + queue); // 從頭部取出元素,並從隊列里刪除,執行poll 后 元素減少一個
System.out.println(queue.peek() + " \t : " + queue); // 從頭部取出元素,執行peek 不移除元素
}
}
關鍵字
關鍵字是為了線程安全服務的,哪什么是線程安全呢?當多個線程訪問某一個類(對象或方法)時,這個對象始終都能表現出正確的行為,那么這個類(對象或方法)就是線程安全的。
線程安全的兩個特性:原子性和可見性。synchronized 同步,原子性。volatile 可見性。wait,notify 負責多個線程之間的通信。
synchronized
synchronized 可以在任意對象及方法上加鎖,而加鎖的這段代碼稱為"互斥區"或"臨界區",若一個線程想要執行synchronized修飾的代碼塊,首先要
step1 嘗試獲得鎖
step2 如果拿到鎖,執行synchronized代碼體內容
step3 如果拿不到鎖,這個線程就會不斷的嘗試獲得這把鎖,直到拿到為止,而且是多個線程同時去競爭這把鎖。
注*(線程多了也就是會出現鎖競爭的問題,多個線程執行的順序是按照CPU分配的先后順序而定的,而並非代碼執行的先后順序)
synchronized 可以修飾方法,修飾代碼塊,這些都是對象鎖。若和static一起使用,則升級為類鎖。
synchronized 鎖是可以重入的,當一個線程得到了一個對象的鎖后,再次請求此對象時是可以再次得到該對象的鎖。鎖重入的機制,也支持在父子類繼承的場景。
synchronized 同步異步,一個線程得到了一個對象的鎖后,其他線程是可以執行非加鎖的方法(異步)。但是不能執行其他加鎖的方法(同步)。
synchronized 鎖異常,當一個線程執行的代碼出現異常時,其所持有的鎖會自動釋放。
/**
* synchronized 關鍵字,可以修飾方法,也可以修飾代碼塊。建議采用后者,通過減小鎖的粒度,以提高系統性能。
* synchronized 關鍵字,如果以字符串作為鎖,請注意String常量池的緩存功能和字符串改變后鎖是否的情況。
* synchronized 鎖重入,當一個線程得到了一個對象的鎖后,再次請求此對象時是可以再次得到該對象的鎖。
* synchronized 同異步,一個線程獲得鎖后,另外一個線程可以執行非synchronized修飾的方法,這是異步。若另外一個線程執行任何synchronized修飾的方法則需要等待,這是同步
* synchronized 類鎖,用static + synchronized 修飾則表示對整個類進行加鎖
*/
public class ITDragonSynchronized {
private void thisLock () { // 對象鎖
synchronized (this) {
System.out.println("this 對象鎖!");
}
}
private void classLock () { // 類鎖
synchronized (ITDragonSynchronized.class) {
System.out.println("class 類鎖!");
}
}
private Object lock = new Object();
private void objectLock () { // 任何對象鎖
synchronized (lock) {
System.out.println("object 任何對象鎖!");
}
}
private void stringLock () { // 字符串鎖,注意String常量池的緩存功能
synchronized ("string") { // 用 new String("string") t4 和 t5 同時進入。用string t4完成后,t5在開始
try {
for(int i = 0; i < 3; i++) {
System.out.println("thread : " + Thread.currentThread().getName() + " stringLock !");
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private String strLock = "lock"; // 字符串鎖改變
private void changeStrLock () {
synchronized (strLock) {
try {
System.out.println("thread : " + Thread.currentThread().getName() + " changeLock start !");
strLock = "changeLock";
Thread.sleep(500);
System.out.println("thread : " + Thread.currentThread().getName() + " changeLock end !");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private synchronized void method1() { // 鎖重入
System.out.println("^^^^^^^^^^^^^^^^^^^^ method1");
method2();
}
private synchronized void method2() {
System.out.println("-------------------- method2");
method3();
}
private synchronized void method3() {
System.out.println("******************** method3");
}
private synchronized void syncMethod() {
try {
System.out.println(Thread.currentThread().getName() + " synchronized method!");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 若次方法也加上了synchronized,就必須等待t1線程執行完后,t2才能調用,兩個synchronized塊之間具有互斥性,synchronized塊獲得的是一個對象鎖,鎖定的是整個對象
private void asyncMethod() {
System.out.println(Thread.currentThread().getName() + " asynchronized method!");
}
// static + synchronized 修飾則表示類鎖,打印的結果是thread1線程先執行完,然后在執行thread2線程。若沒有被static修飾,則thread1和 thread2幾乎同時執行,同時結束
private synchronized void classLock(String args) {
System.out.println(args + "start......");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(args + "end......");
}
public static void main(String[] args) throws Exception {
final ITDragonSynchronized itDragonSynchronized = new ITDragonSynchronized();
System.out.println("------------------------- synchronized 代碼塊加鎖 -------------------------");
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.thisLock();
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.classLock();
}
});
Thread thread3 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.objectLock();
}
});
thread1.start();
thread2.start();
thread3.start();
Thread.sleep(2000);
System.out.println("------------------------- synchronized 字符串加鎖 -------------------------");
// 如果字符串鎖,用new String("string") t4,t5線程是可以獲取鎖的,如果直接使用"string" ,若鎖不釋放,t5線程一直處理等待中
Thread thread4 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.stringLock();
}
}, "t4");
Thread thread5 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.stringLock();
}
}, "t5");
thread4.start();
thread5.start();
Thread.sleep(3000);
System.out.println("------------------------- synchronized 字符串變鎖 -------------------------");
// 字符串變了,鎖也會改變,導致t7線程在t6線程未結束后變開始執行,但一個對象的屬性變了,不影響這個對象的鎖。
Thread thread6 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.changeStrLock();
}
}, "t6");
Thread thread7 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.changeStrLock();
}
}, "t7");
thread6.start();
thread7.start();
Thread.sleep(2000);
System.out.println("------------------------- synchronized 鎖重入 -------------------------");
Thread thread8 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.method1();
}
}, "t8");
thread8.start();
Thread thread9 = new Thread(new Runnable() {
@Override
public void run() {
SunClass sunClass = new SunClass();
sunClass.sunMethod();
}
}, "t9");
thread9.start();
Thread.sleep(2000);
System.out.println("------------------------- synchronized 同步異步 -------------------------");
Thread thread10 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.syncMethod();
}
}, "t10");
Thread thread11 = new Thread(new Runnable() {
@Override
public void run() {
itDragonSynchronized.asyncMethod();
}
}, "t11");
thread10.start();
thread11.start();
Thread.sleep(2000);
System.out.println("------------------------- synchronized 同步異步 -------------------------");
ITDragonSynchronized classLock1 = new ITDragonSynchronized();
ITDragonSynchronized classLock2 = new ITDragonSynchronized();
Thread thread12 = new Thread(new Runnable() {
@Override
public void run() {
classLock1.classLock("classLock1");
}
});
thread12.start();
Thread thread13 = new Thread(new Runnable() {
@Override
public void run() {
classLock2.classLock("classLock2");
}
});
thread13.start();
}
// 有父子繼承關系的類,如果都使用了synchronized 關鍵字,也是線程安全的。
static class FatherClass {
public synchronized void fatherMethod(){
System.out.println("#################### fatherMethod");
}
}
static class SunClass extends FatherClass{
public synchronized void sunMethod() {
System.out.println("@@@@@@@@@@@@@@@@@@@@ sunMethod");
this.fatherMethod();
}
}
}
volatile
volatile 關鍵字雖然不具備synchronized關鍵字的原子性(同步)但其主要作用就是使變量在多個線程中可見。也就是可見性。
用法很簡單,直接用來修飾變量。因為其不具備原子性,可以用Atomic類代替。美中不足的是多個Atomic類也不具備原子性,所以還需要synchronized來修飾。
volatile 關鍵字工作原理
每個線程都有自己的工作內存,如果線程需要用到一個變量的時,會從主內存拷貝一份到自己的工作內存中。從而提高了效率。每次執行完線程后再將變量從工作內存同步回主內存中。
這樣就存在一個問題,變量在不同線程中可能存在不同的值。如果用volatile 關鍵字修飾變量,則會讓線程的執行引擎直接從主內存中獲取值。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* volatile 關鍵字主要作用就是使變量在多個線程中可見。
* volatile 關鍵字不具備原子性,但Atomic類是具備原子性和可見性。
* 美中不足的是多個Atomic類不具備原子性,還是需要synchronized 關鍵字幫忙。
*/
public class ITDragonVolatile{
private volatile boolean flag = true;
private static volatile int count;
private static AtomicInteger atomicCount = new AtomicInteger(0); // 加 static 是為了避免每次實例化對象時初始值為零
// 測試volatile 關鍵字的可見性
private void volatileMethod() {
System.out.println("thread start !");
while (flag) { // 如果flag為true則一直處於阻塞中,
}
System.out.println("thread end !");
}
// 驗證volatile 關鍵字不具備原子性
private int volatileCountMethod() {
for (int i = 0; i < 10; i++) {
// 第一個線程還未將count加到10的時候,就可能被另一個線程開始修改。可能會導致最后一次打印的值不是1000
count++ ;
}
return count;
}
// 驗證Atomic類具有原子性
private int atomicCountMethod() {
for (int i = 0; i < 10; i++) {
atomicCount.incrementAndGet();
}
// 若最后一次打印為1000則表示具備原子性,中間打印的信息可能是受println延遲影響。
return atomicCount.get();// 若最后一次打印為1000則表示具備原子性
}
// 驗證多個 Atomic類操作不具備原子性,加synchronized關鍵字修飾即可
private synchronized int multiAtomicMethod(){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicCount.addAndGet(1);
atomicCount.addAndGet(2);
atomicCount.addAndGet(3);
atomicCount.addAndGet(4);
return atomicCount.get(); //若具備原子性,則返回的結果一定都是10的倍數,需多次運行才能看到結果
}
/**
* volatile 關鍵字可見性原因
* 這里有兩個線程 :一個是main的主線程,一個是thread的子線程
* jdk線程工作流程 :為了提高效率,每個線程都有一個工作內存,將主內存的變量拷貝一份到工作內存中。線程的執行引擎就直接從工作內存中獲取變量。
* So 問題來了 :thread線程用的是自己的工作內存,主線程將變量修改后,thread線程不知道。這就是數據不可見的問題。
* 解決方法 :變量用volatile 關鍵字修飾后,線程的執行引擎就直接從主內存中獲取變量。
*
*/
public static void main(String[] args) throws InterruptedException {
// 測試volatile 關鍵字的可見性
/*ITDragonVolatile itDragonVolatile = new ITDragonVolatile();
Thread thread = new Thread(itDragonVolatile);
thread.start();
Thread.sleep(1000); // 等線程啟動了,再設置值
itDragonVolatile.setFlag(false);
System.out.println("flag : " + itDragonVolatile.isFlag());*/
// 驗證volatile 關鍵字不具備原子性 和 Atomic類具有原子性
final ITDragonVolatile itDragonVolatile = new ITDragonVolatile();
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < 100; i++) {
threads.add(new Thread(new Runnable() {
@Override
public void run() {
// 中間打印的信息可能是受println延遲影響,請看最后一次打印的結果
System.out.println(itDragonVolatile.multiAtomicMethod());
}
}));
}
for(Thread thread : threads){
thread.start();
}
}
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
wait,notify
使用 wait/ notify 方法實現線程間的通信,模擬BlockingQueue隊列。有兩點需要注意:
1)wait 和 notify 必須要配合 synchronized 關鍵字使用
2)wait方法是釋放鎖的, notify方法不釋放鎖。
線程通信概念:線程是操作系統中獨立的個體,但這些個體如果不經過特殊的處理,就不能成為一個整體,線程之間的通信就成為整體的必用方法之一。
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
/**
* synchronized 可以在任意對象及方法上加鎖,而加鎖的這段代碼稱為"互斥區"或"臨界區",一般給代碼塊加鎖,通過減小鎖的粒度從而提高性能。
* Atomic* 是為了彌補volatile關鍵字不具備原子性的問題。雖然一個Atomic*對象是具備原子性的,但不能確保多個Atomic*對象也具備原子性。
* volatile 關鍵字不具備synchronized關鍵字的原子性其主要作用就是使變量在多個線程中可見。
* wait / notify
* wait() 使線程阻塞運行,notify() 隨機喚醒等待隊列中等待同一共享資源的一個線程繼續運行,notifyAll() 喚醒所有等待隊列中等待同一共享資源的線程繼續運行。
* 1)wait 和 notify 必須要配合 synchronized 關鍵字使用
* 2)wait方法是釋放鎖的, notify方法不釋放鎖
*/
public class ITDragonMyQueue {
//1 需要一個承裝元素的集合
private LinkedList<Object> list = new LinkedList<Object>();
//2 需要一個計數器 AtomicInteger (保證原子性和可見性)
private AtomicInteger count = new AtomicInteger(0);
//3 需要制定上限和下限
private final Integer minSize = 0;
private final Integer maxSize ;
//4 構造方法
public ITDragonMyQueue(Integer size){
this.maxSize = size;
}
//5 初始化一個對象 用於加鎖
private final Object lock = new Object();
//put(anObject): 把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷,直到BlockingQueue里面有空間再繼續.
public void put(Object obj){
synchronized (lock) {
while(count.get() == this.maxSize){
try {
lock.wait(); // 當Queue沒有空間時,線程被阻塞 ,這里為了區分,命名為wait1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(obj); //1 加入元素
count.incrementAndGet(); //2 計數器累加
lock.notify(); //3 新增元素后,通知另外一個線程wait2,隊列多了一個元素,可以做移除操作了。
System.out.println("新加入的元素為: " + obj);
}
}
//take: 取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入.
public Object take(){
Object ret = null;
synchronized (lock) {
while(count.get() == this.minSize){
try {
lock.wait(); // 當Queue沒有值時,線程被阻塞 ,這里為了區分,命名為wait2
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ret = list.removeFirst(); //1 做移除元素操作
count.decrementAndGet(); //2 計數器遞減
lock.notify(); //3 移除元素后,喚醒另外一個線程wait1,隊列少元素了,可以再添加操作了
}
return ret;
}
public int getSize(){
return this.count.get();
}
public static void main(String[] args) throws Exception{
final ITDragonMyQueue queue = new ITDragonMyQueue(5);
queue.put("a");
queue.put("b");
queue.put("c");
queue.put("d");
queue.put("e");
System.out.println("當前容器的長度: " + queue.getSize());
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
queue.put("f");
queue.put("g");
}
},"thread1");
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("移除的元素為:" + queue.take()); // 移除一個元素后再進一個,而並非同時移除兩個,進入兩個元素。
System.out.println("移除的元素為:" + queue.take());
}
},"thread2");
thread1.start();
Thread.sleep(2000);
thread2.start();
}
}
死鎖
死鎖是一個很糟糕的情況,鎖遲遲不能解開,其他線程只能一直處於等待阻塞狀態。比如線程A擁有鎖一,卻還想要鎖二。線程B擁有鎖二,卻還想要鎖一。兩個線程互不相讓,兩個線程將永遠等待。
排查:
第一步:控制台輸入jps用於獲得當前JVM進程的pid
第二步:jstack pid 用於打印堆棧信息
第三步:解讀,"Thread-1" 是線程的名字,prio 是線程的優先級,tid 是線程id, nid 是本地線程id, waiting to lock 等待去獲取的鎖,locked 自己擁有的鎖。
"Thread-1" #11 prio=5 os_prio=0 tid=0x0000000055ff1800 nid=0x1bd4 waiting for monitor entry [0x0000000056e2e000]
java.lang.Thread.State: BLOCKED (on object monitor)
at com.itdragon.keyword.ITDragonDeadLock.rightLeft(ITDragonDeadLock.java:37)
- waiting to lock <0x00000000ecfdf9d0> (a java.lang.Object)
- locked <0x00000000ecfdf9e0> (a java.lang.Object)
at com.itdragon.keyword.ITDragonDeadLock$2.run(ITDragonDeadLock.java:54)
at java.lang.Thread.run(Thread.java:748)
/**
* 死鎖: 線程A擁有鎖一,卻還想要鎖二。線程B擁有鎖二,卻還想要鎖一。兩個線程互不相讓,兩個線程將永遠等待。
* 避免: 在設計階段,了解鎖的先后順序,減少鎖的交互數量。
* 排查:
* 第一步:控制台輸入 jps 用於獲得當前JVM進程的pid
* 第二步:jstack pid 用於打印堆棧信息
* "Thread-1" #11 prio=5 os_prio=0 tid=0x0000000055ff1800 nid=0x1bd4 waiting for monitor entry [0x0000000056e2e000]
* - waiting to lock <0x00000000ecfdf9d0> - locked <0x00000000ecfdf9e0>
* "Thread-0" #10 prio=5 os_prio=0 tid=0x0000000055ff0800 nid=0x1b14 waiting for monitor entry [0x0000000056c7f000]
* - waiting to lock <0x00000000ecfdf9e0> - locked <0x00000000ecfdf9d0>
* 可以看出,兩個線程持有的鎖都是對方想要得到的鎖(得不到的永遠在騷動),而且最后一行也打印了 Found 1 deadlock.
*/
public class ITDragonDeadLock {
private final Object left = new Object();
private final Object right = new Object();
public void leftRight(){
synchronized (left) {
try {
Thread.sleep(2000); // 模擬持有鎖的過程
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (right) {
System.out.println("leftRight end!");
}
}
}
public void rightLeft(){
synchronized (right) {
try {
Thread.sleep(2000); // 模擬持有鎖的過程
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (left) {
System.out.println("rightLeft end!");
}
}
}
public static void main(String[] args) {
ITDragonDeadLock itDragonDeadLock = new ITDragonDeadLock();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
itDragonDeadLock.leftRight();
}
});
thread1.start();
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
itDragonDeadLock.rightLeft();
}
});
thread2.start();
}
}
多線程案例
若有Thread1、Thread2、Thread3、Thread4四條線程分別統計C、D、E、F四個盤的大小,所有線程都統計完畢交給Thread5線程去做匯總,應當如何實現
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 若有Thread1、Thread2、Thread3、Thread4四條線程分別統計C、D、E、F四個盤的大小,所有線程都統計完畢交給Thread5線程去做匯總,應當如何實現?
* 思考:匯總,說明要把四個線程的結果返回給第五個線程,若要線程有返回值,推薦使用callable。Thread和Runnable都沒返回值
*/
public class ITDragonThreads {
public static void main(String[] args) throws Exception {
// 無緩沖無界線程池
ExecutorService executor = Executors.newFixedThreadPool(8);
// 相對ExecutorService,CompletionService可以更精確和簡便地完成異步任務的執行
CompletionService<Long> completion = new ExecutorCompletionService<Long>(executor);
CountWorker countWorker = null;
for (int i = 0; i < 4; i++) { // 四個線程負責統計
countWorker = new CountWorker(i+1);
completion.submit(countWorker);
}
// 關閉線程池
executor.shutdown();
// 主線程相當於第五個線程,用於匯總數據
long total = 0;
for (int i = 0; i < 4; i++) {
total += completion.take().get();
}
System.out.println(total / 1024 / 1024 / 1024 +"G");
}
}
class CountWorker implements Callable<Long>{
private Integer type;
public CountWorker() {
}
public CountWorker(Integer type) {
this.type = type;
}
@Override
public Long call() throws Exception {
ArrayList<String> paths = new ArrayList<>(Arrays.asList("c:", "d:", "e:", "f:"));
return countDiskSpace(paths.get(type - 1));
}
// 統計磁盤大小
private Long countDiskSpace (String path) {
File file = new File(path);
long totalSpace = file.getTotalSpace();
System.out.println(path + " 總空間大小 : " + totalSpace / 1024 / 1024 / 1024 + "G");
return totalSpace;
}
}
查考面試題
1 常見創建線程的方式和其優缺點
(1)繼承Thread類 (2)實現Runnable接口
優缺點:實現一個接口比繼承一個類要靈活,減少程序之間的耦合度。缺點就是代碼多了一點。
2 start()方法和run()方法的區別
start方法可以啟動線程,而run方法只是thread的一個普通方法調用。
3 多線程的作用
(1)發揮多核CPU的優勢,提高CPU的利用率(2)防止阻塞,提高效率
4 什么是線程安全
當多個線程訪問某一個類(對象或方法)時,這個對象始終都能表現出正確的行為,那么這個類(對象或方法)就是線程安全的。
5 線程安全級別
(1)不可變(2)絕對線程安全(3)相對線程安全(4)線程非安全
6 如何在兩個線程之間共享數據
線程之間數據共享,其實可以理解為線程之間的通信,可以用wait/notify/notifyAll 進行等待和喚醒。
7 用線程池的好處
避免頻繁地創建和銷毀線程,達到線程對象的重用,提高性能,減輕服務器壓力。使用線程池還可以根據項目靈活地控制並發的數目。
8 sleep方法和wait方法有什么區別
sleep方法和wait方法都可以用來放棄CPU一定的時間,sleep是thread的方法,不會釋放鎖。wait是object的方法,會釋放鎖。
總結
1 線程池核心參數有 初始核心線程數,線程池運行最大線程數,空閑線程存活時間,時間單位,任務隊列。
2 隊列是一種數據結構,主要有兩類 阻塞隊列BlockingQueue,和非阻塞高性能隊列ConcurrentLinkedQueue。
3 線程安全的兩個特性,原子性和可見性。synchronized 關鍵字具備原子性。volatile 關鍵字具備可見性。
4 單個Atomic類具備原子性和可見性,多個Atomic類不具備原子性,需要synchronized 關鍵字修飾。
5 兩個線程持有的鎖都是對方想要得到的鎖時容易出現死鎖的情況,從設計上盡量減少鎖的交互。
本章到這里就結束了,涉及的知識點比較多,請參考流程圖來學習。如有什么問題可以指出。喜歡的朋友可以點個"推薦"