javade多任務處理之Executors框架(線程池)實現的內置幾種方式與兩種基本自定義方式


一 Executors框架(線程池)

主要是解決開發人員進行線程的有效控制,原理可以看jdk源碼,主要是由java.uitl.concurrent.ThreadPoolExecutor類實現的,這里只列出簡單用法

根據Executors可以創建不同功能的線程池,主要有四種:

1 newFixedThreadPool : 返回一個固定數量的線程池,並且池中數量一致保持不變,有任務時如果有空閑線程則立即執行,沒有就暫時存放到隊列等待空閑線程

//創建一個有10個線程的線程池,任務多於10個會一直等待,直到10個線程中有空閑線程為止
//ExecutorService pool = Executors.newFixedThreadPool(10);
//開啟線程
pool.execute(new Thread(()->{System.out.println(Thread.currentThread().getName()+"執行中.....");}));
pool.execute(new Thread(()->{System.out.println(Thread.currentThread().getName()+"執行中.....");}));
pool.execute(new Thread(()->{System.out.println(Thread.currentThread().getName()+"執行中.....");}));

 在ThreadPoolExecutor類中有幾個非常重要的方法:

    execute()   執行線程

  submit()  執行線程,並返回執行的結果

  shutdown()  關閉線程,如果有線程正在執行,則會等線程執行完,即所有線程空閑之后再關閉

  shutdownNow()  關閉線程,立刻關閉

看源碼可知,底層任務使用了阻塞隊列,任務大於線程數量后會進入阻塞隊列等待執行

2 newSingleThreadExecutor

		ExecutorService poolSingle = Executors.newSingleThreadExecutor();
		poolSingle.execute(new Thread(()->{
			System.out.println("run1");
			while(true){
			}
		}));				
		//下面永不會執行
		poolSingle.execute(new Thread(()->{System.out.println("run2");}));

  創建只有一個線程的線程池,屬於上面固定數量的一個特例,個人感覺這個單例線程用處不是很大

3 newCachedThreadPool

		不指定數量.如果任務多,且沒有空閑線程則會一直創建線程,並且空閑線程會在60秒后回收
		ExecutorService cachePool = Executors.newCachedThreadPool();
		cachePool.execute(new Thread(()->{
			System.out.println("run1");
			while(true){
			}
		}));
		cachePool.execute(new Thread(()->{
			System.out.println("run2");
			while(true){
			}
		}));

  底層實現:

由於使用了無緩沖隊列,任務隊列使用無緩沖隊列,任務不會被存儲,直接去交給線程執行.且空閑線程60秒之后會被回收,除非60秒之內去執行新的任務

 

4 newScheduledThreadPool

class RunTest implements Runnable{
	@Override
	public void run() {
		System.out.println("run");
	}
}
public class ScheculedTest {
	public static void main(String[] args) {
		ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
		//可實現任務定時,但實際業務中一般都用spring的定時器,這個技術比較老了
		//pool.schedule(new RunTest(), 2, TimeUnit.SECONDS);//2秒后執行此任務
		//可實現任務的輪詢,指定時間間隔循環執行此任務     任務    延遲時間(即2秒后開始執行)   間隔多少秒之后再執行(一直循環)
		pool.scheduleAtFixedRate(new RunTest(), 2, 2, TimeUnit.SECONDS);
		pool.scheduleWithFixedDelay(new RunTest(), 2, 2, TimeUnit.SECONDS);
	}
}

  不用想也知道,底層采用延遲隊列裝任務

 

二 自定義線程池實現的

再來看自定義的,實際業務中可能用到更多的就是自定義了,

自定義需要實現ThreadPoolExecutor這個類,主要有兩種,分別是使用有界隊列裝任務和無界隊列:

1 使用有界隊列:

public class MyCustom01 {

	public static void main(String[] args) {
		//有界隊列自定義線程池
		/**在使用有界隊列時,如果有任務需要執行,如果線程池實際線程數小於corePoolSize,則優先創建線程,
		 * 若大於corePoolSize,則會將任務加入隊列,
		 * 若隊列已滿,則在總線程數不大於maximumPoolSize的前提下,創建新的線程,
		 * 若線程數大於maximumPoolSize,則執行拒絕策略。或其他自定義方式。
		 */
		ThreadPoolExecutor pool = new ThreadPoolExecutor(
				1,   //核心線程數
				2,   //最大線程數
				60,   //空閑時回收時間
				TimeUnit.SECONDS, 
				new ArrayBlockingQueue<Runnable>(3),//指定隊列//指定隊列
				//DiscardOldestPolicy :丟棄最老的請求,即當6進來時,將隊列中最早的2丟棄,執行6
				//new ThreadPoolExecutor.DiscardOldestPolicy()//指定拒絕策略.不指定默認拋異常
				//自定義拒絕策略
				new RejectedExecutionHandler(){
					@Override        //r:被拒絕任務     executor 當前線程池
					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
						//實例工作中默認策略都不可取,一般是把拒絕的任務記錄日志,然后空閑時間解析再重新執行
						//或者直接返回客戶端,讓他稍后再提交此任務
						System.out.println("自定義處理..");
						System.out.println("當前被拒絕任務為:" + r.toString());
						System.out.println(executor.getPoolSize());
					}}
		); 
		MyTask mt1 = new MyTask(1, "任務1");
		MyTask mt2 = new MyTask(2, "任務2");
		MyTask mt3 = new MyTask(3, "任務3");
		MyTask mt4 = new MyTask(4, "任務4");
		MyTask mt5 = new MyTask(5, "任務5");
		MyTask mt6 = new MyTask(6, "任務6");
		pool.execute(mt1);
		pool.execute(mt2);//如果有兩個任務,則會將第二個任務入隊,等待核心線程線程空閑
		pool.execute(mt3);//同上
		pool.execute(mt4);//同上,知道隊列滿 為止
		pool.execute(mt5);//當有5個任務時,隊列已滿,則會創建第二個線程
		pool.execute(mt6); //有6個時,此時隊列已滿,且當前線程達到最大線程數,所以無法執行,執行拒絕策略
		//關閉,等到所有線程空閑時才會關閉
		pool.shutdown();
	}
}

  

2 使用無界隊列:

public class MyCustom02 implements Runnable{
	//並發數字運算
	private static AtomicInteger count = new AtomicInteger(0);
	@Override
	public void run() {
		try {
			int temp = count.incrementAndGet();//count累加,區分任務
			System.out.println("任務" + temp);
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}	
	}
	public static void main(String[] args) throws Exception{
		//無界隊列
		BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
		ExecutorService executor  = new ThreadPoolExecutor(
					5, 		
					10,//無界時,任務多會一直添加到隊列中,不會創建新的線程,線程數會一直是核心數量
					120L, 	
					TimeUnit.SECONDS,
					queue //指定無界隊列
		);
		for(int i = 0 ; i < 20; i++){
			executor.execute(new MyCustom02()); //會5個一起執行
		}
		Thread.sleep(1000);
		System.out.println("queue size:" + queue.size());
		Thread.sleep(2000);
	}
}

  然后拒絕策略個人認為也是比較常用的,實際業務中,因為默認的策略都不太友好


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM