Java 並發系列之十:java 並發框架(2個)


1. Fork/Join框架

2. Executor框架

3. ThreadPoolExecutor

4. ScheduledThreadPoolExecutor

5. FutureTask

6. txt

  1 java並發框架
  2     Fork/Join框架
  3         定義
  4             一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架
  5          核心思想
  6              分治
  7              fork分解任務,join收集任務
  8          工作竊取算法
  9             定義
 10                  工作竊取算法work-stealing: 某個線程從其他隊列里竊取任務來執行
 11             背景
 12                 將一個不較大的任務分割為若干個互不依賴的子任務,為了減少線程之間的競爭,把這些子任務分別放到不同的隊列里,並未每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應。
 13                  執行快的線程幫助執行慢的線程執行任務,提升整個任務效率
 14                 為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列
 15 竊取任務線程永遠從雙端隊列的尾部拿任務執行
 16 被竊取任務線程永遠從雙端隊列的頭部拿任務執行
 17             優點
 18                 充分利用線程進行並行計算,減少了線程間的競爭
 19             缺點
 20                 在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時,並且該算法會消耗了更多的資源,比如創建多個線程和多個雙端隊列
 21         Fork/Join框架設計
 22             2大步驟
 23                 分割任務
 24                     fork類把大任務分割成子任務,子任務繼續分割,直到子任務足夠小
 25                 執行任務並合並結果
 26                     分割的子任務放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據。
 27         核心類
 28             ForkJoinTask
 29                 子類,用於繼承
 30                     繼承子類 RecursiveAction
 31                         用於沒有返回結果的任務
 32                     繼承子類 RecursiveTask
 33                         用於有返回結果的任務
 34                 方法
 35                     fork
 36                         分解任務
 37                     join
 38                         合並任務結果
 39                     isCompletedAbnormally()
 40                         檢查任務是否已經拋出異常或已經被取消了
 41                     getException()
 42                         獲取異常
 43              ForkJoinWorkerThread
 44                  執行任務的工作線程
 45             ForkJoinPool
 46                  執行任務ForkJoinTask的線程池,ForkJoinTask需要通過ForkJoinPool來執行
 47                     submit(task);
 48                 內部結構
 49                     ForkJoinTask數組
 50                         存放任務
 51                     ForkJoinWorkerThread數組
 52                         執行任務
 53     Executor框架
 54         兩級調度模型
 55             上層:Java多線程程序通常把應用分解為若干個任務,然后使用用戶級的調度器(Executor框架)將這些任務映射為固定數量的線程
 56             底層: 操作系統內核OSkernel將這些線程映射到硬件處理器CPU上。
 57             Executor框架控制上層的調度,下層的調度由操作系統內核控制,下層的調度不受應用程序的控制
 58         Executor框架的結構
 59             任務
 60                 包括被執行任務需要實現的接口
 61                     Runnable接口
 62                     Callable接口
 63             任務的執行
 64                 任務執行機制的核心接口
 65                     Executor接口
 66                 繼承自Executor的接口
 67                     ExecutorService接口
 68                         execute(Runnable command)
 69                         submit(Runnable task)
 70                         submit(Callable<T>task)
 71                         返回值是FutureTask對象
 72                 實現了ExecutorService接口的實現類
 73                     ThreadPoolExecutor
 74                     ScheduledThreadPoolExecutor
 75             異步計算的結果
 76                 Future接口
 77                     get
 78                         等待任務執行完成
 79                     cancel
 80                         取消任務完成
 81                 實現了Future接口的實現類
 82                     FutureTask
 83         Executor框架的成員
 84             Executor是一個接口,是框架的基礎,將任務的提交和任務的執行分離開來
 85             Runnable接口和Callable接口的實現類
 86                 任務
 87                     Runnable接口
 88                         不會返回結果
 89                     Callable接口
 90                         會返回結果
 91                         工廠類Executor可以把一個Runnable包裝成一個Callable
 92                         callable(Runnable task)
 93                             返回值是null
 94                         callable(Runnable task, T result)
 95                             返回值是result對象
 96             ThreadPoolExecutor
 97                 是線程池的核心實現類,用來執行被提交的任務
 98             ScheduledThreadPoolExecutor
 99                 可以在給定的延遲后運行命令,或者定期執行命令
100             Future接口和實現Future接口的FutureTask
101                 異步計算的結果
102     ThreadPoolExecutor
103         是線程池的核心實現類,用來執行被提交的任務
104         線程池的創建
105             ThreadPoolExecutor
106                 corePoolSize
107                     線程池中核心線程的數量。當提交一個任務時,線程池會新建一個線程來執行任務,直到當前線程數等於corePoolSize。如果調用了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有基本線程。
108                 maximumPoolSize
109                     線程池中允許的最大線程數。線程池的阻塞隊列滿了之后,如果還有任務提交,如果當前的線程數小於maximumPoolSize,則會新建線程來執行任務。注意,如果使用的是無界隊列,該參數也就沒有什么效果了。
110                 keepAliveTime
111                     線程空閑的時間。線程的創建和銷毀是需要代價的。線程執行完任務后不會立即銷毀,而是繼續存活一段時間:keepAliveTime。默認情況下,該參數只有在線程數大於corePoolSize時才會生效。
112                 unit
113                     keepAliveTime的單位。TimeUnit
114                 workQueue
115                     用來保存等待執行的任務的阻塞隊列,等待的任務必須實現Runnable接口。我們可以選擇如下幾種:
116                     分類
117                         ArrayBlockingQueue:基於數組結構的有界阻塞隊列,FIFO。
118                         LinkedBlockingQueue:基於鏈表結構的有界阻塞隊列,FIFO。
119                         SynchronousQueue:不存儲元素的阻塞隊列,每個插入操作都必須等待一個移出操作,反之亦然。
120                         PriorityBlockingQueue:具有優先界別的無界阻塞隊列。
121                 threadFactory
122                     用於設置創建線程的工廠。可以通過線程工廠給每個創建出來的線程設置更有意義的名字,該對象可以通過Executors.defaultThreadFactory()
123                 handler
124                     RejectedExecutionHandler,線程池的拒絕策略。所謂拒絕策略,是指將任務添加到線程池中時,線程池拒絕該任務所采取的相應策略。當向線程池中提交任務時,如果此時線程池中的線程已經飽和了,而且阻塞隊列也已經滿了,則線程池會選擇一種拒絕策略來處理該任務。
125                     四種拒絕策略
126                         AbortPolicy:直接拋出異常,默認策略;
127                         CallerRunsPolicy:用調用者所在的線程來執行任務;
128                         DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
129                         DiscardPolicy:直接丟棄任務;
130                     當然我們也可以實現自己的拒絕策略,例如記錄日志、持久化存儲不能處理的任務等等,實現RejectedExecutionHandler接口自定義即可。
131             工廠類Executors創建3種類型的ThreadPoolExecutor
132                 SingleThreadExecutor
133                     使用單個worker線程的Executor
134                     應用場景
135                         適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個線程是活動的應用場景
136                     特點
137                         corePool和maximumPoolSize均被設置為1
138                         使用的是相當於無界的有界阻塞隊列LinkedBlockingQueue,所以帶來的影響和FixedThreadPool一樣。
139                 FixedThreadPool
140                     固定線程數的線程池
141                     應用場景
142                         為了滿足資源管理的需求,需要限制當前線程數量的應用場景
143                         適用於負載比較重的服務器
144                     特點
145                         corePoolSize 和 maximumPoolSize都設置為創建FixedThreadPool時指定的參數nThreads,意味着當線程池滿時且阻塞隊列也已經滿時,如果繼續提交任務,則會直接走拒絕策略
146                         默認的拒絕策略,即AbortPolicy,則直接拋出異常。
147                         keepAliveTime設置為0L,表示空閑的線程會立刻終止。
148                         workQueue則是使用LinkedBlockingQueue,但是沒有設置范圍,那么則是最大值(Integer.MAX_VALUE),這基本就相當於一個無界隊列了。
149                         無界隊列對線程池的影響
150                             1. 當線程池中的線程數量等於corePoolSize 時,如果繼續提交任務,該任務會被添加到無界阻塞隊列workQueue中,因此線程中的線程數不會超過corePoolSize
151                             2. 由於1,使用無界隊列時的 maximumPoolSize是一個無效參數
152                             3. 由於1和2,使用無界隊列時的 keepAliveTime 是一個無效參數
153                             4.  不會拒絕任務
154                 CachedThreadPool
155                     根據需要創建新線程,是大小無界的線程池
156                     應用場景
157                         適用於執行很多的短期異步任務的小程序
158                         適用於負載較輕的服務器
159                     特點
160                         corePool為0,maximumPoolSize為Integer.MAX_VALUE,這就意味着所有的任務一提交就會加入到阻塞隊列中。
161                         keepAliveTime這是為60L,unit設置為TimeUnit.SECONDS,意味着空閑線程等待新任務的最長時間為60秒,空閑線程超過60秒后將會被終止。
162                         阻塞隊列采用的SynchronousQueue,每個插入操作都必須等待另一個線程對應的移除操作,此處把主線程提交的任務傳遞給空閑線程去執行。
163                         SynchronousQueue是一個沒有元素的阻塞隊列,加上corePool = 0 ,maximumPoolSize = Integer.MAX_VALUE,這樣就會存在一個問題,如果主線程提交任務的速度遠遠大於CachedThreadPool的處理速度,則CachedThreadPool會不斷地創建新線程來執行任務,這樣有可能會導致系統耗盡CPU和內存資源,所以在使用該線程池是,一定要注意控制並發的任務數,否則創建大量的線程可能導致嚴重的性能問題。
164                     重要操作
165                         offer
166                             主線程執行offer操作與空閑線程執行的poll操作配對成功后,主線程把任務交給空閑線程執行
167                         execute
168                             執行任務
169                         poll
170                             讓空閑線程在SynchronousQueue中等待60s,如果等待到新任務則執行,否則,空閑線程將終止
171         向線程池提交任務
172             execute()
173                 用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功
174                 工作原理
175                     1. 如果當前運行的線程少於corePoolSize,則創建新線程來執行任務(執行這一步驟需要獲取全局鎖)
176                     2. 如果運行的線程等於或多於corePoolSize(完成預熱之后),則將任務加入BlockingQueue,這一步不需要全局鎖
177                     3. 如果無法將任務加入BlockingQueue(隊列已滿),則創建新的線程來處理任務 (執行這一步驟需要獲取全局鎖)
178                     4. 如果創建的線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExcution()方法
179             submit()
180                 用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過它可以判定任務是否執行成功。
181                 future.get()方法用來獲取返回值,它會阻塞當前線程直到任務完成
182                 future.get(long timeout, TimeUnit unit) 方法會阻塞當前線程一段時間后立即返回,這時候可能任務沒有執行完
183         關閉線程池
184             原理
185                 遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷地的任務可能永遠無法終止
186             方法
187                 shutdown
188                     只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程
189                 shutdownNow
190                     首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,任務不一定要執行完
191         合理配置線程池
192             任務的性質
193                 CPU密集型任務
194                     N(cpu)+1  個線程的線程池
195                 IO密集型任務
196                     2*N(cpu) 個線程的線程池
197                 混合型任務
198                     拆分成一個CPU密集型任務和一個IO密集型任務
199             任務的優先級
200 201 202 203                 用PriorityBlockingQueue
204             任務的執行時間
205 206 207 208             任務的依賴性
209                 是否依賴其他系統資源,比如數據庫連接
210                 等待返回結果的時間越長,CPU空閑時間越長,那么線程數應該設置的越大,以便更好利用線程池
211             建議使用有界隊列。有界隊列能夠提高系統的穩定性和預警能力 ,無界隊列會直接撐爆內存,導致系統不可用
212             用PriorityBlockingQueue ,讓執行時間短的任務先執行
213         線程池的監控
214             方便出問題時,可以根據線程池的使用情況,快速定位問題
215             參數
216                 taskCount
217                     線程池需要執行的任務數量
218                 completedTaskCount
219                     線程池在運行過程中已經完成的任務數量
220                 largestPoolSize
221                     線程池曾將創建過的最大線程數量。如果該數字等於線程池的最大大小,表示線程池曾經滿過
222                 getPoolSize
223                     線程池的線程數量
224                 getActiveCount
225                     獲取活動的線程數
226             重寫方法
227                 beforeExecute()
228                     任務執行前
229                 afterExecute()
230                     任務執行后
231                 terminated()
232                     線程池關閉前
233     ScheduledThreadPoolExecutor
234         可以在給定的延遲后運行命令,或者定期執行命令,與Timer類似,但比其功能更強大,更靈活
235         ScheduledThreadPoolExecutor  V.S. Timer
236             ScheduledThreadPoolExecutor可以再構造函數中指定多個對應的后台進程數
237             Timer對應的是單個后台進程
238         內部類
239             DelayedWorkQueue
240                 所使用的阻塞隊列變成了DelayedWorkQueue
241                 DelayedWorkQueue為ScheduledThreadPoolExecutor中的內部類,它其實和阻塞隊列DelayQueue有點兒類似
242                 DelayedWorkQueue中的任務必然是按照延遲時間從短到長來進行排序的
243                 Reentrant+Condition
244             ScheduledFutureTask
245                 待調度的任務
246                 ScheduledFutureTask內部繼承FutureTask,實現RunnableScheduledFuture接口
247                 三個比較重要的變量
248                             private final long sequenceNumber;
249                         /** 任務被添加到ScheduledThreadPoolExecutor中的序號 */
250                             private long time;
251                                 /** 任務要執行的具體時間 */
252                             private final long period;
253                                 /**  任務的間隔周期 */
254                 compareTo方法,提供一個排序算法,該算法規則是:首先按照time排序,time小的排在前面,大的排在后面,如果time相同,則使用sequenceNumber排序,小的排在前面,大的排在后面。
255                 compareTo()方法使用於DelayedWorkQueue隊列對其元素ScheduledThreadPoolExecutor task進行排序的算法
256         創建
257             通常使用工廠類Executors來創建
258             2種類型
259                 ScheduledThreadPoolExecutor
260                     包含固定個數個線程
261                     適用於資源管理而需要限制線程數的場景
262                 SingleThreadScheduledExecutor
263                     包含一個線程
264                     適用於單個線程,需要保證順序執行各個任務的場景
265         4個調度器
266             schedule(Callable callable, long delay, TimeUnit unit) :創建並執行在給定延遲后啟用的 ScheduledFuture。
267             schedule(Runnable command, long delay, TimeUnit unit) :創建並執行在給定延遲后啟用的一次性操作。
268             scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :創建並執行一個在給定初始延遲后首次啟用的定期操作,后續操作具有給定的周期;也就是將在 initialDelay 后開始執行,然后在 initialDelay+period 后執行,接着在 initialDelay + 2 * period 后執行,依此類推。
269 scheduleAtFixedRate是周期固定,也就說它是不會受到這個延遲的影響的,每個線程的調度周期在初始化時就已經絕對了,是什么時候調度就是什么時候調度,它不會因為上一個線程的調度失效延遲而受到影響。
270             scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :創建並執行一個在給定初始延遲后首次啟用的定期操作,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
271 scheduleWithFixedDelay是每個線程的調度間隔固定,也就是說第一個線程與第二線程之間間隔delay,第二個與第三個間隔delay,以此類推。
272         調度和執行
273             run()
274                 1. 調用isPeriodic()獲取該線程是否為周期性任務標志,然后調用canRunInCurrentRunState()方法判斷該線程是否可以執行,如果不可以執行則調用cancel()取消任務。
275                 2. 如果當線程已經到達了執行點,則調用run()方法執行task,該run()方法是在FutureTask中定義的。
276                 3. 否則調用runAndReset()方法運行並重置狀態,調用setNextRunTime()方法重新計算任務的下次執行時間,重新把任務添加到隊列中,讓該任務可以重復執行。
277                 reExecutePeriodic重要的是調用super.getQueue().add(task);將任務task加入的隊列DelayedWorkQueue中
278     FutureTask
279         實現了Future接口和Runnable接口,代表異步計算的結果,
280         因為實現了Runnable接口,可以交給Executor執行,也可以由調用線程直接執行
281         重要操作
282             run()  3種狀態
283                 未啟動
284                 已啟動
285                 已完成
286                     正常結束
287                     取消而結束
288                     異常而結束
289             get()
290                 3種狀態
291                     未啟動
292                         阻塞
293                             底層是 LockSupport.park();
294                     已啟動
295                         阻塞
296                             底層是 LockSupport.park();
297                     已完成
298                         立即返回結果或者拋出異常
299             cannel()
300                 3種狀態
301                     未啟動
302                         任務不會被執行
303                     已啟動
304                         cannel(true):中斷執行任務的線程
305                         cannel(false):不中斷執行任務的線程
306                     已完成
307                         返回false
308         應用場景
309             當一個線程需要等待另一個線程把某個任務執行完后它才能繼續執行
View Code

 

7. 參考網址

  1. 參考來源:http://cmsblogs.com/wp-content/resources/img/sike-juc.png
  2. 《Java並發編程的藝術》_方騰飛PDF 提取碼:o9vr
  3. http://ifeve.com/the-art-of-java-concurrency-program-1/
  4. Java並發學習系列-緒論
  5. Java並發編程實戰
  6. 死磕 Java 並發精品合集


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM