JDK並發包總結


本文主要介紹jdk中常用的同步控制工具以及並發容器, 其結構如下:

同步控制工具類

ReentrantLock

簡而言之, 就是自由度更高的synchronized, 主要具備以下優點.

  • 可重入: 單線程可以重復進入,但要重復退出
  • 可中斷: lock.lockInterruptibly()
  • 可限時: 超時不能獲得鎖,就返回false,不會永久等待構成死鎖
  • 公平鎖: 先來先得, public ReentrantLock(boolean fair), 默認鎖不公平的, 根據線程優先級競爭.

示例 

 1 public class ReenterLock implements Runnable {
 2     public static ReentrantLock lock = new ReentrantLock();
 3     public static int i = 0;
 4 
 5     @Override
 6     public void run() {
 7         for (int j = 0; j < 10000; j++) {
 8             lock.lock();
 9              // 超時設置
10 //            lock.tryLock(5, TimeUnit.SECONDS);
11             try {
12                 i++;
13             } finally {
14                 // 需要放在finally里釋放, 如果上面lock了兩次, 這邊也要unlock兩次
15                 lock.unlock();
16             }
17         }
18     }
19 
20     public static void main(String[] args) throws InterruptedException {
21         ReenterLock tl = new ReenterLock();
22         Thread t1 = new Thread(tl);
23         Thread t2 = new Thread(tl);
24         t1.start();
25         t2.start();
26         t1.join();
27         t2.join();
28         System.out.println(i);
29     }
30 }

中斷死鎖

線程1, 線程2分別去獲取lock1, lock2, 觸發死鎖. 最終通過DeadlockChecker來觸發線程中斷.

 1 public class DeadLock implements Runnable{
 2 
 3     public static ReentrantLock lock1 = new ReentrantLock();
 4     public static ReentrantLock lock2 = new ReentrantLock();
 5     int lock;
 6 
 7     public DeadLock(int lock) {
 8         this.lock = lock;
 9     }
10 
11     @Override
12     public void run() {
13         try {
14             if (lock == 1){
15                 lock1.lockInterruptibly();
16                 try {
17                     Thread.sleep(500);
18                 }catch (InterruptedException e){}
19                 lock2.lockInterruptibly();
20 
21             }else {
22                 lock2.lockInterruptibly();
23                 try {
24                     Thread.sleep(500);
25                 }catch (InterruptedException e){}
26                 lock1.lockInterruptibly();
27 
28             }
29         }catch (InterruptedException e){
30             e.printStackTrace();
31         }finally {
32             if (lock1.isHeldByCurrentThread())
33                 lock1.unlock();
34             if (lock2.isHeldByCurrentThread())
35                 lock2.unlock();
36             System.out.println(Thread.currentThread().getId() + "線程中斷");
37         }
38     }
39 
40     public static void main(String[] args) throws InterruptedException {
41         DeadLock deadLock1 = new DeadLock(1);
42         DeadLock deadLock2 = new DeadLock(2);
43         // 線程1, 線程2分別去獲取lock1, lock2. 導致死鎖
44         Thread t1 = new Thread(deadLock1);
45         Thread t2 = new Thread(deadLock2);
46         t1.start();
47         t2.start();
48         Thread.sleep(1000);
49         // 死鎖檢查, 觸發中斷
50         DeadlockChecker.check();
51 
52     }
53 }
 1 public class DeadlockChecker {
 2     private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
 3     final static Runnable deadLockCheck = new Runnable() {
 4         @Override
 5         public void run() {
 6             while (true) {
 7                 long[] deadlockedThreadlds = mbean.findDeadlockedThreads();
 8 
 9                 if (deadlockedThreadlds != null) {
10                     ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadlds);
11                     for (Thread t : Thread.getAllStackTraces().keySet()) {
12                         for (int i = 0; i < threadInfos.length; i++) {
13                             if (t.getId() == threadInfos[i].getThreadId()) {
14                                 t.interrupt();
15                                 try {
16                                     Thread.sleep(5000);
17                                 } catch (InterruptedException e) {
18                                 }
19                             }
20                         }
21                     }
22                 }
23             }
24         }
25     };
26 
27     public static void check() {
28         Thread t = new Thread(deadLockCheck);
29         t.setDaemon(true);
30         t.start();
31     }
32 }
View Code

Condition

類似於 Object.wait()和Object.notify(), 需要與ReentrantLock結合使用.

具體API如下:

 1     // await()方法會使當前線程等待,同時釋放當前鎖,當其他線程中使用signal()時或者signalAll()方法時,
 2     // 線程會重新獲得鎖並繼續執行。或者當線程被中斷時,也能跳出等待。這和Object.wait()方法很相似。
 3     void await() throws InterruptedException;
 4     // awaitUninterruptibly()方法與await()方法基本相同,但是它並不會再等待過程中響應中斷。
 5     void awaitUninterruptibly();
 6     long awaitNanos(long nanosTimeout) throws InterruptedException;
 7     boolean await(long time, TimeUnit unit) throws InterruptedException;
 8     boolean awaitUntil(Date deadline) throws InterruptedException;
 9     // singal()方法用於喚醒一個在等待中的線程。相對的singalAll()方法會喚醒所有在等待中的線程。
10     // 這和Obejct.notify()方法很類似。
11     void signal();
12     void signalAll();

示例

 1 public class ReenterLockCondition implements Runnable{
 2 
 3     public static ReentrantLock lock = new ReentrantLock();
 4     public static Condition condition = lock.newCondition();
 5 
 6     @Override
 7     public void run() {
 8         try {
 9             lock.lock();
10             condition.await();
11             System.out.println("Thread is going on");
12         } catch (InterruptedException e) {
13             e.printStackTrace();
14         } finally {
15             // 注意放到finally中釋放
16             lock.unlock();
17         }
18     }
19 
20     public static void main(String[] args) throws InterruptedException {
21         ReenterLockCondition t1 = new ReenterLockCondition();
22         Thread tt = new Thread(t1);
23         tt.start();
24         Thread.sleep(2000);
25         System.out.println("after sleep, signal!");
26         // 通知線程tt繼續執行. 喚醒同樣需要重新獲得鎖
27         lock.lock();
28         condition.signal();
29         lock.unlock();
30     }
31 }

 Semaphore信號量

鎖一般都是互斥排他的, 而信號量可以認為是一個共享鎖,

允許N個線程同時進入臨界區, 但是超出許可范圍的只能等待.

如果N = 1, 則類似於lock.

具體API如下, 通過acquire獲取信號量, 通過release釋放

1     public void acquire()
2     public void acquireUninterruptibly()
3     public boolean tryAcquire()
4     public boolean tryAcquire(long timeout, TimeUnit unit)
5     public void release()

示例

模擬20個線程, 但是信號量只設置了5個許可.

因此線程是按序每2秒5個的打印job done.

 1 public class SemapDemo implements Runnable{
 2 
 3     // 設置5個許可
 4     final Semaphore semp = new Semaphore(5);
 5 
 6     @Override
 7     public void run() {
 8         try {
 9             semp.acquire();
10             // 模擬線程耗時操作
11             Thread.sleep(2000L);
12             System.out.println("Job done! " + Thread.currentThread().getId());
13         } catch (InterruptedException e) {
14             e.printStackTrace();
15         } finally {
16             semp.release();
17         }
18     }
19 
20     public static void main(String[] args){
21         ExecutorService service = Executors.newFixedThreadPool(20);
22         final SemapDemo demo = new SemapDemo();
23         for (int i = 0; i < 20; i++) {
24             service.submit(demo);
25         }
26     }
27 }

ReadWriteLock

讀寫分離鎖, 可以大幅提升系統並行度.

  • 讀-讀不互斥:讀讀之間不阻塞。
  • 讀-寫互斥:讀阻塞寫,寫也會阻塞讀。
  • 寫-寫互斥:寫寫阻塞。

示例

使用方法與ReentrantLock類似, 只是讀寫鎖分離.

1 private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
2 private static Lock readLock = readWriteLock.readLock();
3 private static Lock writeLock = readWriteLock.writeLock();

CountDownLatch倒數計時器

一種典型的場景就是火箭發射。在火箭發射前,為了保證萬無一失,往往還要進行各項設備、儀器的檢查。

只有等所有檢查完畢后,引擎才能點火。這種場景就非常適合使用CountDownLatch。它可以使得點火線程, 

等待所有檢查線程全部完工后,再執行.

示例

 1 public class CountDownLatchDemo implements Runnable{
 2     static final CountDownLatch end = new CountDownLatch(10);
 3     static final CountDownLatchDemo demo = new CountDownLatchDemo();
 4 
 5     @Override
 6     public void run() {
 7         try {
 8             Thread.sleep(new Random().nextInt(10) * 1000);
 9             System.out.println("check complete!");
10             end.countDown();
11         } catch (InterruptedException e) {
12             e.printStackTrace();
13         }
14     }
15 
16     public static void main(String[] args) throws InterruptedException {
17         ExecutorService service = Executors.newFixedThreadPool(10);
18         for (int i = 0; i < 10; i++) {
19             service.submit(demo);
20         }
21         // 等待檢查
22         end.await();
23         // 所有線程檢查完畢, 發射火箭.
24         System.out.println("fire");
25         service.shutdown();
26     }
27 }

CyclicBarrier循環柵欄

Cyclic意為循環,也就是說這個計數器可以反復使用。比如,假設我們將計數器設置為10。那么湊齊

第一批10個線程后,計數器就會歸零,然后接着湊齊下一批10個線程.

示例

 1 public class CyclicBarrierDemo {
 2 
 3     public static class Soldier implements Runnable {
 4 
 5         private String soldier;
 6         private final CyclicBarrier cyclic;
 7 
 8         Soldier(CyclicBarrier cyclic, String soldier) {
 9             this.cyclic = cyclic;
10             this.soldier = soldier;
11         }
12 
13         @Override
14         public void run() {
15             try {
16                 // 等待所有士兵到期
17                 cyclic.await();
18                 doWork();
19                 // 等待所有士兵完成工作
20                 cyclic.await();
21             } catch (InterruptedException e) {
22                 e.printStackTrace();
23             } catch (BrokenBarrierException e) {
24                 e.printStackTrace();
25             }
26         }
27 
28         void doWork() {
29             try {
30                 Thread.sleep(Math.abs(new Random().nextInt() % 10000));
31             } catch (InterruptedException e) {
32                 e.printStackTrace();
33             }
34             System.out.println(soldier + " 任務完成!");
35         }
36     }
37 
38     public static class BarrierRun implements Runnable {
39         boolean flag;
40         int N;
41 
42         public BarrierRun(boolean flag, int n) {
43             this.flag = flag;
44             N = n;
45         }
46 
47         @Override
48         public void run() {
49             if (flag) {
50                 System.out.println("士兵:" + N + "個, 任務完成!");
51             } else {
52                 System.out.println("士兵:" + N + "個, 集合完畢!");
53                 flag = true;
54             }
55         }
56     }
57 
58     public static void main(String[] args){
59         final int N = 5;
60         Thread[] allSoldier = new Thread[N];
61         boolean flag = false;
62         CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
63         // 設置屏障點, 主要為了執行這個方法.
64         System.out.println("集合任務!");
65         for (int i = 0; i < N; i++) {
66             System.out.println("士兵" + i + " 報到!");
67             allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));
68             allSoldier[i].start();
69         }
70 
71     }
72

結果

集合任務!
士兵0 報到!
士兵1 報到!
士兵2 報到!
士兵3 報到!
士兵4 報到!
士兵:5個, 集合完畢!
士兵3 任務完成!
士兵1 任務完成!
士兵0 任務完成!
士兵4 任務完成!
士兵2 任務完成!
士兵:5個, 任務完成!

LockSupport

一個線程阻塞工具, 可以在任意位置讓線程阻塞.

與suspend()比較, 如果unpark發生在park之前, 並不會導致線程凍結, 也不需要獲取鎖.

API

1 LockSupport.park();
2 LockSupport.unpark(t1);

中斷響應

能夠響應中斷,但不拋出異常。

中斷響應的結果是,park()函數的返回,可以從Thread.interrupted()得到中斷標志

 1 public class LockSupportDemo {
 2     public static Object u = new Object();
 3     static ChangeObjectThread t1 = new ChangeObjectThread("t1");
 4     static ChangeObjectThread t2 = new ChangeObjectThread("t2");
 5     public static class ChangeObjectThread extends Thread {
 6 
 7         public ChangeObjectThread(String name) {
 8             super(name);
 9         }
10 
11         @Override
12         public void run() {
13             synchronized (u) {
14                 System.out.println("in " + getName());
15                 LockSupport.park();
16             }
17         }
18     }
19 
20     public static void main(String[] args) throws InterruptedException {
21         t1.start();
22         Thread.sleep(100);
23         t2.start();
24         LockSupport.unpark(t1);
25         LockSupport.unpark(t2);
26         t1.join();
27         t2.join();
28     }
29 }

並發容器

Collections.synchronizedMap

其本質是在讀寫map操作上都加了鎖, 因此不推薦在高並發場景使用.

ConcurrentHashMap

內部使用分區Segment來表示不同的部分, 每個分區其實就是一個小的hashtable. 各自有自己的鎖.

只要多個修改發生在不同的分區, 他們就可以並發的進行. 把一個整體分成了16個Segment, 最高支持16個線程並發修改.

代碼中運用了很多volatile聲明共享變量, 第一時間獲取修改的內容, 性能較好.

 1     public V put(K key, V value) {
 2         ConcurrentHashMap.Segment<K,V> s;
 3         if (value == null)
 4             throw new NullPointerException();
 5         int hash = hash(key);
 6         int j = (hash >>> segmentShift) & segmentMask;
 7         // 通過unsafe對j進行偏移來尋找key所對應的分區
 8         if ((s = (ConcurrentHashMap.Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
 9                 (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
10             // 如果分區不存在, 則創建新的分區
11             s = ensureSegment(j);
12         // kv放到分區中
13         return s.put(key, hash, value, false);
14     }

Segment.put源碼

 1     Segment(float lf, int threshold, ConcurrentHashMap.HashEntry<K,V>[] tab) {
 2         this.loadFactor = lf;
 3         this.threshold = threshold;
 4         this.table = tab;
 5     }
 6 
 7     final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 8         // tryLock通過無鎖cas操作嘗試獲取鎖(無等待), 繼承自ReentrantLock.
 9         // 如果成功則, node = null
10         // 如果不成功, 則可能其他線程已經在插入數據了,
11         // 此時會嘗試繼續獲取鎖tryLock, 自旋MAX_SCAN_RETRIES次, 若還是拿不到鎖才開始lock
12         ConcurrentHashMap.HashEntry<K,V> node = tryLock() ? null :
13                 scanAndLockForPut(key, hash, value);
14         V oldValue;
15         try {
16             ConcurrentHashMap.HashEntry<K,V>[] tab = table;
17             // 獲取分區中哪一個entry鏈的index
18             int index = (tab.length - 1) & hash;
19             // 獲取第一個entry
20             ConcurrentHashMap.HashEntry<K,V> first = entryAt(tab, index);
21             for (ConcurrentHashMap.HashEntry<K,V> e = first;;) {
22                 // e != null , 存在hash沖突, 把他加到當前鏈表中
23                 if (e != null) {
24                     K k;
25                     if ((k = e.key) == key ||
26                             (e.hash == hash && key.equals(k))) {
27                         oldValue = e.value;
28                         if (!onlyIfAbsent) {
29                             e.value = value;
30                             ++modCount;
31                         }
32                         break;
33                     }
34                     e = e.next;
35                 }
36                 else {
37                     // 無hash沖突, new entry
38                     if (node != null)
39                         node.setNext(first);
40                     else
41                         node = new ConcurrentHashMap.HashEntry<K,V>(hash, key, value, first);
42                     int c = count + 1;
43                     // 空間大小超出閾值, 需要rehash, 翻倍空間.
44                     if (c > threshold && tab.length < MAXIMUM_CAPACITY)
45                         rehash(node);
46                     else
47                         //放到分區中
48                         setEntryAt(tab, index, node);
49                     ++modCount;
50                     count = c;
51                     oldValue = null;
52                     break;
53                 }
54             }
55         } finally {
56             unlock();
57         }
58         return oldValue;
59     }

如果想要對ConcurrentHashMap排序, 則可以使用ConcurrentSkipListMap,

他支持並發排序, 是一個線程安全的類似TreeMap的實現.

BlockingQueue

阻塞隊列, 主要用於多線程之間共享數據.

當一個線程讀取數據時, 如果隊列是空的, 則當前線程會進入等待狀態.

如果隊列滿了, 當一個線程嘗試寫入數據時, 同樣會進入等待狀態.

適用於生產消費者模型.

其源碼實現也相對簡單.

 1     public void put(E e) throws InterruptedException {
 2         checkNotNull(e);
 3         final ReentrantLock lock = this.lock;
 4         lock.lockInterruptibly();
 5         try {
 6             // 隊列滿了, 寫進入等待
 7             while (count == items.length)
 8                 notFull.await();
 9             insert(e);
10         } finally {
11             lock.unlock();
12         }
13     }
14 
15     public E take() throws InterruptedException {
16         final ReentrantLock lock = this.lock;
17         lock.lockInterruptibly();
18         try {
19             // 隊列空的, 讀進入等待
20             while (count == 0)
21                 notEmpty.await();
22             return extract();
23         } finally {
24             lock.unlock();
25         }
26     }

因為BlockingQueue在put take等操作有鎖, 因此非高性能容器, 

如果需要高並發支持的隊列, 則可以使用ConcurrentLinkedQueue. 他內部也是運用了大量無鎖操作.

CopyOnWriteArrayList

CopyOnWriteArrayList通過在新增元素時, 復制一份新的數組出來, 並在其中寫入數據, 之后將原數組引用指向到新數組.

其Add操作是在內部通過ReentrantLock進行鎖保護, 防止多線程場景復制多份數組.

而Read操作內部無鎖, 直接返回數組引用, 並發下效率高, 因此適用於讀多寫少的場景.

源碼

 1     public boolean add(E e) {
 2         final ReentrantLock lock = this.lock;
 3         // 寫數據的鎖
 4         lock.lock();
 5         try {
 6             Object[] elements = getArray();
 7             int len = elements.length;
 8             // 復制到新的數組
 9             Object[] newElements = Arrays.copyOf(elements, len + 1);
10             // 加入新元素
11             newElements[len] = e;
12             // 修改引用
13             setArray(newElements);
14             return true;
15         } finally {
16             lock.unlock();
17         }
18     }
19 
20     final void setArray(Object[] a) {
21         array = a;
22     }
23 
24     // 讀的時候無鎖
25     public E get(int index) {
26         return get(getArray(), index);
27     }

示例

使用10個讀線程, 100個寫線程. 如果使用ArrayList實現, 那么有可能是在運行過程中拋出ConcurrentModificationException.

原因很簡單, ArrayList在遍歷的時候會check modCount是否發生變化, 如果一邊讀一邊寫就會拋異常.

 1 public class CopyOnWriteListDemo {
 2 
 3     static List<UUID> list = new CopyOnWriteArrayList<UUID>();
 4 //    static List<UUID> list = new ArrayList<UUID>();
 5 
 6     // 往list中寫數據
 7     public static class AddThread implements Runnable {
 8 
 9         @Override
10         public void run() {
11             UUID uuid = UUID.randomUUID();
12             list.add(uuid);
13             System.out.println("++Add uuid : " + uuid);
14 
15         }
16     }
17 
18     // 從list中讀數據
19     public static class ReadThread implements Runnable {
20 
21         @Override
22         public void run() {
23             System.out.println("start read size: " + list.size() + " thread : " + Thread.currentThread().getName());
24             for (UUID uuid : list) {
25                 System.out.println("Read uuid : " + uuid + " size : " + list.size() + "thread: " + Thread.currentThread().getName());
26             }
27         }
28     }
29 
30 
31     public static void main(String[] args) throws InterruptedException {
32         initThread(new AddThread(), 10);
33         initThread(new ReadThread(), 100);
34     }
35 
36     private static void initThread(Runnable runnable, int maxNum) throws InterruptedException {
37         Thread[] ts = new Thread[maxNum];
38         for (int k = 0; k < maxNum; k++) {
39             ts[k] = new Thread(runnable);
40         }
41         for (int k = 0; k < maxNum; k++) {
42             ts[k].start();
43         }
44     }
45 }

下圖運行結果中可以看出來, 同一個線程, 即使在讀的過程中發生了size變化, 也不會拋出ConcurrentModificationException


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM