Java中Semaphore(信號量)的使用


Java中Semaphore(信號量)的使用

Semaphore 的作用:

在 java 中,使用了 synchronized 關鍵字和 Lock 鎖實現了資源的並發訪問控制,在同一時間只允許唯一了線程進入臨界區訪問資源 (讀鎖除外),這樣子控制的主要目的是為了解決多個線程並發同一資源造成的數據不一致的問題。在另外一種場景下,一個資源有多個副本可供同時使用,比如打印機房有多個打印機、廁所有多個坑可供同時使用,這種情況下,Java 提供了另外的並發訪問控制 -- 資源的多副本的並發訪問控制,今天學習的信號量 Semaphore 即是其中的一種。

Semaphore 實現原理初探:

Semaphore 是用來保護一個或者多個共享資源的訪問,Semaphore 內部維護了一個計數器,其值為可以訪問的共享資源的個數。一個線程要訪問共享資源,先獲得信號量,如果信號量的計數器值大於 1,意味着有共享資源可以訪問,則使其計數器值減去 1,再訪問共享資源。

如果計數器值為 0, 線程進入休眠。當某個線程使用完共享資源后,釋放信號量,並將信號量內部的計數器加 1,之前進入休眠的線程將被喚醒並再次試圖獲得信號量。

就好比一個廁所管理員,站在門口,只有廁所有空位,就開門允許與空側數量等量的人進入廁所。多個人進入廁所后,相當於 N 個人來分配使用 N 個空位。為避免多個人來同時競爭同一個側衛,在內部仍然使用鎖來控制資源的同步訪問。

Semaphore 的使用:

Semaphore 使用時需要先構建一個參數來指定共享資源的數量,Semaphore 構造完成后即是獲取 Semaphore、共享資源使用完畢后釋放 Semaphore。

Semaphore semaphore = new Semaphore(10,true);
semaphore.acquire();
//do something here
semaphore.release();

下面的代碼就是模擬控制商場廁所的並發使用:

public class ResourceManage {  
    private final Semaphore semaphore ;  
    private boolean resourceArray[];  
    private final ReentrantLock lock;  
    public ResourceManage() {  
        this.resourceArray = new boolean[10];//存放廁所狀態  
        this.semaphore = new Semaphore(10,true);//控制10個共享資源的使用,使用先進先出的公平模式進行共享;公平模式的信號量,先來的先獲得信號量  
        this.lock = new ReentrantLock(true);//公平模式的鎖,先來的先選  
        for(int i=0 ;i<10; i++){  
            resourceArray[i] = true;//初始化為資源可用的情況  
        }  
    }  
    public void useResource(int userId){ 
		semaphore.acquire(); 
        try{  
            //semaphore.acquire();  
            int id = getResourceId();//占到一個坑  
            System.out.print("userId:"+userId+"正在使用資源,資源id:"+id+"\n");  
            Thread.sleep(100);//do something,相當於於使用資源  
            resourceArray[id] = true;//退出這個坑  
        }catch (InterruptedException e){  
            e.printStackTrace();  
        }finally {  
            semaphore.release();//釋放信號量,計數器加1  
        }  
    }  
    private int getResourceId(){  
        int id = -1; 
		lock.lock();
        try {  
            //lock.lock();//雖然使用了鎖控制同步,但由於只是簡單的一個數組遍歷,效率還是很高的,所以基本不影響性能。  
            for(int i=0; i<10; i++){  
                if(resourceArray[i]){  
                    resourceArray[i] = false;  
                    id = i;  
                    break;  
                }  
            }  
        }catch (Exception e){  
            e.printStackTrace();  
        }finally {  
            lock.unlock();  
        }  
        return id;  
    }  
}  
public class ResourceUser implements Runnable{  
    private ResourceManage resourceManage;  
    private int userId;  
    public ResourceUser(ResourceManage resourceManage, int userId) {  
        this.resourceManage = resourceManage;  
        this.userId = userId;  
    }  
    public void run(){  
        System.out.print("userId:"+userId+"准備使用資源...\n");  
        resourceManage.useResource(userId);  
        System.out.print("userId:"+userId+"使用資源完畢...\n");  
    }  
  
    public static void main(String[] args){  
        ResourceManage resourceManage = new ResourceManage();  
        Thread[] threads = new Thread[100];  
        for (int i = 0; i < 100; i++) {  
            Thread thread = new Thread(new ResourceUser(resourceManage,i));//創建多個資源使用者  
            threads[i] = thread;  
        }  
        for(int i = 0; i < 100; i++){  
            Thread thread = threads[i];  
            try {  
                thread.start();//啟動線程  
            }catch (Exception e){  
                e.printStackTrace();  
            }  
        }  
    }  
}

最后,Semaphore 除了控制資源的多個副本的並發訪問控制,也可以使用二進制信號量來實現類似 synchronized 關鍵字和 Lock 鎖的並發訪問控制功能(也就是只設置0和1的情況)

Java中的信號量和Linux中的思路是一樣的

信號量的原理

信號量維護了一個信號量許可集。線程可以通過調用 acquire() 來獲取信號量的許可;當信號量中有可用的許可時,線程能獲取該許可;否則線程必須等待,直到有可用的許可為止。 線程可以通過 release() 來釋放它所持有的信號量許可。

Semaphore 的函數列表

// 創建具有給定的許可數和非公平的公平設置的 Semaphore。
Semaphore(int permits)
// 創建具有給定的許可數和給定的公平設置的 Semaphore。
Semaphore(int permits, boolean fair)

// 從此信號量獲取一個許可,在提供一個許可前一直將線程阻塞,否則線程被中斷。
void acquire()
// 從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被中斷。
void acquire(int permits)
// 從此信號量中獲取許可,在有可用的許可前將其阻塞。
void acquireUninterruptibly()
// 從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信號量中當前可用的許可數。
int availablePermits()
// 獲取並返回立即可用的所有許可。
int drainPermits()
// 返回一個 collection,包含可能等待獲取的線程。
protected Collection<Thread> getQueuedThreads()
// 返回正在等待獲取的線程的估計數目。
int getQueueLength()
// 查詢是否有線程正在等待獲取。
boolean hasQueuedThreads()
// 如果此信號量的公平設置為 true,則返回 true。
boolean isFair()
// 根據指定的縮減量減小可用許可的數目。
protected void reducePermits(int reduction)
// 釋放一個許可,將其返回給信號量。
void release()
// 釋放給定數目的許可,將其返回到信號量。
void release(int permits)
// 返回標識此信號量的字符串,以及信號量的狀態。
String toString()
// 僅在調用時此信號量存在一個可用許可,才從信號量獲取許可。
boolean tryAcquire()
// 僅在調用時此信號量中有給定數目的許可時,才從此信號量中獲取這些許可。
boolean tryAcquire(int permits)
// 如果在給定的等待時間內此信號量有可用的所有許可,並且當前線程未被中斷,則從此信號量獲取給定數目的許可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在給定的等待時間內,此信號量有可用的許可並且當前線程未被中斷,則從此信號量獲取一個許可。
boolean tryAcquire(long timeout, TimeUnit unit)

Semaphore 數據結構

Semaphore 的 UML 類圖如下:

img

從圖中可以看出:
(01) 和”ReentrantLock” 一樣,Semaphore 也包含了 sync 對象,sync 是 Sync 類型;而且Sync 是一個繼承於 AQS 的抽象類

AQS提供了一種實現阻塞鎖和一系列依賴FIFO同步隊列的同步器的框架,如下圖所示

img

信號量的底層是使用AQS的共享模式實現的:state的值就是可以同時共享的資源數,每次線程成功獲取state都減一

(02) Sync 包括兩個子類:” 公平信號量”FairSync 和 “非公平信號量”NonfairSync。sync 是”FairSync 的實例”,或者”NonfairSync 的實例”;默認情況下,sync 是 NonfairSync(即,默認是非公平信號量)。

Semaphore 源碼分析 (基於 JDK1.7.0_40)

Semaphore 是通過共享鎖實現的。根據共享鎖的獲取原則,Semaphore 分為” 公平信號量” 和” 非公平信號量”。

“公平信號量” 和” 非公平信號量” 的區別

“公平信號量” 和” 非公平信號量” 的釋放信號量的機制是一樣的!不同的是它們獲取信號量的機制:線程在嘗試獲取信號量許可時,對於公平信號量而言,如果當前線程不在 CLH 隊列的頭部,則排隊等候;而對於非公平信號量而言,無論當前線程是不是在 CLH 隊列的頭部,它都會直接獲取信號量。該差異具體的體現在,它們的 tryAcquireShared() 函數的實現不同。

“公平信號量” 類

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

“非公平信號量” 類

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

下面,我們逐步的對它們的源碼進行分析。

1. 信號量構造函數

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

從中,我們可以信號量分為 “公平信號量(FairSync)” 和“非公平信號量 (NonfairSync)”。Semaphore(int permits) 函數會默認創建“非公平信號量”。

2. 公平信號量獲取和釋放

2.1 公平信號量的獲取
Semaphore 中的公平信號量是 FairSync。它的獲取 API 如下:

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

信號量中的 acquire() 獲取函數,實際上是調用的 AQS 中的 acquireSharedInterruptibly()。

acquireSharedInterruptibly() 的源碼如下:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 如果線程是中斷狀態,則拋出異常。
    if (Thread.interrupted())
        throw new InterruptedException();
    // 否則,嘗試獲取“共享鎖”;獲取成功則直接返回,獲取失敗,則通過doAcquireSharedInterruptibly()獲取。
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

Semaphore 中”公平鎖 “對應的 tryAcquireShared() 實現如下:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 判斷“當前線程”是不是CLH隊列中的第一個線程線程,
        // 若是的話,則返回-1。
        if (hasQueuedPredecessors())
            return -1;
        // 設置“可以獲得的信號量的許可數”
        int available = getState();
        // 設置“獲得acquires個信號量許可之后,剩余的信號量許可數”
        int remaining = available - acquires;
        // 如果“剩余的信號量許可數>=0”,則設置“可以獲得的信號量許可數”為remaining。
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

說明:tryAcquireShared() 的作用是嘗試獲取 acquires 個信號量許可數。
對於 Semaphore 而言,state 表示的是 “當前可獲得的信號量許可數”。

下面看看 AQS 中 doAcquireSharedInterruptibly() 的實現:

private void doAcquireSharedInterruptibly(long arg)
    throws InterruptedException {
    // 創建”當前線程“的Node節點,且Node中記錄的鎖是”共享鎖“類型;並將該節點添加到CLH隊列末尾。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 獲取上一個節點。
            // 如果上一節點是CLH隊列的表頭,則”嘗試獲取共享鎖“。
            final Node p = node.predecessor();
            if (p == head) {
                long r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 當前線程一直等待,直到獲取到共享鎖。
            // 如果線程在等待過程中被中斷過,則再次中斷該線程(還原之前的中斷狀態)。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

說明:doAcquireSharedInterruptibly()會使當前線程一直等待,直到當前線程獲取到共享鎖 (或被中斷) 才返回。
(01) addWaiter(Node.SHARED)的作用是,創建”當前線程 “的 Node 節點,且 Node 中記錄的鎖的類型是” 共享鎖 “(Node.SHARED);並將該節點添加到 CLH 隊列末尾。關於 Node 和 CLH 在”Java 多線程系列–“JUC 鎖”03 之 公平鎖(一)” 已經詳細介紹過,這里就不再重復說明了。
(02) node.predecessor() 的作用是,獲取上一個節點。如果上一節點是 CLH 隊列的表頭,則” 嘗試獲取共享鎖 “。
(03) shouldParkAfterFailedAcquire() 的作用和它的名稱一樣,如果在嘗試獲取鎖失敗之后,線程應該等待,則返回 true;否則,返回 false。
(04) 當 shouldParkAfterFailedAcquire() 返回 ture 時,則調用 parkAndCheckInterrupt(),當前線程會進入等待狀態,直到獲取到共享鎖才繼續運行。
doAcquireSharedInterruptibly()中的 shouldParkAfterFailedAcquire(), parkAndCheckInterrupt 等函數在”Java 多線程系列–“JUC 鎖”03 之 公平鎖 (一)” 中介紹過,這里也就不再詳細說明了。

2.2 公平信號量的釋放

Semaphore 中公平信號量 (FairSync) 的釋放 API 如下:

public void release() {
    sync.releaseShared(1);
}

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

信號量的 releases() 釋放函數,實際上是調用的 AQS 中的 releaseShared()。

releaseShared() 在 AQS 中實現,源碼如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

說明:releaseShared() 的目的是讓當前線程釋放它所持有的共享鎖。
它首先會通過 tryReleaseShared() 去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過 doReleaseShared() 去釋放共享鎖。

Semaphore 重寫了 tryReleaseShared(),它的源碼如下:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 獲取“可以獲得的信號量的許可數”
        int current = getState();
        // 獲取“釋放releases個信號量許可之后,剩余的信號量許可數”
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 設置“可以獲得的信號量的許可數”為next。
        if (compareAndSetState(current, next))
            return true;
    }
}

如果 tryReleaseShared() 嘗試釋放共享鎖失敗,則會調用 doReleaseShared() 去釋放共享鎖。doReleaseShared() 的源碼如下:

private void doReleaseShared() {
    for (;;) {
        // 獲取CLH隊列的頭節點
        Node h = head;
        // 如果頭節點不為null,並且頭節點不等於tail節點。
        if (h != null && h != tail) {
            // 獲取頭節點對應的線程的狀態
            int ws = h.waitStatus;
            // 如果頭節點對應的線程是SIGNAL狀態,則意味着“頭節點的下一個節點所對應的線程”需要被unpark喚醒。
            if (ws == Node.SIGNAL) {
                // 設置“頭節點對應的線程狀態”為空狀態。失敗的話,則繼續循環。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 喚醒“頭節點的下一個節點所對應的線程”。
                unparkSuccessor(h);
            }
            // 如果頭節點對應的線程是空狀態,則設置“文件點對應的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態。
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果頭節點發生變化,則繼續循環。否則,退出循環。
        if (h == head)                   // loop if head changed
            break;
    }
}

說明:doReleaseShared()會釋放 “共享鎖”。它會從前往后的遍歷 CLH 隊列,依次“喚醒” 然后 “執行” 隊列中每個節點對應的線程;最終的目的是讓這些線程釋放它們所持有的信號量。

3 非公平信號量獲取和釋放

Semaphore 中的非公平信號量是 NonFairSync。在 Semaphore 中,“非公平信號量許可的釋放 (release)” 與“公平信號量許可的釋放 (release)” 是一樣的。
不同的是它們獲取 “信號量許可” 的機制不同,下面是非公平信號量獲取信號量許可的代碼。

非公平信號量的 tryAcquireShared() 實現如下:

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

nonfairTryAcquireShared() 的實現如下:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 設置“可以獲得的信號量的許可數”
        int available = getState();
        // 設置“獲得acquires個信號量許可之后,剩余的信號量許可數”
        int remaining = available - acquires;
        // 如果“剩余的信號量許可數>=0”,則設置“可以獲得的信號量許可數”為remaining。
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

說明:非公平信號量的 tryAcquireShared()調用 AQS 中的 nonfairTryAcquireShared()。而在 nonfairTryAcquireShared()的 for 循環中,它都會直接判斷 “當前剩余的信號量許可數” 是否足夠;足夠的話,則直接“設置可以獲得的信號量許可數”,進而再獲取信號量。
而公平信號量的 tryAcquireShared() 中,在獲取信號量之前會通過 if (hasQueuedPredecessors()) 來判斷 “當前線程是不是在 CLH 隊列的頭部”,是的話,則返回 - 1。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM