java 線程之executors線程池


一、線程池的作用

  平時的業務中,如果要使用多線程,那么我們會在業務開始前創建線程,業務結束后,銷毀線程。但是對於業務來說,線程的創建和銷毀是與業務本身無關的,只關心線程所執行的任務。因此希望把盡可能多的cpu用在執行任務上面,而不是用在與業務無關的線程創建和銷毀上面。而線程池則解決了這個問題。

  線程池的作用:線程池作用就是限制系統中執行線程的數量。根據系統的環境情況,可以自動或手動設置線程數量,達到運行的最佳效果,從而避免平凡的創建和銷毀線程帶來的系統開銷也有效的規避了因為創建的線程過多而耗盡系統資源導致服務器宕機。使用Runtime.getRuntime().availableProcessors();設置線程數量。

二、 java並發包提供的線程池 Executors類

  A、newFixedThreadPool  用來創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待

           ExecutorService fixedThreadPool =Executors.newFixedThreadPool(1);

 public static ExecutorService newFixedThreadPool(int nThreads) {
        //參數詳解 第一個參數核心線程數,線程池在實例化的時候初始化時線程數
        //第二個:該線程池最大線程數
        //第三個:線程空閑時間(0L表示沒有空閑時間即沒有使用就會被回收)
        //第四個:空閑時間單位
        //第五個:LinkedBlockingQueue 無界隊列 ,將沒有線程處理的任務加入該隊列中
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    }

  B、newSingleThreadExecutor 用來創建一個單線程化的線程池,它只用唯一的工作線程來執行任務,一次只支持一個,所有任務按照指定的順序執行

    ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor();

public static ExecutorService newSingleThreadExecutor() {
        //參數詳解 第一個參數核心線程數,線程池在實例化的時候初始化時線程數
        //第二個:該線程池最大線程數
        //第三個:線程空閑時間(0L表示沒有空閑時間即沒有使用就會被回收)
        //第四個:空閑時間單位
        //第五個:LinkedBlockingQueue 無解隊列 ,將沒有線程處理的任務加入該隊列中
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    }

  C、newCachedThreadPool  用來創建一個可緩存線程池,該線程池沒有長度限制,對於新的任務,如果有空閑的線程,則使用空閑的線程執行,如果沒有,則新建一個線程來執行任務。如果線程池長度超過處理需要,可靈活回收空閑線程。

ExecutorService fixedThreadPool = Executors.newCachedThreadPool(); 

 public static ExecutorService newCachedThreadPool() {
        //參數詳解 第一個參數核心線程數,線程池在實例化的時候初始化時線程數
        //第二個:Integer.MAX_VALUE 不限制該線程池的線程數
        //第三個:線程空閑時間(60L 表示線程空閑60秒之后被回收)
        //第四個:空閑時間單位 SECONDS 秒
        //第五個:SynchronousQueue 無容量隊列 ,將任務直接提交給線程處理自身不存儲任務
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }

  D、newScheduledThreadPool 用來創建一個定長線程池,並且支持定時和周期性的執行任務

    ScheduledExecutorService executorsScheduled=Executors.newScheduledThreadPool(2);

public ScheduledThreadPoolExecutor(int corePoolSize) {
        //參數詳解 第一個參數核心線程數,線程池在實例化的時候初始化時線程數
        //第二個:Integer.MAX_VALUE 不限制該線程池的線程數
        //第三個:線程空閑時間(0 表示線程空閑0秒之后被回收)
        //第四個:空閑時間單位 SECONDS 秒
        //第五個:DelayedWorkQueue 延時隊列 
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
    }

使用 newScheduledThreadPool實現定時器

package com.jalja.org.thread.executors;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class NewScheduledThreadPoolTest {
    public static void main(String[] args) {
        Runnable runnable=new ScheduledThread();
        //實現定時器
        ScheduledExecutorService executorsScheduled=Executors.newScheduledThreadPool(2);
        //runnable 需要執行的任務  1:初始化時間(初始化延遲1秒后執行)  3:輪詢時間(每隔3秒執行)
        //TimeUnit.SECONDS:時間單位
        ScheduledFuture<?> scheduledFuture= executorsScheduled.scheduleWithFixedDelay(runnable, 1,3, TimeUnit.SECONDS);
        System.out.println("scheduledFuture:"+scheduledFuture);
    }
} 
class  ScheduledThread implements Runnable{
    public void run() {
        System.out.println(Thread.currentThread().getName() +"=>開始");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() +"=>結束");
    }
}

 線程池中提交任務的兩種方式:

execute()方法:該方法是 ExecutorService 接口的父類(接口)方法,該接口只有這一個方法。

 

public interface Executor {
    void execute(Runnable command);
}

 

submit()方法:該方法是ExecutorService 接口的方法。

public interface ExecutorService extends Executor {
  ...
  <T> Future<T> submit(Callable<T> task);

  <T> Future<T> submit(Runnable task, T result);

  Future<?> submit(Runnable task);
  ...
}

 從上面的源碼以及講解可以總結execute()和submit()方法的區別:

  1. 接收的參數不一樣;

  2. submit()有返回值,而execute()沒有;

 

三、自定義線程池

  在Java線程池中的newCachedThreadPool,newFixedThreadPool,newSingleThreadExecutor,newScheduledThreadPool這四個線程池在底層都是調用了ThreadPoolExecutor()這個構造方法。若Executors這個類無法滿足我們的需求的時候,可以自己創建自定義的線程池。
ThreadPoolExecutor類的定義如下

                public ThreadPoolExecutor(int corePoolSize,//核心線程數--線程池初始化創建的線程數量  
                   int maximumPoolSize,//最大線程數,線程池中能創建的最大線程數  
                   long keepAliveTime,//線程存活時間  
                   TimeUnit unit,//線程存貨時間單位  
                   BlockingQueue<Runnable> workQueue,//一個阻塞隊列  
                   ThreadFactory threadFactory//拒絕策略  
                 ) {……}  

自定義線程池使用有界隊列(ArrayBlockingQueue 、LinkedBlockingQueue )

  若有新的任務需要執行,如果線程池實際線程數小於corePoolSize核心線程數的時候,則優先創建線程。若大於corePoolSize時,則會將多余的線程存放在隊列中,若隊列已滿,且最請求線程小於maximumPoolSize的情況下,則自定義的線程池會創建新的線程,若隊列已滿,且最請求線程大於maximumPoolSize的情況下,則執行拒絕策略,或其他自定義方式。

package com.jalja.org.thread.executors;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExecutorsTest {
    public static void main(String[] args) {
        ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2));
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.execute(new ExecutorsTest().new ThreadTest());
        test.shutdown();
    }
    class ThreadTest implements Runnable{
        public void run() {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}

結果:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.jalja.org.thread.executors.ExecutorsTest$ThreadTest@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.jalja.org.thread.executors.ExecutorsTest.main(ExecutorsTest.java:17)
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2

看結果可知有一個任務是沒有執行直接拋出異常的。隊列已滿,且最請求線程大於maximumPoolSize的情況下,則執行拒絕策略,這里使用的是——AbortPolicy:直接拋出異常,系統正常工作(默認的策略)。

自定義線程池使用無界隊列:

  對於無界隊列除非系統資源耗盡,否則無界隊列不存在任務入隊失敗的情況,若系統的線程數小於corePoolSize時,則新建線程執行corePoolSize,當達到corePoolSize后,則把多余的任務放入隊列中等待執行若任務的創建和處理的速速差異很大,無界隊列會保持快速增長,直到耗盡系統內存為之,對於無界隊列的線程池maximumPoolSize並無真實用處。

四、拒絕策略

JDK提供策略:

 

1.AbortPolicy:直接拋出異常,系統正常工作。(默認的策略)

package com.jalja.org.thread.executors;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class ExecutorsTest {
    public static void main(String[] args) {
        BlockingQueue<Runnable> f=new LinkedBlockingQueue<Runnable>(2);
        ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,f);
        try {
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
        } catch (RejectedExecutionException e) {
            e.printStackTrace();
            System.out.println("超過有界隊列的數據記錄日志");
        }
        test.shutdown();
    }
    class ThreadTest implements Runnable{
        public void run() {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}

2.CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中執行,運行當前被丟棄的任務。
3.DiscardOrderstPolicy:丟棄最老的請求,嘗試再次提交當前任務。
4.丟棄無法處理的任務,不給於任何處理。

自定義策略:需要實現RejectedExecutionHandler接口

package com.jalja.org.thread.executors;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class ExecutorsTest {
    public static void main(String[] args) {
        BlockingQueue<Runnable> f=new LinkedBlockingQueue<Runnable>(2);
        ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,f, new MyRejected());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.shutdown();
    }
    class ThreadTest implements Runnable{
        public void run() {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}
class MyRejected implements RejectedExecutionHandler{
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("執行異常的任務加入日志");
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM