Java並發編程之java.util.concurrent包下常見類的使用


一,Condition

一個場景,兩個線程數數,同時啟動兩個線程,線程A數1、2、3,然后線程B數4、5、6,最后線程A數7、8、9,程序結束,這涉及到線程之間的通信。

public class ConditionTest {
    static class NumberWrapper {
        public int value = 1;
    }

    public static void main(String[] args) {
        //初始化可重入鎖
        final Lock lock = new ReentrantLock();
        
        //第一個條件當屏幕上輸出到3 
        final Condition reachThreeCondition = lock.newCondition();
        //第二個條件當屏幕上輸出到6
        final Condition reachSixCondition = lock.newCondition();
        
        //NumberWrapper只是為了封裝一個數字,一邊可以將數字對象共享,並可以設置為final
        //注意這里不要用Integer, Integer 是不可變對象
        final NumberWrapper num = new NumberWrapper();
        //初始化A線程
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                //需要先獲得鎖
                lock.lock();
                System.out.println("ThreadA獲得lock");
                try {
                    System.out.println("threadA start write");
                    //A線程先輸出前3個數
                    while (num.value <= 3) {
                        System.out.println(num.value);
                        num.value++;
                    }
                    //輸出到3時要signal,告訴B線程可以開始了
                    reachThreeCondition.signal();
                } finally {
                    lock.unlock();
                    System.out.println("ThreadA釋放lock");
                }
                lock.lock();
                try {
                    //等待輸出6的條件
                    System.out.println("ThreadA獲得lock");
                    reachSixCondition.await();
                    System.out.println("threadA start write");
                    //輸出剩余數字
                    while (num.value <= 9) {
                        System.out.println(num.value);
                        num.value++;
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                    System.out.println("ThreadA釋放lock");
                }
            }

        });
        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lock();
                    System.out.println("ThreadB獲得lock");
                    Thread.sleep(5000);//是await方法釋放了鎖
                    while (num.value <= 3) {
                        //等待3輸出完畢的信號
                        reachThreeCondition.await();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                    System.out.println("ThreadB釋放lock");
                }
                try {
                    lock.lock();
                    System.out.println("ThreadB獲得lock");
                    //已經收到信號,開始輸出4,5,6
                    System.out.println("threadB start write");
                    while (num.value <= 6) {
                        System.out.println(num.value);
                        num.value++;
                    }
                    //4,5,6輸出完畢,告訴A線程6輸出完了
                    reachSixCondition.signal();
                } finally {
                    lock.unlock();
                    System.out.println("ThreadB釋放lock");
                }
            }
        });
        //啟動兩個線程
        threadB.start();
        threadA.start();
    }
}
View Code

創建方式:通過Lock創建,Lock.newCondition();

常用方法:

await():阻塞,直到相同的Condition調用了signal方法。
signal():通知。

總結:Condition必須與Lock一起使用(wait()、notify()必須與synchronized一起使用,否則運行會報錯java.lang.IllegalMonitorStateException),相比於wait與notify更加的靈活,可以設置各種情形,如上例中的到達3和到達6兩個條件。

執行結果:

 

二,CountDownLatch

看代碼:

public class CountDownLatchTest {
    public static void main(String[] args) {
        final CountDownLatch c = new CountDownLatch(3);//總數3
        Thread t1 = new Thread(new Runnable(){
            @Override
            public void run() {
                try {
                    System.out.println("開始等");
                    c.await();//阻塞,等待countDown,當countDown到0就執行后面的完事了
                    System.out.println("完事");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
        });
        Thread t2 = new Thread(new Runnable(){
            @Override
            public void run() {
                for(int i=3;i>0;i--){
                    c.countDown();//減1
                }
            }
            
        });
        t1.start();
        t2.start();
    }
}
View Code

創建方式:直接創建,new CountDownLatch(int num);

常用方法:

await():阻塞,直到countDown方法被執行了num次。
countDown():減

總結:適用於一個線程等待其他線程的情景。

執行結果:

三,CyclicBarrier

與CountDownLatch有什么區別?

CyclicBarrier強調的是n個線程,大家相互等待,只要有一個沒完成,所有人都得等着。正如上例,只有5個人全部跑到終點,大家才能開喝,否則只能全等着。

CountDownLatch強調一個線程等多個線程完成某件事情。CyclicBarrier是多個線程互等,等大家都完成。

另外:

1.CountDownLatch減計數,CyclicBarrier加計數。 
2.CountDownLatch是一次性的,CyclicBarrier可以重用。 

public class MainMission {
    private CyclicBarrier barrier;
    private final static int threadCounts = 5;
    public void runMission() {
        ExecutorService exec=Executors.newFixedThreadPool(threadCounts);
        //new 的時候要傳入數字,我發現,這個類似semaphore,如果位置不足,線程會搶位置。數字要是threadCounts+1為主線程留一個位子,但實際測試中發現,只要等於threadCount就可以
         barrier=new CyclicBarrier(threadCounts+1); 
        for(int i=0;i<5;i++){
            exec.execute(new Mission(barrier));
        }
        try {
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("所有任務都執行完了");
        exec.shutdown();//如果不關閉,程序一直處於運行狀態
    }
    public static void main(String[] args) {
        MainMission m = new MainMission();
        m.runMission();
    }
}
class Mission implements Runnable{
    private CyclicBarrier barrier;
    public Mission(CyclicBarrier barrier){
        this.barrier = barrier;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+"開始執行任務");
        try {
            int sleepSecond = new Random().nextInt(10)*1000;
            System.out.println(Thread.currentThread().getName()+"要執行"+sleepSecond+"秒任務");
            Thread.sleep(sleepSecond);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+"執行完畢");
    }
}
View Code

創建方式:直接創建,new CyclicBarrier(int num);

常用方法:

await():阻塞,直到阻塞的線程數量達到num個。

總結:想想一下百米跑,所有運動員都就位之后才會發令起跑,線程調用await意味着說,我准備好了。

執行結果:

,Semaphore

下面是一個上廁所的例子,廁所位置有限,想用得排隊了。實現使用的就是信號量,可以看出信號量可以用來做限流。

public class MySemaphore implements Runnable{
    Semaphore position;    
    private int id;
    public MySemaphore(int i,Semaphore s){
        this.id=i;
        this.position=s;
    }

    @Override
    public void run() {
        try{
              if(position.availablePermits()>0){
               System.out.println("顧客["+this.id+"]進入廁所,有空位");
              }
              else{
               System.out.println("顧客["+this.id+"]進入廁所,沒空位,排隊");
              }
              position.acquire();//只有在acquire之后才能真正的獲得了position
              System.out.println("#########顧客["+this.id+"]獲得坑位");
              Thread.sleep((int)(Math.random()*100000));
              System.out.println("@@@@@@@@@顧客["+this.id+"]使用完畢");
              position.release();
         }catch(Exception e){
             e.printStackTrace();
         }
    }
    
    public static void main(String args[]){
        ExecutorService list=Executors.newCachedThreadPool();
        Semaphore position=new Semaphore(2);
        for(int i=0;i<10;i++){
         list.submit(new MySemaphore(i+1,position));
        }
        list.shutdown();
        position.acquireUninterruptibly(2);
        System.out.println("使用完畢,需要清掃了");
        position.release(2);
    }


}
View Code

創建方式:直接創建,new Semaphore(int num);

常用方法:

availablePermits():看現在可用的信號量。

acquire():嘗試獲取一個位置,如果獲取不到則阻塞。

release():釋放位置。

acquireUninterruptibly(int num):嘗試獲取num個許可,如果沒有足夠的許可則阻塞,一直阻塞到有足夠的許可釋放出來。調用這個方法的線程具有優先獲取許可的權利。如果調用線程被interrupted,該線程並不會被打斷,它會繼續阻塞等待許可。

總結:搶位置。

執行結果:

,ReentrantLock

創建方式:

new ReentrantLock(); 此種創建方式會創建出一個非公平鎖。

new ReentrantLock(true); 此種方式會創建出一個公平鎖。

非公平鎖:當鎖處於無線程占有的狀態,此時其他線程和在隊列中等待的線程都可以搶占該鎖。 
公平鎖:當鎖處於無線程占有的狀態,在其他線程搶占該鎖的時候,都需要先進入隊列中等待。

tryLock()方法:嘗試去獲取鎖,如果沒有獲取到直接返回,不等待。

細節看這個吧,https://blog.csdn.net/jiangjiajian2008/article/details/52226189,寫的挺好。

六,ReentrantReadWriteLock

創建方式:new ReentrantReadWriteLock();

常用方法:

readLock().lock();寫鎖

writeLock().lock();讀鎖

readLock().unlock();解鎖

writeLock().unlock();解鎖

總結:

 * 如果目前是讀鎖,其他讀鎖也可以進請求,寫鎖不能進。
 * 如果目前是寫鎖,那么其他所有的鎖都不可以進。

 * 適用於讀多寫少的情況,如果是寫多讀少用ReentrantLock。

七,Callable接口

*Callable接口支持返回執行結果,此時需要調用FutureTask.get()方法實現,此方法會阻塞主線程直到獲取結果;當不調用此方法時,主線程不會阻塞!

與Runnable對比:

1.Callable可以有返回值,Runnable沒有

2.Callable接口的call()方法允許拋出異常;而Runnable接口的run()方法的異常只能在內部消化,不能繼續上拋;

八,線程池

提供的線程池有幾種:

//有數量限制的線程池
ExecutorService service=Executors.newFixedThreadPool(4);
//沒有數量限制的線程池
ExecutorService service=Executors.newCachedThreadPool();
//單線程池
ExecutorService service=Executors.newSingleThreadExecutor();
他們都是通過下面這個線程池實現的
有數量線程池的實現方式

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads/*核心線程數*/, nThreads/*最高線程數*/,
                                      0L/*高出核心線程數的線程最高存活時間*/, TimeUnit.MILLISECONDS/*高出核心線程數的線程最高存活時間單位*/,
                                      new LinkedBlockingQueue<Runnable>()/*任務隊列*/);

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM