Java 8並發工具包簡介
Java 8並發工具包由3個包組成,分別是java.util.concurrent、java.util.concurrent.atomic和java.util.concurrent.locks,提供了大量關於並發的接口、類、原子操作類、鎖相關類。借助java.util.concurrent包,可以非常輕松地實現復雜的並發操作。java.util.concurrent包主要包含以下內容,后文將具體介紹:
阻塞隊列:多種阻塞隊列的實現,在隊列為空或滿時能夠阻塞消費者或生產者相關線程。
並發容器:用於並發場景的容器類型,一般無需加鎖。
線程池:創建單線程、固定大小或可緩存的線程池,支持周期性任務,也能夠實現異步任務的返回。
鎖:用於並發同步的鎖;
原子類型:用於實現原子操作的數據類型,包括AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference等
並發工具:用於並發場景的類,可以控制並發線程的同步、等待。
阻塞隊列BlockingQueue
在BlockingQueue中,生產者可以持續向隊列插入新的元素,直到隊列滿為止,隊列滿后生產者線程被阻塞;消費者可以持續從隊列取出元素,直到隊列空為止,隊列空后消費者線程被阻塞。
BlockingQueue提供了四種類型的操作方法,在操作不能立即執行的情況下有不同的表現。
拋出異常 | 返回特殊值 | 阻塞 | 超時返回 | |
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
刪除 | remove(o) | poll() | take() | poll(timeout, timeunit) |
檢查 | element() | peek() |
Java 8提供了多種類型的BlockingQueue實現類,
ArrayBlockingQueue:基於數組實現的有界阻塞隊列,創建后不能修改隊列的大小;
LinkedBlockingQueue:基於鏈表實現的有界阻塞隊列,默認大小為Integer.MAX_VALUE,有較好的吞吐量,但可預測性差。
PriorityBlockingQueue:具有優先級的無界阻塞隊列,不允許插入null,所有元素都必須可比較(即實現Comparable接口)。
SynchronousQueue:只有一個元素的同步隊列。若隊列中有元素插入操作將被阻塞,直到隊列中的元素被其他線程取走。
DelayQueue:無界阻塞隊列,每個元素都有一個延遲時間,在延遲時間之后才釋放元素。
阻塞雙端隊列BlockingDueue
Dueue是“Double Ended Queue”的縮寫。生產者和消費者可以在隊列的兩端進行插入和刪除操作。
在頭部提供了四種類型的操作方法,在操作不能立即執行的情況下有不同的表現。
拋出異常 | 返回特殊值 | 阻塞 | 超時返回 | |
插入 | addFirst(o) | offerFirst(o) | putFirst(o) | offerFirst(o, timeout, timeunit) |
刪除 | removeFirst(o) | pollFirst(o) | takeFirst(o) | pollFirst(timeout, timeunit) |
檢查 | getFirst(o) | peekFirst(o) |
在尾部提供了四種類型的操作方法,在操作不能立即執行的情況下有不同的表現。
拋出異常 | 返回特殊值 | 阻塞 | 超時返回 | |
插入 | addLast(o) | offerLast(o) | putLast(o) | offerLast(o, timeout, timeunit) |
刪除 | removeLast(o) | pollLast(o) | takeLast(o) | pollLast(timeout, timeunit) |
檢查 | getLast(o) | peekLast(o) |
Java 8只提供了一種類型的BlockingDueue實現類,
LinkedBlockingDeque:基於雙向鏈表實現的有界阻塞隊列,默認大小為Integer.MAX_VALUE,有較好的吞吐量,但可預測性差。
阻塞轉移隊列TransferQueue
TransferQueue接口繼承了BlockingQueue接口,因此具有BlockingQueue接口的所有方法,並增加了一些方法。方法及作用如下:
tryTransfer(E e):若當前存在一個正在等待獲取的消費者線程,則該方法會即刻轉移e,並返回true;若不存在則返回false,但是並不會將e插入到隊列中。這個方法不會阻塞當前線程,要么快速返回true,要么快速返回false。
transfer(E e):若當前存在一個正在等待獲取的消費者線程,即立刻將e移交之;否則將元素e插入到隊列尾部,並且當前線程進入阻塞狀態,直到有消費者線程取走該元素。
tryTransfer(E e, long timeout, TimeUnit unit):若當前存在一個正在等待獲取的消費者線程,會立即傳輸給它; 否則將元素e插入到隊列尾部,並且等待被消費者線程獲取消費掉。若在指定的時間內元素e無法被消費者線程獲取,則返回false,同時該元素從隊列中移除。
Java 8 提供了一個基於鏈表的實現類LinkedTransferQueue。
並發容器
工具包提供了隊列的並發實現類ConcurrentLinkedQueue和ConcurrentLinkedDeque,兩者都是無界非阻塞線程安全的隊列。
ConcurrentMap接口繼承了普通的Map接口,提供了線程安全和原子操作特性。Java 8 提供了實現類ConcurrentHashMap,ConcurrentHashMap不鎖定整個Map,只鎖定需要寫入的部分,因此並發性能比HashTable要高很多。
ConcurrentNavigableMap接口繼承了ConcurrentMap和NavigableMap接口,支持並發訪問NavigableMap,還能讓子Map具備並發訪問的能力。NavigableMap是擴展的 SortedMap,具有了針對給定搜索目標返回最接近匹配項的導航方法。
Java 8 提供了實現類ConcurrentSkipListMap,並沒有使用lock來保證線程的並發訪問和修改,而是使用了非阻塞算法來保證並發訪問,高並發時相對於TreeMap有明顯的優勢。
工具包提供了NavigableSet的並發實現類ConcurrentSkipListSet,是線程安全的有序集合,適用於高並發的場景,通過ConcurrentSkipListMap實現。
工具包提供了兩個寫時復制容器,即CopyOnWriteArrayList和CopyOnWriteArraySet。寫時復制技術是一種優化策略,多個線程可以並發訪問同一份數據,當有線程要修改時才進行復制然后修改。在Linux系統中,fork進程后,子進程先與父進程共享數據,需要修改時才用寫時復制得到自己的副本。在Java中,寫時復制容器在修改數據后,把原來容器的引用指向新容器,來實現讀寫分離,在並發讀寫中不需要加鎖。寫時復制容器適用於讀多寫少的場景,在復制時會占用較多內存,能夠保證最終一致性,但無法保證瞬時一致性。
線程池
工具包中Executor接口定義了執行器的基本功能,即execute方法,接收Runnable對象參數並執行Runnable中的操作。
ExecutorService接口繼承Executor接口后增加了關於執行器服務的定義,如關閉、立即關閉、檢查關閉、等待終止、提交有返回值的任務、批量提交任務等。通過Executors的工廠方法獲取ExecutorService的具體實現,目前Executors可以返回的實現類型如下:
FixedThreadPool:固定大小的線程池,創建時指定大小;
WorkStealingPool:擁有多個任務隊列(以便減少連接數)的線程池;
SingleThreadExecutor:單線程執行器,顧名思義只有一個線程執行任務;
CachedThreadPool:根據需要創建線程,可以重復利用已存在的線程來執行任務;
SingleThreadScheduledExecutor:根據時間計划延遲創建單個工作線程或者周期性創建的單線程執行器;
ScheduledThreadPool:能夠延后執行任務,或者按照固定的周期執行任務。
如果希望在任務執行完成后得到任務的返回值,可以調用submit方法傳入Callable任務,並通過返回的Future對象查看任務執行是否完成,並獲取返回值。
線程分叉與合並
ForkJoinPool 讓我們可以很方便地把任務分裂成幾個更小的任務,這些分裂出來的任務也將會提交給 ForkJoinPool。任務可以繼續分割成更小的子任務,只要它還能分割。分叉和合並原理包含兩個遞歸進行的步驟。兩個步驟分別是分叉步驟和合並步驟。
一個使用了分叉和合並原理的任務可以將自己分叉(分割)為更小的子任務,這些子任務可以被並發執行。如下圖所示:
通過把自己分割成多個子任務,每個子任務可以由不同的 CPU 並行執行,或者被同一個 CPU 上的不同線程執行。
只有當給的任務過大,把它分割成幾個子任務才有意義。把任務分割成子任務有一定開銷,因此對於小型任務,這個分割的消耗可能比每個子任務並發執行的消耗還要大。
什么時候把一個任務分割成子任務是有意義的,這個界限也稱作一個閥值。這要看每個任務對有意義閥值的決定。很大程度上取決於它要做的工作的種類。
當一個任務將自己分割成若干子任務之后,該任務將等待所有子任務結束。一旦子任務執行結束,該任務可以把所有結果合並到同一個結果。圖示如下:
鎖
使用鎖實現的同步機制很像synchronized塊,但是比synchronized塊更靈活。鎖和synchronized的主要區別在於:
Synchronized塊不能保證等待進入塊的線程的訪問順序;
Synchronized塊無法接收參數,不能在有超時時間限制的情況下嘗試訪問;
Synchronized塊必須包含在單個方法中,而鎖的lock和unlock操作可以在單獨的方法中。
工具包提供了以下幾種類型的鎖:
ReadWriteLock:讀寫鎖接口,允許多個線程讀取某個資源,但是一次只能有一個線程進行寫操作。內部有讀鎖、寫鎖兩個接口,分別保護讀操作和寫操作。實現類為ReentrantReadWriteLock。
ReentrantLock:可重入鎖,具有與使用 synchronized 方法和語句所訪問的隱式監視器鎖定相同的一些基本行為和語義,但功能更強大。ReentrantLock 將由最近成功獲得鎖定,並且還沒有釋放該鎖定的線程所擁有。當鎖定沒有被另一個線程所擁有時,調用 lock 的線程將成功獲取該鎖定並返回。如果當前線程已經擁有該鎖定,此方法將立即返回。內部有一個計數器,擁有鎖的線程每鎖定一次,計數器加1,每釋放一次計數器減1。
原子類型
工具包提供了一些可以用原子方式進行讀寫的變量類型,支持無鎖線程安全的單變量編程。
本質上,這些類都擴展了volatile的概念,使用一個volatile類型的變量來存儲實際數據。
工具包提供了4種類型的原子變量類型:
AtomicBoolean:可原子操作的布爾對象;
AtomicInteger:可原子操作的整形對象;
AtomicLong:可原子操作的長整形對象;
AtomicReference:可原子操作的對象引用。
以AtomicInteger為例。在Java中i++和++i操作並不是線程安全的,需要加鎖。AtomicInteger提供了以下幾種線程安全的操作方法:
方法 | 定義 | 作用 |
getAndIncrement | public final int getAndIncrement() | i++ |
getAndDecrement | public final int getAndDecrement() | i-- |
incrementAndGet | public final int incrementAndGet() | ++i |
decrementAndGet() | public final int decrementAndGet() | --i |
getAndAdd | public final int getAndAdd(int delta) | 增加delta返回舊值 |
addAndGet | public final int addAndGet(int delta) | 增加delta返回新值 |
在此基礎上,工具包還提供了原子性的數組類型,包括AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray。
並發工具
CountDownLatch
CountDownLatch用於一個或者多個線程等待一系列指定操作的完成。初始化時,給定一個數量,每調用一次countDown() 方法數量減一。其他線程調用await方法等待時,線程會阻塞到數量減到0才開始執行。
CyclicBarrier 柵欄
CyclicBarrier是一種同步機制,它能夠對處理一些算法的線程實現同步。換句話講,它就是一個所有線程必須等待的一個柵欄,直到所有線程都到達這里,然后所有線程才可以繼續做其他事情。在下圖的流程中,線程1和線程2都到達第一個柵欄后才能夠繼續運行。如果線程1先到線程2后到,則線程1需要等待線程2到達柵欄處,然后兩個線程才能繼續運行。
Exchanger 交換機
Exchanger類表示一種會合點,兩個線程可以在這里交換對象。兩個線程各自調用exchange方法進行交換,當線程A調用Exchange對象的exchange()方法后,它會陷入阻塞狀態,直到線程B也調用了exchange()方法,然后以線程安全的方式交換數據,之后線程A和B繼續運行。
Semaphore 信號量
Semaphore 可以很輕松完成信號量控制,Semaphore可以控制某個資源可被同時訪問的個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
ThreadLocalRandom產生並發隨機數
使用Math.random()產生隨機數,使用原子變量來保存當前的種子,這樣兩個線程同時調用序列時得到的是偽隨機數,而不是相同數量的兩倍。ThreadLocalRandom提供並發產生的隨機數,能夠解決多個線程發生的競爭爭奪。
原文地址:Java 8並發工具包漫游指南