基於AQS的前世今生,來學習並發工具類Semaphore。本文將從Semaphore的應用場景、源碼原理解析來學習這個並發工具類。
1、 應用場景
Semaphore用來控制同時訪問某個特定資源的操作數量,或者同時執行某個指定操作的數量。還可以用來實現某種資源池限制,或者對容器施加邊界。
1.1 當成鎖使用
控制同時訪問某個特定資源的操作數量,代碼如下:
public class SemaphoreLock { public static void main(String[] args) { //1、信號量為1時 相當於普通的鎖 信號量大於1時 共享鎖 Output o = new Output(); for (int i = 0; i < 5; i++) { new Thread(() -> o.output()).start(); } } } class Output { Semaphore semaphore = new Semaphore(1); public void output() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " start at " + System.currentTimeMillis()); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " stop at " + System.currentTimeMillis()); }catch(Exception e) { e.printStackTrace(); }finally { semaphore.release(); } } }
1.2 線程通信信號
線程間通信,代碼如下:
public class SemaphoreCommunication { public static void main(String[] args) { //2、線程間進行通信 Semaphore semaphore = new Semaphore(1); new SendingThread(semaphore,"SendingThread"); new ReceivingThread(semaphore,"ReceivingThread"); } } class SendingThread extends Thread { Semaphore semaphore; String name; public SendingThread(Semaphore semaphore,String name) { this.semaphore = semaphore; this.name = name; new Thread(this).start(); } public void run() { try { semaphore.acquire(); for (int i = 0; i < 5; i++) { System.out.println(name + ":" + i); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } semaphore.release(); } } class ReceivingThread extends Thread { Semaphore semaphore; String name; public ReceivingThread(Semaphore semaphore,String name) { this.semaphore = semaphore; this.name = name; new Thread(this).start(); } public void run() { try { semaphore.acquire(); for (int i = 0; i < 5; i++) { System.out.println(name + ":" + i); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } semaphore.release(); } }
1.3 資源池限制
對資源池進行資源限制,代碼如下:
public class SemaphoreConnect { public static void main(String[] args) throws Exception { //3、模擬連接池數量限制 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 200; i++) { executorService.submit(new Runnable() { @Override public void run() { Connection.getInstance().connect(); } }); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); } } class Connection { private static Connection instance = new Connection(); private Semaphore semaphores = new Semaphore(10,true); private int connections = 0; private Connection() { } public static Connection getInstance() { return instance; } public void connect() { try { semaphores.acquire(); doConnect(); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphores.release(); } } private void doConnect() { synchronized (this) { connections ++; System.out.println("current get connections is : " + connections); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (this) { connections --; System.out.println("after release current connections is : " + connections); } } }
1.4 容器邊界限制
對容器進行邊界限制,代碼如下:
public class SemaphoreBoundedList { public static void main(String[] args) { //4、容器邊界限制 final BoundedList ba = new BoundedList(5); Runnable runnable1 = new Runnable() { public void run() { try { ba.add("John"); ba.add("Martin"); ba.add("Adam"); ba.add("Prince"); ba.add("Tod"); System.out.println("Available Permits : " + ba.getSemaphore().availablePermits()); ba.add("Tony"); System.out.println("Final list: " + ba.getArrayList()); }catch (InterruptedException ie) { Thread.interrupted(); } } }; Runnable runnable2 = new Runnable() { public void run() { try { System.out.println("Before removing elements: "+ ba.getArrayList()); Thread.sleep(5000); ba.remove("Martin"); ba.remove("Adam"); }catch (InterruptedException ie) { Thread.interrupted(); } } }; Thread thread1 = new Thread(runnable1); Thread thread2 = new Thread(runnable2); thread1.start(); thread2.start(); } } class BoundedList<T> { private final Semaphore semaphore; private List arrayList; BoundedList(int limit) { this.arrayList = Collections.synchronizedList(new ArrayList()); this.semaphore = new Semaphore(limit); } public boolean add(T t) throws InterruptedException { boolean added = false; semaphore.acquire(); try { added = arrayList.add(t); return added; } finally { if (!added) semaphore.release(); } } public boolean remove(T t) { boolean wasRemoved = arrayList.remove(t); if (wasRemoved) semaphore.release(); return wasRemoved; } public void remove(int index) { arrayList.remove(index); semaphore.release(); } public List getArrayList() { return arrayList; } public Semaphore getSemaphore() { return semaphore; } }
2、 源碼原理解析
2.1 獲取信號
獲取信號的方法如下:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1);//共享式獲取AQS的同步狀態 }
調用的是AQS的acquireSharedInterruptibly方法:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())//線程中斷 說明信號量對線程中斷敏感 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //獲取信號量失敗 線程進入同步隊列自旋等待 doAcquireSharedInterruptibly(arg); }
其中tryAcquireShared依賴的是Sync的實現,Sync提供了公平和非公平式的方式,先看非公平式。
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState();//同步狀態 當前的信號量許可數 int remaining = available - acquires;//減去釋放的信號量 剩余信號量許可數 if (remaining < 0 ||//剩余信號量小於0 直接返回remaining 不做CAS compareAndSetState(available, remaining))//CAS更新 return remaining; } }
再看下公平式的。
protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors())//判斷同步隊列如果存在前置節點 獲取信號量失敗 其他和非公平式是一致的 return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
最后來看下,如果未獲取到信號量的處理方法doAcquireSharedInterruptibly。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//線程進入同步隊列 boolean failed = true; try { for (;;) {//自旋 final Node p = node.predecessor(); if (p == head) {//當前節點的前置節點是AQS的頭節點 即自己是AQS同步隊列的第一個節點 int r = tryAcquireShared(arg); //再去獲取信號量 if (r >= 0) {//獲取成功 setHeadAndPropagate(node, r);//退出自旋 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); //獲取失敗 就取消獲取 } }
2.2 釋放信號
釋放信號的方法如下:
public void release() { sync.releaseShared(1); }
調用的是AQS的releaseShared方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//釋放信號量 doReleaseShared();//喚醒后續的線程節點 return true; } return false; }
tryReleaseShared交由子類Sync實現,代碼如下:
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState();//當前信號量許可數 int next = current + releases; //當前信號量許可數+釋放的信號量許可數 if (next < current) // overflow 這個分支我看着永遠走不進來呢 throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next))//CAS更新當前信號量許可數 return true; } }
釋放成功后,則繼續調用doReleaseShared,喚醒后續線程節點可以來爭取信號量了。
private void doReleaseShared() { for (;;) { Node h = head; //頭節點 if (h != null && h != tail) {//同步隊列中存在線程等待 int ws = h.waitStatus; //頭節點線程狀態 if (ws == Node.SIGNAL) {//頭節點線程狀態為SIGNAL 喚醒后續線程節點 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //喚醒下個節點 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
總結:Semaphore使用AQS同步狀態來保存信號量的當前計數。它里面定義的acquireSharedInterruptibly方法會減少計數,當計數為非正值時阻塞線程,releaseShared方法會增加計數,在計數不超過信號量限制時要解除線程的阻塞。
參考資料:
https://github.com/lingjiango/ConcurrentProgramPractice
https://www.caveofprogramming.com/java-multithreading/java-multithreading-semaphores-part-12.html
https://java2blog.com/java-semaphore-example/
http://tutorials.jenkov.com/java-util-concurrent/semaphore.html