1 java並發框架
2 Fork/Join框架
3 定義
4 一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架
5 核心思想
6 分治
7 fork分解任務,join收集任務
8 工作竊取算法
9 定義
10 工作竊取算法work-stealing: 某個線程從其他隊列里竊取任務來執行
11 背景
12 將一個不較大的任務分割為若干個互不依賴的子任務,為了減少線程之間的競爭,把這些子任務分別放到不同的隊列里,並未每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應。
13 執行快的線程幫助執行慢的線程執行任務,提升整個任務效率
14 為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列
15 竊取任務線程永遠從雙端隊列的尾部拿任務執行
16 被竊取任務線程永遠從雙端隊列的頭部拿任務執行
17 優點
18 充分利用線程進行並行計算,減少了線程間的競爭
19 缺點
20 在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時,並且該算法會消耗了更多的資源,比如創建多個線程和多個雙端隊列
21 Fork/Join框架設計
22 2大步驟
23 分割任務
24 fork類把大任務分割成子任務,子任務繼續分割,直到子任務足夠小
25 執行任務並合並結果
26 分割的子任務放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據。
27 核心類
28 ForkJoinTask
29 子類,用於繼承
30 繼承子類 RecursiveAction
31 用於沒有返回結果的任務
32 繼承子類 RecursiveTask
33 用於有返回結果的任務
34 方法
35 fork
36 分解任務
37 join
38 合並任務結果
39 isCompletedAbnormally()
40 檢查任務是否已經拋出異常或已經被取消了
41 getException()
42 獲取異常
43 ForkJoinWorkerThread
44 執行任務的工作線程
45 ForkJoinPool
46 執行任務ForkJoinTask的線程池,ForkJoinTask需要通過ForkJoinPool來執行
47 submit(task);
48 內部結構
49 ForkJoinTask數組
50 存放任務
51 ForkJoinWorkerThread數組
52 執行任務
53 Executor框架
54 兩級調度模型
55 上層:Java多線程程序通常把應用分解為若干個任務,然后使用用戶級的調度器(Executor框架)將這些任務映射為固定數量的線程
56 底層: 操作系統內核OSkernel將這些線程映射到硬件處理器CPU上。
57 Executor框架控制上層的調度,下層的調度由操作系統內核控制,下層的調度不受應用程序的控制
58 Executor框架的結構
59 任務
60 包括被執行任務需要實現的接口
61 Runnable接口
62 Callable接口
63 任務的執行
64 任務執行機制的核心接口
65 Executor接口
66 繼承自Executor的接口
67 ExecutorService接口
68 execute(Runnable command)
69 submit(Runnable task)
70 submit(Callable<T>task)
71 返回值是FutureTask對象
72 實現了ExecutorService接口的實現類
73 ThreadPoolExecutor
74 ScheduledThreadPoolExecutor
75 異步計算的結果
76 Future接口
77 get
78 等待任務執行完成
79 cancel
80 取消任務完成
81 實現了Future接口的實現類
82 FutureTask
83 Executor框架的成員
84 Executor是一個接口,是框架的基礎,將任務的提交和任務的執行分離開來
85 Runnable接口和Callable接口的實現類
86 任務
87 Runnable接口
88 不會返回結果
89 Callable接口
90 會返回結果
91 工廠類Executor可以把一個Runnable包裝成一個Callable
92 callable(Runnable task)
93 返回值是null
94 callable(Runnable task, T result)
95 返回值是result對象
96 ThreadPoolExecutor
97 是線程池的核心實現類,用來執行被提交的任務
98 ScheduledThreadPoolExecutor
99 可以在給定的延遲后運行命令,或者定期執行命令
100 Future接口和實現Future接口的FutureTask
101 異步計算的結果
102 ThreadPoolExecutor
103 是線程池的核心實現類,用來執行被提交的任務
104 線程池的創建
105 ThreadPoolExecutor
106 corePoolSize
107 線程池中核心線程的數量。當提交一個任務時,線程池會新建一個線程來執行任務,直到當前線程數等於corePoolSize。如果調用了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有基本線程。
108 maximumPoolSize
109 線程池中允許的最大線程數。線程池的阻塞隊列滿了之后,如果還有任務提交,如果當前的線程數小於maximumPoolSize,則會新建線程來執行任務。注意,如果使用的是無界隊列,該參數也就沒有什么效果了。
110 keepAliveTime
111 線程空閑的時間。線程的創建和銷毀是需要代價的。線程執行完任務后不會立即銷毀,而是繼續存活一段時間:keepAliveTime。默認情況下,該參數只有在線程數大於corePoolSize時才會生效。
112 unit
113 keepAliveTime的單位。TimeUnit
114 workQueue
115 用來保存等待執行的任務的阻塞隊列,等待的任務必須實現Runnable接口。我們可以選擇如下幾種:
116 分類
117 ArrayBlockingQueue:基於數組結構的有界阻塞隊列,FIFO。
118 LinkedBlockingQueue:基於鏈表結構的有界阻塞隊列,FIFO。
119 SynchronousQueue:不存儲元素的阻塞隊列,每個插入操作都必須等待一個移出操作,反之亦然。
120 PriorityBlockingQueue:具有優先界別的無界阻塞隊列。
121 threadFactory
122 用於設置創建線程的工廠。可以通過線程工廠給每個創建出來的線程設置更有意義的名字,該對象可以通過Executors.defaultThreadFactory()
123 handler
124 RejectedExecutionHandler,線程池的拒絕策略。所謂拒絕策略,是指將任務添加到線程池中時,線程池拒絕該任務所采取的相應策略。當向線程池中提交任務時,如果此時線程池中的線程已經飽和了,而且阻塞隊列也已經滿了,則線程池會選擇一種拒絕策略來處理該任務。
125 四種拒絕策略
126 AbortPolicy:直接拋出異常,默認策略;
127 CallerRunsPolicy:用調用者所在的線程來執行任務;
128 DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
129 DiscardPolicy:直接丟棄任務;
130 當然我們也可以實現自己的拒絕策略,例如記錄日志、持久化存儲不能處理的任務等等,實現RejectedExecutionHandler接口自定義即可。
131 工廠類Executors創建3種類型的ThreadPoolExecutor
132 SingleThreadExecutor
133 使用單個worker線程的Executor
134 應用場景
135 適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個線程是活動的應用場景
136 特點
137 corePool和maximumPoolSize均被設置為1
138 使用的是相當於無界的有界阻塞隊列LinkedBlockingQueue,所以帶來的影響和FixedThreadPool一樣。
139 FixedThreadPool
140 固定線程數的線程池
141 應用場景
142 為了滿足資源管理的需求,需要限制當前線程數量的應用場景
143 適用於負載比較重的服務器
144 特點
145 corePoolSize 和 maximumPoolSize都設置為創建FixedThreadPool時指定的參數nThreads,意味着當線程池滿時且阻塞隊列也已經滿時,如果繼續提交任務,則會直接走拒絕策略
146 默認的拒絕策略,即AbortPolicy,則直接拋出異常。
147 keepAliveTime設置為0L,表示空閑的線程會立刻終止。
148 workQueue則是使用LinkedBlockingQueue,但是沒有設置范圍,那么則是最大值(Integer.MAX_VALUE),這基本就相當於一個無界隊列了。
149 無界隊列對線程池的影響
150 1. 當線程池中的線程數量等於corePoolSize 時,如果繼續提交任務,該任務會被添加到無界阻塞隊列workQueue中,因此線程中的線程數不會超過corePoolSize
151 2. 由於1,使用無界隊列時的 maximumPoolSize是一個無效參數
152 3. 由於1和2,使用無界隊列時的 keepAliveTime 是一個無效參數
153 4. 不會拒絕任務
154 CachedThreadPool
155 根據需要創建新線程,是大小無界的線程池
156 應用場景
157 適用於執行很多的短期異步任務的小程序
158 適用於負載較輕的服務器
159 特點
160 corePool為0,maximumPoolSize為Integer.MAX_VALUE,這就意味着所有的任務一提交就會加入到阻塞隊列中。
161 keepAliveTime這是為60L,unit設置為TimeUnit.SECONDS,意味着空閑線程等待新任務的最長時間為60秒,空閑線程超過60秒后將會被終止。
162 阻塞隊列采用的SynchronousQueue,每個插入操作都必須等待另一個線程對應的移除操作,此處把主線程提交的任務傳遞給空閑線程去執行。
163 SynchronousQueue是一個沒有元素的阻塞隊列,加上corePool = 0 ,maximumPoolSize = Integer.MAX_VALUE,這樣就會存在一個問題,如果主線程提交任務的速度遠遠大於CachedThreadPool的處理速度,則CachedThreadPool會不斷地創建新線程來執行任務,這樣有可能會導致系統耗盡CPU和內存資源,所以在使用該線程池是,一定要注意控制並發的任務數,否則創建大量的線程可能導致嚴重的性能問題。
164 重要操作
165 offer
166 主線程執行offer操作與空閑線程執行的poll操作配對成功后,主線程把任務交給空閑線程執行
167 execute
168 執行任務
169 poll
170 讓空閑線程在SynchronousQueue中等待60s,如果等待到新任務則執行,否則,空閑線程將終止
171 向線程池提交任務
172 execute()
173 用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功
174 工作原理
175 1. 如果當前運行的線程少於corePoolSize,則創建新線程來執行任務(執行這一步驟需要獲取全局鎖)
176 2. 如果運行的線程等於或多於corePoolSize(完成預熱之后),則將任務加入BlockingQueue,這一步不需要全局鎖
177 3. 如果無法將任務加入BlockingQueue(隊列已滿),則創建新的線程來處理任務 (執行這一步驟需要獲取全局鎖)
178 4. 如果創建的線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExcution()方法
179 submit()
180 用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過它可以判定任務是否執行成功。
181 future.get()方法用來獲取返回值,它會阻塞當前線程直到任務完成
182 future.get(long timeout, TimeUnit unit) 方法會阻塞當前線程一段時間后立即返回,這時候可能任務沒有執行完
183 關閉線程池
184 原理
185 遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷地的任務可能永遠無法終止
186 方法
187 shutdown
188 只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程
189 shutdownNow
190 首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,任務不一定要執行完
191 合理配置線程池
192 任務的性質
193 CPU密集型任務
194 N(cpu)+1 個線程的線程池
195 IO密集型任務
196 2*N(cpu) 個線程的線程池
197 混合型任務
198 拆分成一個CPU密集型任務和一個IO密集型任務
199 任務的優先級
200 高
201 中
202 低
203 用PriorityBlockingQueue
204 任務的執行時間
205 長
206 中
207 短
208 任務的依賴性
209 是否依賴其他系統資源,比如數據庫連接
210 等待返回結果的時間越長,CPU空閑時間越長,那么線程數應該設置的越大,以便更好利用線程池
211 建議使用有界隊列。有界隊列能夠提高系統的穩定性和預警能力 ,無界隊列會直接撐爆內存,導致系統不可用
212 用PriorityBlockingQueue ,讓執行時間短的任務先執行
213 線程池的監控
214 方便出問題時,可以根據線程池的使用情況,快速定位問題
215 參數
216 taskCount
217 線程池需要執行的任務數量
218 completedTaskCount
219 線程池在運行過程中已經完成的任務數量
220 largestPoolSize
221 線程池曾將創建過的最大線程數量。如果該數字等於線程池的最大大小,表示線程池曾經滿過
222 getPoolSize
223 線程池的線程數量
224 getActiveCount
225 獲取活動的線程數
226 重寫方法
227 beforeExecute()
228 任務執行前
229 afterExecute()
230 任務執行后
231 terminated()
232 線程池關閉前
233 ScheduledThreadPoolExecutor
234 可以在給定的延遲后運行命令,或者定期執行命令,與Timer類似,但比其功能更強大,更靈活
235 ScheduledThreadPoolExecutor V.S. Timer
236 ScheduledThreadPoolExecutor可以再構造函數中指定多個對應的后台進程數
237 Timer對應的是單個后台進程
238 內部類
239 DelayedWorkQueue
240 所使用的阻塞隊列變成了DelayedWorkQueue
241 DelayedWorkQueue為ScheduledThreadPoolExecutor中的內部類,它其實和阻塞隊列DelayQueue有點兒類似
242 DelayedWorkQueue中的任務必然是按照延遲時間從短到長來進行排序的
243 Reentrant+Condition
244 ScheduledFutureTask
245 待調度的任務
246 ScheduledFutureTask內部繼承FutureTask,實現RunnableScheduledFuture接口
247 三個比較重要的變量
248 private final long sequenceNumber;
249 /** 任務被添加到ScheduledThreadPoolExecutor中的序號 */
250 private long time;
251 /** 任務要執行的具體時間 */
252 private final long period;
253 /** 任務的間隔周期 */
254 compareTo方法,提供一個排序算法,該算法規則是:首先按照time排序,time小的排在前面,大的排在后面,如果time相同,則使用sequenceNumber排序,小的排在前面,大的排在后面。
255 compareTo()方法使用於DelayedWorkQueue隊列對其元素ScheduledThreadPoolExecutor task進行排序的算法
256 創建
257 通常使用工廠類Executors來創建
258 2種類型
259 ScheduledThreadPoolExecutor
260 包含固定個數個線程
261 適用於資源管理而需要限制線程數的場景
262 SingleThreadScheduledExecutor
263 包含一個線程
264 適用於單個線程,需要保證順序執行各個任務的場景
265 4個調度器
266 schedule(Callable callable, long delay, TimeUnit unit) :創建並執行在給定延遲后啟用的 ScheduledFuture。
267 schedule(Runnable command, long delay, TimeUnit unit) :創建並執行在給定延遲后啟用的一次性操作。
268 scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :創建並執行一個在給定初始延遲后首次啟用的定期操作,后續操作具有給定的周期;也就是將在 initialDelay 后開始執行,然后在 initialDelay+period 后執行,接着在 initialDelay + 2 * period 后執行,依此類推。
269 scheduleAtFixedRate是周期固定,也就說它是不會受到這個延遲的影響的,每個線程的調度周期在初始化時就已經絕對了,是什么時候調度就是什么時候調度,它不會因為上一個線程的調度失效延遲而受到影響。
270 scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :創建並執行一個在給定初始延遲后首次啟用的定期操作,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
271 scheduleWithFixedDelay是每個線程的調度間隔固定,也就是說第一個線程與第二線程之間間隔delay,第二個與第三個間隔delay,以此類推。
272 調度和執行
273 run()
274 1. 調用isPeriodic()獲取該線程是否為周期性任務標志,然后調用canRunInCurrentRunState()方法判斷該線程是否可以執行,如果不可以執行則調用cancel()取消任務。
275 2. 如果當線程已經到達了執行點,則調用run()方法執行task,該run()方法是在FutureTask中定義的。
276 3. 否則調用runAndReset()方法運行並重置狀態,調用setNextRunTime()方法重新計算任務的下次執行時間,重新把任務添加到隊列中,讓該任務可以重復執行。
277 reExecutePeriodic重要的是調用super.getQueue().add(task);將任務task加入的隊列DelayedWorkQueue中
278 FutureTask
279 實現了Future接口和Runnable接口,代表異步計算的結果,
280 因為實現了Runnable接口,可以交給Executor執行,也可以由調用線程直接執行
281 重要操作
282 run() 3種狀態
283 未啟動
284 已啟動
285 已完成
286 正常結束
287 取消而結束
288 異常而結束
289 get()
290 3種狀態
291 未啟動
292 阻塞
293 底層是 LockSupport.park();
294 已啟動
295 阻塞
296 底層是 LockSupport.park();
297 已完成
298 立即返回結果或者拋出異常
299 cannel()
300 3種狀態
301 未啟動
302 任務不會被執行
303 已啟動
304 cannel(true):中斷執行任務的線程
305 cannel(false):不中斷執行任務的線程
306 已完成
307 返回false
308 應用場景
309 當一個線程需要等待另一個線程把某個任務執行完后它才能繼續執行