Android線程管理之ThreadPoolExecutor自定義線程池


前言:

     上篇主要介紹了使用線程池的好處以及ExecutorService接口,然后學習了通過Executors工廠類生成滿足不同需求的簡單線程池,但是有時候我們需要相對復雜的線程池的時候就需要我們自己來自定義一個線程池,今天來學習一下ThreadPoolExecutor,然后結合使用場景定義一個按照線程優先級來執行的任務的線程池。

     線程管理相關文章地址:

ThreadPoolExecutor

    ThreadPoolExecutor線程池用於管理線程任務隊列、若干個線程。

1.)ThreadPoolExecutor構造函數
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue workQueue,RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) 
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory, RejectedExecutionHandler handler)

  corePoolSize: 線程池維護線程的最少數量
  maximumPoolSize:線程池維護線程的最大數量
  keepAliveTime: 線程池維護線程所允許的空閑時間
  unit: 線程池維護線程所允許的空閑時間的單位
  workQueue: 線程池所使用的緩沖隊列
  threadFactory:線程池用於創建線程
  handler: 線程池對拒絕任務的處理策略

2.)創建新線程

   默認使用Executors.defaultThreadFactory(),也可以通過如下方式

    /**
     * 創建線程工廠
     */
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "download#" + mCount.getAndIncrement());
        }
    };
3.)線程創建規則

     ThreadPoolExecutor對象初始化時,不創建任何執行線程,當有新任務進來時,才會創建執行線程。構造ThreadPoolExecutor對象時,需要配置該對象的核心線程池大小和最大線程池大小

  1. 當目前執行線程的總數小於核心線程大小時,所有新加入的任務,都在新線程中處理。
  2. 當目前執行線程的總數大於或等於核心線程時,所有新加入的任務,都放入任務緩存隊列中。
  3. 當目前執行線程的總數大於或等於核心線程,並且緩存隊列已滿,同時此時線程總數小於線程池的最大大小,那么創建新線程,加入線程池中,協助處理新的任務。
  4. 當所有線程都在執行,線程池大小已經達到上限,並且緩存隊列已滿時,就rejectHandler拒絕新的任務。

4.)默認的RejectExecutionHandler拒絕執行策略

  1. AbortPolicy 直接丟棄新任務,並拋出RejectedExecutionException通知調用者,任務被丟棄
  2. CallerRunsPolicy 用調用者的線程,執行新的任務,如果任務執行是有嚴格次序的,請不要使用此policy
  3. DiscardPolicy 靜默丟棄任務,不通知調用者,在處理網絡報文時,可以使用此任務,靜默丟棄沒有幾乎處理的報文
  4. DiscardOldestPolicy 丟棄最舊的任務,處理網絡報文時,可以使用此任務,因為報文處理是有時效的,超過時效的,都必須丟棄

   我們也可以寫一些自己的RejectedExecutionHandler,例如拒絕時,直接將線程加入緩存隊列,並阻塞調用者,或根據任務的時間戳,丟棄超過限制的任務。

5.)任務隊列BlockingQueue 

    排隊原則

  1. 如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。

  2. 如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。

  3. 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。

   常見幾種BlockingQueue實現

     1. ArrayBlockingQueue :  有界的數組隊列
  2. LinkedBlockingQueue : 可支持有界/無界的隊列,使用鏈表實現
  3. PriorityBlockingQueue : 優先隊列,可以針對任務排序
  4. SynchronousQueue : 隊列長度為1的隊列,和Array有點區別就是:client thread提交到block queue會是一個阻塞過程,直到有一個worker thread連接上來poll task。

6.)線程池執行

  execute()方法中,調用了三個私有方法
  addIfUnderCorePoolSize():在線程池大小小於核心線程池大小的情況下,擴展線程池
  addIfUnderMaximumPoolSize():在線程池大小小於線程池大小上限的情況下,擴展線程池
  ensureQueuedTaskHandled():保證在線程池關閉的情況下,新加入隊列的線程也能正確處理

7.)線程池關閉

  shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務
  shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務

ThreadPoolExecutor實現優先級線程池

    1.)定義線程優先級枚舉
/**
 * 線程優先級
 */
public enum Priority {
    HIGH, NORMAL, LOW
}
 2.)定義線程任務
/**
 * 帶有優先級的Runnable類型
 */
/*package*/ class PriorityRunnable implements Runnable {

    public final Priority priority;//任務優先級
    private final Runnable runnable;//任務真正執行者
    /*package*/ long SEQ;//任務唯一標示

    public PriorityRunnable(Priority priority, Runnable runnable) {
        this.priority = priority == null ? Priority.NORMAL : priority;
        this.runnable = runnable;
    }

    @Override
    public final void run() {
        this.runnable.run();
    }
}
3.)定義一個PriorityExecutor繼承ThreadPoolExecutor
public class PriorityExecutor extends ThreadPoolExecutor {

    private static final int CORE_POOL_SIZE = 5;//核心線程池大小
    private static final int MAXIMUM_POOL_SIZE = 256;//最大線程池隊列大小
    private static final int KEEP_ALIVE = 1;//保持存活時間,當線程數大於corePoolSize的空閑線程能保持的最大時間。
    private static final AtomicLong SEQ_SEED = new AtomicLong(0);//主要獲取添加任務

    /**
     * 創建線程工廠
     */
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "download#" + mCount.getAndIncrement());
        }
    };


    /**
     * 線程隊列方式 先進先出
     */
    private static final Comparator<Runnable> FIFO = new Comparator<Runnable>() {
        @Override
        public int compare(Runnable lhs, Runnable rhs) {
            if (lhs instanceof PriorityRunnable && rhs instanceof PriorityRunnable) {
                PriorityRunnable lpr = ((PriorityRunnable) lhs);
                PriorityRunnable rpr = ((PriorityRunnable) rhs);
                int result = lpr.priority.ordinal() - rpr.priority.ordinal();
                return result == 0 ? (int) (lpr.SEQ - rpr.SEQ) : result;
            } else {
                return 0;
            }
        }
    };

    /**
     * 線程隊列方式 后進先出
     */
    private static final Comparator<Runnable> LIFO = new Comparator<Runnable>() {
        @Override
        public int compare(Runnable lhs, Runnable rhs) {
            if (lhs instanceof PriorityRunnable && rhs instanceof PriorityRunnable) {
                PriorityRunnable lpr = ((PriorityRunnable) lhs);
                PriorityRunnable rpr = ((PriorityRunnable) rhs);
                int result = lpr.priority.ordinal() - rpr.priority.ordinal();
                return result == 0 ? (int) (rpr.SEQ - lpr.SEQ) : result;
            } else {
                return 0;
            }
        }
    };

    /**
     * 默認工作線程數5
     *
     * @param fifo 優先級相同時, 等待隊列的是否優先執行先加入的任務.
     */
    public PriorityExecutor(boolean fifo) {
        this(CORE_POOL_SIZE, fifo);
    }

    /**
     * @param poolSize 工作線程數
     * @param fifo     優先級相同時, 等待隊列的是否優先執行先加入的任務.
     */
    public PriorityExecutor(int poolSize, boolean fifo) {
        this(poolSize, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(MAXIMUM_POOL_SIZE, fifo ? FIFO : LIFO), sThreadFactory);
    }

    public PriorityExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    /**
     * 判斷當前線程池是否繁忙
     * @return
     */
    public boolean isBusy() {
        return getActiveCount() >= getCorePoolSize();
    }

    /**
     * 提交任務
     * @param runnable
     */
    @Override
    public void execute(Runnable runnable) {
        if (runnable instanceof PriorityRunnable) {
            ((PriorityRunnable) runnable).SEQ = SEQ_SEED.getAndIncrement();
        }
        super.execute(runnable);
    }
}

里面定義了兩種線程隊列模式: FIFO(先進先出) LIFO(后進先出) 優先級相同的按照提交先后排序

4.)測試程序
 ExecutorService executorService = new PriorityExecutor(5, false);
        for (int i = 0; i < 20; i++) {
            PriorityRunnable priorityRunnable = new PriorityRunnable(Priority.NORMAL, new Runnable() {
                @Override
                public void run() {
                    Log.e(TAG, Thread.currentThread().getName()+"優先級正常");
                }
            });
            if (i % 3 == 1) {
                priorityRunnable = new PriorityRunnable(Priority.HIGH, new Runnable() {
                    @Override
                    public void run() {
                        Log.e(TAG, Thread.currentThread().getName()+"優先級高");
                    }
                });
            } else if (i % 5 == 0) {
                priorityRunnable = new PriorityRunnable(Priority.LOW, new Runnable() {
                    @Override
                    public void run() {
                        Log.e(TAG, Thread.currentThread().getName()+"優先級低");
                    }
                });
            }
            executorService.execute(priorityRunnable);
        }

運行結果:不難發現優先級高基本上優先執行了 最后執行的基本上優先級比較低


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM