[Java多線程]-線程池的基本使用和部分源碼解析(創建,執行原理)


前面的文章:多線程爬坑之路-學習多線程需要來了解哪些東西?(concurrent並發包的數據結構和線程池,Locks鎖,Atomic原子類)

      多線程爬坑之路-Thread和Runable源碼解析

      多線程爬坑之路-Thread和Runable源碼解析之基本方法的運用實例

一.線程池ThreadPool的基本定義?

  線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然后在創建線程后自動啟動這些任務。線程池線程都是后台線程。每個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。如果某個線程在托管代碼中空閑(如正在等待某個事件),則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起的工作,則線程池將在一段時間后創建另一個輔助線程但線程的數目永遠不會超過最大值。超過最大值的線程可以排隊,但他們要等到其他線程完成后才啟動。

二.線程池的背景及作用?

  線程池是用來解決多任務並發,頻繁創建線程和銷毀線程導致資源的浪費和效率低下的一種技術。類似於數據庫連接池,這種池化技術用於將部分資源重復利用,或者說是用已有的資源對多個任務進行服務。線程池就是創建固定的線程數量,服務於並發的任務,避免每一個任務,系統都進行線程的創建和銷毀,增加系統的負擔,線程數量過多時會造成內存溢出。

三.線程池的運用場景:

  頻繁的任務創建,快速的任務響應,同等的任務優先級。如果需要很長時間才有一個任務需要執行,那樣創建線程池就是浪費資源,如果執行一個任務需要幾個小時,那這個任務在線程池中執行一次就會阻塞大量的線程。任務的優先級在線程池沒有作用,因為線程池的任務執行順序是隊列。如果我們需要執行的任務有輕重緩急,那么不建議使用線程池。

四.線程池的基本類圖

五.線程池的創建過程:

線程池的類圖中我們可以得到最終的兩個實現類,ThreadPoolExeutorScheduleThreadPoolExeutor,這兩個類都可以創建一個線程池。但是通常我們使用這兩個類的封裝類Exeutors來創建一個線程池。

1、TheadPoolExeutor:運用這個類我們需要傳入四個參數

  • corePoolsize:核心線程數量
  • maximumPoolSize:最大線程數量
  • keepAliveTime:最大存活時間(當一個線程沒有任務執行時最大的存活時間)
  • until:用於格式化時間,指定時間類型。TimeUnit:一個枚舉類,詳情請看:http://www.importnew.com/7219.html
  • workQueue:任務隊列的接口聲明(BlockingQueue<Runnable>是一個繼承Queue的接口)
1 public ThreadPoolExecutor(int corePoolSize,
2                               int maximumPoolSize,
3                               long keepAliveTime,
4                               TimeUnit unit,
5                               BlockingQueue<Runnable> workQueue) {
6         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
7              Executors.defaultThreadFactory(), defaultHandler);
8     }

2、SheduleThreadPoolExeutor:這個類的四個構造函數可以根據不同的參數構造不同用途的線程池。

 public ScheduledThreadPoolExecutor(int corePoolSize) {
         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, //調用的是super()方法來構造線程池。
               new DelayedWorkQueue());
     }
   public ScheduledThreadPoolExecutor(int corePoolSize,
                                        RejectedExecutionHandler handler) {
         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
               new DelayedWorkQueue(), handler);
     }
  public ScheduledThreadPoolExecutor(int corePoolSize,
                                        ThreadFactory threadFactory) {
         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
               new DelayedWorkQueue(), threadFactory);
     }
 public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

3、Exeutors:這個類主要是對以上兩個類做了包裝,通過這個類各種靜態方法我們可以創建滿足各種需求的線程池。Exeutors類通過傳遞不同的參數以達到創建不同類型線程池的目的。

Exeutors的方法摘要列表:(省略了部分)完整的類大家可以看源碼,API:http://www.javaweb.cc/help/JavaAPI1.6/java/util/concurrent/Executors.html

方法摘要
static Callable<Object> callable(PrivilegedAction<?> action) 
          返回 Callable 對象,調用它時可運行給定特權的操作並返回其結果。
static Callable<Object> callable(PrivilegedExceptionAction<?> action) 
          返回 Callable 對象,調用它時可運行給定特權的異常操作並返回其結果。
static Callable<Object> callable(Runnable task) 
          返回 Callable 對象,調用它時可運行給定的任務並返回 null
static
<T> Callable<T>
callable(Runnable task, T result) 
          返回 Callable 對象,調用它時可運行給定的任務並返回給定的結果。
static ThreadFactory defaultThreadFactory() 
          返回用於創建新線程的默認線程工廠。
static ExecutorService newCachedThreadPool() 
          創建一個可根據需要創建新線程的線程池,但是在以前構造的線程可用時將重用它們。
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) 
          創建一個可根據需要創建新線程的線程池,但是在以前構造的線程可用時將重用它們,並在需要時使用提供的 ThreadFactory 創建新線程。
static ExecutorService newFixedThreadPool(int nThreads) 
          創建一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程。
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) 
          創建一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程,在需要時使用提供的 ThreadFactory 創建新線程。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 
          創建一個線程池,它可安排在給定延遲后運行命令或者定期地執行。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) 
          創建一個線程池,它可安排在給定延遲后運行命令或者定期地執行。
static ExecutorService newSingleThreadExecutor() 
          創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) 
          創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程,並在需要時使用提供的 ThreadFactory 創建新線程。
static ScheduledExecutorService newSingleThreadScheduledExecutor() 
          創建一個單線程執行程序,它可安排在給定延遲后運行命令或者定期地執行。
static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) 
          創建一個單線程執行程序,它可安排在給定延遲后運行命令或者定期地執行。
static
<T> Callable<T>
privilegedCallable(Callable<T> callable) 
          返回 Callable 對象,調用它時可在當前的訪問控制上下文中執行給定的 callable 對象。
static
<T> Callable<T>
privilegedCallableUsingCurrentClassLoader(Callable<T> callable) 
          返回 Callable 對象,調用它時可在當前的訪問控制上下文中,使用當前上下文類加載器作為上下文類加載器來執行給定的 callable 對象。
static ThreadFactory privilegedThreadFactory() 
          返回用於創建新線程的線程工廠,這些新線程與當前線程具有相同的權限。
static ExecutorService unconfigurableExecutorService(ExecutorService executor) 
          返回一個將所有已定義的 ExecutorService 方法委托給指定執行程序的對象,但是使用強制轉換可能無法訪問其他方法。
static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) 
          返回一個將所有已定義的 ExecutorService 方法委托給指定執行程序的對象,但是使用強制轉換可能無法訪問其他方法。

列表中紅色部分就是一些常用的創建線程池的方法,這些方法的類型分為兩類scheduleExecutorService和ExecutorService,這是兩個接口,

scheduleExecutorService類型的線程池分兩類:

  1.newScheduleThreadPool:是有延遲執行或者定期執行的任務的線程池

  2.newSingleThreadScheduleExecutor:是有延遲執行或者定期執行的任務的單線程線程池

ExecutorService類型的線程池分三類:

  1.newCacheThreadPool:可重用,可擴展(按需創建)

  2.newFixedThreadPool:可重用,固定數量,共享的無界隊列

  3.newSingleThreadExeutor:單線程,無界隊列                      

Exeutors可通過調用上面五種封裝方法創建不同應用場景的線程池,這些方法的具體實現源碼如下:

1、newCachedThreadPool源碼:

  • 調用ThreadPoolExecutor創建線程:
    • 第一個參數為0表示他的核心線程數是0,可根據任務需要創建線程,每一個使用完得線程具有60秒的存活時間,
    • 第二個參數最大線程數:Integer.MAX_VALUE,int類型的最大值。
    • 第三個參數最大存活時間:60秒,第四個參數:時間的單位是SECONDS,秒。
    • 最后一個表示任務隊列:SynchronousQueue(這個類的實現后面再看),這里沒有指定線程工廠在ThreadPoolExecutor類中會使用默認的工廠。
 1 /**
 2      * Creates a thread pool that creates new threads as needed, but
 3      * will reuse previously constructed threads when they are
 4      * available.  These pools will typically improve the performance
 5      * of programs that execute many short-lived asynchronous tasks.
 6      * Calls to {@code execute} will reuse previously constructed
 7      * threads if available. If no existing thread is available, a new
 8      * thread will be created and added to the pool. Threads that have
 9      * not been used for sixty seconds are terminated and removed from
10      * the cache. Thus, a pool that remains idle for long enough will
11      * not consume any resources. Note that pools with similar
12      * properties but different details (for example, timeout parameters)
13      * may be created using {@link ThreadPoolExecutor} constructors.
14      *
15      * @return the newly created thread pool
16      */
17     public static ExecutorService newCachedThreadPool() {
18         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
19                                       60L, TimeUnit.SECONDS,
20                                       new SynchronousQueue<Runnable>());
21     }
  • 有線程工廠作為參數:與上面唯一不同的是多了一個參數,threadFactory會替代掉默認的工廠來創建線程。
 1  /**
 2      * Creates a thread pool that creates new threads as needed, but
 3      * will reuse previously constructed threads when they are
 4      * available, and uses the provided
 5      * ThreadFactory to create new threads when needed.
 6      * @param threadFactory the factory to use when creating new threads
 7      * @return the newly created thread pool
 8      * @throws NullPointerException if threadFactory is null
 9      */
10     public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
11         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
12                                       60L, TimeUnit.SECONDS,
13                                       new SynchronousQueue<Runnable>(),
14                                       threadFactory);
15     }

其他的類我就不一一列出源碼了,給出下面的表格,可以一目了然的看到他們之間的創建差異。(其中忽略掉線程工廠Threadfactory和Hanler(對於無法通過一個線程池執行任務的處理程序))因為他們無法對線程池的差異造成影響。

靜態方法中的參數主要有三個:ThreadFactor,corePoolsize,Handler.其中corePoolSize最為重要,設置的數量要根據具體的需要而定。過少會造成線程阻塞性能下降,過多會造成資源浪費。

Executors

     靜態方法       

          調用方法(new)      

                                                                調用方法Executor的參數

corePoolSize

 maximumPoolSize

keepAliveTime

 TimeUnit   

    workQueue

newCacheThreadPool

ThreadPoolExecutor

0

Integer.MAX_VALUE 

60s

 SECONDS(秒)      

SynchronousQueue

newFixedThreadPool

ThreadPoolExecutor

n(參數)

n(參數)

0s

 MILLISECONDS

LinkedBlockingQueue

newSingleThreadExecutor

ThreadPoolExecutor

1

1

0s

MILLISECONDS

LinkedBlockingQueue

newSingleThreadScheduledExecutor

ScheduledThreadPoolExecutor

1

Integer.MAX_VALUE 

0s

NANOSECONDS

DelayedWorkQueue

newScheduledThreadPool

ScheduledThreadPoolExecutor

n(參數)

Integer.MAX_VALUE 

0s

NANOSECONDS

DelayedWorkQueue

Integer.MAX_VALUE=int型的最大值,2^32-1=2147483647

4、如何選擇創建線程池的類型?

  corePoolSize:保證了最少有多少個存活的線程。如果我們的任務很穩定,一直能夠保證最少x個任務需要被執行,那我們可以將這個值設置為x,如果我們的任務有時多有時少,有時候沒有,我們可以設置為0,讓它自動根據需要創建線程,這就是newCacheThreadPool的方法,因為不確定接下來有沒有任務,所以這些線程使用完之后保持60秒的活時間,如果有任務則重用這些線程,沒任務則60秒后線程銷毀。如果我們的任務中,始終有一個任務需要執行,但是有時會有更多的線程,那我們可以選擇newSingleThreadScheduledExecutor,他保證了corePoolSize=1,始終有一個線程等待任務。最大值為Integer.MAX_VALUE。我們可以根據自己的場景需要選擇合適的線程池。

核心方法從Executors轉移到了ThreadPoolExecutor類和ScheduledThreadPoolExecutor類上。下面深入一下構建一個線程池需要的步驟和准備。理解了他是如何創建一個線程池之后我們可以手動寫自己的線程池。

6、ThreadPoolExecutor類視圖:

總結一下類里面就是三個部分:

  1.構造方法:第一個構造方法部分我們已經研究過了

  2.屬性操作:第二個部分屬性操作,就是獲取我們構造的線程池的一些屬性,基本就是構造方法中的參數。

  3.線程池管理:主要還是內部的調用,去對線程的創建和管理,對任務的執行和監控。

線程池一旦創建好了只需要往里面添加任務執行就好了。而我們需要關心的就是創建的線程池性能是否足夠,是否可以優化。

示例:創建線程池

1 import java.util.concurrent.ThreadFactory;
2 
3 public class ThreadFactorImpl implements ThreadFactory{//這里自己實現了一個線程工廠,只是為了下面創建線程池做准備,大家可以忽略掉默認的工廠沒這么簡單。后面給源碼
4     public Thread newThread(Runnable r) {
5         return new Thread(r);
6     }
7 }
 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 import java.util.concurrent.ScheduledThreadPoolExecutor;
 4 import java.util.concurrent.SynchronousQueue;
 5 import java.util.concurrent.ThreadPoolExecutor;
 6 import java.util.concurrent.TimeUnit;
 7 
 8 public class TestThreadPool {
 9     private int num;
10 
11     public int test() {
12         return num++;
13     }
14     public static void main(String[] args) {
15         ExecutorService es1 = Executors.newCachedThreadPool();
16         ExecutorService es2 = Executors.newCachedThreadPool(new ThreadFactorImpl());
17         ExecutorService es3 = Executors.newFixedThreadPool(10);
18         ExecutorService es4 = Executors.newFixedThreadPool(10, new ThreadFactorImpl());
19         ExecutorService es5 = Executors.newSingleThreadExecutor();
20         ExecutorService es6 = Executors.newSingleThreadExecutor(new ThreadFactorImpl());
21         ExecutorService es7 = Executors.newScheduledThreadPool(10);
22         ExecutorService es8 = Executors.newScheduledThreadPool(10, new ThreadFactorImpl());
23         ExecutorService es9 = new ScheduledThreadPoolExecutor(10);
24         ExecutorService es10 = new ThreadPoolExecutor(1, 10, 20, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
25     }
26 }

當Executors的構造方法滿足不了需求的時候,就需要直接通過原始類ThreadPoolExecutor類和ScheduledThreadPoolExecutor類來傳入合適的參數。

defaultThreadFactory默認的線程工廠:

 1 static class DefaultThreadFactory implements ThreadFactory {
 2         private static final AtomicInteger poolNumber = new AtomicInteger(1);
 3         private final ThreadGroup group;
 4         private final AtomicInteger threadNumber = new AtomicInteger(1);
 5         private final String namePrefix;
 6 
 7         DefaultThreadFactory() {
 8             SecurityManager s = System.getSecurityManager();
 9             group = (s != null) ? s.getThreadGroup() :
10                                   Thread.currentThread().getThreadGroup();
11             namePrefix = "pool-" +
12                           poolNumber.getAndIncrement() +
13                          "-thread-";
14         }
15 
16         public Thread newThread(Runnable r) {
17             Thread t = new Thread(group, r,
18                                   namePrefix + threadNumber.getAndIncrement(),
19                                   0);
20             if (t.isDaemon())
21                 t.setDaemon(false);
22             if (t.getPriority() != Thread.NORM_PRIORITY)
23                 t.setPriority(Thread.NORM_PRIORITY);
24             return t;
25         }
26     }

創建了線程池,有了線程工廠,當任務量>線程數的時候,工廠就會創建線程去執行任務。

六、線程池是如何管理和執行任務的?

執行任務只需要調用execute()方法就夠了,下面我們創建了四個任務分別給各自的對象num數據+1。然后把任務加入線程池,execute方法實際上是加入工作隊列的過程。

示例:execute執行任務

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 
 4 public class TestThreadPool implements Runnable{
 5     private int num;
 6     public TestThreadPool(int num){
 7         this.num = num;
 8     }
 9     public int getNum() {
10         return num;
11     }
12     public void setNum(int num) {
13         this.num = num;
14     }
15     public void run() {
16         setNum(++this.num);
17     }
18     public static void main(String[] args) {
19         ExecutorService es1 = Executors.newCachedThreadPool();
20         //任務t1,t2,t3,t4
21         TestThreadPool t1 = new TestThreadPool(1);
22         TestThreadPool t2 = new TestThreadPool(2);
23         TestThreadPool t3 = new TestThreadPool(3);
24         TestThreadPool t4 = new TestThreadPool(4);
25         es1.execute(t1);
26         es1.execute(t2);
27         es1.execute(t3);
28         es1.execute(t4);
29         try {
30             Thread.sleep(1000);
31         } catch (InterruptedException e) {
32             e.printStackTrace();
33         }
34         System.out.println(t1.getNum());
35         System.out.println(t2.getNum());
36         System.out.println(t3.getNum());
37         System.out.println(t4.getNum());
38     }
39 }

執行結果如下:跟我們預想的結果相同。

2
3
4
5

execute方法源碼:

由於執行任務的線程是內部工廠創建的所以我們無法觀測到線程的狀態和屬性,通過Execute的源碼來看一下它的執行過程:(不同的創建方法對應了不同的線程池類,同時也對應了不同的execute方法)下面是ThreadPoolExecutor類的execute方法源碼

 1 public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         /*
 5          * Proceed in 3 steps:
 6          *
 7          * 1. If fewer than corePoolSize threads are running, try to
 8          * start a new thread with the given command as its first
 9          * task.  The call to addWorker atomically checks runState and
10          * workerCount, and so prevents false alarms that would add
11          * threads when it shouldn't, by returning false.
12          *
13          * 2. If a task can be successfully queued, then we still need
14          * to double-check whether we should have added a thread
15          * (because existing ones died since last checking) or that
16          * the pool shut down since entry into this method. So we
17          * recheck state and if necessary roll back the enqueuing if
18          * stopped, or start a new thread if there are none.
19          *
20          * 3. If we cannot queue task, then we try to add a new
21          * thread.  If it fails, we know we are shut down or saturated
22          * and so reject the task.
23          */
24         int c = ctl.get();
25         if (workerCountOf(c) < corePoolSize) {
26             if (addWorker(command, true))
27                 return;
28             c = ctl.get();
29         }
30         if (isRunning(c) && workQueue.offer(command)) {
31             int recheck = ctl.get();
32             if (! isRunning(recheck) && remove(command))
33                 reject(command);
34             else if (workerCountOf(recheck) == 0)
35                 addWorker(null, false);
36         }
37         else if (!addWorker(command, false))
38             reject(command);
39     }

英語好的直接看源碼的注釋部分,注釋中說:

excute的過程主要分三步:

  1. 如果當前正在運行的線程數小於corePoolSize的話,那么嘗試使用傳入的任務(command)作為第一個任務啟動一個新的線程執行。addWorker函數會原子性的檢查線程池的runState以及workerCount防止不應該添加的新線程被添加。如果是假警報的話,那么addWorker函數就會返回false,表示添加新線程失敗。
  2. 如果一個Task被成功的加入隊列了,然后仍然需要重新check一次是否需要重新添加一個線程,因為有可能在上一次檢查到這次檢查之間,已經存在的線程已經死亡。或者,自從進入這個方法后,線程池已經被關閉(shut down)。所以我們需要重新check狀態,並且在必要的時候,如果處於stopped狀態,需要重新回滾到隊列中,或者如果沒有的話,就需要重新啟動一個線程。
  3. 如果不能把task加入到隊列中,那么就會嘗試去添加一個新的線程,如果它失敗了,就知道是已經當前線程池是處於shut down或者處於飽和狀態,那么就執行拒絕(reject)操作。

  簡單的來說:就是檢查線程池的狀態,看是否能夠把任務加入到隊列或者是否需要用一個線程去執行它,最終就是,加入隊列等待,或者加入隊列執行,或者說拒絕(線程池狀態問題。。)

  再往下深入,就要到虛擬機的實現機制上的一些東西了包括同步機制,鎖,鎖優化問題,等等。至少到目前為止,我們知道了線程池的使用和它的重點在哪里,怎樣根據需求去創建一個合適的線程池。

  后面的話就要進入並發問題了,到了線程的難點部分。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM