版權聲明:本文出自汪磊的博客,轉載請務必注明出處。
一、ThreadFactory概述以及源碼分析
ThreadFactory很簡單,就是一個線程工廠也就是負責生產線程的,我們看下ThreadFactory源碼;
1 public interface ThreadFactory { 2 3 /** 4 * Constructs a new {@code Thread}. Implementations may also initialize 5 * priority, name, daemon status, {@code ThreadGroup}, etc. 6 * 7 * @param r a runnable to be executed by new thread instance 8 * @return constructed thread, or {@code null} if the request to 9 * create a thread is rejected 10 */ 11 Thread newThread(Runnable r); 12 }
很簡單吧,就是一個接口,newThread方法就是用來生產線程的,子類需要實現這個方法來根據自己規則生產相應的線程。
那安卓中什么地方用到了ThreadFactory呢?稍有經驗的就會知道線程池中用到了,我們看下平常使用線程池是怎么創建的,以下是我一個項目中用到的:
1 // 創建線程池對象 2 public static final Executor poolExecutor = new ThreadPoolExecutor( 3 CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, 4 new LinkedBlockingQueue<Runnable>());
咦?沒有用到線程池啊,別急,我們看下ThreadPoolExecutor創建的源碼:
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue) { 6 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 7 Executors.defaultThreadFactory(), defaultHandler); 8 }
看到了吧,最終調用的如下構造函數來創建線程池:
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.corePoolSize = corePoolSize; 16 this.maximumPoolSize = maximumPoolSize; 17 this.workQueue = workQueue; 18 this.keepAliveTime = unit.toNanos(keepAliveTime); 19 this.threadFactory = threadFactory; 20 this.handler = handler; 21 }
ThreadFactory傳入的參數是Executors.defaultThreadFactory(),那我們繼續看下Executors中defaultThreadFactory吧:
1 /** 2 * The default thread factory 3 */ 4 static class DefaultThreadFactory implements ThreadFactory { 5 private static final AtomicInteger poolNumber = new AtomicInteger(1); 6 private final ThreadGroup group; 7 private final AtomicInteger threadNumber = new AtomicInteger(1); 8 private final String namePrefix; 9 10 DefaultThreadFactory() { 11 SecurityManager s = System.getSecurityManager(); 12 group = (s != null) ? s.getThreadGroup() : 13 Thread.currentThread().getThreadGroup(); 14 namePrefix = "pool-" + 15 poolNumber.getAndIncrement() + 16 "-thread-"; 17 } 18 19 public Thread newThread(Runnable r) { 20 Thread t = new Thread(group, r, 21 namePrefix + threadNumber.getAndIncrement(), 22 0); 23 if (t.isDaemon()) 24 t.setDaemon(false); 25 if (t.getPriority() != Thread.NORM_PRIORITY) 26 t.setPriority(Thread.NORM_PRIORITY); 27 return t; 28 } 29 }
DefaultThreadFactory實現了ThreadFactory接口,newThread中生產了一個個線程並且設置為不是守護線程,線程優先級均為Thread.NORM_PRIORITY。
在我們使用線程池的時候如果不自己創建線程工廠類,那么系統會給我們創建一個默認的線程工廠來生產線程(Executors中defaultThreadFactory())
好了,關於線程工廠就見到這里了,知道有這么個玩意用來生產線程的就可以了。
二、BlockingQueue概述以及源碼分析
BlockingQueue顧名思義:阻塞隊列,簡單說就是放入,取出數據都會發生阻塞。比如取出數據時發現容器沒有數據,就會等待產生阻塞一直等到有數據
為止,同樣放入數據時如果容器已經滿了,那么就回等待一直到容器有空間可以放入數據。
BlockingQueue就是一個接口,定義了插入,取出數據的接口,如下:
1 public interface BlockingQueue<E> extends Queue<E> { 2 3 //添加元素到隊列里,添加成功返回true,由於容量滿了添加失敗會拋出IllegalStateException異常 4 boolean add(E e); 5 6 //添加元素到隊列里,成功返回true,失敗返回false 7 boolean offer(E e); 8 9 //添加元素到隊列里,如果容量滿了會阻塞直到容量不滿 10 void put(E e) throws InterruptedException; 11 12 13 //添加元素到隊列里,如果容器已滿會阻塞列隊,但是不會一直阻塞,只會阻塞timeout時間,在這期間內添加成功則返回true,否則返回false 14 boolean offer(E e, long timeout, TimeUnit unit) 15 throws InterruptedException; 16 17 18 //從隊列中獲取元素,如果隊列為空,則一直阻塞 19 E take() throws InterruptedException; 20 21 //從隊列中獲取元素,如果容器為空會阻塞列隊,但是不會一直阻塞,只會阻塞timeout時間,在這期間內獲取成功則返回對應元素,否則返回null 22 E poll(long timeout, TimeUnit unit) 23 throws InterruptedException; 24 25 //存儲數據的隊列剩余空間大小 26 int remainingCapacity(); 27 28 //刪除指定的元素,成功返回true,失敗返回false 29 boolean remove(Object o); 30 31 public boolean contains(Object o); 32 33 int drainTo(Collection<? super E> c); 34 35 int drainTo(Collection<? super E> c, int maxElements); 36 }
主要方法已經給出注釋。
BlockingQueue的具體子類如下圖所示:

其中最最常用的就是ArrayBlockingQueue以及LinkedBlockingQueue,我們以ArrayBlockingQueue為例詳細分析一下實現過程。
三、ArrayBlockingQueue源碼分析
ArrayBlockingQueue內部是以數組為容器盛放元素,並且放入和取出元素的時候使用同一個鎖,也就是放入的時候不能同時取出元素。
ArrayBlockingQueue中主要屬性:
1 //存儲元素的數組,是個循環數組,至於為什么是循環數組下面會講到 2 final Object[] items; 3 4 //下一次拿數據的時候的索引 5 int takeIndex; 6 7 //下一次放數據的時候的索引 8 int putIndex; 9 10 //隊列中已經存儲元素的個數 11 int count; 12 13 //鎖,只有這一把鎖 14 final ReentrantLock lock; 15 16 //等待拿數據的的條件對象 17 private final Condition notEmpty; 18 19 //等待放數據的的條件對象 20 private final Condition notFull;
已經給出詳細注釋就不一一詳細解釋了。
接下來我們看下構造函數;
1 public ArrayBlockingQueue(int capacity) { 2 this(capacity, false); 3 } 4 5 public ArrayBlockingQueue(int capacity, boolean fair) { 6 if (capacity <= 0) 7 throw new IllegalArgumentException(); 8 this.items = new Object[capacity]; 9 lock = new ReentrantLock(fair); 10 notEmpty = lock.newCondition(); 11 notFull = lock.newCondition(); 12 }
初始化的時候我們需要指定盛放元素容器的大小,並且初始化一些屬性。
接下來我們分析下ArrayBlockingQueue中添加的方法:add,offer以及put方法。
先看add方法:
1 public boolean add(E e) { 2 return super.add(e); 3 }
直接調用的父類的方法,我們只好去看看父類中的add方法了:
1 public boolean add(E e) { 2 if (offer(e)) 3 return true; 4 else 5 throw new IllegalStateException("Queue full"); 6 }
是不是邏輯很簡單,add方法內部調用了offer方法如果offer方法返回true則add直接返回true,否則拋出IllegalStateException異常。
接下來我們看下offer方法:
1 public boolean offer(E e) { 2 if (e == null) throw new NullPointerException(); 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 if (count == items.length) 7 return false; 8 else { 9 enqueue(e); 10 return true; 11 } 12 } finally { 13 lock.unlock(); 14 } 15 }
第2行,檢查是否為null,為null則拋出空指針異常。
3,4行加鎖,保證只有一個線程操作。
6,7行檢查當前容器是否已將滿了,如果滿了則不能再放入元素,直接返回false。
9,10行如果容器沒滿則執行enqueue方法放入元素,然后返回true,enqueue方法后面會分析。
13行解除鎖。以上就是offer方法的邏輯,比較簡單,該說的都說了。
接下來分析put方法:
1 public void put(E e) throws InterruptedException { 2 if (e == null) throw new NullPointerException(); 3 final ReentrantLock lock = this.lock; 4 lock.lockInterruptibly(); 5 try { 6 while (count == items.length) 7 notFull.await(); 8 enqueue(e); 9 } finally { 10 lock.unlock(); 11 } 12 }
第4行這里調用的是lockInterruptibly方法,與lock方法相比,lockInterruptibly可以被中斷,中斷的時候產生InterruptedException異常。
6,7行同樣判斷容器是否已經滿了,如果已經滿了則執行wait邏輯等待,這里就是阻塞隊列的核心,是不是覺得原來如此啊。
8行,如果容器有空間執行enqueue方法,向容器中加入元素。
offer與put都用到了enqueue方法向容器中加入元素,接下來我們看下enqueue方法:
1 private void enqueue(E x) { 2 // assert lock.getHoldCount() == 1; 3 // assert items[putIndex] == null; 4 final Object[] items = this.items; 5 items[putIndex] = x; 6 if (++putIndex == items.length) putIndex = 0; 7 count++; 8 notEmpty.signal(); 9 }
4,5行就是向數組items中放入元素x。
6行,放元素的索引putIndex加1后與數組長度比較,如果達到數組長度則將putIndex置為0,相當於下一次放入元素位置為數組的第一次位置,從頭開始放入元素,上面說過items是個循環數組,就是在這里體現出來的,如果我們初始化的時候設定容器items大小為10,然而我們不停放入數據也是沒問題的,只不過后面加入的數據會覆蓋前面的數據。
8行,喚醒取數據的線程,告訴其哥們我放入了一個數據你可以取數據了。
到這放入數據的核心部分就分析完了,是不是很簡單???放入數據還有個offer(E e, long timeout, TimeUnit unit)方法,同樣很簡單,可以自行分析。
接下來分析取出數據的方法,取出數據主要是poll與take方法。
先來看下poll方法;
1 public E poll() { 2 final ReentrantLock lock = this.lock; 3 lock.lock(); 4 try { 5 return (count == 0) ? null : dequeue(); 6 } finally { 7 lock.unlock(); 8 } 9 }
2,3行同樣是先鎖住,保證單線程操作。
5行,判斷當前線程中元素數量是否為0,如果為0則沒有元素返回null,否則執行dequeue方法取出數據並返回,后續會分析dequeue方法。
7行解除鎖。
poll方法是不是很簡單,同樣poll(long timeout, TimeUnit unit)也不難分析可自行分析。
接下來看下take方法:
1 public E take() throws InterruptedException { 2 final ReentrantLock lock = this.lock; 3 lock.lockInterruptibly(); 4 try { 5 while (count == 0) 6 notEmpty.await(); 7 return dequeue(); 8 } finally { 9 lock.unlock(); 10 } 11 }
take方法也不難理解,核心就是5,6行邏輯,如果count為0也就是容器內沒有數據則執行wait方法,線程一直處於等待狀態,如果不為0則執行dequeue
方法取出數據。
我們再看下dequeue方法:
1 private E dequeue() { 2 // assert lock.getHoldCount() == 1; 3 // assert items[takeIndex] != null; 4 final Object[] items = this.items; 5 @SuppressWarnings("unchecked") 6 E x = (E) items[takeIndex]; 7 items[takeIndex] = null; 8 if (++takeIndex == items.length) takeIndex = 0; 9 count--; 10 if (itrs != null) 11 itrs.elementDequeued(); 12 notFull.signal(); 13 return x; 14 }
6,7,13行從數組items中取出數據,並將原數組位置處置為null,最后13行處返回取出的數據。
8行,取數據索引takeIndex加1后與數組總長度比較如果達到數組長度則將takeIndex置為0,下一次從數組開始處取數據。
10,11行通知迭代器有數據取出。
12行通知放入數據的線程有數據取出了,你可以放入數據了。
好了,以上就是ArrayBlockingQueue中放入取出數據的操作源碼分析,多線程中總會提到生產者消費者模式,其實用ArrayBlockingQueue實現是很簡單的,ArrayBlockingQueue只是將wait,notify操作進行了封裝而已。
LinkedBlockingQueue源碼就不一一分析了,主要區別的LinkedBlockingQueue內部是用鏈表來存儲數據的,並且有兩把鎖,放數據鎖與取數據鎖,也就是放入數據的線程和取出數據的線程可以同時操作LinkedBlockingQueue,而ArrayBlockingQueue中放數據線程與取數據線程是互斥的,不能同時操作,LinkedBlockingQueue初始化的時候可以不指定容器大小,如果不指定則容器大小為Integer.MAX_VALUE,而ArrayBlockingQueue則必須指定容器大小。
好了以上就是本篇全部內容了,希望對你有用。
聲明:文章將會陸續搬遷到個人公眾號,以后文章也會第一時間發布到個人公眾號,及時獲取文章內容請關注公眾號