轉自:https://blog.csdn.net/sunxianghuang/article/details/52287968
隊列同步器(AQS)
隊列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來構建鎖或者其他同步組件的基礎框架,它使用了一個int成員變量表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作,並發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。
隊列同步器的基本結構
同步器依賴內部的同步隊列(一個FIFO雙向隊列)來完成同步狀態的管理。同步隊列中的節點(Node)用來保存"獲取同步狀態失敗的線程"引用、等待狀態以及前驅和后繼節點。
同步器包含了兩個節點類型的引用,一個指向頭節點,而另一個指向尾節點。
1 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer 2 implements java.io.Serializable { 3 ...... 4 private transient volatile Node head;//頭節點 5 private transient volatile Node tail;//尾節點 6 private volatile int state;//*同步狀態* 7 ...... 8 static final class Node { 9 volatile int waitStatus;//等待狀態 10 volatile Node prev;//前驅 11 volatile Node next;//后繼 12 volatile Thread thread;//線程引用 13 ...... 14 } 15 ...... 16 }
注:Node類型的prev、next屬性以及AbstractQueuedSynchronizer類型的head 、tail屬性都設置為volatile,保證可見
自定義同步組件的設計思路
同步器的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態,在抽象方法的實現過程中免不了要對同步狀態進行更改,這時就需要使用同步器提供的3個方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))來進行操作,因為它們能夠保證狀態的改變是安全的。
子類推薦被定義為自定義同步組件的靜態內部類,同步器自身沒有實現任何同步接口,它僅僅是定義了若干同步狀態獲取和釋放的方法來供自定義同步組件使用,同步器既可以支持獨占式地獲取同步狀態,也可以支持共享式地獲取同步狀態,這樣就可以方便實現不同類型的同步組件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。
同步器是實現鎖(也可以是任意同步組件)的關鍵,在鎖的實現中聚合(組合)同步器,利用同步器實現鎖的語義。可以這樣理解二者之間的關系:鎖是面向使用者的,它定義了使用者與鎖交互的接口(比如可以允許兩個線程並行訪問),隱藏了實現細節;同步器面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實現者所需關注的領域。
同步器的設計是基於模板方法模式的,也就是說,使用者需要繼承同步器並重寫指定的方法,隨后將同步器組合在自定義同步組件的實現中,並調用同步器提供的模板方法,而這些模板方法將會調用使用者重寫的方法。
重寫同步器指定的方法時,需要使用同步器提供的如下3個方法來訪問或修改同步狀態。
getState():獲取當前同步狀態。
setState(int newState):設置當前同步狀態。
compareAndSetState(int expect,int update):使用CAS設置當前狀態,該方法能夠保證狀態設置的原子性。
獨占式同步組件的設計
可重寫的方法
/*Attempts to acquire in exclusive mode. This method should query if the state of the object permits it to be acquired in the exclusive mode, and if so to acquire it.*/ //獨占式獲取同步狀態,實現該方法需要查詢當前狀態並判斷同步狀態是否符合預期,然后再進行CAS設置同步狀態 protected boolean tryAcquire(int arg) /*Attempts to set the state to reflect a release in exclusive mode.*/ //獨占式釋放同步狀態,等待獲取同步狀態的線程將有機會獲取同步狀態 protected boolean tryRelease(int arg) /*Returns true if synchronization is held exclusively with respect to the current (calling) thread. */ //當前同步器是否在獨占模式下被線程占用,一般該方法表示是否被當前線程所獨占 protected boolean isHeldExclusively()
同步器提供的模板方法
/*Acquires in exclusive mode, ignoring interrupts.*/ //獨占式獲取同步狀態,如果當前線程獲取同步狀態成功,立即返回。否則,將會進入同步隊列等待, //該方法將會重復調用重寫的tryAcquire(int arg)方法 public final void acquire(int arg) /*Acquires in exclusive mode, aborting if interrupted.*/ //與acquire(int arg)基本相同,但是該方法響應中斷。 public final void acquireInterruptibly(int arg) /* Releases in exclusive mode. Implemented by unblocking one or more threads if {@link #tryRelease} returns true. This method can be used to implement method {@link Lock#unlock}.*/ //獨占式釋放同步狀態,該方法會在釋放同步狀態后,將同步隊列中第一個節點包含的線程喚醒 public final boolean release(int arg)
acquire(int arg)模板方法
通過調用同步器的acquire(int arg)方法可以獲取同步狀態。該方法對中斷不敏感,也就是說,由於線程獲取同步狀態失敗后進入同步隊列中,后續對線程進行中斷操作時,線程不會從同步隊列中移除。
1 public final void acquire(int arg) {//**該方法是模板方法** 2 if (!tryAcquire(arg) &&//先通過tryAcquire獲取同步狀態 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//獲取同步狀態失敗則生成節點並加入同步隊列 4 selfInterrupt(); 5 }
獨占式同步狀態獲取流程
主要邏輯:首先調用自定義同步器實現的tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態,如果同步狀態獲取失敗,則構造同步節點(獨占式Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態)並通過addWaiter(Node node)方法將該節點加入到同步隊列的尾部,最后調用acquireQueued(Node node,int arg)方法,使得該節點以“死循環”的方式獲取同步狀態。
將節點加入同步隊列
當前線程獲取同步狀態失敗時,同步器會將當前線程、等待狀態等信息構造成為一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程。
試想一下,當一個線程成功地獲取了同步狀態(或者鎖),其他線程將無法獲取到同步狀態,轉而被構造成為節點並加入到同步隊列中,而這個加入隊列的過程必須要保證線程安全。
因此,同步器提供了一個基於CAS的設置尾節點的方法:compareAndSetTail(Nodeexpect,Nodeupdate),它需要傳遞當前線程“認為”的尾節點和當前節點,只有設置成功后,當前節點才正式與之前的尾節點建立關聯。
1 //將節點加入到同步隊列的尾部 2 private Node addWaiter(Node mode) { 3 Node node = new Node(Thread.currentThread(), mode);//生成節點(Node) 4 // Try the fast path of enq; backup to full enq on failure 5 //快速嘗試在尾部添加 6 Node pred = tail; 7 if (pred != null) { 8 node.prev = pred;//先將當前節點node的前驅指向當前tail 9 if (compareAndSetTail(pred, node)) {//CAS嘗試將tail設置為node 10 //如果CAS嘗試成功,就說明"設置當前節點node的前驅"與"CAS設置tail"之間沒有別的線程設置tail成功 11 //只需要將"之前的tail"的后繼節點指向node即可 12 pred.next = node; 13 return node; 14 } 15 } 16 enq(node);//否則,通過死循環來保證節點的正確添加 17 return node; 18 }
1 private Node enq(final Node node) { 2 for (;;) {//通過死循環來保證節點的正確添加 3 Node t = tail; 4 if (t == null) { // Must initialize 同步隊列為空的情況 5 if (compareAndSetHead(new Node())) 6 tail = head; 7 } else { 8 node.prev = t; 9 if (compareAndSetTail(t, node)) {//直到CAS成功為止 10 t.next = node; 11 return t;//結束循環 12 } 13 } 14 } 15 }
在enq(final Node node)方法中,同步器通過“死循環”來保證節點的正確添加,在“死循環”中只有通過CAS將節點設置成為尾節點之后,當前線程才能從該方法返回,否則,當前線程不斷地嘗試設置。可以看出,enq(final Node node)方法將並發添加節點的請求通過CAS變得“串行化”了。
串行化的優點
如果通過加鎖同步的方式添加節點,線程必須獲取鎖后才能添加尾節點,那么必然會導致其他線程等待加鎖而阻塞,獲取鎖的線程釋放鎖后阻塞的線程又會被喚醒,而線程的阻塞和喚醒需要依賴於系統內核完成,因此程序的執行需要從用戶態切換到核心態,而這樣的切換是非常耗時的操作。如果我們通過”循環CAS“來添加節點的話,所有線程都不會被阻塞,而是不斷失敗重試,線程不需要進行鎖同步,不僅消除了線程阻塞喚醒的開銷而且消除了加鎖解鎖的時間開銷。但是循環CAS也有其缺點,循環CAS通過不斷嘗試來添加節點,如果說CAS操作失敗那么將會占用處理器資源。
節點的自旋
節點進入同步隊列之后,就進入了一個自旋的過程,每個節點(或者說是線程)都在自省地觀察,當條件滿足,獲取到了同步狀態,就可以從這個自旋過程中退出,否則依舊留在這個自旋過程中。
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) {//無限循環 6 final Node p = node.predecessor(); 7 if (p == head && tryAcquire(arg)) {//前驅節點是首節點且獲取到了同步狀態 8 setHead(node); //設置首節點 9 p.next = null; // help GC 斷開引用 10 failed = false; 11 return interrupted;//從自旋中退出 12 } 13 if (shouldParkAfterFailedAcquire(p, node) &&//獲取同步狀態失敗后判斷是否需要阻塞或中斷 14 parkAndCheckInterrupt())//阻塞當前線程 15 interrupted = true; 16 } 17 } finally { 18 if (failed) 19 cancelAcquire(node); 20 } 21 }
1 /**Checks and updates status for a node that failed to acquire. 2 * Returns true if thread should block. This is the main signal control in all acquire loops.*/ 3 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 4 int ws = pred.waitStatus;//獲取前驅節點的等待狀態 5 if (ws == Node.SIGNAL) 6 //SIGNAL狀態:前驅節點釋放同步狀態或者被取消,將會通知后繼節點。因此,可以放心的阻塞當前線程,返回true。 7 /* This node has already set status asking a release to signal it, so it can safely park.*/ 8 return true; 9 if (ws > 0) {//前驅節點被取消了,跳過前驅節點並重試 10 /* Predecessor was cancelled. Skip over predecessors and indicate retry. */ 11 do { 12 node.prev = pred = pred.prev; 13 } while (pred.waitStatus > 0); 14 pred.next = node; 15 } else {//獨占模式下,一般情況下這里指前驅節點等待狀態為SIGNAL 16 /* waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to 17 * retry to make sure it cannot acquire before parking. */ 18 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//設置當前節點等待狀態為SIGNAL 19 } 20 return false; 21 }
1 /** Convenience method to park and then check if interrupted 。return {@code true} if interrupted */ 2 private final boolean parkAndCheckInterrupt() { 3 LockSupport.park(this);//阻塞當前線程 4 return Thread.interrupted(); 5 }
可以看到節點和節點之間在循環檢查的過程中基本不相互通信,而是簡單地判斷自己的前驅是否為頭節點,這樣就使得節點的釋
放規則符合FIFO。並且也便於對過早通知的處理(過早通知是指:前驅節點不是頭節點的線程由於中斷而被喚醒)。
當同步狀態獲取成功之后,當前線程從acquire(int arg)方法返回,如果對於鎖這種並發組件而言,代表着當前線程獲取了鎖。
設置首節點
同步隊列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的線程在釋放同步狀態時,將會喚醒后續節點,而后續節點將會在獲取同步狀態成功時將自己設置為首節點。
設置首節點是由獲取同步狀態成功的線程來完成的,由於只有一個線程能夠成功的獲取到同步狀態,因此設置頭節點的方法並不需要使用CAS來保證,它只需要將首節點設置成為原首節點后繼節點,並斷開首節點的next引用即可。
釋放同步狀態
當前線程獲取同步狀態並執行了相應邏輯之后,就需要釋放同步狀態,使得后續節點能夠繼續獲取同步狀態。通過調用同步器的release(int arg)方法可以釋放同步狀態,該方法在釋放了同步狀態之后,會"喚醒"其后繼節點(進而使后繼節點重新嘗試獲取同步狀態)。
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) {//釋放同步狀態 3 Node h = head; 4 if (h != null && h.waitStatus != 0)//獨占模式下這里表示SIGNAL 5 unparkSuccessor(h);//喚醒后繼節點 6 return true; 7 } 8 return false; 9 }
1 /** Wakes up node's successor, if one exists.*/ 2 private void unparkSuccessor(Node node) { 3 int ws = node.waitStatus;//獲取當前節點等待狀態 4 if (ws < 0) 5 compareAndSetWaitStatus(node, ws, 0);//更新等待狀態 6 7 /* Thread to unpark is held in successor, which is normally just the next node. 8 But if cancelled or apparently null, 9 * traverse backwards from tail to find the actual non-cancelled successor.*/ 10 Node s = node.next; 11 if (s == null || s.waitStatus > 0) {//找到第一個沒有被取消的后繼節點(等待狀態為SIGNAL) 12 s = null; 13 for (Node t = tail; t != null && t != node; t = t.prev) 14 if (t.waitStatus <= 0) 15 s = t; 16 } 17 if (s != null) 18 LockSupport.unpark(s.thread);//喚醒后繼線程 19 }
總結:在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中並在隊列中進行自旋;移出隊列
(或停止自旋)的條件是前驅節點為頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器調用tryRelease(int arg)方法釋放同步狀態,然后喚醒頭節點的后繼節點。
獨占鎖(Mutex)
1 import java.util.Collection; 2 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 3 4 public class Mutex { 5 // 靜態內部類,自定義同步器 6 private static class Sync extends AbstractQueuedSynchronizer { 7 // 是否處於占用狀態 8 protected boolean isHeldExclusively() { 9 return getState() == 1; 10 } 11 // 當狀態為0的時候獲取鎖 12 public boolean tryAcquire(int acquires) { 13 if (compareAndSetState(0, 1)) { 14 setExclusiveOwnerThread(Thread.currentThread()); 15 return true; 16 } 17 return false; 18 } 19 // 釋放鎖,將狀態設置為0 20 protected boolean tryRelease(int releases) { 21 if (getState() == 0) 22 throw new IllegalMonitorStateException(); 23 setExclusiveOwnerThread(null); 24 setState(0); 25 return true; 26 } 27 } 28 // 僅需要將操作代理到Sync上即可 29 private final Sync sync = new Sync(); 30 31 //獲取等待的線程 32 public Collection<Thread> getQueuedThreads(){ 33 return sync.getQueuedThreads(); 34 } 35 36 //獨占鎖的操作接口 37 public void lock() {//獲取鎖 38 sync.acquire(1); 39 } 40 41 public void unlock() {//釋放鎖 42 sync.release(1); 43 } 44 }
1 import java.util.Collection; 2 import java.util.Random; 3 4 public class MutexTestSecond { 5 private static Random r=new Random(47); 6 private static int threadCount=10; 7 private static Mutex mut=new Mutex(); 8 private static class Weight implements Runnable{//給蘋果稱重的任務 9 String name; 10 public Weight(String name){ 11 this.name=name; 12 } 13 @Override 14 public void run() { 15 mut.lock(); 16 System.out.println(name+"放蘋果!"); 17 System.out.println(name+"重量:"+(r.nextInt(10)+3)); 18 System.out.println(name+"取蘋果!"); 19 printQueuedThreads(mut.getQueuedThreads()); 20 mut.unlock(); 21 } 22 } 23 private static void printQueuedThreads(Collection<Thread> threads){ 24 System.out.print("等待隊列中的線程:"); 25 for(Thread t:threads){ 26 System.out.print(t.getName()+" "); 27 } 28 System.out.println(); 29 } 30 public static void main(String[] args) { 31 Thread[] threads=new Thread[threadCount]; 32 for(int i=0;i<threadCount;i++){ 33 threads[i]=new Thread(new Weight("Weight-"+i),"Thread-"+i); 34 } 35 for(int i=0;i<threadCount;i++){ 36 threads[i].start(); 37 } 38 } 39 }
輸出:
Weight-0放蘋果!
Weight-0重量:11
Weight-0取蘋果!
等待隊列中的線程:Thread-3 Thread-2 Thread-1
Weight-6放蘋果!
Weight-6重量:8
Weight-6取蘋果!
等待隊列中的線程:Thread-8 Thread-7 Thread-5 Thread-4 Thread-3 Thread-2 Thread-1
Weight-1放蘋果!
Weight-1重量:6
Weight-1取蘋果!
等待隊列中的線程:Thread-9 Thread-8 Thread-7 Thread-5 Thread-4 Thread-3 Thread-2
Weight-2放蘋果!
Weight-2重量:4
Weight-2取蘋果!
等待隊列中的線程:Thread-9 Thread-8 Thread-7 Thread-5 Thread-4 Thread-3
Weight-3放蘋果!
Weight-3重量:4
Weight-3取蘋果!
等待隊列中的線程:Thread-9 Thread-8 Thread-7 Thread-5 Thread-4
Weight-4放蘋果!
Weight-4重量:12
Weight-4取蘋果!
等待隊列中的線程:Thread-9 Thread-8 Thread-7 Thread-5
Weight-5放蘋果!
Weight-5重量:11
Weight-5取蘋果!
等待隊列中的線程:Thread-9 Thread-8 Thread-7
Weight-7放蘋果!
Weight-7重量:3
Weight-7取蘋果!
等待隊列中的線程:Thread-9 Thread-8
Weight-8放蘋果!
Weight-8重量:5
Weight-8取蘋果!
等待隊列中的線程:Thread-9
Weight-9放蘋果!
Weight-9重量:10
Weight-9取蘋果!
等待隊列中的線程:
從輸出中可以看出,我們的獨占鎖Mutex,保證了秤的獨占使用。
重入鎖
重入鎖ReentrantLock,顧名思義,就是支持重進入的鎖,它表示該鎖能夠支持一個線程對資源的重復加鎖。除此之外,該鎖的還支持獲取鎖時的公平和非公平性選擇。
對於獨占鎖(Mutex),考慮如下場景:當一個線程調用Mutex的lock()方法獲取鎖之后,如果再次調用lock()方法,則該線程將會被自己所阻塞,原因是Mutex在實現tryAcquire(int acquires)方法時沒有考慮占有鎖的線程再次獲取鎖的場景,而在調用tryAcquire(int acquires)方法時返回了false,導致該線程被阻塞。簡單地說,Mutex是一個不支持重進入的鎖。
synchronized關鍵字隱式的支持重進入,比如一個synchronized修飾的遞歸方法,在方法執行時,執行線程在獲取了鎖之后仍能連續多次地獲得該鎖,而不像Mutex由於獲取了鎖,而在下一次獲取鎖時出現阻塞自己的情況。
ReentrantLock雖然沒能像synchronized關鍵字一樣支持隱式的重進入,但是在調用lock()方法時,已經獲取到鎖的線程,能夠再次調用lock()方法獲取鎖而不被阻塞。
可重入的實現
重進入是指任意線程在獲取到鎖之后能夠再次獲取該鎖而不會被鎖所阻塞,該特性的實現需要解決以下兩個問題。
1)線程再次獲取鎖。鎖需要去識別獲取鎖的線程是否為當前占據鎖的線程,如果是,則再次成功獲取。
2)鎖的最終釋放。線程重復n次獲取了鎖,隨后在第n次釋放該鎖后,其他線程能夠獲取到該鎖。鎖的最終釋放要求鎖對於獲取進行計數自增,計數表示當前鎖被重復獲取的次數,而鎖被釋放時,計數自減,當計數等於0時表示鎖已經成功釋放。
ReentrantLock是通過組合自定義同步器來實現鎖的獲取與釋放。我們以非公平鎖為例:
1 public class ReentrantLock implements Lock, java.io.Serializable { 2 private final Sync sync; 3 4 ...... 5 abstract static class Sync extends AbstractQueuedSynchronizer { 6 private static final long serialVersionUID = -5179523762034025860L; 7 8 abstract void lock();//抽象方法 9 10 final boolean nonfairTryAcquire(int acquires) {//非公平的獲取鎖 11 final Thread current = Thread.currentThread(); 12 int c = getState(); 13 if (c == 0) {//首次獲取同步狀態 14 if (compareAndSetState(0, acquires)) {//只要設置成功就獲取到鎖 15 setExclusiveOwnerThread(current); 16 return true; 17 } 18 } 19 else if (current == getExclusiveOwnerThread()) {//再次獲取同步狀態(可重入的關鍵) 20 //如果是獲取鎖的線程再次請求,則將同步狀態值進行增加並返回true,表示獲取同步狀態成功。 21 int nextc = c + acquires; 22 if (nextc < 0) // overflow 23 throw new Error("Maximum lock count exceeded"); 24 setState(nextc); 25 return true; 26 } 27 return false; 28 } 29 30 protected final boolean tryRelease(int releases) { 31 int c = getState() - releases; 32 if (Thread.currentThread() != getExclusiveOwnerThread()) 33 throw new IllegalMonitorStateException(); 34 boolean free = false; 35 if (c == 0) {//當同步狀態為0時,將占有線程設置為null 36 free = true; 37 setExclusiveOwnerThread(null); 38 } 39 setState(c);//更新同步狀態 40 return free; 41 } 42 ...... 43 } 44 static final class NonfairSync extends Sync { 45 private static final long serialVersionUID = 7316153563782823691L; 46 47 /**Performs lock. Try immediate barge, backing up to normal acquire on failure. */ 48 final void lock() { 49 if (compareAndSetState(0, 1))//首次獲取鎖成功 50 setExclusiveOwnerThread(Thread.currentThread()); 51 else 52 acquire(1);//申請加鎖 53 } 54 55 protected final boolean tryAcquire(int acquires) { 56 return nonfairTryAcquire(acquires);//非公平獲取鎖 57 } 58 } 59 ...... 60 public void lock() { 61 sync.lock(); 62 } 63 public void unlock() { 64 sync.release(1); 65 } 66 ...... 67 }
1 import java.util.Random; 2 import java.util.concurrent.locks.ReentrantLock; 3 4 public class ReentrantLockTest { 5 private static Random r=new Random(47); 6 private static int threadCount=10; 7 private static ReentrantLock mut=new ReentrantLock(); 8 private static class Weight implements Runnable{//給蘋果稱重的任務 9 String name; 10 public Weight(String name){ 11 this.name=name; 12 } 13 @Override 14 public void run() { 15 mut.lock(); 16 System.out.println(name+"放蘋果!"); 17 System.out.println(name+"重量:"+(r.nextInt(10)+3)); 18 System.out.println(name+"取蘋果!"); 19 if(r.nextInt()%2==0){run();}//遞歸調用 20 mut.unlock(); 21 } 22 } 23 public static void main(String[] args) throws InterruptedException { 24 Thread[] threads=new Thread[threadCount]; 25 for(int i=0;i<threadCount;i++){ 26 threads[i]=new Thread(new Weight("Weight-"+i),"Thread-"+i); 27 } 28 for(int i=0;i<threadCount;i++){ 29 threads[i].start(); 30 Thread.sleep(10); 31 } 32 } 33 }
輸出:
Weight-0放蘋果!
Weight-0重量:11
Weight-0取蘋果!
Weight-0放蘋果!
Weight-0重量:6
Weight-0取蘋果!
Weight-0放蘋果!
Weight-0重量:4
Weight-0取蘋果!
Weight-1放蘋果!
Weight-1重量:11
Weight-1取蘋果!
Weight-2放蘋果!
Weight-2重量:5
Weight-2取蘋果!
Weight-2放蘋果!
Weight-2重量:11
Weight-2取蘋果!
Weight-2放蘋果!
Weight-2重量:4
Weight-2取蘋果!
Weight-3放蘋果!
Weight-3重量:12
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:11
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:3
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:9
Weight-3取蘋果!
Weight-5放蘋果!
Weight-5重量:4
Weight-5取蘋果!
Weight-7放蘋果!
Weight-7重量:7
Weight-7取蘋果!
Weight-8放蘋果!
Weight-8重量:9
Weight-8取蘋果!
Weight-6放蘋果!
Weight-6重量:3
Weight-6取蘋果!
Weight-4放蘋果!
Weight-4重量:7
Weight-4取蘋果!
Weight-4放蘋果!
Weight-4重量:3
Weight-4取蘋果!
Weight-9放蘋果!
Weight-9重量:5
Weight-9取蘋果!
Weight-9放蘋果!
Weight-9重量:6
Weight-9取蘋果!
Weight-9放蘋果!
Weight-9重量:7
Weight-9取蘋果!
從輸出中,可以看出可重入特性。如果,我們將可重入鎖換成獨占鎖Mutex程序將會阻塞,不具有可重入性。
此外,我們還發現,線程的執行是亂序的(從線程名稱的角度看),即與start()方法調用順序不一致。這是為什么呢?
原來重入鎖ReentrantLock默認采用非公平實現?那好,我們將可重入鎖設置為公平鎖:
private static ReentrantLock mut=new ReentrantLock(true);//設置為公平鎖
輸出:
Weight-0放蘋果!
Weight-0重量:11
Weight-0取蘋果!
Weight-0放蘋果!
Weight-0重量:6
Weight-0取蘋果!
Weight-0放蘋果!
Weight-0重量:4
Weight-0取蘋果!
Weight-1放蘋果!
Weight-1重量:11
Weight-1取蘋果!
Weight-2放蘋果!
Weight-2重量:5
Weight-2取蘋果!
Weight-2放蘋果!
Weight-2重量:11
Weight-2取蘋果!
Weight-2放蘋果!
Weight-2重量:4
Weight-2取蘋果!
Weight-3放蘋果!
Weight-3重量:12
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:11
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:3
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:9
Weight-3取蘋果!
Weight-7放蘋果!
Weight-7重量:4
Weight-7取蘋果!
Weight-6放蘋果!
Weight-6重量:7
Weight-6取蘋果!
Weight-4放蘋果!
Weight-4重量:9
Weight-4取蘋果!
Weight-5放蘋果!
Weight-5重量:3
Weight-5取蘋果!
Weight-8放蘋果!
Weight-8重量:7
Weight-8取蘋果!
Weight-8放蘋果!
Weight-8重量:3
Weight-8取蘋果!
Weight-9放蘋果!
Weight-9重量:5
Weight-9取蘋果!
Weight-9放蘋果!
Weight-9重量:6
Weight-9取蘋果!
Weight-9放蘋果!
Weight-9重量:7
Weight-9取蘋果!
從輸出中我們看到,線程的執行順序與對應start()方法被調用的順序依然不一樣,說好的公平鎖呢?
原因分析:start()語句調用的順序與線程進入Runnable狀態的順序不一定一致,也就是說先調用start()語句所對應的線程不一定先進入Runnable狀態,即使先進入Runnable狀態也不一定先分得處理器開始執行。
公平性與否是針對獲取鎖而言的,如果一個鎖是公平的,那么鎖的獲取順序就應該符合請求的絕對時間順序,也就是FIFO。
個人理解,如有偏頗,還望指正!!
非公平性的實現
1 static final class NonfairSync extends Sync { 2 private static final long serialVersionUID = 7316153563782823691L; 3 4 /**Performs lock. Try immediate barge, backing up to normal acquire on failure.*/ 5 final void lock() { 6 if (compareAndSetState(0, 1))//只要CAS更新同步狀態成功就獲取到鎖。 7 setExclusiveOwnerThread(Thread.currentThread()); 8 else 9 acquire(1); 10 } 11 12 protected final boolean tryAcquire(int acquires) { 13 return nonfairTryAcquire(acquires); 14 } 15 }
非公平性實例,如果Thread-1擁有鎖,Thread-2和Thread-3在同步隊列中,當Thread-1釋放鎖后會喚醒Thread-2,但是如果此時Thread-1重新申請鎖,可能依然是Thread-1獲取到鎖。甚至這時候Thread-4也申請鎖,Thread-4也可能比Thread-2先獲取鎖。
非公平鎖可能使得線程“飢餓”。當一個線程請求鎖時,只要獲取了同步狀態即成功獲取鎖。在這個前提下,剛釋放鎖的線程再次獲取同步狀態的幾率會非常大,使得其他線程只能在同步隊列中等待。
公平性的實現
1 static final class FairSync extends Sync { 2 private static final long serialVersionUID = -3000897897090466540L; 3 4 final void lock() { 5 acquire(1); 6 } 7 8 /** Fair version of tryAcquire. Don't grant access unless recursive call or no waiters or is first.*/ 9 protected final boolean tryAcquire(int acquires) { 10 final Thread current = Thread.currentThread(); 11 int c = getState(); 12 //這里沒有一進來就直接進行CAS操作 13 if (c == 0) { 14 if (!hasQueuedPredecessors() &&<span><span class="comment">//增加是否有前驅線程的判斷</span><span>,從而保證公平性</span></span> 15 compareAndSetState(0, acquires)) { 16 setExclusiveOwnerThread(current); 17 return true; 18 } 19 } 20 else if (current == getExclusiveOwnerThread()) { 21 int nextc = c + acquires; 22 if (nextc < 0) 23 throw new Error("Maximum lock count exceeded"); 24 setState(nextc); 25 return true; 26 } 27 return false; 28 } 29 }
公平鎖與非公平鎖的比較
公平性與否是針對獲取鎖而言的,如果一個鎖是公平的,那么鎖的獲取順序就應該符合請求的絕對時間順序,也就是FIFO。
公平性鎖每次都是從同步隊列中的第一個節點獲取到鎖,而非公平性鎖出現了一個線程連續獲取鎖的情況。
非公平性鎖可能使線程“飢餓”,當一個線程請求鎖時,只要獲取了同步狀態即成功獲取鎖。在這個前提下,剛釋放鎖的線程再次獲取同步狀態的幾率會非常大,使得其他線程只能在同步隊列中等待。
非公平鎖可能使線程“飢餓”,為什么它又被設定成默認的實現呢?非公平性鎖模式下線程上下文切換的次數少,因此其性能開銷更小。公平性鎖保證了鎖的獲取按照FIFO原則,而代價是進行大量的線程切換。非公平性鎖雖然可能造成線程“飢餓”,但極少的線程切換,保證了其更大的吞吐量。
共享式同步組件設計
可重寫的方法
1 /**Attempts to acquire in shared mode. This method should query if the state of the object 2 permits it to be acquired in the shared mode, and if so to acquire it.*/ 3 //共享式獲取同步狀態,返回大於等於0的值,表示獲取成功,反之獲取失敗 4 protected int tryAcquireShared(int arg) 5 6 7 /**Attempts to set the state to reflect a release in shared mode.*/ 8 //共享式釋放同步狀態 9 protected boolean tryReleaseShared(int arg)
同步器提供的模板方法
/**Acquires in shared mode, ignoring interrupts.*/ //共享式獲取同步狀態,如果當前線程未獲取到同步狀態,將會進入同步隊列等待 //與獨占式獲取的主要區別是在同一時刻可以有多個線程獲取到同步狀態 public final void acquireShared(int arg) <span style="font-size:14px;"><span style="font-size:14px;"></span></span><pre name="code" class="java"> /**Acquires in exclusive mode, aborting if interrupted.*/ //該方法可以響應中斷 public final void acquireInterruptibly(int arg)
/**Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true.*/ //共享式釋放同步狀態 public final boolean releaseShared(int arg)
共享式獲取與獨占式獲取最主要的區別在於同一時刻能否有多個線程同時獲取到同步狀態。以文件的讀寫為例,如果一個程序在對文件進行讀操作,那么這一時刻對於該文件的寫操作均被阻塞,而讀操作能夠同時進行。寫操作要求對資源的獨占式訪問,而讀操作可以是共享式訪問。
獲取同步狀態
調用同步器的acquireShared(int arg)方法可以共享式地獲取同步狀態。
1 public final void acquireShared(int arg) { 2 if (tryAcquireShared(arg) < 0) 3 doAcquireShared(arg); 4 }
在acquireShared(int arg)方法中,同步器調用tryAcquireShared(int arg)方法嘗試獲取同步狀態,tryAcquireShared(int arg)方法返回值為int類型,當返回值大於等於0時,表示能夠獲取到同步狀態。因此,在共享式獲取的自旋過程中,成功獲取到同步狀態並退出自旋的條件就是tryAcquireShared(int arg)方法返回值大於等於0。
在doAcquireShared(int arg)方法的自旋過程中,如果當前節點的前驅為頭節點時,嘗試獲取同步狀態,如果返回值大於等於0,表示該次獲取同步狀態成功並從自旋過程中退出。
釋放同步狀態
與獨占式一樣,共享式獲取也需要釋放同步狀態,通過調用releaseShared(int arg)方法可以釋放同步狀態。
1 public final boolean releaseShared(int arg) { 2 if (tryReleaseShared(arg)) { 3 doReleaseShared(); 4 return true; 5 } 6 return false; 7 }
該方法在釋放同步狀態之后,將會喚醒后續處於等待狀態的節點。對於能夠支持多個線程同時訪問的並發組件(比Semaphore),它和獨占式主要區別在於tryReleaseShared(int arg)方法必須確保同步狀態(或者資源數)線程安全釋放,一般是通過循環和CAS來保證的,因為釋放同步狀態的操作會同時來自多個線程。
CountDownLatch
1 public class CountDownLatch { 2 /**Synchronization control For CountDownLatch. Uses AQS state to represent count.*/ 3 private static final class Sync extends AbstractQueuedSynchronizer { 4 private static final long serialVersionUID = 4982264981922014374L; 5 6 Sync(int count) { 7 setState(count);//初始化同步狀態 8 } 9 10 int getCount() { 11 return getState(); 12 } 13 14 protected int tryAcquireShared(int acquires) { //同步狀態為0才返回成功 15 return (getState() == 0) ? 1 : -1; 16 } 17 18 protected boolean tryReleaseShared(int releases) {//減少同步狀態 19 // Decrement count; signal when transition to zero 20 for (;;) { //這里通過循環CAS來釋放同步狀態,從而保證線程安全性 21 int c = getState(); 22 if (c == 0) 23 return false; 24 int nextc = c-1; 25 if (compareAndSetState(c, nextc)) 26 return nextc == 0; 27 } 28 } 29 } 30 31 private final Sync sync;//組合一個同步器(AQS) 32 33 public CountDownLatch(int count) { 34 if (count < 0) throw new IllegalArgumentException("count < 0"); 35 this.sync = new Sync(count);//初始化同步狀態 36 } 37 38 /*Causes the current thread to wait until the latch has counted down to 39 * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.*/ 40 public void await() throws InterruptedException { 41 sync.acquireSharedInterruptibly(1);//當同步狀態為0時,acquireShared(1)才返回 42 } 43 44 public boolean await(long timeout, TimeUnit unit) 45 throws InterruptedException { 46 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 47 } 48 public void countDown() { 49 sync.releaseShared(1);//釋放同步狀態 50 } 51 52 public long getCount() { 53 return sync.getCount(); 54 } 55 56 public String toString() { 57 return super.toString() + "[Count = " + sync.getCount() + "]"; 58 } 59 }
獨占式超時獲取同步狀態
Lock接口
鎖是用來控制多個線程訪問共享資源的方式,一般來說,一個鎖能夠防止多個線程同時訪問共享資源(但是有些鎖可以允許多個線程並發的訪問共享資源,比如讀寫鎖)。
在Lock接口出現之前,Java程序是靠synchronized關鍵字實現鎖功能的,而Java SE 5之后,並發包中新增了Lock接口(以及相關實現類)用來實現鎖功能,它提供了與synchronized關鍵字類似的同步功能,只是在使用時需要顯式地獲取和釋放鎖。雖然它缺少了(通過synchronized塊或者方法所提供的)隱式獲取釋放鎖的便捷性,但是卻擁有了鎖獲取與釋放的可操作性、可中斷的獲取鎖以及超時獲取鎖等多種synchronized關鍵字所不具備的同步特性。
使用synchronized關鍵字將會隱式地獲取鎖,但是它將鎖的獲取和釋放固化了,也就是先獲取再釋放。當然,這種方式簡化了同步的管理,可是擴展性沒有顯示的鎖獲取和釋放來的好。例如,針對一個場景,手把手進行鎖獲取和釋放,先獲得鎖A,然后再獲取鎖B,當鎖B獲得后,釋放鎖A同時獲取鎖C,當鎖C獲得后,再釋放B同時獲取鎖D,以此類推。這種場景下,synchronized關鍵字就不那么容易實現了,而使用Lock卻容易許多。
Lock的使用方式
1 Lock lock = new ReentrantLock(); 2 lock.lock(); 3 try { 4 。。。。。。 5 } finally { 6 lock.unlock(); 7 }
在finally塊中釋放鎖,目的是保證在獲取到鎖之后,最終能夠被釋放。
不要將獲取鎖的過程寫在try塊中,因為如果在獲取鎖(自定義鎖的實現)時發生了異常,異常拋出的同時,也會導致鎖無故釋放。
Lock接口提供的新特性
Lock接口
1 public interface Lock { 2 //獲取鎖,調用該方法將會獲取鎖,當鎖獲取后,從該方法返回 3 void lock(); 4 //可中斷地獲取鎖,和lock()方法的不同之處在於該方法會響應中斷,即在鎖的獲取過程中可以中斷當前線程 5 void lockInterruptibly() throws InterruptedException; 6 //嘗試非阻塞的獲取鎖,調用該方法后會立刻返回,如果能夠獲取則返回true,否則返回false 7 boolean tryLock(); 8 //超時地獲取鎖 1、當前線程在超時時間內成功獲取鎖。2、當前線程在超時時間內被中斷。3、超時時間結束返回false。 9 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; 10 //釋放鎖 11 void unlock(); 12 //獲取等待通知組件 13 Condition newCondition(); 14 }
Lock接口的實現
Lock接口的實現基本都是通過組合了一個隊列同步器(AbstractQueuedSynchronizer)的子類來完成線程訪問控制的。
例如,ReentrantLock(重入鎖)。
說在最后:關於可響應中斷和超時等待特性,文中基本略過,詳情可參看《Java並發編程的藝術》和JDK源碼。
參考:
JDK 1.7源碼
《Java並發編程的藝術》
《Java並發編程實踐》