第八章:線程池的使用——Java並發編程實戰


一、在任務和執行策略之間隱性耦合

Executor框架將任務的提交和它的執行策略解耦開來。雖然Executor框架為制定和修改執行策略提供了相當大的靈活性,但並非所有的任務都能適用所有的執行策略。

  • 依賴性任務:依賴其他同步任務的結果,使其不得不順序執行,影響活躍性
  • 使用線程封閉的任務:在單線程的Executor中執行,任務可以不是線程安全的,但是一旦提交到線程池時,就會失去線程安全
  • 對響應時間敏感的任務:在單個線程或含有少量線程的線程池中執行是不可接受的
  • 使用ThreadLocal的任務:ThreadLocal使每個線程都可以擁有某個變量的一個私有"版本",而線程池中的線程是重復使用的,即一次使用完后,會被重新放回線程池,可被重新分配使用。因此,ThreadLocal線程變量,如果保存的信息只是針對一次請求的,放回線程池之前需要清空這些Threadlocal變量的值(或者取得線程之后,首先清空這些Threadlocal變量的值)

只有任務都是同類型並且相互獨立時,線程池的效率達到最佳

1、線程飢餓死鎖——在線程池中所有正在執行任務的線程都由於等待其他仍處於工作隊列中的任務而阻塞

  例1:在單線程池中,正在執行的任務阻塞等待隊列中的某個任務執行完畢

  例2:線程池不夠大時,通過柵欄機制協調多個任務時

  例3:由於其他資源的隱性限制,每個任務都需要使用有限的數據庫連接資源,那么不管線程池多大,都會表現出和和連接資源相同的大小 

每當提交了一個有依賴性的Executor任務時,要清楚地知道可能會出現線程"飢餓"死鎖,因此需要在代碼或配置Executor地配置文件中記錄線程池地大小限制或配置限制

2、運行時間較長的任務

  線程池的大小應該超過有較長執行時間的任務數量,否則可能造成線程池中線程均服務於長時間任務導致其它短時間任務也阻塞導致性能下降

緩解策略:限定任務等待資源的時間,如果等待超時,那么可以把任務標示為失敗,然后中止任務或者將任務重新返回隊列中以便隨后執行。這樣,無論任務的最終結果是否成功,這種方法都能確保任務總能繼續執行下去,並將線程釋放出來以執行一些能更快完成的任務。例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等

 

二、設置線程池的大小

線程池的理想大小取決於被提交任務的類型及所部署系統的特性

  • 線程池過大,那么大量的線程將在相對很少的CPU和內存資源上發生競爭,這不僅會導致更高的內存使用量,而且還可能耗盡資源
  • 如果線程池過小,那么將導致許多空閑的處理器無法執行工作,從而降低吞吐量

對於計算密集型的任務,在擁有Ncpu個處理器的系統上,當線程池的大小為Ncpu+1時,通常能實現最優的利用率;對於包含I/O操作或者其他阻塞操作的任務,由於線程並不會一直執行,因此線程池的規模應該更大

N(threads)=N(cpu)*U(cpu)*(1+W/C)   N(cpu)=CPU的數量=Runtime.getRuntime().availableProcessors(); U(cpu)= 期望CPU的使用率,0<=U(cpu)<=1 ;W/C=等待時間與運行時間的比率

三、配置ThreadPoolExecutor

 

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

1、線程的創建與銷毀

  • CorePoolSize: 線程池基本大小,在創建ThreadPoolExecutor初期,線程並不會立即啟動,而是等到有任務提交時才會啟動,除非調用prestartAllCoreThreads,並且只有在工作隊列滿了的情況下才會創建超出這個數量的線程。
  • MaxmumPooSize: 線程池最大大小表示可同時活動的線程數量的上限。若某個線程的空閑時間超過了keepAliveTime, 則被標記為可回收的

newFixedThreadPool: CorePoolSize = MaxmumPoolSize

newCachedThreadPool: CorePoolSize=0,MaxmumPoolSize=Integer.MAX_VALUE,線程池可被無限擴展,需求降低時自動回收

2、管理隊列任務

  • workQueue:用於保存超過線程池線程處理速率的Runnable任務的隊列 (三種:無界隊列、有界隊列和同步移交)

newFixedThreadPool和newSingleThreadPool在默認情況下將使用一個無界的LinkedBlockingQueue,有更好的性能

使用有界隊列有助於避免資源耗盡的情況發生,為了避免當隊列填滿后,在使用有界的工作隊列時,隊列的大小與線程池的大小必須一起調節,能防止過載

對於非常大的或者無界的線程池,可以通過使用SynchronousQueue來避免任務排隊,要將一個元素放入SynchronousQueue中,必須有另一個線程正在等待接受這個元素,任務會直接移交給執行它的線程,否則將拒絕任務。newCachedThreadPool工廠方法中就使用了SynchronousQueue

使用優先隊列PriorityBlockingQueue可以控制任務被執行的順序

3、飽和策略

  • AbortPolicy(中止策略),默認的飽和策略。會拋出RejectedExecutionException異常(拋棄當前任務vs拋棄最舊任務)
  • 調用者運行:下一個任務在調用了execute方法的主線程中進行運行,主線程至少在一段時間內不能提交任何任務。到達的請求將被保存在TCP層的隊列中而不是在應用程序的隊列中,導致服務器在高負載下實現一種平緩的性能降低

其他:對執行策略進行修改,使用信號量,控制處於執行中的任務

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;
    
    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }
    
    public void submitTask(final Runnable command){
        try {
            semaphore.acquire(); //提交任務前請求信號量
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try{
                        command.run();
                    } finally{
                        semaphore.release(); //執行完釋放信號
                    }
                }
            });
        } catch (InterruptedException e) {
            // handle exception
        }
    }
}

4、線程工廠

通過自定義線程工廠可以對其進行擴展加入新的功能實現

當應用需要利用安全策略來控制某些特殊代碼庫的訪問權,可以利用PrivilegedThreadFactory來定制自己的線程工廠,以免出現安全性異常。將與創建privilegedThreadFactory的線程擁有相同的訪問權限、AccessControlContext和contextClassLoader

public class MyThreadFactory implements ThreadFactory {
    private final String poolName;
    
    public MyThreadFactory(String poolName) {
        super();
        this.poolName = poolName;
    }

    @Override
    public Thread newThread(Runnable r) {
        return new MyAppThread(r);
    }
}

public class MyAppThread extends Thread {
    public static final String DEFAULT_NAME="MyAppThread";
    private static volatile boolean debugLifecycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();
    
    public MyAppThread(Runnable r) {
        this(r, DEFAULT_NAME);
    }

    public MyAppThread(Runnable r, String name) {
        super(r, name+ "-" + created.incrementAndGet());
        setUncaughtExceptionHandler( //設置未捕獲的異常發生時的處理器
                new Thread.UncaughtExceptionHandler() {
                    @Override
                    public void uncaughtException(Thread t, Throwable e) {
                        log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
                    }
                });
    }

    @Override
    public void run() {
        boolean debug = debugLifecycle;
        if (debug) 
            log.log(Level.FINE, "running thread " + getName());
        try {
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug) 
                log.log(Level.FINE, "existing thread " + getName());
        }
    }
}
自定義線程工廠

 

5、在調用構造函數后在定制ThreadPoolExecutor

  • 可以在創建線程池后,再通過Setter方法設置其基本屬性(將ExecutorService擴展為ThreadPoolExecutor)
  • 在Executors中包含一個unconfigurableExecutorService工廠方法,該方法對一個現有的ExecutorService進行包裝,使其只暴露出ExecutorService的方法,因此不能對它進行配置

 

四、擴展ThreadPoolExecutor

ThreadPoolExecutor使用了模板方法模式,提供了beforeExecute、afterExecute和terminated擴展方法

  • 線程執行前調用beforeExecute(如果beforeExecute拋出了一個RuntimeException,那么任務將不會被執行)
  • 線程執行后調用afterExecute(拋出異常也會調用,如果任務在完成后帶有一個Error,那么就不會調用afterExecute)
  • 在線程池完成關閉操作時調用terminated,也就是所有任務都已經完成並且所有工作者線程也已經關閉后
public class TimingThreadPoolExecutor extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();//任務執行開始時間
    private final Logger log = Logger.getAnonymousLogger();
    private final AtomicLong numTasks = new AtomicLong(); //統計任務數
    private final AtomicLong totalTime = new AtomicLong(); //線程池運行總時間

    public TimingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        log.fine(String.format("Thread %s: start %s", t, r));
        startTime.set(System.nanoTime());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try{
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
        } finally{
            super.afterExecute(r, t);
        }
    }

    @Override
    protected void terminated() {
        try{
            //任務執行平均時間
            log.info(String.format("Terminated: average time=%dns", totalTime.get() / numTasks.get()));
        }finally{
            super.terminated();
        }
    }
}
增加日志和記時等功能的線程池

 

五、遞歸算法的並行化

  • 如果循環中的迭代操作都是獨立的,並且不需要等待所有的迭代操作都完成再繼續執行,那么就可以使用Executor將串行循環轉化為並行循環
  • 如果需要提交一個任務集並等待它們完成,那么可以使用ExecutorService.invokeAll
  • 如果遞歸執行的任務中,在每個迭代操作中都不需要來自於后續遞歸迭代的結果,可以創建一個特定於遍歷過程的Executor,並使用shutdown和awaitTermination等方法,等待上面並行運行的結果


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM