並發王者課-鉑金9:互通有無-Exchanger如何完成線程間的數據交換


歡迎來到《並發王者課》,本文是該系列文章中的第22篇,鉑金中的第9篇

在前面的文章中,我們已經介紹了ReentrantLockCountDownLatchCyclicBarrierSemaphore等同步工具。在本文中,將為你介紹最后一個同步工具,即Exchanger.

Exchanger用於兩個線程在某個節點時進行數據交換。在用法上,Exchanger並不復雜,但是實現上會稍微有點費解。所以,考慮到Exchanger在平時使用的場景並不多,況且多數讀者對一些“枯燥”的源碼的耐受度有限(可能引起不適或煩躁等不良情緒,阻礙學習),本文將側重講它的使用和思想,對於源碼不會過多展開,點到為止。

一、Exchanger的使用場景

在峽谷中,鎧和蘭陵王都是擅長打野的英雄,各自對野怪的偏好也不完全相同。所以,為了能得到自己想要的野怪,他們經常會在峽谷的交易中心交換各自的獵物。

這一天,鎧打到了一只棕熊,而蘭陵王則收獲了一只野狼,並且彼此都想要對方的野怪。於是,他們約定在峽谷交易中心交換雙方的野怪,誰先到了就先等會。這個過程,可以用下面這幅圖來表示:

在鎧和蘭陵王交換獵物的過程中,有三個點需要你留意:

  • 交換的雙方有明確的交易地點(峽谷交易中心);
  • 交換的雙方具有明確的交易對象(比如棕熊和野狼);
  • 誰先到了就等會兒(他們中總會有先來后到)。

如果用代碼來實現的話,也是有多種方式可以選擇,比如前面所學過的同步方法等。不過,雖然做也是可以做的,只是沒那么方便。所以,接下來我們就用Exchanger來實現這一過程。

在下面的代碼中,我們定義了一個exchanger,它就類似於峽谷交易中心,而它的類型Exchanger<WildMonster> 則明確表示交換的對象是野怪

接着,我們再定義兩個線程,分別代表蘭陵王。在其線程的內部,會通過前面定義的exchanger對象來和對方進行交換數據。交換完成后,他們彼此將獲得對方的物品

 public static void main(String[] args) {
     Exchanger<WildMonster> exchanger = new Exchanger<> (); // 定義交換地點和交換類型

     Thread 鎧 = newThread("鎧", () -> {
         try {
             WildMonster wildMonster = new Bear("棕熊");
             say("我手里有一只:" + wildMonster.getName());
             WildMonster exchanged = exchanger.exchange(wildMonster); // 交換后將獲得對方的物品
             say("交易完成,我獲得了:", wildMonster.getName(), "->", exchanged.getName());
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     });

     Thread 蘭陵王 = newThread("蘭陵王", () -> {
         try {
             WildMonster wildMonster = new Wolf("野狼");
             say("我手里有一只:" + wildMonster.getName());
             WildMonster exchanged = exchanger.exchange(wildMonster);
             say("交易完成,我獲得了:", wildMonster.getName(), "->", exchanged.getName());
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     });
     鎧.start();
     蘭陵王.start();
 }

下面是上面代碼用到的內部類:

 @Data
 private static class WildMonster {
     protected String name;
 }

 private static class Wolf extends WildMonster {
     public Wolf(String name) {
         this.name = name;
     }
 }

 private static class Bear extends WildMonster {
     public Bear(String name) {
         this.name = name;
     }
 }

示例代碼運行結果如下:

鎧:我手里有一只:棕熊
蘭陵王:我手里有一只:野狼
蘭陵王:交易完成,我獲得了:野狼->棕熊
鎧:交易完成,我獲得了:棕熊->野狼

Process finished with exit code 0

從結果中可以看到,鎧用棕熊換到了野狼,而蘭陵王則用野狼換到了棕熊,他們完成了交換。

以上就是Exchanger的用法,看起來還是非常簡單的,事實上也確實很簡單。在使用Exchanger的時候要注意下面幾點:

  • 定義Exchanger對象,各線程通過這個對象完成交換
  • 在Exchanger對象中要定義類型,也就是這兩個線程要交換什么
  • 線程在調用Exchanger進行交換時,要特別注意的是,先到的那個線程會原地等待另外一個線程的出現。比如,鎧先到交換地點,可這時候蘭陵王還沒有到,那么鎧會等待蘭陵王的出現,除非超過設置的時間限制,比如蘭陵王中途被妲己蹲了草叢。反之亦然,蘭陵王先到也到等鎧的出現。

二、Exchanger的源碼與實現

雖然理解Exchanger的思想很容易,了解其用法也很簡單,但是若要理清它幾百余行的源碼卻並非易事。其原因在於,槽是Exchanger中的核心概念和屬性,Exchanger中的數據交換分為單槽交換多槽交換,其中單槽交換源碼簡單,但多槽交換卻很復雜。所以,下文對Exchanger源碼的闡述以概括為主,不會對源碼深究。如果你有興趣,可以參考閱讀這篇文章,作者對其源碼的解讀較為詳細。

1. 核心構造

與其他同步工具不同的是,Exchanger有且僅有一個構造函數。在這個構造中,也只初始化了一個對象participant.

public Exchanger() {
    participant = new Participant();
}

從繼承關系看,Participant本質上是一個ThreadLocal,而其中的Node則是線程的本地變量。

static final class Participant extends ThreadLocal<Node> {
    public Node initialValue() { 
        return new Node(); 
    }
}

2. 核心屬性

Exchanger有四個核心變量,如下所示。當然,除此之外,還有一些用以計算的其他變量。不過,為避免引入不必要的復雜度,本文暫不提及。

//ThreadLocal變量,每個線程都有自己的一個副本
private final Participant participant;

//多槽位,高並發下使用,保存待匹配的Node實例
private volatile Node[] arena;

//單槽位,arena未初始化時使用的保存待匹配的Node實例
private volatile Node slot;

//初始值為0,當創建arena后會被賦值成SEQ,用來記錄arena數組的可用最大索引,會隨着並發的增大而增大直到等於最大值FULL,會隨着並行的線程逐一匹配成功而減少恢復成初始值
private volatile int bound;

Node的具體細節,注意其中的item和match.

 @sun.misc.Contended static final class Node {
        int index;              //arena的下標,多個槽位的時候使用
        int bound;              // 上一次記錄的Exchanger.bound
        int collides;           // 記錄的 CAS 失敗數
        int hash;               // 用於自旋
        Object item;            //  這個線程的數據項
        volatile Object match;  // 交換的數據
        volatile Thread parked; //  當阻塞時,設置此線程,不阻塞的話會自旋
    }

3. 核心方法

// 交換數據
// 如果一個線程達到后,會等待其他線程的到達(除非自己被中斷)。然后,該線程會和到達的線程交換數據。
// 如果線程在到達后,已經有其他線程在等待。那么,將會喚起該線程並交換數據。
public V exchange(V x) throws InterruptedException {...}
    
//帶有超時限制的交換
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {...}

所以,從源碼上看上文的示例,那么鎧和蘭陵王交換數據的過程應該是下面這樣的:

小結

以上就是關於Exchanger的全部內容。在學習Exchanger時,要側重理解它所要解決的問題場景,以及它的基本用法。對於其源碼,當前階段可以選擇“不求甚解”,以降維的方式降低學習難度,日后再循序漸進理解。我在寫本文時,也曾多次考慮是否要講清楚源碼,最終還是決定暫緩,畢竟現階段理解它、學會它才是重點。

正文到此結束,恭喜你又上了一顆星✨

夫子的試煉

  • 使用Exchanger實現生產者與消費者。

延伸閱讀與參考資料

關於作者

關注【技術八點半】,及時獲取文章更新。傳遞有品質的技術文章,記錄平凡人的成長故事,偶爾也聊聊生活和理想。早晨8:30推送作者品質原創,晚上20:30推送行業深度好文。

如果本文對你有幫助,歡迎點贊關注監督,我們一起從青銅到王者


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM