Java線程池使用和常用參數
多線程問題:
1、java中為什么要使用多線程
使用多線程,可以把一些大任務分解成多個小任務來執行,多個小任務之間互不影像,同時進行,這樣,充分利用了cpu資源。
2、java中簡單的實現多線程的方式
繼承Thread類,重寫run方法;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
class
MyTread
extends
Thread{
public
void
run() {
System.out.println(Thread.currentThread().getName());
}
}
實現Runable接口,實現run方法;
class
MyRunnable
implements
Runnable{
public
void
run() {
System.out.println(Thread.currentThread().getName());
}
}
class
ThreadTest {
public
static
void
main(String[] args) {
MyTread thread =
new
Mythread();
thread.start();
//開啟一個線程
MyRunnable myRunnable =
new
MyRunnable();
Thread runnable =
new
Thread(myRunnable);
runnable.start();
//開啟一個線程
}
}
|
3、java線程的狀態
創建:當new了一個線程,並沒有調用start之前,線程處於創建狀態;
就緒:當調用了start之后,線程處於就緒狀態,這是,線程調度程序還沒有設置執行當前線程;
運行:線程調度程序執行到線程時,當前線程從就緒狀態轉成運行狀態,開始執行run方法里邊的代碼;
阻塞:線程在運行的時候,被暫停執行(通常等待某項資源就緒后在執行,sleep、wait可以導致線程阻塞),這是該線程處於阻塞狀態;
死亡:當一個線程執行完run方法里邊的代碼或調用了stop方法后,該線程結束運行
4、為什么要引入線程池
當我們需要的並發執行線程數量很多時,且每個線程執行很短的時間就結束了,這樣,我們頻繁的創建、銷毀線程就大大降低了工作效率(創建和銷毀線程需要時間、資源)。
java中的線程池可以達到這樣的效果:一個線程執行完任務之后,繼續去執行下一個任務,不被銷毀,這樣線程利用率提高了。
5、java中的線程池(ThreadPoolExecutor)
說起java中的線程池,就想到java.util.concurrent.ThreadPoolExecutor。ThreadPoolExecutor類是java線程池中的核心類。他的實現方式有四種:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
public
class
ThreadPoolExecutor
extends
AbstractExecutorService {
public
ThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this
(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public
ThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this
(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public
ThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this
(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public
ThreadPoolExecutor(
int
corePoolSize,
int
maximumPoolSize,
long
keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if
(corePoolSize <
0
||
maximumPoolSize <=
0
||
maximumPoolSize < corePoolSize ||
keepAliveTime <
0
)
throw
new
IllegalArgumentException();
if
(workQueue ==
null
|| threadFactory ==
null
|| handler ==
null
)
throw
new
NullPointerException();
this
.corePoolSize = corePoolSize;
this
.maximumPoolSize = maximumPoolSize;
this
.workQueue = workQueue;
this
.keepAliveTime = unit.toNanos(keepAliveTime);
this
.threadFactory = threadFactory;
this
.handler = handler;
}
|
通過ThreadPoolExecutor類的源碼可以看出,ThreadPoolExecutor類繼承AbstractExecutorService,提供四個構造方法,通過構造方法可以看出前面三個最終掉了最后一個
下面介紹下構造方法中的參數:
corePoolSize:線程池的大小。線程池創建之后不會立即去創建線程,而是等待線程的到來。當當前執行的線程數大於改值是,線程會加入到緩沖隊列;
maximumPoolSize:線程池中創建的最大線程數;
keepAliveTime:空閑的線程多久時間后被銷毀。默認情況下,改值在線程數大於corePoolSize時,對超出corePoolSize值得這些線程起作用。
unit:TimeUnit枚舉類型的值,代表keepAliveTime時間單位,可以取下列值:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鍾
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
workQueue:阻塞隊列,用來存儲等待執行的任務,決定了線程池的排隊策略,有以下取值:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
threadFactory:線程工廠,是用來創建線程的。默認new Executors.DefaultThreadFactory();
handler:線程拒絕策略。當創建的線程超出maximumPoolSize,且緩沖隊列已滿時,新任務會拒絕,有以下取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
以下是具體的實現方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
//默認策略。使用該策略時,如果線程池隊列滿了丟掉這個任務並且拋出RejectedExecutionException異常
class
AbortPolicy
implements
RejectedExecutionHandler{
public
void
rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw
new
RejectedExecutionException(
"Task "
+ r.toString() +
" rejected from "
+
executor.toString());
}
}
//如果線程池隊列滿了,會直接丟掉這個任務並且不會有任何異常
class
DiscardPolicy
implements
RejectedExecutionHandler{
public
void
rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
}
//丟棄最老的,會將最早進入隊列的任務刪掉騰出空間,再嘗試加入隊列
class
DiscardOldestPolicy
implements
RejectedExecutionHandler{
public
void
rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if
(!executor.isShutdown()) {
//移除隊頭元素
executor.getQueue().poll();
//再嘗試入隊
executor.execute(r);
}
}
}
//主線程會自己去執行該任務,不會等待線程池中的線程去執行
class
CallerRunsPolicy
implements
RejectedExecutionHandler{
public
void
rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if
(!executor.isShutdown()) {
//直接執行run方法
r.run();
}
}
}
|
以下是ThreadPoolExecutor具體的繼承結構
1
2
3
|
public
abstract
class
AbstractExecutorService
implements
ExecutorService {
}
|
這是一個抽象類,實現了ExecutorService接口,並實現了ExecutorService里邊的方法,下面看下ExecutorService接口的具體實現
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public
interface
ExecutorService
extends
Executor {
void
shutdown();
List<Runnable> shutdownNow();
boolean
isShutdown();
boolean
isTerminated();
boolean
awaitTermination(
long
timeout, TimeUnit unit)
throws
InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<?
extends
Callable<T>> tasks)
throws
InterruptedException;
<T> List<Future<T>> invokeAll(Collection<?
extends
Callable<T>> tasks,
long
timeout, TimeUnit unit)
throws
InterruptedException;
<T> T invokeAny(Collection<?
extends
Callable<T>> tasks)
throws
InterruptedException, ExecutionException;
<T> T invokeAny(Collection<?
extends
Callable<T>> tasks,
long
timeout, TimeUnit unit)
throws
InterruptedException, ExecutionException, TimeoutException;
}
|
ExecutorService繼承Executor接口,下面是Executor接口的具體實現
1
2
3
|
public
interface
Executor {
void
execute(Runnable command);
}
|
Executor接口是頂層接口,只聲明了一個execute方法,該方法是用來執行傳遞進來的任務的。
回過頭來,咱么重新看ThreadPoolExecutor類,改類里邊有以下兩個重要的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public
void
execute(Runnable command) {
if
(command ==
null
)
throw
new
NullPointerException();
int
c = ctl.get();
if
(workerCountOf(c) < corePoolSize) {
if
(addWorker(command,
true
))
return
;
c = ctl.get();
}
if
(isRunning(c) && workQueue.offer(command)) {
int
recheck = ctl.get();
if
(! isRunning(recheck) && remove(command))
reject(command);
else
if
(workerCountOf(recheck) ==
0
)
addWorker(
null
,
false
);
}
else
if
(!addWorker(command,
false
))
reject(command);
}
public
<T> Future<T> submit(Callable<T> task) {
if
(task ==
null
)
throw
new
NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return
ftask;
}
|
execute()方法是Executor中聲明的方法,在ThreadPoolExecutor有了具體的實現,這個方法是ThreadPoolExecutor的核心方法,
通過這個方法可以向線程池提交一個任務,交由線程池去執行
submit()方法是ExecutorService中聲明的方法,在AbstractExecutorService中進行了實現,Executor中並沒有對其進行重寫。從實現中可以看出,submit方法最終也調用了execute
方法,也是執行一個人去,但submit方法可以返回執行結果,利用Future來獲取任務執行結果。
6、Spring中的線程池
Spring中的線程池是由ThreadPoolTaskExecutor類來實現的。該類的實現原理最終也是調用了java中的ThreadPoolExecutor類中的一些方法。具體的實現讀者可以自己去翻閱Spring
的源碼,這里筆者就不羅列了。我們看下ThreadPoolTaskExecutor的初始化。
ThreadPoolTaskExecutor有兩種常用的有兩種初始化方式:xml配置,java代碼初始化
xml配置:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="5" /> <property name="keepAliveSeconds" value="200" /> <property name="maxPoolSize" value="10" /> <property name="queueCapacity" value="20" /> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean>
看過上面的內容,讀者應該很清楚上面的一些參數代表的意思了吧。筆者在這里不一一去解釋了。
1
2
3
4
5
6
7
8
9
10
11
12
|
public
MyThreadPoolTaskExecutor {
@Autowired
private
ThreadPoolTaskExecutor taskExecutor;
private
void
test(){
taskExecutor.execute(
new
Runnable(){
@Override
public
void
run() {
//執行的代碼
}});
}
}
|
Java代碼初始化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private
void
test2(){
ThreadPoolTaskExecutor executor =
new
ThreadPoolTaskExecutor();
executor.setCorePoolSize(
10
);
executor.setMaxPoolSize(
15
);
executor.setKeepAliveSeconds(
1
);
executor.setQueueCapacity(
5
);
executor.setRejectedExecutionHandler(
new
ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
executor.execute(
new
Runnable(){
@Override
public
void
run() {
//執行的代碼
}
});
}
|
常用參數總結:
關於Java線程池的參數設置: 線程池是Java多線程里開發里的重要內容,使用難度不大,但如何用好就要明白參數的含義和如何去設置。干貨里的內容大多是參考別人的,加入了一些知識點的擴充和看法。希望能對多線程開發學習的童鞋有些啟發和幫助。
一、ThreadPoolExecutor的重要參數
1、corePoolSize:核心線程數
* 核心線程會一直存活,及時沒有任務需要執行
* 當線程數小於核心線程數時,即使有線程空閑,線程池也會優先創建新線程處理
* 設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉
2、queueCapacity:任務隊列容量(阻塞隊列)
* 當核心線程數達到最大時,新任務會放在隊列中排隊等待執行
3、maxPoolSize:最大線程數
* 當線程數>=corePoolSize,且任務隊列已滿時。線程池會創建新線程來處理任務
* 當線程數=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常
4、 keepAliveTime:線程空閑時間
* 當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize
* 如果allowCoreThreadTimeout=true,則會直到線程數量=0
5、allowCoreThreadTimeout:允許核心線程超時
6、rejectedExecutionHandler:任務拒絕處理器
* 兩種情況會拒絕處理任務:
- 當線程數已經達到maxPoolSize,切隊列已滿,會拒絕新任務
- 當線程池被調用shutdown()后,會等待線程池里的任務執行完畢,再shutdown。如果在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務
* 線程池會調用rejectedExecutionHandler來處理這個任務。如果沒有設置默認是AbortPolicy,會拋出異常
* ThreadPoolExecutor類有幾個內部實現類來處理這類情況:
- AbortPolicy 丟棄任務,拋運行時異常
- CallerRunsPolicy 執行任務
- DiscardPolicy 忽視,什么都不會發生
- DiscardOldestPolicy 從隊列中踢出最先進入隊列(最后一個執行)的任務
* 實現RejectedExecutionHandler接口,可自定義處理器
二、ThreadPoolExecutor執行順序
線程池按以下行為執行任務
1. 當線程數小於核心線程數時,創建線程。
2. 當線程數大於等於核心線程數,且任務隊列未滿時,將任務放入任務隊列。
3. 當線程數大於等於核心線程數,且任務隊列已滿
- 若線程數小於最大線程數,創建線程
- 若線程數等於最大線程數,拋出異常,拒絕任務
三、如何設置參數
1、默認值
* corePoolSize=1
* queueCapacity=Integer.MAX_VALUE
* maxPoolSize=Integer.MAX_VALUE
* keepAliveTime=60s
* allowCoreThreadTimeout=false
* rejectedExecutionHandler=AbortPolicy()
2、如何來設置
* 需要根據幾個值來決定
- tasks :每秒的任務數,假設為500~1000
- taskcost:每個任務花費時間,假設為0.1s
- responsetime:系統允許容忍的最大響應時間,假設為1s
* 做幾個計算
- corePoolSize = 每秒需要多少個線程處理?
* threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個線程。corePoolSize設置應該大於50
* 根據8020原則,如果80%的每秒任務數小於800,那么corePoolSize設置為80即可
- queueCapacity = (coreSizePool/taskcost)*responsetime
* 計算可得 queueCapacity = 80/0.1*1 = 800。意思是隊列里的線程可以等待1s,超過了的需要新開線程來執行
* 切記不能設置為Integer.MAX_VALUE,這樣隊列會很大,線程數只會保持在corePoolSize大小,當任務陡增時,不能新開線程來執行,響應時間會隨之陡增。
- maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
* 計算可得 maxPoolSize = (1000-800)/10 = 20
* (最大任務數-隊列容量)/每個線程每秒處理能力 = 最大線程數
- rejectedExecutionHandler:根據具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩沖機制來處理
- keepAliveTime和allowCoreThreadTimeout采用默認通常能滿足
3、 以上都是理想值,實際情況下要根據機器性能來決定。如果在未達到最大線程數的情況機器cpu load已經滿了,則需要通過升級硬件(呵呵)和優化代碼,降低taskcost來處理。
設置。