高級java必會系列一:常用線程池和調度類


眾所周知,開啟線程2種方法:第一是實現Runable接口,第二繼承Thread類。(當然內部類也算...)常用的,這里就不再贅述。

一、線程池

1.newCachedThreadPool

       (1)緩存型池子,先查看池中有沒有以前建立的線程,如果有,就reuse,如果沒有,就建立一個新的線程加入池中;

        (2)緩存型池子,通常用於執行一些生存周期很短的異步型任務;因此一些面向連接的daemon型server中用得不多;

        (3)能reuse的線程,必須是timeout IDLE內的池中線程,缺省timeout是60s,超過這個IDLE時長,線程實例將被終止及移出池。

        (4)注意,放入CachedThreadPool的線程不必擔心其結束,超過TIMEOUT不活動,其會自動被終止

2.newFixedThreadPool--本人常用

        (1)newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的線程

        (2)其獨特之處:任意時間點,最多只能有固定數目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子

        (3)和cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),所以FixedThreadPool多數針對一些很穩定很固定的正規並發線程,多用於服務器

        (4)從方法的源代碼看,cache池和fixed 池調用的是同一個底層池,只不過參數不同:
      fixed池線程數固定,並且是0秒IDLE(無IDLE)
      cache池線程數支持0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE 

3.ScheduledThreadPool

        (1)調度型線程池

        (2)這個池子里的線程可以按schedule依次delay執行,或周期執行

4.SingleThreadExecutor

        (1)單例線程,任意時間池中只能有一個線程

        (2)用的是和cache池和fixed池相同的底層池,但線程數目是1-1,0秒IDLE(無IDLE)

二、常用線程調度類

1.wait、notify、notifyAll-----不建議新手直接使用

顧名思義,wait是等待,notify是通知一個等待線程、notifyAll喚醒所有等待線程

2.CountDownLatch----很適合用來將一個任務分為n個獨立的部分,等這些部分都完成后繼續接下來的任務

隸屬於java.util.concurrent包。CountDownLatch類是一個同步計數器,構造時傳入int參數,該參數就是計數器的初始值,每調用一次countDown()方法,計數器減1,計數器大於0 時,await()方法會阻塞程序繼續執行.當多個線程達到預期時(latch.countDown()),喚醒多個其他等待中的線程,即執行latch.await()后面的代碼。樣例是,張三、李四合作完成任務,張三5秒,李四8秒,當張三李四都完成后,總任務結束。代碼如下:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {  
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    
    public static void main(String[] args) throws InterruptedException {  
        CountDownLatch latch=new CountDownLatch(2);//兩個工人的協作  
        Worker worker1=new Worker("張三", 5000, latch);  
        Worker worker2=new Worker("李四", 8000, latch);  
        worker1.start(); 
        worker2.start();  
        latch.await();//阻塞!等待所有工人完成工作  
        System.out.println("all work done at "+sdf.format(new Date()));  
    }  
      
    static class Worker extends Thread{  
        String workerName;   
        int workTime;  
        CountDownLatch latch;  
        public Worker(String workerName ,int workTime ,CountDownLatch latch){  
             this.workerName=workerName;  
             this.workTime=workTime;  
             this.latch=latch;  
        } 
        
        public void run(){  
            System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));  
            doWork();//工作了  
            System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));  
            latch.countDown();//工人完成工作,計數器減一  

        }  
          
        private void doWork(){  
            try {  
                Thread.sleep(workTime);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }

-----------------------------------------------
Worker 李四 do work begin at 2016-11-02 18:25:28
Worker 張三 do work begin at 2016-11-02 18:25:28
Worker 張三 do work complete at 2016-11-02 18:25:33
Worker 李四 do work complete at 2016-11-02 18:25:36
all work done at 2016-11-02 18:25:36

測試可見,張三李四共同協作完成。

3.CyclicBarrier----適合多線程循環到達屏障后再執行

 字面意思循環屏障,可理解為柵欄,協同多個線程都執行到barrier.await時,如果構造CyclicBarrier barrier=new CyclicBarrier(2, Runnable)時,第一個參數代碼線程數,如果有第二參Runnable,那么所有線程都await時,先執行Runnable,再各自執行await后續的代碼。

CyclicBarrier和CountDownLatch區別

1.CountDownLatch在多個線程都執行完畢latch.countDown后喚醒await線程,多個countDown子線程在執行完countDown后可繼續執行后續代碼。

2.CyclicBarrier可循環使用,CountDownLatch只1次。見代碼示例:

3.CountDownLatch需要latch.countDownlatch.await()配合使用。CyclicBarrier就一個barrier.await。

下面舉例:鳥、魚2個線程同時運行問題。

 

  1 package study.thread;
  2 
  3 import java.util.concurrent.BrokenBarrierException;  
  4 import java.util.concurrent.CyclicBarrier;  
  5   
  6 /** 
  7  * 循環柵欄(屏障)
  8  * 問題:一個池塘,有很多鳥和很多魚,鳥每分鍾產生一個后代,魚每30秒鍾產生2個后代。
  9  * 鳥每10秒鍾要吃掉一條魚。建一個池塘,初始化一些魚和鳥,看看什么時候鳥把魚吃光。 
 10  * 
 11  */  
 12 public class CyclicBarrierDemo {  
 13   
 14     long time ;  
 15     long birdNum ;  
 16     long fishNum ;  
 17     Object lock = new Object() ;  
 18     CyclicBarrier barrier  ;  
 19       
 20     public CyclicBarrierDemo(long birdNum , long fishNum){  
 21         this.birdNum = birdNum ;  
 22         this.fishNum = fishNum ;  
 23     }  
 24   
 25     /**
 26      * 入口
 27      * @param args
 28      */
 29     public static void main(String[] args) { 
 30         //構造demo,初始化5只秒,20條魚
 31         CyclicBarrierDemo bf = new CyclicBarrierDemo(5 , 20) ;  
 32         //生態圈開啟
 33         bf.start();   
 34     }  
 35   
 36     //生態圈開啟
 37     public void start(){  
 38         //構造魚,鳥,時間線
 39         FishThread fish = new FishThread() ;  
 40         BirdThread bird = new BirdThread() ;  
 41         TimeLine tl = new TimeLine() ;  
 42   
 43         //初始化環形屏障,當barrier對象的await方法被調用兩次之后,將會執行tl線程  
 44         barrier = new CyclicBarrier(2, tl) ;//這里要注意第一個參數,如果大於調用await的線程數,會死鎖。  
 45   
 46         //魚、鳥動起來
 47         fish.start();  
 48         bird.start();  
 49   
 50     }  
 51   
 52     public void printInfo(String source){  
 53         System.out.printf(source+"time[%d]:birdNum[%d] ,fishNum[%d]\n" ,time , birdNum , fishNum);  
 54     }  
 55   
 56     private class TimeLine implements Runnable {  
 57         @Override  
 58         public void run() { //所有子任務都調用了await方法后,將會執行該方法, 然后所有子線程繼續執行  
 59             System.out.println("TimeLine start!");
 60             //如果魚數量<=0,結束程序
 61             if(fishNum <= 0){  
 62                 System.exit(-1);     
 63             }
 64             //時間加10秒
 65             time += 10 ; 
 66             System.out.println("TimeLine end,時間加10秒!");
 67         }  
 68     }  
 69   
 70     private class FishThread extends Thread {  
 71         @Override  
 72         public void run() {  
 73             //循環
 74             while(true){  
 75                 try { 
 76                     System.out.println("魚已經就位!到達await!");
 77                     barrier.await() ;   //進入睡眠, 等待所有子任務都進入睡眠  然后再繼續  
 78                 } catch (InterruptedException | BrokenBarrierException e) {  
 79                     e.printStackTrace();  
 80                 }  
 81                 synchronized (lock) {
 82                     //魚每30秒鍾產生2個后代
 83                     if(time % 30 == 0){
 84                         fishNum += fishNum * 2;  
 85                         printInfo("魚動作執行!");
 86                     }  
 87                 }  
 88             }  
 89         }  
 90     }  
 91   
 92     private class BirdThread extends Thread{  
 93         @Override  
 94         public void run() {
 95             //循環
 96             while(true){  
 97                 try {  
 98                     System.out.println("鳥已經就位!到達await!");
 99                     barrier.await() ;  //進入睡眠, 等待所有子任務都進入睡眠  然后再繼續  
100                 } catch (InterruptedException | BrokenBarrierException e) {  
101                     e.printStackTrace();  
102                 }    
103                 synchronized (lock) {
104                     //鳥每10秒鍾要吃掉一條魚
105                     if(time % 10 == 0){  
106                         fishNum = fishNum >= birdNum ? fishNum - birdNum : 0 ;    
107                         //鳥每分鍾產生一個后代
108                         if(time % 60 == 0){  
109                             birdNum += birdNum ;  
110                         }  
111                         printInfo("鳥動作執行!");  
112                     }  
113                 }  
114   
115             }  
116   
117         }  
118   
119     }  
120   
121 }  

 

4.Semaphore---通過控制操作系統的信號量數目來控制並發,比控制線程並發數粒度更細。

管理固定數值的信號量,用以控制並發的數量。把需要並發的代碼放在acquirerelease之間即可。acquire獲取信號,release釋放信號。如果Semaphore管理一個信號量,就是互斥鎖。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {

     public static void main(String[] args) {  
        // 線程池 
        ExecutorService exec = Executors.newCachedThreadPool();  
        // 只能5個線程同時訪問 
        final Semaphore semp = new Semaphore(5);  
        // 模擬20個客戶端訪問 
        for (int index = 0; index < 20; index++) {
            final int NO = index;  
            Runnable run = new Runnable() {  
                public void run() {  
                    try {  
                        //獲取許可 
 semp.acquire();  
                        System.out.println("Accessing: " + NO);  
                        Thread.sleep(2000);  
                    } catch (InterruptedException e) { 
                        e.printStackTrace();
                    } finally{
                        //釋放 
 semp.release();
              System.out.println("-----------------"+semp.availablePermits());
} } }; exec.execute(run); }
// 退出線程池 exec.shutdown(); } }

5.Exchanger

用於兩個線程之間進行數據交換,先執行exchanger.exchange()的線程等待后來的線程到達,然后交換數據,最后再繼續向下執行。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * 
 * @ClassName: ExchangerDemo
 * @Description: 用於兩個線程之間進行數據交換,先執行exchanger.exchange()的線程等待后來的線程到達,然后交換數據,最后再繼續向下執行。
 * @author denny.zhang
 * @date 2016年11月4日 下午1:27:29
 *
 */
public class ExchangerDemo {
    public static void main(String[] args) {
        final Exchanger<List<Integer>> exchanger = new Exchanger<List<Integer>>();
        
        new Thread(){
            public void run(){
                List<Integer> list = new ArrayList<Integer>();
                list.add(1);
                list.add(2);
                try {
                    list = exchanger.exchange(list);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread1"+list);
            }
        }.start();
        
        new Thread(){
            public void run(){
                List<Integer> list = new ArrayList<Integer>();
                list.add(3);
                list.add(4);
                try {
                    list = exchanger.exchange(list);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Thread2"+list);
            }
        }.start();
    }
}

6.Future和FutrueTask---常用!

Future是接口,FutrueTask是接口實現類。場景:多線程並發執行,返回結果放進list.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 
 * @ClassName: FutureDemo
 * @Description: Future
 * @author denny.zhang
 * @date 2016年11月4日 下午1:50:32
 *
 */
public class FutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //結果集
        List<Integer> list = new ArrayList<Integer>();
        //開啟多線程
        ExecutorService exs = Executors.newFixedThreadPool(3);
        List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
        //啟動線程池,固定線程數為3
        for(int i=0;i<3;i++){
            //提交任務,添加返回
            futureList.add(exs.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return 1; } }));
        }
        //結果歸集
        for (Future<Integer> future : futureList) {
            while (true) {
                if (future.isDone()&& !future.isCancelled()) {
                    Integer i = future.get();
                    list.add(i);
                    break;
                } else {
                    Thread.sleep(100);
                }
            }
        }
        System.out.println("list="+list);
    }
}

返回:list=[1, 1, 1]

 ====================================

參考:

《大型網站系統與java中間件實踐》

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM