多線程系列五:並發工具類和並發容器


一、並發容器

1.ConcurrentHashMap

為什么要使用ConcurrentHashMap

在多線程環境下,使用HashMap進行put操作會引起死循環,導致CPU利用率接近100%,HashMap在並發執行put操作時會引起死循環,是因為多線程會導致HashMap的Entry鏈表

形成環形數據結構,一旦形成環形數據結構,Entry的next節點永遠不為空,就會產生死循環獲取Entry。

HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下。因為當一個線程訪問HashTable的同步方法,其他線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態。如線程1使用put進行元素添加,線程2不但不能使用put方法添加元素,也不能使用get方法來獲取元素,所以競爭越激烈效率越低。

ConcurrentHashMap的一些有用的方法

很多時候我們希望在元素不存在時插入元素,我們一般會像下面那樣寫代碼

synchronized(map){

  if (map.get(key) == null){

      return map.put(key, value);

  } else{

      return map.get(key);

  }

}

putIfAbsent(key,value)方法原子性的實現了同樣的功能

putIfAbsent(K key, V value)  

 如果key對應的value不存在,則put進去,返回null。否則不put,返回已存在的value  

boolean remove(Object key, Object value)  

  如果key對應的值是value,則移除K-V,返回true。否則不移除,返回false  

boolean replace(K key, V oldValue, V newValue)  

 如果key對應的當前值是oldValue,則替換為newValue,返回true。否則不替換,返回false

Hash的解釋

散列任意長度的輸入通過一種算法變換成固定長度的輸出。屬於壓縮的映射。

hash算法示例圖演示:

 

類似於HaspMap的實現就是使用散列,比如把1000個元素放到長度為10的hashmap里面去,放入之前會把這1000個數經過hash算法映射到10個數組里面去,這時候就會存在相同的映射值在一個數組的相同位置,就會產生hash碰撞,此時hashmap就會在產生碰撞的數組的后面使用Entry鏈表來存儲相同映射的值,然后使用equals方法來判斷同一個鏈表存儲的值是否一樣來獲取值,鏈表就是hashmap用來解決碰撞的方法,所以我們一般在寫一個類的時候要寫自己的hashcode方法和equals方法,如果鍵的hashcode相同,再使用鍵的equals方法判斷鍵內容是不是一樣的,一樣的就獲取值

Md5Sha,取余都是散列算法,ConcurrentHashMap中是wang/jenkins算法

 ConcurrentHashMap在1.7下的實現

分段鎖的設計思想

分段鎖的思想示例圖:

說明:

a)傳統的hashtable是很小空間的數組整段鎖住,這樣性能比較低

b)ConcurrentHashMap是在很小空間數組的前面再加一個數組,映射的時候先映射到前面的數組,然后再映射到后面的很小空間的數組;讀取的時候只需要把前面的數組鎖住就可以了。這就是分段鎖的思想

ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment實際是一種可重入鎖(ReentrantLock),也就是用於分段的鎖。HashEntry則用於存儲鍵值對數據。一個ConcurrentHashMap里包含一個Segment數組。Segment的結構和HashMap類似,是一種數組和鏈表結構。一個Segment里包含一個HashEntry數組,每個HashEntry是一個鏈表結構的元素,每個Segment守護着一個HashEntry數組里的元素,當對HashEntry數組的數據進行修改時,必須首先獲得與它對應的Segment鎖。

 

說明:上圖存在兩次散列的過程:比如插入一個1000的數,首先是把1000的位數(最多是高16位)做一次散列找到在segments數組中的位置,然后再把1000本身做一次散列找到在table中的位置

獲取值時一樣

ConcurrentHashMap初始化方法是通過initialCapacity、loadFactor和concurrencyLevel(參數concurrencyLevel是用戶估計的並發級別,就是說你覺得最多有多少線程共同修改這個map,根據這個來確定Segment數組的大小concurrencyLevel默認是DEFAULT_CONCURRENCY_LEVEL = 16;)

ConcurrentHashMap完全允許多個讀操作並發進行,讀操作並不需要加鎖。ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的。HashEntry代表每個hash鏈中的一個節點可以看到其中的對象屬性要么是final要么是volatile的。

總結:ConcurrentHashMap在1.7及以下的實現使用數組+鏈表的方式,采用了分段鎖的思想

ConcurrentHashMap在1.8下的實現

改進一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存數據,采用table數組元素作為鎖,從而實現了對每一行數據進行加鎖,進一步減少並發沖突的概率。

改進二:將原先table數組+單向鏈表的數據結構,變更為table數組+單向鏈表+紅黑樹的結構。對於個數超過8(默認值)的列表,jdk1.8中采用了紅黑樹的結構,那么查詢的時間復雜度可以降低到O(logN),可以改進性能。

總結:ConcurrentHashMap在1.8下的實現使用數組+鏈表+紅黑樹的方式,當鏈表個數超過8的時候就把原來的鏈表轉成紅黑樹,使用紅黑樹來存取,采用了元素鎖的思想

2. ConcurrentSkipListMap  ConcurrentSkipListSet

ConcurrentSkipListMap    TreeMap的並發實現

ConcurrentSkipListSet     TreeSet的並發實現

了解什么是SkipList

二分查找和AVL樹查找

二分查找要求元素可以隨機訪問,所以決定了需要把元素存儲在連續內存。這樣查找確實很快,但是插入和刪除元素的時候,為了保證元素的有序性,就需要大量的移動元素了。

如果需要的是一個能夠進行二分查找,又能快速添加和刪除元素的數據結構,首先就是二叉查找樹,二叉查找樹在最壞情況下可能變成一個鏈表。

於是,就出現了平衡二叉樹,根據平衡算法的不同有AVL樹,B-TreeB+Tree,紅黑樹等,但是AVL樹實現起來比較復雜,平衡操作較難理解,這時候就可以用SkipList跳躍表結構。

傳統意義的單鏈表是一個線性結構,向有序的鏈表中插入一個節點需要O(n)的時間,查找操作需要O(n)的時間。

 

如果我們使用上圖所示的跳躍表,就可以減少查找所需時間為O(n/2),因為我們可以先通過每個節點的最上面的指針先進行查找,這樣子就能跳過一半的節點。

比如我們想查找19,首先和6比較,大於6之后,在和9進行比較,然后在和12進行比較......最后比較到21的時候,發現21大於19,說明查找的點在1721之間,從這個過程中,我們可以看出,查找的時候跳過了3712等點,因此查找的復雜度為O(n/2)

跳躍表其實也是一種通過“空間來換取時間”的一個算法,通過在每個節點中增加了向前的指針,從而提升查找的效率。

跳躍表又被稱為概率,或者說是隨機化的數據結構,目前開源軟件 Redis lucence都有用到它。

3. ConcurrentLinkedQueue  無界非阻塞隊列

ConcurrentLinkedQueue   LinkedList 並發版本

Add,offer:添加元素

Peek()get頭元素並不把元素拿走

poll()get頭元素把元素拿走

4. CopyOnWriteArrayListCopyOnWriteArraySet

寫的時候進行復制,可以進行並發的讀。

適用讀多寫少的場景:比如白名單,黑名單,商品類目的訪問和更新場景,假如我們有一個搜索網站,用戶在這個網站的搜索框中,輸入關鍵字搜索內容,但是某些關鍵字不允許被搜索。這些不能被搜索的關鍵字會被放在一個黑名單當中,黑名單每天晚上更新一次。當用戶搜索時,會檢查當前關鍵字在不在黑名單當中,如果在,則提示不能搜索。

弱點:內存占用高,數據一致性弱

總結:寫的時候重新復制一份數據,然后在復制的數據里面寫入數據,寫完以后再把原來的數據的引用執行復制的數據,所以存在數據的弱一致性,適用於讀多寫少的場景

5.什么是阻塞隊列

取數據和存數據不滿足要求時,會對線程進行阻塞。例如取數據時發現隊列里面沒有數據就在那里阻塞等着有數據了再取;存數據時發現隊列已經滿了就在那里阻塞等着有數據被取走時再存

方法

拋出異常

返回值

一直阻塞

超時退出

插入

Add

offer

put

offer

移除

remove

poll

take

poll

檢查

element

peek

沒有

沒有

常用阻塞隊列

ArrayBlockingQueue 數組結構組成有界阻塞隊列。

先進先出原則,初始化必須傳大小,takeput時候用的同一把鎖

LinkedBlockingQueue:鏈表結構組成的有界阻塞隊列

先進先出原則,初始化可以不傳大小,puttake鎖分離

PriorityBlockingQueue:支持優先級排序的無界阻塞隊列,

排序,自然順序升序排列,更改順序:類自己實現compareTo()方法,初始化PriorityBlockingQueue指定一個比較器Comparator

DelayQueue: 使用了優先級隊列的無界阻塞隊列

支持延時獲取,隊列里的元素要實現Delay接口。DelayQueue非常有用,可以將DelayQueue運用在以下應用場景。

緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。

還有訂單到期,限時支付等等。

SynchronousQueue:不存儲元素的阻塞隊列

每個put操作必須要等take操作

LinkedTransferQueue:鏈表結構組成的界阻塞隊列

Transfer,tryTransfer,生產者put時,當前有消費者take,生產者直接把元素傳給消費者

LinkedBlockingDeque:鏈表結構組成的雙向阻塞隊列

可以在隊列的兩端插入和移除,xxxFirst頭部操作,xxxLast尾部操作。工作竊取模式。

了解阻塞隊列的實現原理

使用了Condition實現。

生產者消費者模式

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生

產線程和消費線程的工作能力來提高程序整體處理數據的速度。

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發

中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理

完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這種生產消費能力不均衡的問題,便有了生產者和消費者模式。

生產者和消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而是通過阻塞隊列來進行通信,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

什么是Fork/Join框架

並行執行任務的框架把大任務拆分成很多的小任務匯總每個小任務的結果得到大任務的結果

 

工作竊取算法

工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務來執行,執行完以后把結果放回去

那么,為什么需要使用工作竊取算法呢?假如我們需要做一個比較大的任務,可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,把這些子任務分別放到不同的隊列里,並為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應。

比如A線程負責處理A隊列里的任務。但是,有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等着,不如去幫其他線程干活,於是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

Fork/Join框架的使用

Fork/Join使用兩個類來完成以上兩件事情。

①ForkJoinTask:我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務

中執行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask類,只需要繼承它的子類,Fork/Join框架提供了以下兩個子類。

·RecursiveAction:用於沒有返回結果的任務。

·RecursiveTask:用於有返回結果的任務。

②ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。

Fork/Join有同步和異步兩種方式。

案例1:孫悟空摘桃子fork/join的案例

 1 /**
 2  * 孫悟空摘桃子fork/join的案例,孫悟空去摘桃子時發現桃子太多就讓猴子猴孫去幫忙在桃子,
 3  * 摘完以后再統一匯總求和
 4  */
 5 public class ForkJoinWuKong {
 6 
 7   private static class XiaoWuKong extends RecursiveTask<Integer>{
 8 
 9       private final static int THRESHOLD = 100;//閾值,數組多小的時候,不再進行任務拆分操作
10       private PanTao[] src;
11       private int fromIndex;
12       private int toIndex;
13       private IPickTaoZi pickTaoZi;
14 
15       public XiaoWuKong(PanTao[] src, int fromIndex, int toIndex, IPickTaoZi pickTaoZi) {
16           this.src = src;
17           this.fromIndex = fromIndex;
18           this.toIndex = toIndex;
19           this.pickTaoZi = pickTaoZi;
20       }
21 
22       @Override
23       protected Integer compute() {
24           //計算完以后結果匯總
25           if (toIndex-fromIndex<THRESHOLD){
26               int count =0 ;
27               for(int i=fromIndex;i<toIndex;i++){
28                   if (pickTaoZi.pick(src,i)) count++;
29               }
30               return count;
31           }
32           //大任務拆分成小任務
33           else{
34               //fromIndex....mid......toIndex
35               int mid = (fromIndex+toIndex)/2;
36               XiaoWuKong left = new XiaoWuKong(src,fromIndex,mid,pickTaoZi);
37               XiaoWuKong right = new XiaoWuKong(src,mid,toIndex,pickTaoZi);
38               invokeAll(left,right);
39               return left.join()+right.join();
40 
41           }
42       }
43   }
44 
45     public static void main(String[] args) {
46 
47         ForkJoinPool pool = new ForkJoinPool();
48         PanTao[] src = MakePanTaoArray.makeArray();
49         IProcessTaoZi processTaoZi = new WuKongProcessImpl();
50         IPickTaoZi pickTaoZi = new WuKongPickImpl(processTaoZi);
51 
52         long start = System.currentTimeMillis();
53 
54         //構造一個ForkJoinTask
55         XiaoWuKong xiaoWuKong = new XiaoWuKong(src,0,
56                 src.length-1,pickTaoZi);
57 
58         //ForkJoinTask交給ForkJoinPool來執行。
59         pool.invoke(xiaoWuKong);
60 
61         System.out.println("The count is "+ xiaoWuKong.join()
62                 +" spend time:"+(System.currentTimeMillis()-start)+"ms");
63 
64     }
65 
66 }

案例2:使用Fork/Join框架實現計算1+2+3+....+100的結果

package com.study.demo.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

/**
 * Fork/Join框架設計思路:
 * 第一步:分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要
 *         不停的分割,直到分割出的子任務足夠小。
 * 第二步:執行任務並合並結果。分割的子任務分別放在雙端隊列里,然后啟動幾個線程分別從雙端隊列里獲取任務執行。
 *         子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據。 
 * 
 * Fork/Join框架的具體實現:
 * Fork/Join使用兩個類來完成以上兩件事情:
 * ForkJoinTask:我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork()和join()
 *               操作的機制,通常情況下我們不需要直接繼承ForkJoinTask類,而只需要繼承它的子類,Fork/Join框架提供了以下兩個子類:
 *               RecursiveAction:用於沒有返回結果的任務。
 *               RecursiveTask :用於有返回結果的任務。
 * ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,
 *                進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。
 *                
 * 實戰:使用Fork/Join框架實現計算1+2+3+....+100的結果-100個數拆分成10個(閾值)子任務來執行最后匯總結果
 *
 */
public class CountTask extends RecursiveTask<Integer> {

    /**
     * 序列化
     */
    private static final long serialVersionUID = 1L;
    private static final int THRESHOLD = 10;// 閾值
    private int start;
    private int end;
    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {

        int sum = 0;

        // 如果任務足夠小就計算任務
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }

        } else {

            // 如果任務大於閥值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);

            // 執行子任務
            leftTask.fork();
            rightTask.fork();

            // 等待子任務執行完,並得到其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合並子任務
            sum = leftResult + rightResult;

        }

        return sum;

    }

    public static void main(String[] args) {

        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 生成一個計算任務,負責計算1+2+3+4
        CountTask task = new CountTask(1, 100);

        // 執行一個任務
        Future result = forkJoinPool.submit(task);

        try {

            System.out.println(result.get());

        } catch (InterruptedException e) {

        } catch (ExecutionException e) {

        }

    }

}

 

二、並發工具類

1. CountDownLatch

允許一個或多個線程等待其他線程完成操作。CountDownLatch的構造函數接收一個int類型的參數作為計數器,如果你想等待N個點完成,這里就傳入N。當我們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變成零。

由於countDown方法可以用在任何地方,所以這里說的N個點,可以是N個線程,也可以是1個線程里的N個執行步驟。用在多個線程時,只需要把這個CountDownLatch的引用傳遞到線程里即可。

 1 public class CountDownLatchCase {
 2 
 3     static CountDownLatch c = new CountDownLatch(7);
 4 
 5     private static class SubThread implements Runnable{
 6 
 7         @Override
 8         public void run() {
 9             System.out.println(Thread.currentThread().getId());
10             c.countDown();
11             System.out.println(Thread.currentThread().getId()+" is done");
12         }
13     }
14 
15     public static void main(String[] args) throws InterruptedException {
16 
17         new Thread(new Runnable() {
18             @Override
19             public void run() {
20                 System.out.println(Thread.currentThread().getId());
21                 c.countDown();
22                 System.out.println("sleeping...");
23                 try {
24                     Thread.sleep(1500);
25                 } catch (InterruptedException e) {
26                     e.printStackTrace();
27                 }
28                 System.out.println("sleep is completer");
29                 c.countDown();
30             }
31         }).start();
32 
33         for(int i=0;i<=4;i++){
34             Thread thread = new Thread(new SubThread());
35             thread.start();
36         }
37 
38         c.await();
39         System.out.println("Main will gone.....");
40     }
41 }

 

2. CyclicBarrier

CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然后當前線程被阻塞。

 1 public class CyclicBarrriesBase {
 2 
 3     static CyclicBarrier c = new CyclicBarrier(2);
 4 
 5     public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
 6         new Thread(new Runnable() {
 7             @Override
 8             public void run() {
 9                 System.out.println(Thread.currentThread().getId());
10                 try {
11                     c.await();//等待主線程完成
12                     System.out.println(Thread.currentThread().getId()+"is going");
13                 } catch (InterruptedException e) {
14                     e.printStackTrace();
15                 } catch (BrokenBarrierException e) {
16                     e.printStackTrace();
17                 }
18                 System.out.println("sleeping...");
19 
20             }
21         }).start();
22 
23         System.out.println("main will sleep.....");
24         Thread.sleep(2000);
25         c.await();////等待子線程完成
26 
27         System.out.println("All are complete.");
28     }
29 
30 
31 
32 }

 

CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties,Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。

CyclicBarrier可以用於多線程計算數據,最后合並計算結果的場景。

 1 public class CyclicBarrierSum {
 2 
 3     static CyclicBarrier c = new CyclicBarrier(5,new SumThread());
 4     //子線程結果存放的緩存
 5     private static ConcurrentHashMap<String,Integer> resultMap =
 6             new ConcurrentHashMap<>();
 7 
 8     //所有子線程達到屏障后,會執行這個Runnable的任務
 9     private static class SumThread implements Runnable{
10 
11         @Override
12         public void run() {
13             int result =0;
14             for(Map.Entry<String,Integer> workResult:resultMap.entrySet()){
15                 result = result+workResult.getValue();
16             }
17             System.out.println("result = "+result);
18             System.out.println("完全可以做與子線程,統計無關的事情.....");
19         }
20     }
21 
22     //工作線程,也就是子線程
23     private static class WorkThread implements Runnable{
24 
25         private Random t = new Random();
26 
27         @Override
28         public void run() {
29             int r = t.nextInt(1000)+1000;
30             System.out.println(Thread.currentThread().getId()+":r="+r);
31             resultMap.put(Thread.currentThread().getId()+"",r);
32             try {
33                 Thread.sleep(1000+r);
34                 c.await();
35             } catch (InterruptedException e) {
36                 e.printStackTrace();
37             } catch (BrokenBarrierException e) {
38                 e.printStackTrace();
39             }
40 
41         }
42     }
43 
44     public static void main(String[] args) {
45         for(int i=0;i<=4;i++){
46             Thread thread = new Thread(new WorkThread());
47             thread.start();
48         }
49     }
50 }

 

CyclicBarrierCountDownLatch的區別

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置,CountDownLatch.await一般阻塞主線程,所有的工作線程執行countDown,CyclicBarrierton通過工作線程調用await從而阻塞工作線程直到所有工作線程達到屏障

4. 控制並發線程數的Semaphore

Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。應用場景Semaphore可以用於做流量控制,特別是公用資源有限的應用場景,比如數據庫連接。假如有一個需求,要讀取幾萬個文件的數據,因為都是IO密集型任務,我們可以啟動幾十個線程並發地讀取,但是如果讀到內存后,還需要存儲到數據庫中,而數據庫的連接數只有10個,這時我們必須控制只有10個線程同時獲取數據庫連接保存數據,否則會報錯無法獲取數據庫連接。這個時候,就可以使用Semaphore來做流量控制。。Semaphore的構造方法Semaphore(int permits)接受一個整型的數字,表示可用的許可證數量。Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證,使用完之后調用release()方法歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證。

 1 public class SemaphporeCase<T> {
 2 
 3     private final Semaphore items;//有多少元素可拿
 4     private final Semaphore space;//有多少空位可放元素
 5     private List queue = new LinkedList<>();
 6 
 7     public SemaphporeCase(int itemCounts){
 8         this.items = new Semaphore(0);
 9         this.space = new Semaphore(itemCounts);
10     }
11 
12     //放入數據
13     public void put(T x) throws InterruptedException {
14         space.acquire();//拿空位的許可,沒有空位線程會在這個方法上阻塞
15         synchronized (queue){
16             queue.add(x);
17         }
18         items.release();//有元素了,可以釋放一個拿元素的許可
19     }
20 
21     //取數據
22     public T take() throws InterruptedException {
23         items.acquire();//拿元素的許可,沒有元素線程會在這個方法上阻塞
24         T t;
25         synchronized (queue){
26             t = (T)queue.remove(0);
27         }
28         space.release();//有空位了,可以釋放一個存在空位的許可
29         return t;
30     }
31 }

 

Semaphore還提供一些其他方法,具體如下。

·intavailablePermits():返回此信號量中當前可用的許可證數。

·intgetQueueLength():返回正在等待獲取許可證的線程數。

·booleanhasQueuedThreads():是否有線程正在等待獲取許可證。

·void reducePermits(int reduction):減少reduction個許可證,是個protected方法。

·Collection getQueuedThreads():返

5. Exchanger

Exchanger(交換者)是一個用於線程間協作的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。這兩個線程通過exchange方法交換數據,如果第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。

 1 public class ExchangeCase {
 2 
 3     static final Exchanger<List<String>> exgr = new Exchanger<>();
 4 
 5     public static void main(String[] args) {
 6 
 7         new Thread(new Runnable() {
 8 
 9             @Override
10             public void run() {
11                 try {
12                     List<String> list = new ArrayList<>();
13                     list.add(Thread.currentThread().getId()+" insert A1");
14                     list.add(Thread.currentThread().getId()+" insert A2");
15                     list = exgr.exchange(list);//交換數據
16                     for(String item:list){
17                         System.out.println(Thread.currentThread().getId()+":"+item);
18                     }
19                 } catch (InterruptedException e) {
20                     e.printStackTrace();
21                 }
22             }
23         }).start();
24 
25         new Thread(new Runnable() {
26 
27             @Override
28             public void run() {
29                 try {
30                     List<String> list = new ArrayList<>();
31                     list.add(Thread.currentThread().getId()+" insert B1");
32                     list.add(Thread.currentThread().getId()+" insert B2");
33                     list.add(Thread.currentThread().getId()+" insert B3");
34                     System.out.println(Thread.currentThread().getId()+" will sleep");
35                     Thread.sleep(1500);
36                     list = exgr.exchange(list);//交換數據
37                     for(String item:list){
38                         System.out.println(Thread.currentThread().getId()+":"+item);
39                     }
40                 } catch (InterruptedException e) {
41                     e.printStackTrace();
42                 }
43             }
44         }).start();
45 
46     }
47 
48 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM