自定義線程池實踐 不影響主程序的情況下, 調用第三方接口,超時停止訪問 實踐


情景: 在高並發的請求中,  立刻返回響應消息,  調用第三方接口 繼續執行, 且超時調用停止該線程,  

 

package com.cjcx.inter.utils;

import java.util.concurrent.*;

/**
 *
 */
public class ThreadPoolTest {

    // 線程池維護線程的最少數量
    private static Integer corePoolSize = 2;

    // 線程池維護線程的最大數量
    private static Integer maxPoolSize = 10;

    // 線程池維護線程所允許的空閑時間
    private static Integer keepAliveTime = 90;

    // 線程池所使用的緩沖隊列大小
    private static Integer workQueueSize = 1024;


    public static void main(String[] args) {
        try {
            ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize,
                    maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(),
                    new ThreadPoolExecutor.CallerRunsPolicy());

            // 每秒輸出線程池情況
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        while (true) {
                            Executors.newFixedThreadPool(2);
                            Thread.sleep(3000);
                            System.out.println("============================================活躍的線程數:" + pool.getActiveCount() + ",核心線程數:" + pool.getCorePoolSize() + ",線程池大小:" + pool.getPoolSize() + ",隊列的大小" + pool.getQueue().size() + "==============================================");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();

            // 多任務執行
            for (int i = 0; i < 2000; i++) {
                final int k = i;
                System.out.println("i:" + i);
                /*
                可丟棄任務(超出隊列) 后台定時沖DB拉取再處理的場景,
                if(pool.getQueue().size() > 1000){
                    return;
                }*/
                
                pool.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 任務 小票接口 上傳給IBC
                        try {
                            long s = System.currentTimeMillis();
                            System.out.println(Thread.currentThread().getName() + "===================== 任務" + k + " 開始");
                            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                                public Integer call() throws Exception {
                                    return executeTask(k);
                                }
                            });
                            try {
                                Integer result = future.get(1, TimeUnit.SECONDS);
                                System.out.println("設備ID: " + k + ", 結果:" + result + " 上傳接口程序 正常完成");
                                UploadDescribe.getInstance().record(System.currentTimeMillis() - s);
                            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                                future.cancel(true);
                                UploadDescribe.getInstance().cancel("DV_" + k);
                                System.out.println("設備ID: " + k + " 上傳接口程序 1秒內未完成, 停止上傳.退出");
                            } finally {
                                System.out.println(Thread.currentThread().getName() + "清理資源===================== 任務" + k + " 結束, 用時:" + (System.currentTimeMillis() - s));
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 上傳IBC任務
    static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    private static Integer executeTask(Integer k) {
        Integer r = -1;
        try {
            int s = (int) (Math.random() * 2000);
            System.out.println("k:" + k + ", sleep:" + s);
            Thread.sleep(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        r = 0;
        return r;
    }
}

 

 // 編寫一個類記錄 執行的成功,失敗次數,記錄成功的平均耗時

package com.cjcx.inter.utils;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 接口上傳情況
 * @author e58
 *
 */
public class UploadDescribe {
    
    private Logger logger = LoggerFactory.getLogger(UploadDescribe.class);
    
    private static UploadDescribe describe;
    
    private UploadDescribe() {}
    
    public synchronized static UploadDescribe getInstance() {
        if (describe == null) {
            describe = new UploadDescribe();
        }
        return describe;
    }
    
    private ReentrantLock put = new ReentrantLock();
    
    // 上傳超時時間ms
    private Long connectTimeOut = 5000L;
    
    // 上傳總次數
    private Integer uploadTimes = 0; 
    // 上傳平均耗時ms
    private Long uploadAverageTime = 0L;
    // 超時總次數
    private Integer cancelTimes = 0;
    // 超時的設備ID
    private Set<String> sets = new HashSet<String>();
    
    /**
     * 請求成功,記錄次數和耗時
     */
    public void record(Long increTime){
        try {
            put.lock();
            // 次數+1
            uploadTimes = uploadTimes + 1;
            // 平均時間
            if(uploadTimes != 0)
                uploadAverageTime = (uploadAverageTime + increTime ) / uploadTimes;
            
            desc();
        } catch (Exception e) {
            logger.error("UploadDescribe record:", e);
        } finally{
            put.unlock();
        }
    }
    
    /**
     * 設備請求超時耗時
     */
    public void cancel(String deviceId){
        try {
            put.lock();
            // 次數+1
            cancelTimes = cancelTimes + 1;
            // 平均時間
            sets.add(deviceId);
            
            desc();
        } catch (Exception e) {
            logger.error("UploadDescribe cancel:", e);
        } finally{
            put.unlock();
        }
    }
    
    /**
     * 清空數據
     */
    public void clear(){
        try {
            put.lock();
            uploadTimes = 0; 
            uploadAverageTime = 0L;
            cancelTimes = 0;
            sets.clear();
        } catch (Exception e) {
            logger.error("UploadDescribe clear:", e);
        } finally{
            put.unlock();
        }
    }
    
    /**
     * 輸出描述信息
     */
    public void desc(){
        logger.info("uploadTimes=" + uploadTimes + ", uploadAverageTime=" + uploadAverageTime + ", cancelTimes=" + cancelTimes + "]");
    }
    
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM