Java並發集合及線程池實現原理


本文簡要介紹Java並發編程方面常用的類和集合,並介紹下其實現原理。

1、AtomicInteger

可以用原子方式更新int值。類 AtomicBooleanAtomicIntegerAtomicLongAtomicReference 的實例各自提供對相應類型單個變量的訪問和更新。基本的原理都是使用CAS操作:

boolean compareAndSet(expectedValue, updateValue);

如果此方法(在不同的類間參數類型也不同)當前保持expectedValue,則以原子方式將變量設置為updateValue,並在成功時報告true

循環CAS,參考AtomicInteger中的實現:

public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

ABA問題

因為CAS需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那么使用CAS進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那么A-B-A 就會變成1A-2B-3A。
從Java1.5開始JDK的atomic包里提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法作用是首先檢查當前引用是否等於預期引用,並且當前標志是否等於預期標志,如果全部相等,則以原子方式將該引用和該標志的值設置為給定的更新值。

public boolean compareAndSet(
        V      expectedReference,//預期引用
        V      newReference,//更新后的引用
        int    expectedStamp, //預期標志
        int    newStamp) //更新后的標志

2、ArrayBlockingQueue

一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。隊列的頭部是在隊列中存在時間最長的元素。隊列的尾部是在隊列中存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操作則是從隊列頭部開始獲得元素。這是一個典型的“有界緩存區”,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦創建了這樣的緩存區,就不能再增加其容量。試圖向已滿隊列中放入元素會導致操作受阻塞;試圖從空隊列中提取元素將導致類似阻塞。

此類支持對等待的生產者線程和使用者線程進行排序的可選公平策略。默認情況下,不保證是這種排序。然而,通過將公平性(fairness)設置為true而構造的隊列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。

一些源代碼參考:

/** Main lock guarding all access */
    final ReentrantLock lock;

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }

    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }

    private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]);
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }

    final int dec(int i) {
        return ((i == 0) ? items.length : i) - 1;
    }

    @SuppressWarnings("unchecked")
    static <E> E cast(Object item) {
        return (E) item;
    }
View Code

ArrayBlockingQueue只使用了一個lock來控制互斥訪問,所有的互斥訪問都在這個lock的try finally中實現

3、LinkedBlockingQueue

一個基於已鏈接節點的、范圍任意的blocking queue。此隊列按 FIFO(先進先出)排序元素。隊列的頭部是在隊列中時間最長的元素。隊列的尾部是在隊列中時間最短的元素。新元素插入到隊列的尾部,並且隊列獲取操作會獲得位於隊列頭部的元素。鏈接隊列的吞吐量通常要高於基於數組的隊列,但是在大多數並發應用程序中,其可預知的性能要低。

可選的容量范圍構造方法參數作為防止隊列過度擴展的一種方法。如果未指定容量,則它等於Integer.MAX_VALUE。除非插入節點會使隊列超出容量,否則每次插入后會動態地創建鏈接節點。

如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。

一些實現代碼:

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger(0);

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
View Code

從源代碼實現來看,LinkedBlockingQueue使用了2個lock,一個takelock和一個putlock,讀和寫用不同的lock來控制,這樣並發效率更高

4、ConcurrentLinkedQueue

ArrayBlockingQueue和LinkedBlockingQueue都是使用lock來實現的,也就是阻塞式的隊列,而ConcurrentLinkedQueue使用CAS來實現,是非阻塞式的“lock-free”實現。

ConcurrentLinkedQueue源代碼的實現有點復雜,具體的可看這篇文章的分析:

http://www.infoq.com/cn/articles/ConcurrentLinkedQueue

5、ConcurrentHashMap

HashMap不是線程安全的。

HashTable容器使用synchronized來保證線程安全,在線程競爭激烈的情況下HashTable的效率非常低下。

ConcurrentHashMap采用了Segment分段技術,容器里有多把鎖,每把鎖用於鎖容器其中一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效的提高並發訪問效率。

ConcurrentHashMap結構:

ConcurrentHashMap的實現原理分析:

http://www.infoq.com/cn/articles/ConcurrentHashMap

既然ConcurrentHashMap使用分段鎖Segment來保護不同段的數據,那么在插入和獲取元素的時候,必須先通過哈希算法定位到Segment。可以看到ConcurrentHashMap會首先使用Wang/Jenkins hash的變種算法對元素的hashCode進行一次再哈希。

private static int hash(int h) {
        h += (h << 15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h << 3);
        h ^= (h >>> 6);
        h += (h << 2) + (h << 14);
        return h ^ (h >>> 16);
    }

之所以進行再哈希,其目的是為了減少哈希沖突,使元素能夠均勻的分布在不同的Segment上,從而提高容器的存取效率。假如哈希的質量差到極點,那么所有的元素都在一個Segment中,不僅存取元素緩慢,分段鎖也會失去意義。

ConcurrentHashMap的get操作:

public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}

get操作的高效之處在於整個get過程不需要加鎖,除非讀到的值是空的才會加鎖重讀,我們知道HashTable容器的get方法是需要加鎖的,那么ConcurrentHashMap的get操作是如何做到不加鎖的呢?原因是它的get方法里將要使用的共享變量都定義成volatile,如用於統計當前Segement大小的count字段和用於存儲值的HashEntry的value。定義成volatile的變量,能夠在線程之間保持可見性,能夠被多線程同時讀,並且保證不會讀到過期的值,但是只能被單線程寫(有一種情況可以被多線程寫,就是寫入的值不依賴於原值),在get操作里只需要讀不需要寫共享變量count和value,所以可以不用加鎖。之所以不會讀到過期的值,是根據java內存模型的happen before原則,對volatile字段的寫入操作先於讀操作,即使兩個線程同時修改和獲取volatile變量,get操作也能拿到最新的值,這是用volatile替換鎖的經典應用場景。

transient volatile int count;
volatile V value;

在定位元素的代碼里我們可以發現定位HashEntry和定位Segment的哈希算法雖然一樣,都與數組的長度減去一相與,但是相與的值不一樣,定位Segment使用的是元素的hashcode通過再哈希后得到的值的高位,而定位HashEntry直接使用的是再哈希后的值。其目的是避免兩次哈希后的值一樣,導致元素雖然在Segment里散列開了,但是卻沒有在HashEntry里散列開。

hash >>> segmentShift) & segmentMask //定位Segment所使用的hash算法
int index = hash & (tab.length - 1); // 定位HashEntry所使用的hash算法

ConcurrentHashMap的put操作:

由於put方法里需要對共享變量進行寫入操作,所以為了線程安全,在操作共享變量時必須得加鎖。Put方法首先定位到Segment,然后在Segment里進行插入操作。插入操作需要經歷兩個步驟,第一步判斷是否需要對Segment里的HashEntry數組進行擴容,第二步定位添加元素的位置然后放在HashEntry數組里。

是否需要擴容。在插入元素前會先判斷Segment里的HashEntry數組是否超過容量(threshold),如果超過閥值,數組進行擴容。值得一提的是,Segment的擴容判斷比HashMap更恰當,因為HashMap是在插入元素后判斷元素是否已經到達容量的,如果到達了就進行擴容,但是很有可能擴容之后沒有新元素插入,這時HashMap就進行了一次無效的擴容。

如何擴容。擴容的時候首先會創建一個兩倍於原容量的數組,然后將原數組里的元素進行再hash后插入到新的數組里。為了高效ConcurrentHashMap不會對整個容器進行擴容,而只對某個segment進行擴容。

ConcurrentHashMap的size操作:

如果我們要統計整個ConcurrentHashMap里元素的大小,就必須統計所有Segment里元素的大小后求和。Segment里的全局變量count是一個volatile變量,那么在多線程場景下,我們是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?不是的,雖然相加時可以獲取每個Segment的count的最新值,但是拿到之后可能累加前使用的count發生了變化,那么統計結果就不准了。所以最安全的做法,是在統計size的時候把所有Segment的put,remove和clean方法全部鎖住,但是這種做法顯然非常低效。 因為在累加count操作過程中,之前累加過的count發生變化的幾率非常小,所以ConcurrentHashMap的做法是先嘗試2次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再采用加鎖的方式來統計所有Segment的大小。

那么ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?使用modCount變量,在put , remove和clean方法里操作元素前都會將變量modCount進行加1,那么在統計size前后比較modCount是否發生變化,從而得知容器的大小是否發生變化。

6、CopyOnWriteArrayList

CopyOnWrite容器即寫時復制的容器。往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行Copy,復制出一個新的容器,然后新的容器里添加元素,添加完元素之后,再將原容器的引用指向新的容器。這樣做的好處是可以對CopyOnWrite容器進行並發的讀,而不需要加鎖,因為當前容器不會添加任何元素。所以CopyOnWrite容器也是一種讀寫分離的思想,讀和寫不同的容器。類似的有CopyOnWriteArraySet。

public boolean add(T e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // 復制出新數組
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        // 把新元素添加到新數組里
        newElements[len] = e;
        // 把原數組引用指向新數組
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}
 
final void setArray(Object[] a) {
    array = a;
}

讀的時候不需要加鎖,如果讀的時候有多個線程正在向ArrayList添加數據,讀還是會讀到舊的數據,因為寫的時候不會鎖住舊的ArrayList。

public E get(int index) {
    return get(getArray(), index);
}

7、AbstractQueuedSynchronizer 

為實現依賴於先進先出 (FIFO) 等待隊列的阻塞鎖和相關同步器(信號量、事件,等等)提供一個框架。此類的設計目標是成為依靠單個原子 int 值來表示狀態的大多數同步器的一個有用基礎。子類必須定義更改此狀態的受保護方法,並定義哪種狀態對於此對象意味着被獲取或被釋放。假定這些條件之后,此類中的其他方法就可以實現所有排隊和阻塞機制。子類可以維護其他狀態字段,但只是為了獲得同步而只追蹤使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法來操作以原子方式更新的 int 值。

使用示例
以下是一個非再進入的互斥鎖類,它使用值 0 表示未鎖定狀態,使用 1 表示鎖定狀態。當非重入鎖定不嚴格地需要當前擁有者線程的記錄時,此類使得使用監視器更加方便。它還支持一些條件並公開了一個檢測方法:

class Mutex implements Lock, java.io.Serializable {

    // Our internal helper class
    private static class Sync extends AbstractQueuedSynchronizer {
      // Report whether in locked state
      protected boolean isHeldExclusively() { 
        return getState() == 1; 
      }

      // Acquire the lock if state is zero
      public boolean tryAcquire(int acquires) {
        assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
      }

      // Release the lock by setting state to zero
      protected boolean tryRelease(int releases) {
        assert releases == 1; // Otherwise unused
        if (getState() == 0) throw new IllegalMonitorStateException();
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
      }
       
      // Provide a Condition
      Condition newCondition() { return new ConditionObject(); }

      // Deserialize properly
      private void readObject(ObjectInputStream s) 
        throws IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
      }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();

    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
    public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException { 
      sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit) 
        throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
 }

8、線程池Executor

ThreadPoolExecutor

ThreadPoolExecutor 的內部工作原理,整個思路總結起來就是 5 句話:

1.  如果當前池大小 poolSize 小於 corePoolSize ,則創建新線程執行任務。

2.  如果當前池大小 poolSize 大於 corePoolSize ,且等待隊列未滿,則進入等待隊列

3.  如果當前池大小 poolSize 大於 corePoolSize 且小於 maximumPoolSize ,且等待隊列已滿,則創建新線程執行任務。

4.  如果當前池大小 poolSize 大於 corePoolSize 且大於 maximumPoolSize ,且等待隊列已滿,則調用拒絕策略來處理該任務。

5.  線程池里的每個線程執行完任務后不會立刻退出,而是會去檢查下等待隊列里是否還有線程任務需要執行,如果在 keepAliveTime 里等不到新的任務了,那么線程就會退出。

排隊有三種通用策略:

1)直接提交。工作隊列的默認選項是SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。

2)無界隊列。使用無界隊列(例如,不具有預定義容量的LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。

3)有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。

ThreadFactory 和 RejectedExecutionHandler是ThreadPoolExecutor的兩個屬性,也 可以認為是兩個簡單的擴展點. ThreadFactory 是創建線程的工廠。默認的線程工廠會創建一個帶有“ pool-poolNumber-thread-threadNumber ”為名字的線程,如果我們有特別的需要,如線程組命名、優先級等,可以定制自己的ThreadFactory 。

  • RejectedExecutionHandler 是拒絕的策略。常見有以下幾種:
  • AbortPolicy :不執行,會拋出 RejectedExecutionException 異常。
  • CallerRunsPolicy :由調用者(調用線程池的主線程)執行。
  • DiscardOldestPolicy :拋棄等待隊列中最老的。
  • DiscardPolicy: 不做任何處理,即拋棄當前任務。

ScheduledThreadPoolExecutor 

ScheduledThreadPoolExecutor 是對ThreadPoolExecutor的集成。增加了定時觸發線程任務的功能。需要注意從內部實現看,ScheduledThreadPoolExecutor 使用的是  corePoolSize  線程和一個無界隊列的固定大小的池,所以調整 maximumPoolSize  沒有效果。無界隊列是一個內部自定義的 DelayedWorkQueue 。

1)FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>());  
}  

實際上就是個不支持keepalivetime,且corePoolSize和maximumPoolSize相等的線程池,等待隊列是LinkedBlockingQueue

2)SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {  
    return new FinalizableDelegatedExecutorService  
        (new ThreadPoolExecutor(1, 1,  
                                0L, TimeUnit.MILLISECONDS,  
                                new LinkedBlockingQueue<Runnable>()));  
}  

實際上就是個不支持keepalivetime,且corePoolSize和maximumPoolSize都等1的線程池,,等待隊列是LinkedBlockingQueue

3)CachedThreadPool

public static ExecutorService newCachedThreadPool() {  
      return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
                                  60L, TimeUnit.SECONDS,  
                                  new SynchronousQueue<Runnable>());  
}  

實際上就是個支持keepalivetime時間是60秒,且corePoolSize為0,maximumPoolSize=Integer.MAX_VALUE無窮大的線程池。

SingleThreadScheduledExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {  
    return new DelegatedScheduledExecutorService  
        (new ScheduledThreadPoolExecutor(1, threadFactory));  
}  

實際上是個corePoolSize為1的ScheduledExecutor,但是maximumPoolSize=Integer.MAX_VALUE

ScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
    return new ScheduledThreadPoolExecutor(corePoolSize);  
}  

實際上是corePoolSize可設定的ScheduledExecutor,但是maximumPoolSize=Integer.MAX_VALUE

所以,《阿里java開發手冊》強制不允許使用Executor創建線程池:

【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
說明: Executors 返回的線程池對象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool :
允許的請求隊列長度為 Integer.MAX_VALUE ,可能會堆積大量的請求,從而導致 OOM 。
2) CachedThreadPool 和 ScheduledThreadPool :
允許的創建線程數量為 Integer.MAX_VALUE ,可能會創建大量的線程,從而導致 OOM 。

 線程池執行execute和submit區別

1、submit在執行過程中與execute不一樣,不會拋出異常而是把異常保存在成員變量中,在FutureTask.get阻塞獲取的時候再把異常拋出來。
2、Spring的@Schedule注解的內部實現就是使用submit,因此,如果你構建的任務內部有未檢查異常,你是永遠也拿不到這個異常的。
3、execute直接拋出異常之后線程就死掉了,submit保存異常線程沒有死掉,因此execute的線程池可能會出現沒有意義的情況,因為線程沒有得到重用,而submit不會出現這種情況。

參考:

《java並發編程的藝術》

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM