最近項目中要用到多線程處理任務,自然就用到了ThreadPoolTaskExecutor這個對象,這個是spring對於Java的concurrent包下的ThreadPoolExecutor類的封裝,對於超出等待隊列大小的任務默認是使用RejectedExecutionHandler去處理拒絕的任務,而這個Handler的默認策略是AbortPolicy,直接拋出RejectedExecutionException異常,這個不符合我們的業務場景,
業務需求:我希望是對於超出的任務,主線程進行阻塞,直到有可用線程,簡單的代碼如下
package com.quant.dev.modules.dev.enetity;
import lombok.extern.slf4j.Slf4j;
import java.net.URL;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @program: dev
* @description:
* @author: Mr.EternityZhang
* @create: 2019-07-08 17:41
*/
@Slf4j
public class TestThread {
static class ThreadFactoryCustom implements ThreadFactory{
private final AtomicInteger threadNum=new AtomicInteger(1);
private final String namePrefix;
private ThreadFactoryCustom(String namePrefix){
this.namePrefix=namePrefix+"-";
}
@Override
public Thread newThread(Runnable r) {
Thread t=new Thread(r,namePrefix+threadNum.getAndIncrement());
if(t.isDaemon()){
t.setDaemon(true);
}
if(t.getPriority()!=Thread.NORM_PRIORITY){
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
static class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
private final String threadName;
private final URL url;
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Provider端線程池滿!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)" ,
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
log.warn(msg);
if (!e.isShutdown()) {
try {
log.info("start get queue");
e.getQueue().put(r);
log.info("end get queue");
} catch (InterruptedException ee) {
log.error(ee.toString(), ee);
Thread.currentThread().interrupt();
}
}
}
}
public static ThreadFactory getThreadFactoryCustom(String name){
return new ThreadFactoryCustom(name);
}
public static void main(String[] args) {
String poolName="eternity";
ThreadFactory factory=getThreadFactoryCustom(poolName);
log.info("核數={}",Runtime.getRuntime().availableProcessors());
ThreadPoolExecutor executor=
new ThreadPoolExecutor(100,400,5,
TimeUnit.SECONDS,new LinkedBlockingDeque<>(400),factory,new AbortPolicyWithReport(poolName,null));
Long begin=System.currentTimeMillis();
CountDownLatch count=new CountDownLatch(2000);
AtomicInteger integer=new AtomicInteger(1);
for(int i=0;i<2000;i++){
executor.execute(()->{
try {
log.info("當前線程為={},數值={}",Thread.currentThread().getName(),integer);
integer.getAndIncrement();
Thread.sleep(500);
count.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
}
try {
count.await();
log.info("阻塞數值={}",count.getCount());
log.info("活躍數量={}",executor.getActiveCount());
if(executor.getActiveCount()==0){
executor.shutdown();
}
} catch (Exception e) {
e.printStackTrace();
}
log.info("耗時={}------結果={}",System.currentTimeMillis()-begin,integer);
}
}