線程池
一 , java.util.concurrent
1,首先,為什么要用線程池包?
1,用線程池包和數據庫連接池一樣,為了節省線程的創建和關閉時間
2,擴充了返回類型,實現runable只能通過共享數據和主線程通訊,通過callable 可以接受返回類型,並可以拋出異常在主線程捕獲
3,擴充了些工具類
4,atomic支持計數
線程池最常用代碼應用方式,
1,實現Callable
2. 創建線程池
3. 執行並接收future參數
4. 關閉線程池,停止接收新的線程task
代碼如下
package org.benson.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * * @author qq277803242 * */ public class Test4ConcurrentPool implements Callable<String> { public static int result = 0; public final static int LOOP_COUNT = 500000; @Override public String call() throws Exception { System.out.println("run.."); for (int i = 0; i < LOOP_COUNT; i++) result++; if(result>4500000) throw new Exception("my exception"); return "result is " + result; } public static void main(String[] args) throws ExecutionException { ExecutorService execu = Executors.newFixedThreadPool(10); Future<String> future = null; for (int i = 0; i < 10; i++) { future = execu.submit(new Test4ConcurrentPool()); try { System.out.println(future.get()); } catch (Exception e) { System.out.println("here can catch Exception,the exception is "+e.getMessage()); execu.shutdownNow(); } } execu.shutdown();//reduce accept new task // execu.shutdownNow();//reduce accept new task and try to stop exit task } }
看代碼這段可看到是調用了線程池創建了線程
// ExecutorService execu = Executors.newFixedThreadPool(10); ExecutorService execu = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
線程池的原理,工作流程如下
1,檢查基本線程是否已滿,如果未滿創建新的線程
2,檢查線程隊列是否已滿,如果未滿創建新的等待線程填入隊列
3,檢查最大線程數是否已滿,如果滿了執行飽和線程策略
如此再返回看線程池的參數,很明確
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
按參數順序解釋如下:
corePoolSize 在線程池中的基本大小數
maximumPoolSize 線程池所允許的最大線程數目(包括隊列中的線程數和基本線程中的)
keepAliveTime 線程空閑后,線程在線程池的保存時間。 線程池的作用就是為了減少創建和銷毀線程池的時間,所以當運行線程很短,但很多,間隔時間長時,此個值可以設置大點
workQueue 線程池隊列,runnableTaskQueue,阻塞線程,可選如下
runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。 可以選擇以下幾個阻塞隊列。
- ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
- LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
- PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
ThreadFactory,用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。
RejectedExecutionHandler,飽和處理策略,用於處理線程飽和后的機制,有如下類型
- AbortPolicy:直接拋出異常。
- CallerRunsPolicy:只用調用者所在線程來運行任務。
- DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。
- DiscardPolicy:不處理,丟棄掉。
關於shutdownNow會循環線程池所有的線程,然后調用中斷方法,中斷方法拋出異常中斷當前線程。
2,並發包提供的一些工具類,使用方法及其原理
java.util.concurrent.Semaphore信號燈,固定執行線程數量,每次執行5個線程,應用例子如下
package org.benson.concurrent; import java.util.concurrent.Semaphore; /** * * @author qq277803242 * */ public class Test4Semaphore implements Runnable { Semaphore semaphore=new Semaphore(5); @Override public void run() { try { semaphore.acquire(); } catch (InterruptedException e) { System.out.println(e.getMessage()); } System.out.println(Thread.currentThread().getName() + " start runing 。。。"); try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println(e.getMessage()); } semaphore.release(); System.out.println(Thread.currentThread().getName() + " finished"); } public static void main(String[] args) { Runnable runable=new Test4Semaphore(); for(int i=0;i<100;i++){ new Thread(runable).start(); } } }
最近把工作給辭了, 有時間了,想想還是分開篇幅寫,總結下, 呵呵,到時也整理份目錄出來,當作紀念
下篇打算講講基於Future 構建緩存
二 , java.util.concurrent.atomic
三 , java.util.concurrent.locks