從源代碼分析Universal-Image-Loader中的線程池


一般來講一個網絡訪問就需要App創建一個線程來執行,但是這也導致了當網絡訪問比較多的情況下,線程的數目可能積聚增多,雖然Android系統理論上說可以創建無數個線程,但是某一時間段,線程數的急劇增加可能導致系統OOM。在UIL中引入了線程池這種技術來管理線程。合理利用線程池能夠帶來三個好處。第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。第三:提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。

 

前面我們有講到ImageLoader.displayImage(…)函數中的圖片處理流程,但當時有意忽略了線程方面的額處理。UIL中將線程池相關的東西封裝在ImageLoaderEngine類中了。讓我們回到圖片下載的源代碼中,也就是ImageLoader.displayImage(…)函數。

 1     public void displayImage(String uri, ImageAware imageAware, DisplayImageOptions options,
 2             ImageLoadingListener listener, ImageLoadingProgressListener progressListener) {
 3         //檢查UIL的配置是否被初始化
 4         checkConfiguration();
 5         if (imageAware == null) {
 6             throw new IllegalArgumentException(ERROR_WRONG_ARGUMENTS);
 7         }
 8         if (listener == null) {
 9             listener = emptyListener;
10         }
11         if (options == null) {
12             options = configuration.defaultDisplayImageOptions;
13         }
14 
15         if (TextUtils.isEmpty(uri)) {
16             engine.cancelDisplayTaskFor(imageAware);
17             listener.onLoadingStarted(uri, imageAware.getWrappedView());
18             if (options.shouldShowImageForEmptyUri()) {
19                 imageAware.setImageDrawable(options.getImageForEmptyUri(configuration.resources));
20             } else {
21                 imageAware.setImageDrawable(null);
22             }
23             listener.onLoadingComplete(uri, imageAware.getWrappedView(), null);
24             return;
25         }
26         //計算Bitmap的大小,以便后面解析圖片時用
27         ImageSize targetSize = ImageSizeUtils.defineTargetSizeForView(imageAware, configuration.getMaxImageSize());
28         String memoryCacheKey = MemoryCacheUtils.generateKey(uri, targetSize);
29         engine.prepareDisplayTaskFor(imageAware, memoryCacheKey);
30 
31         listener.onLoadingStarted(uri, imageAware.getWrappedView());
32         //Bitmap是否緩存在內存?
33         Bitmap bmp = configuration.memoryCache.get(memoryCacheKey);
34         if (bmp != null && !bmp.isRecycled()) {
35             L.d(LOG_LOAD_IMAGE_FROM_MEMORY_CACHE, memoryCacheKey);
36 
37             if (options.shouldPostProcess()) {
38                 ImageLoadingInfo imageLoadingInfo = new ImageLoadingInfo(uri, imageAware, targetSize, memoryCacheKey,
39                         options, listener, progressListener, engine.getLockForUri(uri));
40                 //處理並顯示圖片
41                 ProcessAndDisplayImageTask displayTask = new ProcessAndDisplayImageTask(engine, bmp, imageLoadingInfo,
42                         defineHandler(options));
43                 if (options.isSyncLoading()) {
44                     displayTask.run();
45                 } else {
46                     engine.submit(displayTask);
47                 }
48             } else {
49                 //顯示圖片
50                 options.getDisplayer().display(bmp, imageAware, LoadedFrom.MEMORY_CACHE);
51                 listener.onLoadingComplete(uri, imageAware.getWrappedView(), bmp);
52             }
53         } else {
54             if (options.shouldShowImageOnLoading()) {
55                 imageAware.setImageDrawable(options.getImageOnLoading(configuration.resources));
56             } else if (options.isResetViewBeforeLoading()) {
57                 imageAware.setImageDrawable(null);
58             }
59             
60             ImageLoadingInfo imageLoadingInfo = new ImageLoadingInfo(uri, imageAware, targetSize, memoryCacheKey,
61                     options, listener, progressListener, engine.getLockForUri(uri));
62             //啟動一個線程,加載並顯示圖片
63             LoadAndDisplayImageTask displayTask = new LoadAndDisplayImageTask(engine, imageLoadingInfo,
64                     defineHandler(options));
65             if (options.isSyncLoading()) {
66                 displayTask.run();
67             } else {
68                 engine.submit(displayTask);
69             }
70         }
71     }

注意上面代碼塊中的第48行和第68行,當需要加載顯示圖片的時候,相關的task通過engine.submit(...)函數提交執行,那么submit之后發生了什么呢?engine是ImageLoaderEngine類的一個實例,他主要用來響應displayTask的執行。我們跟進ImageLoaderEngine中看看相關的字段和方法。

 1 class ImageLoaderEngine {
 2 
 3     final ImageLoaderConfiguration configuration;
 4 
 5     private Executor taskExecutor;
 6     private Executor taskExecutorForCachedImages;
 7     private Executor taskDistributor;
 8 
 9     private final Map<Integer, String> cacheKeysForImageAwares = Collections
10             .synchronizedMap(new HashMap<Integer, String>());
11     private final Map<String, ReentrantLock> uriLocks = new WeakHashMap<String, ReentrantLock>();
12 
13     private final AtomicBoolean paused = new AtomicBoolean(false);
14     private final AtomicBoolean networkDenied = new AtomicBoolean(false);
15     private final AtomicBoolean slowNetwork = new AtomicBoolean(false);
16 
17     private final Object pauseLock = new Object();
18 
19     ImageLoaderEngine(ImageLoaderConfiguration configuration) {
20         this.configuration = configuration;
21 
22         taskExecutor = configuration.taskExecutor;
23         taskExecutorForCachedImages = configuration.taskExecutorForCachedImages;
24 
25         taskDistributor = DefaultConfigurationFactory.createTaskDistributor();
26     }
27 
28     /** Submits task to execution pool */
29     void submit(final LoadAndDisplayImageTask task) {
30         taskDistributor.execute(new Runnable() {
31             @Override
32             public void run() {
33                 File image = configuration.diskCache.get(task.getLoadingUri());
34                 boolean isImageCachedOnDisk = image != null && image.exists();
35                 initExecutorsIfNeed();
36                 if (isImageCachedOnDisk) {
37                     taskExecutorForCachedImages.execute(task);
38                 } else {
39                     taskExecutor.execute(task);
40                 }
41             }
42         });
43     }
44 
45     /** Submits task to execution pool */
46     void submit(ProcessAndDisplayImageTask task) {
47         initExecutorsIfNeed();
48         taskExecutorForCachedImages.execute(task);
49     }
50 
51     private void initExecutorsIfNeed() {
52         if (!configuration.customExecutor && ((ExecutorService) taskExecutor).isShutdown()) {
53             taskExecutor = createTaskExecutor();
54         }
55         if (!configuration.customExecutorForCachedImages && ((ExecutorService) taskExecutorForCachedImages)
56                 .isShutdown()) {
57             taskExecutorForCachedImages = createTaskExecutor();
58         }
59     }
60 
61     private Executor createTaskExecutor() {
62         return DefaultConfigurationFactory
63                 .createExecutor(configuration.threadPoolSize, configuration.threadPriority,
64                 configuration.tasksProcessingType);
65     }
66     //省略部分代碼....
67 }

注意到第29行submit(final LoadAndDisplayImageTask task)函數,我們發現這個函數通過taskDistributor.execute來執行一個Runnable對象的run(),從代碼中不難知道它就是先試讀取磁盤緩存,再根據isImageCachedOnDisk判斷文件是否有緩存在磁盤中,最后通過不同的taskExecutor來執行對應的任務。我們注意到這個submit函數中出現了taskExecutorForCachedImages、taskExecutor、taskDistributor這三個對象。通過觀察ImageLoaderEngine的類字段(第5~7行),我們發現它們其實都是Executor接口的實例。

     我們先來學習一下Executor接口,它規定了線程池的接口。

Executor接口執行已提交的 Runnable 任務的對象。此接口提供一種將任務提交與每個任務將如何運行的機制(包括線程使用的細節、調度等)分離開來的方法。通常使用 Executor 而不是顯式地創建線程。例如,可能會使用以下方法,而不是為一組任務中的每個任務調用 new Thread(new(RunnableTask())).start():

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

 上面這段話,說白了我們就是通過Exccutor對象將線程放入線程池中運行的。

    通過觀察我們發現,taskExecutorForCachedImages和taskExecutor都是在ImageLoaderEngine.createTaskExecutor()中創建,經過分析我們發現他在DefaultConfigurationFactory.createExecutor中被初始化成ThreadPoolExecutor類型的對象(這是默認情況)。需要注意的是,ThreadPoolExecutor其實是實現了ExecutorService接口的一個實體類。線程池實際表現為 ExecutorService 類的一個實例。通過使用 ExecutorService ,我們可以提交將在未來完成的任務。需要補充說明的是,ExecutorService繼承自Executor接口。

    接下來讓我們看看ThreadPoolExecutor初始化所需要的參數。

創建一個ThreadPoolExecutor需要的參數:

  • corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前創建並啟動所有基本線程。
  • runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。 可以選擇以下幾個阻塞隊列。
    • ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
    • LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
    • SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
    • PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
  • maximumPoolSize(線程池最大大小):線程池允許創建的最大線程數。如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是如果使用了無界的任務隊列這個參數就沒什么效果。
  • ThreadFactory:用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。
  • RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。以下是JDK1.5提供的四種策略。
    • AbortPolicy:直接拋出異常。
    • CallerRunsPolicy:只用調用者所在線程來運行任務。
    • DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。
    • DiscardPolicy:不處理,丟棄掉。
    • 當然也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務。
  • keepAliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高線程的利用率。
  • TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鍾(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

 

接下來,再讓我們分析taskDistributor的創建過程。分析發現,taskDistributor在DefaultConfigurationFactory.createTaskDistributor()中被創建,它是一個ThreadPoolExecutor類型的對象,通過Executors線程池工廠創建。官方文檔推薦程序員用它來創建線程池,因為它已經配置好常見的線程池情景。接下來讓我們來了解一下Executors工廠方法所能創建的線程池類型。

用Executors靜態工廠方法創建的線程池類型
    a) newFixedThreadPool:創建一個定長的線程池。達到最大線程數后,線程數不再增長。如果一個線程由於非預期Exception而結束,線程池會補充一個新的線程。
    b) newCachedThreadPool:創建一個可緩存的線程池。當池長度超過處理需求時,可以回收空閑的線程。
    c) newSingleThreadPool:創建一個單線程executor。
    d) newScheduledThreadPool:創建一個定長的線程池,而且支持定時的以及周期性的任務執行。類似於Timer。但是,Timer是基於絕對時間,對系統時鍾的改變是敏感的,而ScheduledThreadPoolExecutor只支持相對時間。
         1) Timer是創建唯一的線程來執行所有的timer任務。如果一個任務超時了,會導致其他的TimerTask時間准確性出問題。
         2)如果TimerTask拋出uncheck 異常,Timer將會產生無法預料的行為。因此,ScheduledThreadPoolExecutor可以完全代替Timer。

 

再回到上文提到的ImageLoaderEngine.submit(...),從函數中分析可以得知:taskDistributor用來嘗試讀取磁盤中是否有圖片緩存,因為涉及磁盤操作,需要用線程來執行。根據是否有對應的圖片緩存,將圖片加載的任務分發到對應的執行器。如果圖片已經緩存在磁盤,則通過taskExecutorForCachedImages執行,如果圖片沒有緩存在磁盤,則通過taskExecutor執行。我們注意到這三個都實現了Executor接口,那么為什么要將任務細分在三個線程池中進行呢?這其實這跟線程池的調優有關,如果我們將所有的任務都放在同一個線程池中運行當然是可以的,但是這樣的話所有的任務就都只能采取同一種任務優先級和運行策略。顯然果要有更好的性能,在線程數比較多並且線程承擔的任務不同的情況下,App中最好還是按任務的類別來划分線程池。

     上面的分析又引出一個問題,我們究竟應該如何配置自己的線程池。

 合理的配置線程池

要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

  1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
  2. 任務的優先級:高,中和低。
  3. 任務的執行時間:長,中和短。
  4. 任務的依賴性:是否依賴其他系統資源,如數據庫連接。

任務性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務配置盡可能小的線程,如配置Ncpu+1個線程的線程池。IO密集型任務則由於線程並不是一直在執行任務,則配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐率要高於串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。

優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要注意的是如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行。

執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。

依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,如果等待的時間越長CPU空閑時間就越長,那么線程數應該設置越大,這樣才能更好的利用CPU。

建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。有一次我們組使用的后台任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后台任務線程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻塞住,任務積壓在線程池里。如果當時我們設置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內存,導致整個系統不可用,而不只是后台任務出現問題。當然我們的系統所有的任務是用的單獨的服務器部署的,而我們使用不同規模的線程池跑不同類型的任務,但是出現這樣問題時也會影響到其他任務。

 

接下來,讓我們看看UIL中線程池的配置。

讓我們來分析一下,taskDistributor由於在每創建一個新的線程的時候都需要讀取一下磁盤,屬於IO操作。需要圖片緩存的應用一般在需要加載圖片的時候,同時創建很多(>5)線程,這些線程一般來得猛去的也快,存活時間不必太長。taskDistributor和taskExecutorForCachedImages涉及網絡和磁盤的讀取和寫入操作,比較耗時。主線程數默認為3,感覺定的低了,實際上IO密集的操作應該定得高一點,以便合理利用CPU的。線程優先級(10為最高,1為最低)為4是比較合理的,因為這些操作只需要后台完成即可,優先級太高可能讓界面失去響應。

 

 

參考鏈接

聊聊並發(三)——JAVA線程池的分析和使用

線程池的介紹及簡單實現

java Executor  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM