多線程編程(一)-Semaphore(信號量)的使用


  • Semaphore的介紹

    單詞Semaphore的中文含義就是信號、信號系統的意思,此類的主要作用就是限制線程並發的數量。

    舉個例子,一個屋子里有10個人,但只有一個窄門可以出去,這個窄門一次最多只能通過一人,這樣就限制了同時出門的人數,同理也就是限制了線程並發的數量,這也就是Semaphore類要達到的目的。

  • 類Semaphore的同步性

    多線程中的同步就是多個線程排隊去執行任務,一個一個執行,不會有線程安全的問題。

    構造函數參數permits是許可的意思,代表同一時間內,最多允許多少個線程同時執行acquire()和release()之間的代碼。

    無參方法acquire()的作用是默認使用1個許可。

package com.wjg.unit1;

import java.util.concurrent.Semaphore;

public class Service {
    private Semaphore semaphore = new Semaphore(1);
    public void testMethod(){
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+" begin timer="+System.currentTimeMillis());
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName()+" end   timer="+System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            semaphore.release();
        }
        
    }
}

//運行類
package com.wjg.unit1;

public class Run {
    public static void main(String[] args) {
        Service service = new Service();
        Run run = new Run();
        ThreadA a = run.new ThreadA(service);
        a.setName("a");
        a.start();
        
        ThreadB b = run.new ThreadB(service);
        b.setName("b");
        b.start();
        
        ThreadC c = run.new ThreadC(service);
        c.setName("c");
        c.start();
        
    }
    
    public class ThreadA extends Thread {

        private Service service;

        public ThreadA(Service service) {
            super();
            this.service = service;
        }

        @Override
        public void run() {
            service.testMethod();
        }
    }

    public class ThreadB extends Thread {

        private Service service;

        public ThreadB(Service service) {
            super();
            this.service = service;
        }

        @Override
        public void run() {
            service.testMethod();
        }
    }

    public class ThreadC extends Thread {

        private Service service;

        public ThreadC(Service service) {
            super();
            this.service = service;
        }

        @Override
        public void run() {
            service.testMethod();
        }
    }
}


運行結果:
a begin timer=1487748505823
a end   timer=1487748510826
b begin timer=1487748510827
b end   timer=1487748515828
c begin timer=1487748515828
c end   timer=1487748520833
  • 構造函數permits參數作用

    參數permits代表同一時間內,最多允許有x個線程可以執行acquire()和release()之間的代碼。我們將上例的Service改造一下,Run類不變

package com.wjg.unit1;

import java.util.concurrent.Semaphore;

public class Service {
    //我們將premits參數值改為2
    private Semaphore semaphore = new Semaphore(2);
    public void testMethod(){
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+" begin timer="+System.currentTimeMillis());
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName()+" end   timer="+System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            semaphore.release();
        }
        
    }
}

運行結果:
a begin timer=1487749198356
b begin timer=1487749198357
b end   timer=1487749203359
a end   timer=1487749203359
c begin timer=1487749203359
c end   timer=1487749208360
  • 方法acquire(int permits)使用

    方法acquire(int permits)功能就是每調用1次就使用x個許可。

    這個有一點說明一下,acquire方法做的是減法操作,release方法做的是加法操作,構造函數new Semaphore(5)中的5並不是最終的許可數量,僅僅是初始化的狀態值,是可以動態改變的。

  • 方法acquireUninterruptibly()使用

    此方法作用是使等待進入acquire()方法的線程,不允許被中斷。我們先看一下被中斷的例子

    

package com.wjg.unit1;

import java.util.concurrent.Semaphore;

public class Service {
    private Semaphore semaphore = new Semaphore(1);
    
    public void testMethod(){
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+" begin timer="+System.currentTimeMillis());
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                String newString = new String();
                Math.random();
            }
            System.out.println(Thread.currentThread().getName()+" end   timer="+System.currentTimeMillis());
        } catch (InterruptedException e) {
            System.out.println("線程"+Thread.currentThread().getName()+"進入了catch");
            e.printStackTrace();
        }finally {
            semaphore.release();
        }
    }
}




package com.wjg.unit1;

public class Run {
    public static void main(String[] args) throws InterruptedException {
        Service service = new Service();
        Run run  = new Run();
        ThreadA a = run.new ThreadA(service);
        a.setName("a");
        a.start();
        
        ThreadB b = run.new ThreadB(service);
        b.setName("b");
        b.start();
        
        Thread.sleep(1000);
        //中斷b線程
        b.interrupt();
        System.out.println("main  中斷了b");
    }
    
    public class ThreadA extends Thread{
        private Service service;

        public ThreadA(Service service) {
            super();
            this.service = service;
        }

        @Override
        public void run() {
            service.testMethod();
        }
        
    }
    
    public class ThreadB extends Thread{
        private Service service;

        public ThreadB(Service service) {
            super();
            this.service = service;
        }

        @Override
        public void run() {
            service.testMethod();
        }
        
    }
}


執行結果:
a begin timer=1487751504264
main  中斷了b
java.lang.InterruptedException
線程b進入了catch
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:996)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
    at java.util.concurrent.Semaphore.acquire(Semaphore.java:317)
    at com.wjg.unit1_1_4.Service.testMethod(Service.java:10)
    at com.wjg.unit1_1_4.Run$ThreadB.run(Run.java:46)
a end   timer=1487751505711

    以上的例子就是把等待獲取許可的線程B手動結束了,下面的例子是利用acquireUninterruptibly()方法阻止等待獲取許可的線程中斷。

package com.wjg.unit1_1_4;

import java.util.concurrent.Semaphore;

public class Service {
    private Semaphore semaphore = new Semaphore(1);
    
    public void testMethod(){
        try {
       //此處阻止等待獲取許可的線程被中斷 semaphore.acquireUninterruptibly(); System.out.println(Thread.currentThread().getName()
+" begin timer="+System.currentTimeMillis()); for (int i = 0; i < Integer.MAX_VALUE/50; i++) { String newString = new String(); Math.random(); } System.out.println(Thread.currentThread().getName()+" end timer="+System.currentTimeMillis()); } finally { semaphore.release(); } } }


執行結果
a begin timer=1487751777062
main  中斷了b
a end   timer=1487751778455
b begin timer=1487751778455
b end   timer=148775177963
  • 方法availablePermits()和drainPermits()使用

    availablePermits()   返回Semaphore對象中當前可用的許可數

    drainPermits()      獲取並返回立即可用的許可個數,並且將可用許可置0

  • 方法getQueueLength()和hasQueuedThreads()使用

    getQueueLength()   返回等待可續的線程個數

    hasQueuedThreads()   判斷是否還有等待許可的線程

    這兩個方法通常都是在判斷當前有沒有等待許可的線程信息時使用。

  • 公平與非公平信號量

    公平信號量是指獲得鎖的順序與線程啟動的順序有關,非公平信息量就是無關的了。

    非公平信號量線程啟動的順序與調用semaphore.acquire()的順序無關,也就是線程先啟動了並不代表先獲得許可。

    公平與不公平通過Semaphore類的構造函數new Semaphore(int permits,boolean fair)的第二個參數fair決定。

  • 方法tryAcquire()作用

    無參的tryAcquire()作用就是嘗試地獲得一個許可,如果獲取不到則返回false,此方法通常與if語句結合使用,具有不阻塞的特點。

  • 方法tryAcquire(int permits)使用

     作用是嘗試獲取x個許可,如果湖區不到則返回false。

  • 方法tryAcquire(long timeout,TimeUnit unit)使用

     作用是在指定的時間內嘗試地獲取1個許可,如果獲取不到則返回false,此處會有阻塞。

  • 方法tryAcquire(int permits,long timeout,TimeUnit unit)使用

    作用是在指定的時間內嘗試地獲取x個許可,如果獲取不到則返回false,此處會有阻塞。    

 

注:本系列筆記均參考自《Java並發編程 核心方法與框架》此書。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM