Java並發之Semaphore源碼解析(二)


在上一章,我們學習了信號量(Semaphore)是如何請求許可證的,下面我們來看看要如何歸還許可證。

可以看到當我們要歸還許可證時,不論是調用release()或是release(int permits),都會調用AQS實現的releaseShared(int arg)方法。在releaseShared(int arg)方法中會先調用子類實現的tryReleaseShared(int arg)方法,這個方法會向信號量歸還許可證,在歸還完畢后,會調用doReleaseShared()方法嘗試喚醒信號量等待隊列中需要許可證的線程,這也印證了筆者之前所說的線程在歸還信號量后,會嘗試喚醒等待隊列中等待許可證的線程。

那我們來看看信號量(Semaphore)靜態內部類Sync實現的tryReleaseShared(int releases)是怎么完成歸還許可證,首先會調用getState()獲取信號量當前剩余的許可證,加上外部線程歸還的許可證數量算出總許可證數量:current + releases,如果能用CAS的方式修改成功,則退出方法,否則一直輪詢直到歸還成功,這里CAS失敗的原因有可能是外部也在請求和歸還許可證,可能在執行完代碼<1>處后和執行代碼<2>處之前,信號量內部的許可證數量已經變了,所以CAS失敗。歸還信號量成功后就會調用doReleaseShared(),這個方法前面已經講解過了,這里就不再贅述了。

public class Semaphore implements java.io.Serializable {
	//...
	abstract static class Sync extends AbstractQueuedSynchronizer {
		//...
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();//<1>
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))//<2>
                    return true;
            }
        }
		//...
	}
	//...
    public void release() {
        sync.releaseShared(1);
    }
	//...
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
	//...
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
	//...
	protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
	//...
}

  

下面我們再來看看tryAcquire(long timeout, TimeUnit unit)和tryAcquire(int permits, long timeout, TimeUnit unit)的實現,這兩個方法會在給定的時間范圍內嘗試獲取許可證,如果獲取成功則返回true,獲取失敗則返回false。

這兩個方法都會調用AQS實現的tryAcquireSharedNanos(int arg, long nanosTimeout),這個方法其實和先前講得doAcquireShared(int arg)十分相似,只是多了一個超時返回的功能。

這里筆者簡單過一下這個方法的實現:先在代碼<1>處算出超時時間,然后封裝線程對應的節點Node並將其入隊,如果判斷節點的前驅節點是頭節點,且申請許可證成功,這里會調用setHeadAndPropagate(node, r)將頭節點指向當前節點,並嘗試喚醒下一個節點對應的線程。如果申請許可證失敗,會在<2>處算出還剩多少的阻塞時間nanosTimeout,如果剩余阻塞時間小於等於0,代表線程獲取許可證失敗,這里會調用<3>處的cancelAcquire(node) 將節點從等待隊列中移除,具體的移除邏輯可以看筆者寫的ReentrantLock源碼解析第二章。如果剩余阻塞時間大於0,則會執行shouldParkAfterFailedAcquire(p, node)將前驅節點的等待狀態改為SIGNAL,在第二次循環時,如果前驅節點的狀態為SIGNAL,且剩余阻塞時間大於SPIN_FOR_TIMEOUT_THRESHOLD(1000ns),則陷入阻塞,直到被中斷拋出異常,或者被喚醒,檢查是否能獲取許可證,如果不能獲取許可證且超時,則會返回false表示在超時時間內沒有獲取到許可證。

public class Semaphore implements java.io.Serializable {
	//...
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }
	//...
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
	//...
}

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	//...
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }
	//...
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;//<1>
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();//<2>
                if (nanosTimeout <= 0L) {
                    cancelAcquire(node);//<3>
                    return false;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }
	//...
}

 

下面我們對照一下FairSync和NonfairSync,其實NonfairSync基本沒有什么實現,都是調用其父類Sync的方法,以非公平的方式競爭許可證也是調用其父類nonfairTryAcquireShared(acquires)方法。而FairSync自身是有實現以公平的方式獲取許可證,實現邏輯也非常簡單。先判斷信號量的等待隊列是否有節點,有的話則返回獲取失敗,如果沒有再獲取當前的可用許可證數量available,扣去申請的許可證數量available - acquires,用CAS的方式把扣減完的值remaining存放進state,由於扣減的時候可能存在其他線程也在申請/歸還許可證,所以available的值並非一直有效,如果在獲取available后有其他線程也申請和歸還許可證,那么這里的CAS很可能會失敗,判斷CAS失敗后,又會開始新的一輪嘗試獲取許可證邏輯。

static final class FairSync extends Sync {
	private static final long serialVersionUID = 2014338818796000944L;

	FairSync(int permits) {
		super(permits);
	}

	protected int tryAcquireShared(int acquires) {
		for (;;) {
			if (hasQueuedPredecessors())
				return -1;
			int available = getState();
			int remaining = available - acquires;
			if (remaining < 0 ||
				compareAndSetState(available, remaining))
				return remaining;
		}
	}
}

static final class NonfairSync extends Sync {
	private static final long serialVersionUID = -2694183684443567898L;

	NonfairSync(int permits) {
		super(permits);
	}

	protected int tryAcquireShared(int acquires) {
		return nonfairTryAcquireShared(acquires);
	}
}

    

對照完公平FairSync和非公平NonfairSync的差別后,我們來看看Sync類實現的方法,Sync類的實現其實也不算復雜,主要就下面4個方法,其中:nonfairTryAcquireShared(int acquires)和tryReleaseShared(int releases)先前已經將結果了,下面我們專注:reducePermits(int reductions)和drainPermits()。

abstract static class Sync extends AbstractQueuedSynchronizer {
	final int nonfairTryAcquireShared(int acquires) {
		//...
	}
	protected final boolean tryReleaseShared(int releases) {
		//...
	}
	final void reducePermits(int reductions) {
		//...
	}
	final int drainPermits() {
		//...
	}
}

  

Sync類實現的的reducePermits(int reductions)的作用是降低許可證數量,比如當雙11來臨時,淘寶京東可以對一些服務進行擴容和配置升級,使得原本可以承受10W並發量的服務提高到可以承受50W,這里可以在不調用acquire()的前提下,調用release()方法增加信號量的許可證,當雙11的壓力過去后,需要對服務進行縮容,由50W的並發量回到10W,這里可以用reducePermits(int reductions)降低許可證數量。在這個方法中會先獲取當前許可證數量,減去我們要扣除的許可證數量current - reductions,並判斷其結果是否溢出,如果溢出則拋出異常,沒有溢出用CAS的方式設置最新的許可證數量。

public class Semaphore implements java.io.Serializable {
	//...
    abstract static class Sync extends AbstractQueuedSynchronizer {
		//...
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
		//...
	}
	//...
	protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }
	//...
}

  

需要注意兩點:

  1. 這個方法的訪問權限是protected,如果要使用此方法需要用一個類去繼承,並修改此方法的訪問權限。
  2. 這個方法可能導致信號量的剩余許可證數量為負,比如一個信號量原先的許可證數量為10,且被借走了9個許可證,當前許可證數量為1。這時想把許可證數量從原先的10扣降到3,向reducePermits(int reduction)傳入7,此時current-reductions=1-7=-6,如果CAS成功,那么信號量目前的許可證數量為-6,不過沒關系,如果前面借走的9個許可證最終會歸還,信號量的許可證數量最終會回到3。
class MySemaphore extends Semaphore {
	public MySemaphore(int permits) {
		super(permits);
	}

	@Override
	public void reducePermits(int reduction) {
		super.reducePermits(reduction);
	}
}

public static void main(String[] args) {
	MySemaphore semaphore = new MySemaphore(8);
	System.out.println("初始信號量的許可證數量:" + semaphore.availablePermits());
	//初始化完信號量后,增加信號量的許可證數量
	int add = 2;
	semaphore.release(add);
	System.out.printf("增加%d個許可證后,許可證數量:%d\n", add, semaphore.availablePermits());
	//申請9個許可證
	int permits = 9;
	try {
		semaphore.acquire(permits);
		System.out.printf("申請%d個許可證后剩余許可證數量:%d\n", permits, semaphore.availablePermits());
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	//這里要將原先10個許可證扣除到只剩3個,所以傳入7,扣除7個許可證
	semaphore.reducePermits(7);
	System.out.println("扣除7個許可證數量后,剩余許可證數量:" + semaphore.availablePermits());
	//歸還原先出借的9個許可證
	semaphore.release(permits);
	System.out.printf("歸還原先出借的%d信號量后,剩余信號量:%d\n", permits, semaphore.availablePermits());
}

    

執行結果:

初始信號量的許可證數量:8
增加2個許可證后,許可證數量:10
申請9個許可證后剩余許可證數量:1
扣除7個許可證數量后,剩余許可證數量:-6
歸還原先出借的9信號量后,剩余信號量:3

  

Sync類實現的drainPermits()可以一次性扣除信號量目前所有的許可證數量並返回,通過這個API,我們可以得知資源目前最大的訪問限度。還是拿上一章遠程服務為例,判定服務能承受的並發是5000,用於限流的semaphore信號量的最大許可證數量也是5000。假設目前信號量剩余的許可證數量為2000,即有3000個線程正在並發訪問遠程服務,我們可以通過drainPermits()方法獲取剩余的允許訪問數量2000,然后創建2000個線程訪問遠程服務,這個API一般用於計算量大且計算內容比較獨立的場景。

public class Semaphore implements java.io.Serializable {
	//...
    abstract static class Sync extends AbstractQueuedSynchronizer {
		//...
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
		//...
	}
	//...
    public int drainPermits() {
        return sync.drainPermits();
    }
	//...
}

 

最后,筆者介紹一個Semaphore在JDK1.6.0_17時期的BUG,便結束對Semaphore的源碼解析。

當時AQS的setHeadAndPropagate(Node node, int propagate)和releaseShared(int arg) 兩個方法的實現是下面這樣的,這個代碼可能導致隊列被阻塞。

private void setHeadAndPropagate(Node node, int propagate) {
	setHead(node);
	if (propagate > 0 && node.waitStatus != 0) {
		Node s = node.next;
		if (s == null || s.isShared())
			unparkSuccessor(node);
	}
}

public final boolean releaseShared(int arg) {
	if (tryReleaseShared(arg)) {
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);
		return true;
	}
	return false;
}

  

按照上面代碼的實現,會讓下面的代碼出現隊列被阻塞的情況。t1和t2線程用於請求許可證,t3和t4線程用於歸還許可證,循環10000000次只是為了增加出現阻塞的概率,現在說說什么樣的場景下會出現隊列被阻塞的情況。

程序開始時,信號量的許可證數量為0,所以t1和t2只能進入隊列等待,t1和t2在隊列中的節點對應N1和N2,節點的排序為:head->N1->N2(tail)。t3歸還許可證時發現頭節點不為null且頭節點的等待狀態為SIGNAL,於是會調用unparkSuccessor(h)方法喚醒頭節點的后繼節點N1對應的線程t1,在執行unparkSuccessor(h)的時候會把head的等待狀態改為0。

t1被喚醒后獲取到許可證,返回剩余許可證數量為0,即之后調用setHeadAndPropagate(Node node, int propagate)方法傳入的propagate為0,但尚未調用。此時t4也歸還了許可證,但發現head節點的等待狀態為0,就不會調用unparkSuccessor(h)。

t1執行setHeadAndPropagate(Node node, int propagate),將頭節點指向自身線程對應的節點N1,雖然此時信號量里有剩余的許可證,但t1原先拿到的propagate為0,所以不會執行unparkSuccessor(node)喚醒t4。

那么新版本的setHeadAndPropagate(Node node, int propagate)和releaseShared(int arg)又是如何保證有許可證被歸還時喚醒隊列中被阻塞的線程呢?這里其實和PROPAGATE有關,讓我們按照新版的setHeadAndPropagate和releaseShared走一遍上面的流程。

t1和t2進入隊列中等待,t3歸還許可證發現頭節點不為null,且頭節點等待狀態為SIGNAL,於是調用unparkSuccessor(h)方法喚醒頭節點的后繼節點N1對應的線程t1,在執行unparkSuccessor(h)的時候會把head的等待狀態改為0。

t1被喚醒后獲取到許可證,返回剩余許可證數量為0,在調用setHeadAndPropagate(Node node, int propagate)之前,t4歸還了許可證,發現頭節點的等待狀態為0,將其改為PROPAGATE。

t1執行setHeadAndPropagate(Node node, int propagate),獲取原先頭節點h,並將頭節點指向N1,此時雖然propagate為0,但原先頭節點h的等待狀態<0,可以執行doReleaseShared()喚醒后繼節點N2對應的線程t2。

import java.util.concurrent.Semaphore;

public class TestSemaphore {

    private static Semaphore sem = new Semaphore(0);

    private static class Thread1 extends Thread {
        @Override
        public void run() {
            sem.acquireUninterruptibly();
        }
    }

    private static class Thread2 extends Thread {
        @Override
        public void run() {
            sem.release();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10000000; i++) {
            Thread t1 = new Thread1();
            Thread t2 = new Thread1();
            Thread t3 = new Thread2();
            Thread t4 = new Thread2();
            t1.start();
            t2.start();
            t3.start();
            t4.start();
            t1.join();
            t2.join();
            t3.join();
            t4.join();
            System.out.println(i);
        }
    }
}

   

至此,Semaphore的源碼解析就到此結束了。筆者在這里並沒有全部介紹完所有Semaphore的API,例如:acquireUninterruptibly()和acquireUninterruptibly(int permits),因為這兩個方法實在與之前介紹的acquire(),如果大家能理解清楚前面講解的內容,這兩個API相信對大家不在話下。

本章我們也初次見到AQS內部類Node的不同狀態和使用方式,即節點除了獨占(Node.EXCLUSIVE),還會有共享的狀態(Node.SHARED),這里我們也首次見到等待狀態為PROPAGATE的節點,代表傳播的意思,通過這個狀態,不但可以提升信號量整體的吞吐量,還可以避免高並發場景下節點沒有被喚醒的情況。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM