1. Fork/Join框架
2. Executor框架
3. ThreadPoolExecutor
4. ScheduledThreadPoolExecutor
5. FutureTask
6. txt

1 java並發框架 2 Fork/Join框架 3 定義 4 一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架 5 核心思想 6 分治 7 fork分解任務,join收集任務 8 工作竊取算法 9 定義 10 工作竊取算法work-stealing: 某個線程從其他隊列里竊取任務來執行 11 背景 12 將一個不較大的任務分割為若干個互不依賴的子任務,為了減少線程之間的競爭,把這些子任務分別放到不同的隊列里,並未每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應。 13 執行快的線程幫助執行慢的線程執行任務,提升整個任務效率 14 為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列 15 竊取任務線程永遠從雙端隊列的尾部拿任務執行 16 被竊取任務線程永遠從雙端隊列的頭部拿任務執行 17 優點 18 充分利用線程進行並行計算,減少了線程間的競爭 19 缺點 20 在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時,並且該算法會消耗了更多的資源,比如創建多個線程和多個雙端隊列 21 Fork/Join框架設計 22 2大步驟 23 分割任務 24 fork類把大任務分割成子任務,子任務繼續分割,直到子任務足夠小 25 執行任務並合並結果 26 分割的子任務放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據。 27 核心類 28 ForkJoinTask 29 子類,用於繼承 30 繼承子類 RecursiveAction 31 用於沒有返回結果的任務 32 繼承子類 RecursiveTask 33 用於有返回結果的任務 34 方法 35 fork 36 分解任務 37 join 38 合並任務結果 39 isCompletedAbnormally() 40 檢查任務是否已經拋出異常或已經被取消了 41 getException() 42 獲取異常 43 ForkJoinWorkerThread 44 執行任務的工作線程 45 ForkJoinPool 46 執行任務ForkJoinTask的線程池,ForkJoinTask需要通過ForkJoinPool來執行 47 submit(task); 48 內部結構 49 ForkJoinTask數組 50 存放任務 51 ForkJoinWorkerThread數組 52 執行任務 53 Executor框架 54 兩級調度模型 55 上層:Java多線程程序通常把應用分解為若干個任務,然后使用用戶級的調度器(Executor框架)將這些任務映射為固定數量的線程 56 底層: 操作系統內核OSkernel將這些線程映射到硬件處理器CPU上。 57 Executor框架控制上層的調度,下層的調度由操作系統內核控制,下層的調度不受應用程序的控制 58 Executor框架的結構 59 任務 60 包括被執行任務需要實現的接口 61 Runnable接口 62 Callable接口 63 任務的執行 64 任務執行機制的核心接口 65 Executor接口 66 繼承自Executor的接口 67 ExecutorService接口 68 execute(Runnable command) 69 submit(Runnable task) 70 submit(Callable<T>task) 71 返回值是FutureTask對象 72 實現了ExecutorService接口的實現類 73 ThreadPoolExecutor 74 ScheduledThreadPoolExecutor 75 異步計算的結果 76 Future接口 77 get 78 等待任務執行完成 79 cancel 80 取消任務完成 81 實現了Future接口的實現類 82 FutureTask 83 Executor框架的成員 84 Executor是一個接口,是框架的基礎,將任務的提交和任務的執行分離開來 85 Runnable接口和Callable接口的實現類 86 任務 87 Runnable接口 88 不會返回結果 89 Callable接口 90 會返回結果 91 工廠類Executor可以把一個Runnable包裝成一個Callable 92 callable(Runnable task) 93 返回值是null 94 callable(Runnable task, T result) 95 返回值是result對象 96 ThreadPoolExecutor 97 是線程池的核心實現類,用來執行被提交的任務 98 ScheduledThreadPoolExecutor 99 可以在給定的延遲后運行命令,或者定期執行命令 100 Future接口和實現Future接口的FutureTask 101 異步計算的結果 102 ThreadPoolExecutor 103 是線程池的核心實現類,用來執行被提交的任務 104 線程池的創建 105 ThreadPoolExecutor 106 corePoolSize 107 線程池中核心線程的數量。當提交一個任務時,線程池會新建一個線程來執行任務,直到當前線程數等於corePoolSize。如果調用了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有基本線程。 108 maximumPoolSize 109 線程池中允許的最大線程數。線程池的阻塞隊列滿了之后,如果還有任務提交,如果當前的線程數小於maximumPoolSize,則會新建線程來執行任務。注意,如果使用的是無界隊列,該參數也就沒有什么效果了。 110 keepAliveTime 111 線程空閑的時間。線程的創建和銷毀是需要代價的。線程執行完任務后不會立即銷毀,而是繼續存活一段時間:keepAliveTime。默認情況下,該參數只有在線程數大於corePoolSize時才會生效。 112 unit 113 keepAliveTime的單位。TimeUnit 114 workQueue 115 用來保存等待執行的任務的阻塞隊列,等待的任務必須實現Runnable接口。我們可以選擇如下幾種: 116 分類 117 ArrayBlockingQueue:基於數組結構的有界阻塞隊列,FIFO。 118 LinkedBlockingQueue:基於鏈表結構的有界阻塞隊列,FIFO。 119 SynchronousQueue:不存儲元素的阻塞隊列,每個插入操作都必須等待一個移出操作,反之亦然。 120 PriorityBlockingQueue:具有優先界別的無界阻塞隊列。 121 threadFactory 122 用於設置創建線程的工廠。可以通過線程工廠給每個創建出來的線程設置更有意義的名字,該對象可以通過Executors.defaultThreadFactory() 123 handler 124 RejectedExecutionHandler,線程池的拒絕策略。所謂拒絕策略,是指將任務添加到線程池中時,線程池拒絕該任務所采取的相應策略。當向線程池中提交任務時,如果此時線程池中的線程已經飽和了,而且阻塞隊列也已經滿了,則線程池會選擇一種拒絕策略來處理該任務。 125 四種拒絕策略 126 AbortPolicy:直接拋出異常,默認策略; 127 CallerRunsPolicy:用調用者所在的線程來執行任務; 128 DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務; 129 DiscardPolicy:直接丟棄任務; 130 當然我們也可以實現自己的拒絕策略,例如記錄日志、持久化存儲不能處理的任務等等,實現RejectedExecutionHandler接口自定義即可。 131 工廠類Executors創建3種類型的ThreadPoolExecutor 132 SingleThreadExecutor 133 使用單個worker線程的Executor 134 應用場景 135 適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個線程是活動的應用場景 136 特點 137 corePool和maximumPoolSize均被設置為1 138 使用的是相當於無界的有界阻塞隊列LinkedBlockingQueue,所以帶來的影響和FixedThreadPool一樣。 139 FixedThreadPool 140 固定線程數的線程池 141 應用場景 142 為了滿足資源管理的需求,需要限制當前線程數量的應用場景 143 適用於負載比較重的服務器 144 特點 145 corePoolSize 和 maximumPoolSize都設置為創建FixedThreadPool時指定的參數nThreads,意味着當線程池滿時且阻塞隊列也已經滿時,如果繼續提交任務,則會直接走拒絕策略 146 默認的拒絕策略,即AbortPolicy,則直接拋出異常。 147 keepAliveTime設置為0L,表示空閑的線程會立刻終止。 148 workQueue則是使用LinkedBlockingQueue,但是沒有設置范圍,那么則是最大值(Integer.MAX_VALUE),這基本就相當於一個無界隊列了。 149 無界隊列對線程池的影響 150 1. 當線程池中的線程數量等於corePoolSize 時,如果繼續提交任務,該任務會被添加到無界阻塞隊列workQueue中,因此線程中的線程數不會超過corePoolSize 151 2. 由於1,使用無界隊列時的 maximumPoolSize是一個無效參數 152 3. 由於1和2,使用無界隊列時的 keepAliveTime 是一個無效參數 153 4. 不會拒絕任務 154 CachedThreadPool 155 根據需要創建新線程,是大小無界的線程池 156 應用場景 157 適用於執行很多的短期異步任務的小程序 158 適用於負載較輕的服務器 159 特點 160 corePool為0,maximumPoolSize為Integer.MAX_VALUE,這就意味着所有的任務一提交就會加入到阻塞隊列中。 161 keepAliveTime這是為60L,unit設置為TimeUnit.SECONDS,意味着空閑線程等待新任務的最長時間為60秒,空閑線程超過60秒后將會被終止。 162 阻塞隊列采用的SynchronousQueue,每個插入操作都必須等待另一個線程對應的移除操作,此處把主線程提交的任務傳遞給空閑線程去執行。 163 SynchronousQueue是一個沒有元素的阻塞隊列,加上corePool = 0 ,maximumPoolSize = Integer.MAX_VALUE,這樣就會存在一個問題,如果主線程提交任務的速度遠遠大於CachedThreadPool的處理速度,則CachedThreadPool會不斷地創建新線程來執行任務,這樣有可能會導致系統耗盡CPU和內存資源,所以在使用該線程池是,一定要注意控制並發的任務數,否則創建大量的線程可能導致嚴重的性能問題。 164 重要操作 165 offer 166 主線程執行offer操作與空閑線程執行的poll操作配對成功后,主線程把任務交給空閑線程執行 167 execute 168 執行任務 169 poll 170 讓空閑線程在SynchronousQueue中等待60s,如果等待到新任務則執行,否則,空閑線程將終止 171 向線程池提交任務 172 execute() 173 用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功 174 工作原理 175 1. 如果當前運行的線程少於corePoolSize,則創建新線程來執行任務(執行這一步驟需要獲取全局鎖) 176 2. 如果運行的線程等於或多於corePoolSize(完成預熱之后),則將任務加入BlockingQueue,這一步不需要全局鎖 177 3. 如果無法將任務加入BlockingQueue(隊列已滿),則創建新的線程來處理任務 (執行這一步驟需要獲取全局鎖) 178 4. 如果創建的線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExcution()方法 179 submit() 180 用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過它可以判定任務是否執行成功。 181 future.get()方法用來獲取返回值,它會阻塞當前線程直到任務完成 182 future.get(long timeout, TimeUnit unit) 方法會阻塞當前線程一段時間后立即返回,這時候可能任務沒有執行完 183 關閉線程池 184 原理 185 遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷地的任務可能永遠無法終止 186 方法 187 shutdown 188 只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程 189 shutdownNow 190 首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,任務不一定要執行完 191 合理配置線程池 192 任務的性質 193 CPU密集型任務 194 N(cpu)+1 個線程的線程池 195 IO密集型任務 196 2*N(cpu) 個線程的線程池 197 混合型任務 198 拆分成一個CPU密集型任務和一個IO密集型任務 199 任務的優先級 200 高 201 中 202 低 203 用PriorityBlockingQueue 204 任務的執行時間 205 長 206 中 207 短 208 任務的依賴性 209 是否依賴其他系統資源,比如數據庫連接 210 等待返回結果的時間越長,CPU空閑時間越長,那么線程數應該設置的越大,以便更好利用線程池 211 建議使用有界隊列。有界隊列能夠提高系統的穩定性和預警能力 ,無界隊列會直接撐爆內存,導致系統不可用 212 用PriorityBlockingQueue ,讓執行時間短的任務先執行 213 線程池的監控 214 方便出問題時,可以根據線程池的使用情況,快速定位問題 215 參數 216 taskCount 217 線程池需要執行的任務數量 218 completedTaskCount 219 線程池在運行過程中已經完成的任務數量 220 largestPoolSize 221 線程池曾將創建過的最大線程數量。如果該數字等於線程池的最大大小,表示線程池曾經滿過 222 getPoolSize 223 線程池的線程數量 224 getActiveCount 225 獲取活動的線程數 226 重寫方法 227 beforeExecute() 228 任務執行前 229 afterExecute() 230 任務執行后 231 terminated() 232 線程池關閉前 233 ScheduledThreadPoolExecutor 234 可以在給定的延遲后運行命令,或者定期執行命令,與Timer類似,但比其功能更強大,更靈活 235 ScheduledThreadPoolExecutor V.S. Timer 236 ScheduledThreadPoolExecutor可以再構造函數中指定多個對應的后台進程數 237 Timer對應的是單個后台進程 238 內部類 239 DelayedWorkQueue 240 所使用的阻塞隊列變成了DelayedWorkQueue 241 DelayedWorkQueue為ScheduledThreadPoolExecutor中的內部類,它其實和阻塞隊列DelayQueue有點兒類似 242 DelayedWorkQueue中的任務必然是按照延遲時間從短到長來進行排序的 243 Reentrant+Condition 244 ScheduledFutureTask 245 待調度的任務 246 ScheduledFutureTask內部繼承FutureTask,實現RunnableScheduledFuture接口 247 三個比較重要的變量 248 private final long sequenceNumber; 249 /** 任務被添加到ScheduledThreadPoolExecutor中的序號 */ 250 private long time; 251 /** 任務要執行的具體時間 */ 252 private final long period; 253 /** 任務的間隔周期 */ 254 compareTo方法,提供一個排序算法,該算法規則是:首先按照time排序,time小的排在前面,大的排在后面,如果time相同,則使用sequenceNumber排序,小的排在前面,大的排在后面。 255 compareTo()方法使用於DelayedWorkQueue隊列對其元素ScheduledThreadPoolExecutor task進行排序的算法 256 創建 257 通常使用工廠類Executors來創建 258 2種類型 259 ScheduledThreadPoolExecutor 260 包含固定個數個線程 261 適用於資源管理而需要限制線程數的場景 262 SingleThreadScheduledExecutor 263 包含一個線程 264 適用於單個線程,需要保證順序執行各個任務的場景 265 4個調度器 266 schedule(Callable callable, long delay, TimeUnit unit) :創建並執行在給定延遲后啟用的 ScheduledFuture。 267 schedule(Runnable command, long delay, TimeUnit unit) :創建並執行在給定延遲后啟用的一次性操作。 268 scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :創建並執行一個在給定初始延遲后首次啟用的定期操作,后續操作具有給定的周期;也就是將在 initialDelay 后開始執行,然后在 initialDelay+period 后執行,接着在 initialDelay + 2 * period 后執行,依此類推。 269 scheduleAtFixedRate是周期固定,也就說它是不會受到這個延遲的影響的,每個線程的調度周期在初始化時就已經絕對了,是什么時候調度就是什么時候調度,它不會因為上一個線程的調度失效延遲而受到影響。 270 scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :創建並執行一個在給定初始延遲后首次啟用的定期操作,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲。 271 scheduleWithFixedDelay是每個線程的調度間隔固定,也就是說第一個線程與第二線程之間間隔delay,第二個與第三個間隔delay,以此類推。 272 調度和執行 273 run() 274 1. 調用isPeriodic()獲取該線程是否為周期性任務標志,然后調用canRunInCurrentRunState()方法判斷該線程是否可以執行,如果不可以執行則調用cancel()取消任務。 275 2. 如果當線程已經到達了執行點,則調用run()方法執行task,該run()方法是在FutureTask中定義的。 276 3. 否則調用runAndReset()方法運行並重置狀態,調用setNextRunTime()方法重新計算任務的下次執行時間,重新把任務添加到隊列中,讓該任務可以重復執行。 277 reExecutePeriodic重要的是調用super.getQueue().add(task);將任務task加入的隊列DelayedWorkQueue中 278 FutureTask 279 實現了Future接口和Runnable接口,代表異步計算的結果, 280 因為實現了Runnable接口,可以交給Executor執行,也可以由調用線程直接執行 281 重要操作 282 run() 3種狀態 283 未啟動 284 已啟動 285 已完成 286 正常結束 287 取消而結束 288 異常而結束 289 get() 290 3種狀態 291 未啟動 292 阻塞 293 底層是 LockSupport.park(); 294 已啟動 295 阻塞 296 底層是 LockSupport.park(); 297 已完成 298 立即返回結果或者拋出異常 299 cannel() 300 3種狀態 301 未啟動 302 任務不會被執行 303 已啟動 304 cannel(true):中斷執行任務的線程 305 cannel(false):不中斷執行任務的線程 306 已完成 307 返回false 308 應用場景 309 當一個線程需要等待另一個線程把某個任務執行完后它才能繼續執行