java線程池技術(一):ThreadFactory與BlockingQueue


版權聲明:本文出自汪磊的博客,轉載請務必注明出處。

一、ThreadFactory概述以及源碼分析

ThreadFactory很簡單,就是一個線程工廠也就是負責生產線程的,我們看下ThreadFactory源碼;

 1 public interface ThreadFactory {
 2 
 3     /**
 4      * Constructs a new {@code Thread}.  Implementations may also initialize
 5      * priority, name, daemon status, {@code ThreadGroup}, etc.
 6      *
 7      * @param r a runnable to be executed by new thread instance
 8      * @return constructed thread, or {@code null} if the request to
 9      *         create a thread is rejected
10      */
11     Thread newThread(Runnable r);
12 }

很簡單吧,就是一個接口,newThread方法就是用來生產線程的,子類需要實現這個方法來根據自己規則生產相應的線程。

那安卓中什么地方用到了ThreadFactory呢?稍有經驗的就會知道線程池中用到了,我們看下平常使用線程池是怎么創建的,以下是我一個項目中用到的:

1     // 創建線程池對象
2     public static final Executor poolExecutor = new ThreadPoolExecutor(
3             CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS,
4             new LinkedBlockingQueue<Runnable>());

咦?沒有用到線程池啊,別急,我們看下ThreadPoolExecutor創建的源碼:

1 public ThreadPoolExecutor(int corePoolSize,
2                               int maximumPoolSize,
3                               long keepAliveTime,
4                               TimeUnit unit,
5                               BlockingQueue<Runnable> workQueue) {
6         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
7              Executors.defaultThreadFactory(), defaultHandler);
8 }

看到了吧,最終調用的如下構造函數來創建線程池:

 1     public ThreadPoolExecutor(int corePoolSize,
 2                               int maximumPoolSize,
 3                               long keepAliveTime,
 4                               TimeUnit unit,
 5                               BlockingQueue<Runnable> workQueue,
 6                               ThreadFactory threadFactory,
 7                               RejectedExecutionHandler handler) {
 8         if (corePoolSize < 0 ||
 9             maximumPoolSize <= 0 ||
10             maximumPoolSize < corePoolSize ||
11             keepAliveTime < 0)
12             throw new IllegalArgumentException();
13         if (workQueue == null || threadFactory == null || handler == null)
14             throw new NullPointerException();
15         this.corePoolSize = corePoolSize;
16         this.maximumPoolSize = maximumPoolSize;
17         this.workQueue = workQueue;
18         this.keepAliveTime = unit.toNanos(keepAliveTime);
19         this.threadFactory = threadFactory;
20         this.handler = handler;
21     }
ThreadFactory傳入的參數是Executors.defaultThreadFactory(),那我們繼續看下ExecutorsdefaultThreadFactory吧:
 1     /**
 2      * The default thread factory
 3      */
 4     static class DefaultThreadFactory implements ThreadFactory {
 5         private static final AtomicInteger poolNumber = new AtomicInteger(1);
 6         private final ThreadGroup group;
 7         private final AtomicInteger threadNumber = new AtomicInteger(1);
 8         private final String namePrefix;
 9 
10         DefaultThreadFactory() {
11             SecurityManager s = System.getSecurityManager();
12             group = (s != null) ? s.getThreadGroup() :
13                                   Thread.currentThread().getThreadGroup();
14             namePrefix = "pool-" +
15                           poolNumber.getAndIncrement() +
16                          "-thread-";
17         }
18 
19         public Thread newThread(Runnable r) {
20             Thread t = new Thread(group, r,
21                                   namePrefix + threadNumber.getAndIncrement(),
22                                   0);
23             if (t.isDaemon())
24                 t.setDaemon(false);
25             if (t.getPriority() != Thread.NORM_PRIORITY)
26                 t.setPriority(Thread.NORM_PRIORITY);
27             return t;
28         }
29     }
DefaultThreadFactory實現了ThreadFactory接口,newThread中生產了一個個線程並且設置為不是守護線程,線程優先級均為Thread.NORM_PRIORITY。
在我們使用線程池的時候如果不自己創建線程工廠類,那么系統會給我們創建一個默認的線程工廠來生產線程(Executors中defaultThreadFactory())
好了,關於線程工廠就見到這里了,知道有這么個玩意用來生產線程的就可以了。
二、BlockingQueue概述以及源碼分析
BlockingQueue顧名思義:阻塞隊列,簡單說就是放入,取出數據都會發生阻塞。比如取出數據時發現容器沒有數據,就會等待產生阻塞一直等到有數據
為止,同樣放入數據時如果容器已經滿了,那么就回等待一直到容器有空間可以放入數據。
BlockingQueue就是一個接口,定義了插入,取出數據的接口,如下:
 1 public interface BlockingQueue<E> extends Queue<E> {
 2     
 3     //添加元素到隊列里,添加成功返回true,由於容量滿了添加失敗會拋出IllegalStateException異常
 4     boolean add(E e);
 5 
 6     //添加元素到隊列里,成功返回true,失敗返回false
 7     boolean offer(E e);
 8 
 9     //添加元素到隊列里,如果容量滿了會阻塞直到容量不滿
10     void put(E e) throws InterruptedException;
11 
12 
13     //添加元素到隊列里,如果容器已滿會阻塞列隊,但是不會一直阻塞,只會阻塞timeout時間,在這期間內添加成功則返回true,否則返回false
14     boolean offer(E e, long timeout, TimeUnit unit)
15         throws InterruptedException;
16 
17 
18     //從隊列中獲取元素,如果隊列為空,則一直阻塞
19     E take() throws InterruptedException;
20 
21     //從隊列中獲取元素,如果容器為空會阻塞列隊,但是不會一直阻塞,只會阻塞timeout時間,在這期間內獲取成功則返回對應元素,否則返回null
22     E poll(long timeout, TimeUnit unit)
23         throws InterruptedException;
24 
25     //存儲數據的隊列剩余空間大小
26     int remainingCapacity();
27 
28     //刪除指定的元素,成功返回true,失敗返回false
29     boolean remove(Object o);
30 
31     public boolean contains(Object o);
32 
33     int drainTo(Collection<? super E> c);
34 
35     int drainTo(Collection<? super E> c, int maxElements);
36 }
主要方法已經給出注釋。
BlockingQueue的具體子類如下圖所示:

其中最最常用的就是ArrayBlockingQueue以及LinkedBlockingQueue,我們以ArrayBlockingQueue為例詳細分析一下實現過程。
三、ArrayBlockingQueue源碼分析
ArrayBlockingQueue內部是以數組為容器盛放元素,並且放入和取出元素的時候使用同一個鎖,也就是放入的時候不能同時取出元素。
ArrayBlockingQueue中主要屬性:
 1     //存儲元素的數組,是個循環數組,至於為什么是循環數組下面會講到
 2     final Object[] items;
 3 
 4     //下一次拿數據的時候的索引
 5     int takeIndex;
 6 
 7     //下一次放數據的時候的索引
 8     int putIndex;
 9 
10     //隊列中已經存儲元素的個數
11     int count;
12 
13     //鎖,只有這一把鎖
14     final ReentrantLock lock;
15 
16     //等待拿數據的的條件對象
17     private final Condition notEmpty;
18 
19     //等待放數據的的條件對象
20     private final Condition notFull;

已經給出詳細注釋就不一一詳細解釋了。

接下來我們看下構造函數;

 1     public ArrayBlockingQueue(int capacity) {
 2         this(capacity, false);
 3     }
 4 
 5     public ArrayBlockingQueue(int capacity, boolean fair) {
 6         if (capacity <= 0)
 7             throw new IllegalArgumentException();
 8         this.items = new Object[capacity];
 9         lock = new ReentrantLock(fair);
10         notEmpty = lock.newCondition();
11         notFull =  lock.newCondition();
12     }

初始化的時候我們需要指定盛放元素容器的大小,並且初始化一些屬性。

接下來我們分析下ArrayBlockingQueue中添加的方法:add,offer以及put方法。

先看add方法:

1 public boolean add(E e) {
2         return super.add(e);
3 }

直接調用的父類的方法,我們只好去看看父類中的add方法了:

1 public boolean add(E e) {
2         if (offer(e))
3             return true;
4         else
5             throw new IllegalStateException("Queue full");
6 }

是不是邏輯很簡單,add方法內部調用了offer方法如果offer方法返回true則add直接返回true,否則拋出IllegalStateException異常。

接下來我們看下offer方法:

 1     public boolean offer(E e) {
 2         if (e == null) throw new NullPointerException();
 3         final ReentrantLock lock = this.lock;
 4         lock.lock();
 5         try {
 6             if (count == items.length)
 7                 return false;
 8             else {
 9                 enqueue(e);
10                 return true;
11             }
12         } finally {
13             lock.unlock();
14         }
15     }

第2行,檢查是否為null,為null則拋出空指針異常。

3,4行加鎖,保證只有一個線程操作。

6,7行檢查當前容器是否已將滿了,如果滿了則不能再放入元素,直接返回false。

9,10行如果容器沒滿則執行enqueue方法放入元素,然后返回true,enqueue方法后面會分析。

13行解除鎖。以上就是offer方法的邏輯,比較簡單,該說的都說了。

接下來分析put方法:

 1 public void put(E e) throws InterruptedException {
 2         if (e == null) throw new NullPointerException();
 3         final ReentrantLock lock = this.lock;
 4         lock.lockInterruptibly();
 5         try {
 6             while (count == items.length)
 7                 notFull.await();
 8             enqueue(e);
 9         } finally {
10             lock.unlock();
11         }
12 }

第4行這里調用的是lockInterruptibly方法,與lock方法相比,lockInterruptibly可以被中斷,中斷的時候產生InterruptedException異常。

6,7行同樣判斷容器是否已經滿了,如果已經滿了則執行wait邏輯等待,這里就是阻塞隊列的核心,是不是覺得原來如此啊。

8行,如果容器有空間執行enqueue方法,向容器中加入元素。

offer與put都用到了enqueue方法向容器中加入元素,接下來我們看下enqueue方法:

1 private void enqueue(E x) {
2         // assert lock.getHoldCount() == 1;
3         // assert items[putIndex] == null;
4         final Object[] items = this.items;
5         items[putIndex] = x;
6         if (++putIndex == items.length) putIndex = 0;
7         count++;
8         notEmpty.signal();
9 }

4,5行就是向數組items中放入元素x。

6行,放元素的索引putIndex加1后與數組長度比較,如果達到數組長度則將putIndex置為0,相當於下一次放入元素位置為數組的第一次位置,從頭開始放入元素,上面說過items是個循環數組,就是在這里體現出來的,如果我們初始化的時候設定容器items大小為10,然而我們不停放入數據也是沒問題的,只不過后面加入的數據會覆蓋前面的數據。

8行,喚醒取數據的線程,告訴其哥們我放入了一個數據你可以取數據了。

到這放入數據的核心部分就分析完了,是不是很簡單???放入數據還有個offer(E e, long timeout, TimeUnit unit)方法,同樣很簡單,可以自行分析。

接下來分析取出數據的方法,取出數據主要是poll與take方法。

先來看下poll方法;

1 public E poll() {
2         final ReentrantLock lock = this.lock;
3         lock.lock();
4         try {
5             return (count == 0) ? null : dequeue();
6         } finally {
7             lock.unlock();
8         }
9 }

2,3行同樣是先鎖住,保證單線程操作。

5行,判斷當前線程中元素數量是否為0,如果為0則沒有元素返回null,否則執行dequeue方法取出數據並返回,后續會分析dequeue方法。

7行解除鎖。

poll方法是不是很簡單,同樣poll(long timeout, TimeUnit unit)也不難分析可自行分析。

接下來看下take方法:

 1 public E take() throws InterruptedException {
 2         final ReentrantLock lock = this.lock;
 3         lock.lockInterruptibly();
 4         try {
 5             while (count == 0)
 6                 notEmpty.await();
 7             return dequeue();
 8         } finally {
 9             lock.unlock();
10         }
11 }

take方法也不難理解,核心就是5,6行邏輯,如果count為0也就是容器內沒有數據則執行wait方法,線程一直處於等待狀態,如果不為0則執行dequeue

方法取出數據。

我們再看下dequeue方法:

 1 private E dequeue() {
 2         // assert lock.getHoldCount() == 1;
 3         // assert items[takeIndex] != null;
 4         final Object[] items = this.items;
 5         @SuppressWarnings("unchecked")
 6         E x = (E) items[takeIndex];
 7         items[takeIndex] = null;
 8         if (++takeIndex == items.length) takeIndex = 0;
 9         count--;
10         if (itrs != null)
11             itrs.elementDequeued();
12         notFull.signal();
13         return x;
14 }

6,7,13行從數組items中取出數據,並將原數組位置處置為null,最后13行處返回取出的數據。

8行,取數據索引takeIndex加1后與數組總長度比較如果達到數組長度則將takeIndex置為0,下一次從數組開始處取數據。

10,11行通知迭代器有數據取出。

12行通知放入數據的線程有數據取出了,你可以放入數據了。

好了,以上就是ArrayBlockingQueue中放入取出數據的操作源碼分析,多線程中總會提到生產者消費者模式,其實用ArrayBlockingQueue實現是很簡單的,ArrayBlockingQueue只是將wait,notify操作進行了封裝而已。

LinkedBlockingQueue源碼就不一一分析了,主要區別的LinkedBlockingQueue內部是用鏈表來存儲數據的,並且有兩把鎖,放數據鎖與取數據鎖,也就是放入數據的線程和取出數據的線程可以同時操作LinkedBlockingQueue,而ArrayBlockingQueue中放數據線程與取數據線程是互斥的,不能同時操作,LinkedBlockingQueue初始化的時候可以不指定容器大小,如果不指定則容器大小為Integer.MAX_VALUE,而ArrayBlockingQueue則必須指定容器大小。

好了以上就是本篇全部內容了,希望對你有用。

聲明:文章將會陸續搬遷到個人公眾號,以后文章也會第一時間發布到個人公眾號,及時獲取文章內容請關注公眾號

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM