重寫ThreadFactory方法和拒絕策略


最近項目中要用到多線程處理任務,自然就用到了ThreadPoolTaskExecutor這個對象,這個是spring對於Java的concurrent包下的ThreadPoolExecutor類的封裝,對於超出等待隊列大小的任務默認是使用RejectedExecutionHandler去處理拒絕的任務,而這個Handler的默認策略是AbortPolicy,直接拋出RejectedExecutionException異常,這個不符合我們的業務場景,

業務需求:我希望是對於超出的任務,主線程進行阻塞,直到有可用線程,簡單的代碼如下

package com.quant.dev.modules.dev.enetity;

import lombok.extern.slf4j.Slf4j;

import java.net.URL;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @program: dev
 * @description:
 * @author: Mr.EternityZhang
 * @create: 2019-07-08 17:41
 */
@Slf4j
public class TestThread {


    static class ThreadFactoryCustom implements ThreadFactory{
        private final AtomicInteger threadNum=new AtomicInteger(1);
        private final String namePrefix;

        private ThreadFactoryCustom(String namePrefix){
            this.namePrefix=namePrefix+"-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t=new Thread(r,namePrefix+threadNum.getAndIncrement());
            if(t.isDaemon()){
                t.setDaemon(true);
            }
            if(t.getPriority()!=Thread.NORM_PRIORITY){
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }

    static class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

        private final String threadName;

        private final URL url;

        public AbortPolicyWithReport(String threadName, URL url) {
            this.threadName = threadName;
            this.url = url;
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            String msg = String.format("Provider端線程池滿!" +
                            " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                            " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)" ,
                    threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                    e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
            log.warn(msg);
            if (!e.isShutdown()) {
                try {
                    log.info("start get queue");
                    e.getQueue().put(r);
                    log.info("end get queue");
                } catch (InterruptedException ee) {
                    log.error(ee.toString(), ee);
                    Thread.currentThread().interrupt();
                }
            }
        }

    }

    public static ThreadFactory getThreadFactoryCustom(String name){
        return new ThreadFactoryCustom(name);
    }

    public static void main(String[] args) {
        String poolName="eternity";
        ThreadFactory factory=getThreadFactoryCustom(poolName);
        log.info("核數={}",Runtime.getRuntime().availableProcessors());
        ThreadPoolExecutor executor=
                new ThreadPoolExecutor(100,400,5,
                        TimeUnit.SECONDS,new LinkedBlockingDeque<>(400),factory,new AbortPolicyWithReport(poolName,null));
        Long begin=System.currentTimeMillis();
        CountDownLatch count=new CountDownLatch(2000);
        AtomicInteger integer=new AtomicInteger(1);
        for(int i=0;i<2000;i++){
            executor.execute(()->{
                try {
                    log.info("當前線程為={},數值={}",Thread.currentThread().getName(),integer);
                    integer.getAndIncrement();
                    Thread.sleep(500);
                    count.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        try {
            count.await();
            log.info("阻塞數值={}",count.getCount());
            log.info("活躍數量={}",executor.getActiveCount());
            if(executor.getActiveCount()==0){
                executor.shutdown();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("耗時={}------結果={}",System.currentTimeMillis()-begin,integer);
    }
}

阻塞原理:

之所以能實現阻塞,是基於BlockingQueue的put方法來實現的,當阻塞隊列滿時,put方法會一直等待

參考

https://www.jianshu.com/p/3cfd943996a1


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM