安全性和活躍度通常相互牽制。我們使用鎖來保證線程安全,但是濫用鎖可能引起鎖順序死鎖。類似地,我們使用線程池和信號量來約束資源的使用,
但是缺不能知曉哪些管轄范圍內的活動可能形成的資源死鎖。Java應用程序不能從死鎖中恢復,所以確保你的設計能夠避免死鎖出現的先決條件是非常有價值。
一.死鎖
經典的“哲學家進餐”問題很好的闡釋了死鎖。5個哲學家一起出門去吃中餐,他們圍坐在一個圓桌邊。他們只有五只筷子(不是5雙),每兩個人中間放有一只。
哲學家邊吃邊思考,交替進行。每個人都需要獲得兩只筷子才能吃東西,但是吃后要把筷子放回原處繼續思考。有一些管理筷子的算法,使每一個人都能夠或多或少,及時
吃到東西(一個飢餓的哲學家試圖獲得兩只臨近的筷子,但是如果其中的一只正在被別人占用,那么他英愛放棄其中一只可用的筷子,等待幾分鍾再嘗試)。但是這樣做可能導致
一些哲學家或者所有哲學家都餓死 (每個人都迅速捉住自己左邊的筷子,然后等待自己右邊的筷子變成可用,同時並不放下左邊的筷子)。這最后一種情況,當每個人都擁有他人需要的
資源,並且等待其他人正在占有的資源,如果大家一致占有資源,直到獲得自己需要卻沒占有的其他資源,如果大家一致占有資源,直到獲得自己需要卻沒被占有的其他資源,那么就會產生死鎖。
當一個線程永遠占有一個鎖,而其他線程嘗試去獲得這個鎖,那么他們將永遠被阻塞。當線程Thread1占有鎖A時,想要獲得鎖B,但是同時線程Thread2持有B鎖,並嘗試獲得A鎖,兩個線程將永遠等待下去。
這種情況是死鎖最簡單的形式.
例子如下代碼:
public class DeadLock { private static Object lockA = new Object(); private static Object lockB = new Object(); public static void main(String[] args) { new DeadLock().deadLock(); } private void deadLock() { Thread thread1 = new Thread(new Runnable() { public void run() { synchronized (lockA){ try { System.out.println(Thread.currentThread().getName() + "獲取A鎖 ing!"); Thread.sleep(500); System.out.println(Thread.currentThread().getName() + "睡眠500ms"); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "需要B鎖!!!"); synchronized (lockB){ System.out.println(Thread.currentThread().getName() + "B鎖獲取成功"); } } } },"Thread1"); Thread thread2 = new Thread(new Runnable() { public void run() { synchronized (lockB){ try { System.out.println(Thread.currentThread().getName() + "獲取B鎖 ing!"); Thread.sleep(500); System.out.println(Thread.currentThread().getName() + "睡眠500ms"); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "需要A鎖!!!"); synchronized (lockA){ System.out.println(Thread.currentThread().getName() + "A鎖獲取成功"); } } } },"Thread2"); thread1.start(); thread2.start(); } }
運行結果如下圖:

結果很明顯了,這兩個線程陷入了死鎖狀態了,發生死鎖的原因是,兩個線程試圖通過不同的順序獲得多個相同的鎖。如果請求鎖的順序相同,
就不會出現循環的鎖依賴現象(你等我放鎖,我等你放鎖),也就不會產生死鎖了。如果你能夠保證同時請求鎖A和鎖B的每一個線程,都是按照從鎖A到鎖B的順序,那么就不會發生死鎖了。
如果所有線程以通用的固定秩序獲取鎖,程序就不會出現鎖順序死鎖問題了。
什么情況下會發生死鎖呢?
1.鎖的嵌套容易發生死鎖。解決辦法:獲取鎖時,查看是否有嵌套。盡量不要用鎖的嵌套,如果必須要用到鎖的嵌套,就要指定鎖的順序,因為參數的順序是超乎我們控制的,為了解決這個問題,我們必須指定鎖的順序,並且在整個應用程序中,
獲得鎖都必須始終遵守這個既定的順序。
上面的例子出現死鎖的根本原因就是獲取所的順序是亂序的,超乎我們控制的。上面例子最理想的情況就是把業務邏輯抽離出來,把獲取鎖的代碼放在一個公共的方法里面,讓這兩個線程獲取鎖
都是從我的公共的方法里面獲取,當Thread1線程進入公共方法時,獲取了A鎖,另外Thread2又進來了,但是A鎖已經被Thread1線程獲取了,Thread1接着又獲取鎖B,Thread2線程就不能再獲取不到了鎖A,更別說再去獲取鎖B了,這樣就有一定的順序了。
上面例子的改造如下:
public class DeadLock { private static Object lockA = new Object(); private static Object lockB = new Object(); public static void main(String[] args) { new DeadLock().deadLock(); } private void deadLock() { Thread thread1 = new Thread(new Runnable() { public void run() { getLock(); } },"Thread1"); Thread thread2 = new Thread(new Runnable() { public void run() { getLock(); } },"Thread2"); thread1.start(); thread2.start(); } public void getLock() { synchronized (lockA){ try { System.out.println(Thread.currentThread().getName() + "獲取A鎖 ing!"); Thread.sleep(500); System.out.println(Thread.currentThread().getName() + "睡眠500ms"); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "需要B鎖!!!"); synchronized (lockB){ System.out.println(Thread.currentThread().getName() + "B鎖獲取成功"); } } } }
運行結果如下:

可以看到把業務邏輯抽離出來,把獲取鎖的代碼放在一個公共的方法里面,獲得鎖都必須始終遵守這個既定的順序。
2.引入顯式鎖的超時機制特性來避免死鎖
超時機制是監控死鎖和從死鎖中恢復的技術,是使用每個顯式所Lock類中定時tryLock特性,來替代使用顳部所機制。在內部鎖的機制中,只要沒有獲得鎖,就永遠保持等待,而
顯示的鎖使你能狗定義超時的時間,在規定時間之后tryLock還沒有獲得鎖就會返回失敗。通過使用超時,盡管這段時間比你預期能夠獲得所的時間長很多,你仍然可以在意外發生后重新
獲得控制權。當嘗試獲得定時鎖失敗時,你並不需要知道原因。也許是因為有死鎖發生,也許是線程在持有鎖的時候錯誤地進入無限循環;也有可能是執行一些活動所花費的時間比你
預期慢了許多。不過至少你有機會了解到你的嘗試已經失敗,記錄下這次嘗試中有用的信息,並重新開始計算,這遠比關閉整個線程要優雅得多。
即使定時鎖並沒有應用於整個系統,使用它來獲得多重鎖還是能夠有效應對死鎖。如果獲取鎖的請求超時,你可以釋放這個鎖,並后退,等待一會后再嘗試,這很可能消除了死鎖發生的條件,
並且循序程序恢復。(這項技術只有在同時獲得兩個鎖的時候才有效;如果多個鎖是在嵌套的方法中被請求的,你無法僅僅釋放外層的鎖,盡管你知道自己已經持有該鎖)
顯式鎖Lock,Lock是一個接口,定義了一些抽象的所操作。與內部鎖機制不同,Lock提供了無條件,可輪詢,定時的,可中斷的鎖獲取操作,所有加鎖和解鎖的方法都是顯式的。
Lock的實現必須提供舉報與內部鎖相同的內存可見性的語義。但是加鎖的語義,調度算法,順序保證,性能特性這些可以不同。
Lock接口源碼如下:
public interface Lock { //加鎖 void lock(); //可中斷的鎖,打算線程的等待狀態,即A線程已經獲取該鎖,B線程又來獲 //取,但是A線程會通知B,來打算B線程的等待。 void lockInterruptibly() throws InterruptedException; //嘗試去獲取鎖,失敗返回False boolean tryLock(); //超時機制獲取鎖 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //釋放鎖 void unlock(); Condition newCondition(); }
ReentranLock實現了Lock接口,提供了與synchronized相同的互斥和內存可見性的保證。獲得ReentrantLock的鎖與進入synchronized塊有着相同內存含義,釋放ReentrantLock鎖與退出synchronized塊有着相同內存含義。
ReentrantLock提供了與synchronized一樣可重入加鎖的語義。ReentrantLock支持Lock接口定義的所有獲取鎖的方式。與synchronized相比,ReentranLock為處理不可用的鎖提供了更多靈活性。
但是對於現在的JDK的更新,synchronized的性能被優化的越來越好,內部鎖(synchronized)已經獲得相當可觀的性能,性能不僅僅是個不斷變化的目標,而且變化的非常快。
如下圖:

看到圖,隨着JDK的更新迭代,內部鎖的性能越來越快,這不是ReentrantLock的衰退,而是內部鎖(synchronized)越來越快,特別在JDK目前跟新到現在1.9.
下面用顯式鎖Lock再來改造上面的例子
public class DeadLock { Lock lock = new ReentrantLock(); private static Object lockA = new Object(); private static Object lockB = new Object(); public static void main(String[] args) { new DeadLock().deadLock(); } private void deadLock() { Thread thread1 = new Thread(new Runnable() { public void run() { try { lock.lock(); System.out.println(Thread.currentThread().getName() + "獲取A鎖 ing!"); Thread.sleep(500); System.out.println(Thread.currentThread().getName() + "睡眠500ms"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } System.out.println(Thread.currentThread().getName() + "需要B鎖!!!"); try { lock.lock(); System.out.println(Thread.currentThread().getName() + "B鎖獲取成功"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }, "Thread1"); Thread thread2 = new Thread(new Runnable() { public void run() { try { lock.lock(); System.out.println(Thread.currentThread().getName() + "獲取B鎖 ing!"); Thread.sleep(500); System.out.println(Thread.currentThread().getName() + "睡眠500ms"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } System.out.println(Thread.currentThread().getName() + "需要A鎖!!!"); try { lock.lock(); System.out.println(Thread.currentThread().getName() + "A鎖獲取成功"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }, "Thread1"); thread1.start(); thread2.start(); } }
運行結果如下:

可以看到顯示鎖Lock是可以避免死鎖的。
注意:Lock接口規范形式。這種模式在某種程度上比使用內部鎖更加復雜:鎖必須在finally塊中釋放。另一方面,如果鎖守護的代碼在try塊之外拋出了異常,它將永遠都不會被釋放了;如果對象
能夠被置於不一致狀態,可能需要額外的try-catch,或try-finally塊。(當你在使用任何形式的鎖時,你總是應該關注異常帶來的影響,包括內部鎖)。
忘記時候finally釋放Lock是一個定時炸彈。當不幸發生的時候,你將很難追蹤到錯誤的發生點,因為根本沒有記錄鎖本應該被釋放的位置和時間。這就是ReentrantLock不能完全替代synchronized的原因:它更加危險,
因為當程序的控制權離開守護的塊,不會自動清除鎖。盡管記得在finally塊中釋放鎖並不苦難,但忘記的可能仍然存在。
sy
可輪詢的和可定時的鎖請求
可定時的與可輪詢的鎖獲取模式,是由tryLock方法實現,與物體愛建的鎖獲取相比,它具有更完善的錯誤恢復機制。在內部鎖中,死鎖是致命的,唯一的恢復方法是重新啟動程序,唯一的預防方法是在構建程序時不要出錯,
所以不可能循序不一致的鎖順序。可定時的與可輪詢的鎖提供了另外一個選擇:可以規避死鎖的放生。
如果你不能獲得所有需要的鎖,那么使用可定時的與可輪詢的獲取方式(tryLock)使你能夠重新拿到控制權,它會釋放你已經獲得的這些鎖,然后再重新嘗試(或者至少會記錄這個失敗,抑或者采取其他措施)。使用tryLock試圖獲得兩個鎖,
如果不能同時獲得兩個,就回退,並重新嘗試。休眠時間由一個特定的組件管理,並由一個隨機組件減少活鎖發生的可能性。如果一定時間內,沒有獲得所有需要的鎖,就會返回一個失敗狀態,這樣操作就能優雅的失敗了。
tryLock()經常與if esle一起使用。
讀-寫鎖
ReentrantLock實現了標准的互斥鎖:一次最多只有一個線程能夠持有相同ReentrantLock。但是互斥通常做為保護數據一致性的很強的加鎖約束,因此,過分的限制了並發性。互斥是保守的加鎖策略,避免了
“寫/寫”和“寫/讀"的重讀,但是同樣避開了"讀/讀"的重疊。在很多情況下,數據結構是”頻繁被讀取“的——它們是可變的,有時候會被改變,但多數訪問只進行讀操作。此時,如果能夠放寬,允許多個讀者同時訪問數據結構就
非常好了。只要每個線程保證能夠讀到最新的數據(線程的可見性),並且在讀者讀取數據的時候沒有其他線程修改數據,就不會發生問題。這就是讀-寫鎖允許的情況:一個資源能夠被多個讀者訪問,或者被一個寫者訪問,兩者不能同時進行。
ReadWriteLock,暴露了2個Lock對象,一個用來讀,另一個用來寫。讀取ReadWriteLock鎖守護的數據,你必須首先獲得讀取的鎖,當需要修改ReadWriteLock守護的數據,你必須首先獲得寫入鎖。
ReadWriteLock源碼接口如下:
public interface ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading */ Lock readLock(); /** * Returns the lock used for writing. * * @return the lock used for writing */ Lock writeLock(); }
讀寫鎖實現的加鎖策略允許多個同時存在的讀者,但是只允許一個寫者。與Lock一樣,ReadWriteLock允許多種實現,造成性能,調度保證,獲取優先,公平性,以及加鎖語義等方面的不盡相同。
讀寫鎖的設計是用來進行性能改進的,使得特定情況下能夠有更好的並發性。時間實踐中,當多處理器系統中,頻繁的訪問主要為讀取數據結構的時候哦,讀寫鎖能夠改進性能;在其他情況下運行的情況比獨占
的鎖要稍微差一些,這歸因於它更大的復雜性。使用它能否帶來改進,最好通過對系統進行剖析來判斷:好在ReadWriteLock使用Lock作為讀寫部分的鎖,所以如果剖析得的結果發現讀寫鎖沒有能提高性能,把讀寫鎖置換為獨占鎖是比較容易。
下面我們用synchonized來進行讀操作,對於讀操作性能如何呢?
例子如下:
public class ReadWriteLockTest { private ReentrantReadWriteLock rw1 = new ReentrantReadWriteLock(); public static void main(String[] args) { final ReadWriteLockTest test = new ReadWriteLockTest(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); } public synchronized void get(Thread thread) { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1){ System.out.println(thread.getName() + "正在讀操作"); } System.out.println(thread.getName() + "讀操作完成"); } }
運行結果如下:

可以看到要線程Thread0讀操作完了,Thread1才能進行讀操作。明顯這樣性能很慢。
現在我們用ReadWriteLock來進行讀操作,看一下性能如何
例子如下:
public class ReadWriteLockTest { private ReentrantReadWriteLock rw1 = new ReentrantReadWriteLock(); public static void main(String[] args) { final ReadWriteLockTest test = new ReadWriteLockTest(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); } public void get(Thread thread) { try { rw1.readLock().lock(); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1){ System.out.println(thread.getName() + "正在讀操作"); } System.out.println(thread.getName() + "讀操作完成"); } catch (Exception e) { e.printStackTrace(); } finally { rw1.readLock().unlock(); } } }
運行結果如下:

可以看到線程間是不用排隊來讀操作的。這樣效率明顯很高。
我們再看一下寫操作,如下:
public class ReadWriteLockTest { private ReentrantReadWriteLock rw1 = new ReentrantReadWriteLock(); public static void main(String[] args) { final ReadWriteLockTest test = new ReadWriteLockTest(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); new Thread(){ @Override public void run() { test.get(Thread.currentThread()); } }.start(); } public void get(Thread thread) { try { rw1.writeLock().lock(); long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start <= 1){ System.out.println(thread.getName() + "正在寫操作"); } System.out.println(thread.getName() + "寫操作完成"); } catch (Exception e) { e.printStackTrace(); } finally { rw1.writeLock().unlock(); } } }
運行結果如下:

可以看到ReadWriteLock只允許一個寫者。
公平鎖
ReentrantReadWriteLock為兩個鎖提供了可重入的加鎖語義,它是繼承了ReadWriteLock,擴展了ReadWriteLock。它與ReadWriteLock相同,ReentrantReadWriteLock能夠被構造
為非公平鎖(構造方法不設置參數,默認是非公平),或者公平。在公平鎖中,選擇權交給等待時間最長的線程;如果鎖由讀者獲得,而一個線程請求寫入鎖,那么不再允許讀者獲得讀取鎖,直到寫者被受理,平且已經釋放了寫鎖。
在非公平的鎖中,線程允許訪問的順序是不定的。由寫者降級為讀者是允許的;從讀者升級為寫者是不允許的(嘗試這樣的行為會導致死鎖)
當鎖被持有的時間相對較長,並且大部分操作都不會改變鎖守護的資源,那么讀寫鎖能夠改進並發性。ReadWriteMap使用了ReentrantReadWriteLock來包裝Map,使得它能夠在多線程間
被安全的共享,並仍然能夠避免 "讀-寫" 或者 ”寫-寫“沖突。顯示中ConcurrentHashMap並發容器的性能已經足夠好了,所以你可以是使用他,而不必使用這個新的解決方案,如果你需要並發的部分
只有哈希Map,但是如果你需要為LinkedHashMap這種可替換元素Map提供更好的並發訪問,那么這項技術是非常有用的。
用讀寫鎖包裝的Map如下圖:

讀寫鎖的性能如下圖:

總結:
顯式的Lock與內部鎖相比提供了一些擴展的特性,包括處理不可用的鎖時更好的靈活性,以及對隊列行為更好的控制。但是ReentrantLock不能完全替代synchronized;只有當你需要
synchronized沒能提供的特性時才應該使用。
讀-寫鎖允許多個讀者並發訪問被守護的對象,當訪問多為讀取數據結構的時候,它具有改進可伸縮性的潛力。
數據庫層面上的鎖——悲觀鎖和樂觀鎖
樂觀鎖:他對世界比較樂觀,認為別人訪問正在改變的數據的概率是很低的,所以直到修改完成准備提交所做的的修改到數據庫的時候才會將數據鎖住。完成更改后釋放。
我想一下一個這樣的業務場景:我們從數據庫中獲取了一條數據,我們正要修改他的數據時,剛好另外一個用戶此時已經修改過了這條數據,這是我們是不知道別人修改過這條數據的。
解決辦法,我們可以在表中增加一個version字段,讓這個version自增或者自減,或者用一個時間戳字段,這個時間搓字段是唯一的。我們寫數據的時候帶上version,也就是每個人更新的時候都會判斷當前的版本號是否跟我查詢出來得到的版本號是否一致,不一致就更新失敗,一致就更新這條記錄並更改版本號。
例子如下:
1.查詢出商品信息 select (status,status,version) from t_goods where id=#{id} 2.根據商品信息生成訂單 3.修改商品status為2 update t_goods set status=2,version=version+1 where id=#{id} and version=#{version};
用戶體驗表現層面通常表現為系統繁忙之類的。
在這里還要注意樂觀鎖的一個細節:就是version字段要自增或者自減,否者會出現ABA問題。
ABA問題:線程Thread1拿到了version字段為A,由於CAS操作(即先進行比較然后設值),線程Thread2先拿到的version,將version改成B,線程Thread3來拿到version,將version值又改回了A。此時Thread1的CAS(先比較后set值)操作結束了,繼續執行,它發現version的值還是A,以為沒有發生變化,所以就繼續執行了。這個過程中,version從A變為B,再由B變為A就被形象地稱為ABA問題了。
悲觀鎖:也稱排它鎖,當事務在操作數據時把這部分數據進行鎖定,直到操作完畢后再解鎖,其他事務操作才可操作該部分數據。這將防止其他進程讀取或修改表中的數據。
一般使用 select ...for update 對所選擇的數據進行加鎖處理,例如
select * from account where name=”JAVA” for update,
這條sql 語句鎖定了account 表中所有符合檢索條件(name=”JAVA”)的記錄。本次事務提交之前(事務提交時會釋放事務過程中的鎖),外界無法修改這些記錄。
用戶界面常表現為轉圈圈等待。
如果數據庫分庫分表了,不再是單個數據庫了,那么我們可以用分布式鎖,比如redis的setnx特性,zookeeper的節點唯一性和順序性特性來做分布式鎖。
