JAVA線程池: https://juejin.im/post/5a743c526fb9a063557d7eba
blockingqueue參考資料:https://blog.csdn.net/vernonzheng/article/details/8247564
一: ThreadPoolTaskExecutor是一個spring的線程池技術,查看代碼可以看到這樣一個字段:
private ThreadPoolExecutor threadPoolExecutor;
可以發現,spring的 ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor進行實現,
直接看代碼:
@Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
這是ThreadPoolTaskExecutor用來初始化threadPoolExecutor的方法,BlockingQueue是一個阻塞隊列,這個我們先不管。由於ThreadPoolTaskExecutor的實現方式完全是使用threadPoolExecutor進行實現,我們需要知道這個threadPoolExecutor的一些參數。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
這個是調用的構造函數:
int corePoolSize:線程池維護線程的最小數量.
int maximumPoolSize:線程池維護線程的最大數量.
long keepAliveTime:空閑線程的存活時間.
TimeUnit unit: 時間單位,現有納秒,微秒,毫秒,秒枚舉值.
BlockingQueue<Runnable> workQueue:持有等待執行的任務隊列.
RejectedExecutionHandler handler:
用來拒絕一個任務的執行,有兩種情況會發生這種情況。
一是在execute方法中若addIfUnderMaximumPoolSize(command)為false,即線程池已經飽和;
二是在execute方法中, 發現runState!=RUNNING || poolSize == 0,即已經shutdown,就調用ensureQueuedTaskHandled(Runnable command),在該方法中有可能調用reject。
ThreadPoolExecutor池子的處理流程如下:
1)當池子大小小於corePoolSize就新建線程,並處理請求
2)當池子大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去從workQueue中取任務並處理
3)當workQueue放不下新入的任務時,新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理
4)另外,當池子的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀
其會優先創建 CorePoolSiz 線程, 當繼續增加線程時,先放入Queue中,當 CorePoolSiz 和 Queue 都滿的時候,就增加創建新線程,當線程達到MaxPoolSize的時候,就會拋出錯 誤 org.springframework.core.task.TaskRejectedException
另外MaxPoolSize的設定如果比系統支持的線程數還要大時,會拋出java.lang.OutOfMemoryError: unable to create new native thread 異常。
這個是ThreadPoolExecutor的運算流程,既然ThreadPoolTaskExecutor是直接使用ThreadPoolExecutor進行處理,所以運算規則肯定一樣。
在spring中使用ThreadPoolTaskExecutor的配置:
<!-- 異步線程池 --> <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心線程數 --> <property name="corePoolSize" value="3" /> <!-- 最大線程數 --> <property name="maxPoolSize" value="10" /> <!-- 隊列最大長度 >=mainExecutor.maxSize --> <property name="queueCapacity" value="25" /> <!-- 線程池維護線程所允許的空閑時間 --> <property name="keepAliveSeconds" value="300" /> <!-- 線程池對拒絕任務(無線程可用)的處理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,如果執行器已關閉,則丟棄. --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean>
Reject策略預定義有四種:
Reject 策略詳解:https://blog.csdn.net/jgteng/article/details/54411423
(1)ThreadPoolExecutor.AbortPolicy策略,是默認的策略,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,如果執行器已關閉,則丟棄.
(3)ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄.
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程).
向spring容器中加入ThreadPoolTaskExecutor后,使用時只需要調用其的execute方法,其參數為一個Runnable。
threadPool.execute(new Runnable() { @Override public void run() { System.out.println("======="); } });
ThreadPoolTaskExecutor有兩個execute的重載,但翻看代碼可以知道調用的是同一個方法,所以只調用execute就可以了

@Override public void execute(Runnable task) { Executor executor = getThreadPoolExecutor(); try { executor.execute(task); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); } } @Override public void execute(Runnable task, long startTimeout) { execute(task); }
在execute中調用的是ThreadPoolExecutor中的execute方法,執行了上面的處理流程后執行任務。

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
二:阻塞隊列BlockingQueue
在ThreadPoolTaskExecutor源碼中我們看到了BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);這樣一句話用來得到一個隊列,這個隊列是用來存放任務的。當線程池中有空閑線程時就回去任務隊列中拿任務並處理。BlockingQueue是一個阻塞並線程安全的一個隊列
多線程環境中,通過隊列可以很容易實現數據共享,比如經典的“生產者”和“消費者”模型中,通過隊列可以很便利地實現兩者之間的數據共享。假設我們有若干 生產者線程,另外又有若干個消費者線程。如果生產者線程需要把准備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就可以很方便地解決他們之間的數 據共享問題。但如果生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的情況呢?理想情況下,如果生產者產出數據的速度大於消費者消費的速度,並 且當生產出來的數據累積到一定程度的時候,那么生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。
BlockingQueue的核心方法:
放入數據:
offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,
則返回true,否則返回false.(本方法不阻塞當前執行方法的線程)
offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往隊列中
加入BlockingQueue,則返回失敗。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷
直到BlockingQueue里面有空間再繼續.
獲取數據:
poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,
取不到時返回null;
poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,
隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗。
take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到
BlockingQueue有新的數據被加入;
drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),
通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。
查看 ThreadPoolTaskExecutor的代碼可以發現,其主要是使用 BlockingQueue的一種實現LinkedBlockingQueue進行實現。
/** * Create the BlockingQueue to use for the ThreadPoolExecutor. * <p>A LinkedBlockingQueue instance will be created for a positive * capacity value; a SynchronousQueue else. * @param queueCapacity the specified queue capacity * @return the BlockingQueue instance * @see java.util.concurrent.LinkedBlockingQueue * @see java.util.concurrent.SynchronousQueue */ protected BlockingQueue<Runnable> createQueue(int queueCapacity) { if (queueCapacity > 0) { return new LinkedBlockingQueue<Runnable>(queueCapacity); } else { return new SynchronousQueue<Runnable>(); } }
LinkedBlockingQueue是一個基於鏈表的阻塞隊列,其內部也維持着一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立 即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從 隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理 並發數據,還因為其對於生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味着在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以 此來提高整個隊列的並發性能。
生成LinkedBlockingQueue時有一個大小限制,其默認為Integer.MAX_VALUE.
另外LinkedBlockingQueue不接受null值,當添加null的時候,會直接拋出NullPointerException:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException();
3.另外最近的項目中使用的多線程和隊列比較多,多線程自不用說,我百度了一下隊列的優點,感覺說的很好,特此抄過來:
1. 解耦
在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。消息隊列在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2. 冗余
有時在處理數據的時候處理過程會失敗。除非數據被持久化,否則將永遠丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。在被許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理過程明確的指出該消息已經被處理完畢,確保你的數據被安全的保存直到你使用完畢。
3. 擴展性
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的;只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。
4. 靈活性 & 峰值處理能力
在訪問量劇增的情況下,你的應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為 以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住增長的訪問壓力,而不是因為超出負荷的請求而完全崩潰。
5. 可恢復性
當體系的一部分組件失效,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。而這種允許重試或者延后處理請求的能力通常是造就一個略感不便的用戶和一個沮喪透頂的用戶之間的區別。
6. 送達保證
消息隊列提供的冗余機制保證了消息能被實際的處理,只要一個進程讀取了該隊列即可。在此基礎上,IronMQ提供了一個"只送達一次"保證。無論有多少進 程在從隊列中領取數據,每一個消息只能被處理一次。這之所以成為可能,是因為獲取一個消息只是"預定"了這個消息,暫時把它移出了隊列。除非客戶端明確的 表示已經處理完了這個消息,否則這個消息會被放回隊列中去,在一段可配置的時間之后可再次被處理。
7.排序保證
在許多情況下,數據處理的順序都很重要。消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。
8.緩沖
在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩沖層來幫助任務最高效率的執行--寫入隊列的處理會盡可能的快速,而不受從隊列讀的預備處理的約束。該緩沖有助於控制和優化數據流經過系統的速度。
9. 理解數據流
在一個分布式系統里,要得到一個關於用戶操作會用多長時間及其原因的總體印象,是個巨大的挑戰。消息系列通過消息被處理的頻率,來方便的輔助確定那些表現不佳的處理過程或領域,這些地方的數據流都不夠優化。
10. 異步通信
很多時候,你不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許你把一個消息放入隊列,但並不立即處理它。你想向隊列中放入多少消息就放多少,然后在你樂意的時候再去處理它們。
ThreadPoolTaskExecutor