一、在任務和執行策略之間隱性耦合
Executor框架將任務的提交和它的執行策略解耦開來。雖然Executor框架為制定和修改執行策略提供了相當大的靈活性,但並非所有的任務都能適用所有的執行策略。
只有任務都是同類型並且相互獨立時,線程池的效率達到最佳
1、線程飢餓死鎖——在線程池中所有正在執行任務的線程都由於等待其他仍處於工作隊列中的任務而阻塞
例1:在單線程池中,正在執行的任務阻塞等待隊列中的某個任務執行完畢
例2:線程池不夠大時,通過柵欄機制協調多個任務時
例3:由於其他資源的隱性限制,每個任務都需要使用有限的數據庫連接資源,那么不管線程池多大,都會表現出和和連接資源相同的大小
每當提交了一個有依賴性的Executor任務時,要清楚地知道可能會出現線程"飢餓"死鎖,因此需要在代碼或配置Executor地配置文件中記錄線程池地大小限制或配置限制
2、運行時間較長的任務
線程池的大小應該超過有較長執行時間的任務數量,否則可能造成線程池中線程均服務於長時間任務導致其它短時間任務也阻塞導致性能下降
緩解策略:限定任務等待資源的時間,如果等待超時,那么可以把任務標示為失敗,然后中止任務或者將任務重新返回隊列中以便隨后執行。這樣,無論任務的最終結果是否成功,這種方法都能確保任務總能繼續執行下去,並將線程釋放出來以執行一些能更快完成的任務。例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等
二、設置線程池的大小
線程池的理想大小取決於被提交任務的類型及所部署系統的特性
對於計算密集型的任務,在擁有Ncpu個處理器的系統上,當線程池的大小為Ncpu+1時,通常能實現最優的利用率;對於包含I/O操作或者其他阻塞操作的任務,由於線程並不會一直執行,因此線程池的規模應該更大
N(threads)=N(cpu)*U(cpu)*(1+W/C) N(cpu)=CPU的數量=Runtime.getRuntime().availableProcessors(); U(cpu)= 期望CPU的使用率,0<=U(cpu)<=1 ;W/C=等待時間與運行時間的比率
三、配置ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
1、線程的創建與銷毀
newFixedThreadPool: CorePoolSize = MaxmumPoolSize
newCachedThreadPool: CorePoolSize=0,MaxmumPoolSize=Integer.MAX_VALUE,線程池可被無限擴展,需求降低時自動回收
2、管理隊列任務
newFixedThreadPool和newSingleThreadPool在默認情況下將使用一個無界的LinkedBlockingQueue,有更好的性能
使用有界隊列有助於避免資源耗盡的情況發生,為了避免當隊列填滿后,在使用有界的工作隊列時,隊列的大小與線程池的大小必須一起調節,能防止過載
對於非常大的或者無界的線程池,可以通過使用SynchronousQueue來避免任務排隊,要將一個元素放入SynchronousQueue中,必須有另一個線程正在等待接受這個元素,任務會直接移交給執行它的線程,否則將拒絕任務。newCachedThreadPool工廠方法中就使用了SynchronousQueue
使用優先隊列PriorityBlockingQueue可以控制任務被執行的順序
3、飽和策略
其他:對執行策略進行修改,使用信號量,控制處於執行中的任務
public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command){ try { semaphore.acquire(); //提交任務前請求信號量 exec.execute(new Runnable() { @Override public void run() { try{ command.run(); } finally{ semaphore.release(); //執行完釋放信號 } } }); } catch (InterruptedException e) { // handle exception } } }
4、線程工廠
通過自定義線程工廠可以對其進行擴展加入新的功能實現
當應用需要利用安全策略來控制某些特殊代碼庫的訪問權,可以利用PrivilegedThreadFactory來定制自己的線程工廠,以免出現安全性異常。將與創建privilegedThreadFactory的線程擁有相同的訪問權限、AccessControlContext和contextClassLoader
public class MyThreadFactory implements ThreadFactory { private final String poolName; public MyThreadFactory(String poolName) { super(); this.poolName = poolName; } @Override public Thread newThread(Runnable r) { return new MyAppThread(r); } } public class MyAppThread extends Thread { public static final String DEFAULT_NAME="MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable r, String name) { super(r, name+ "-" + created.incrementAndGet()); setUncaughtExceptionHandler( //設置未捕獲的異常發生時的處理器 new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } @Override public void run() { boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "running thread " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "existing thread " + getName()); } } }
5、在調用構造函數后在定制ThreadPoolExecutor
四、擴展ThreadPoolExecutor
ThreadPoolExecutor使用了模板方法模式,提供了beforeExecute、afterExecute和terminated擴展方法
public class TimingThreadPoolExecutor extends ThreadPoolExecutor { private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();//任務執行開始時間 private final Logger log = Logger.getAnonymousLogger(); private final AtomicLong numTasks = new AtomicLong(); //統計任務數 private final AtomicLong totalTime = new AtomicLong(); //線程池運行總時間 public TimingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); log.fine(String.format("Thread %s: start %s", t, r)); startTime.set(System.nanoTime()); } @Override protected void afterExecute(Runnable r, Throwable t) { try{ long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime)); } finally{ super.afterExecute(r, t); } } @Override protected void terminated() { try{ //任務執行平均時間 log.info(String.format("Terminated: average time=%dns", totalTime.get() / numTasks.get())); }finally{ super.terminated(); } } }
五、遞歸算法的並行化
本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。