Effective Java 第三版——81. 優先使用並發實用程序替代wait和notify


Tips
書中的源代碼地址:https://github.com/jbloch/effective-java-3e-source-code
注意,書中的有些代碼里方法是基於Java 9 API中的,所以JDK 最好下載 JDK 9以上的版本。

Effective Java, Third Edition

81. 優先使用並發實用程序替代wait和notify

本書的第一版專門用一個條目來介紹正確使用wait和notify方法[Bloch01,Item 50]。 它的建議仍然有效,並在本條目末尾進行了總結,但這個建議遠不如以前那么重要了。 這是因為沒有太多理由再使用wait和notify了。 從Java 5開始,該平台提供了更高級別的並發實用程序,可以執行以前必須在wait和notify時手動編寫代碼的各種操作。 鑒於正確使用wait和notify的困難,應該使用更高級別的並發實用程序

java.util.concurrent包中的高級實用程序分為三類:Executor Framework,在條目 80中簡要介紹了它;並發集合(concurrent collections) 和同步器(synchronizers)。 本條目簡要介紹后兩者。

並發集合是標准集合接口(如List,Queue和Map)的高性能並發實現。 為了提供高並發性,這些實現在內部管理自己的同步(條目 79)。 因此,不可能從並發集合中排除並發活動; 鎖定它只會使程序變慢

因為不能排除並發集合上的並發活動,所以也不能以原子方式組合對它們的方法調用。 因此,並發集合接口配備了依賴於狀態的修改操作,這些操作將幾個基本操作組合成單個原子操作。 事實證明,這些操作對並發集合非常有用,它們使用默認方法(條目 21)添加到Java 8中相應的集合接口中。

例如,Map的putIfAbsent(key, value)方法插入鍵的映射(如果不存在)並返回與鍵關聯的之前的值,如果沒有則返回null。
這樣可以輕松實現線程安全的規范化Map。 此方法模擬String.intern`方法的行為:

// Concurrent canonicalizing map atop ConcurrentMap - not optimal
private static final ConcurrentMap<String, String> map =
        new ConcurrentHashMap<>();

public static String intern(String s) {
    String previousValue = map.putIfAbsent(s, s);
    return previousValue == null ? s : previousValue;
}

事實上,你可以做得更好。ConcurrentHashMap針對get等檢索操作進行了優化。因此,只有在get表明有必要時,才首先調用get並調用putIfAbsent方法:

// Concurrent canonicalizing map atop ConcurrentMap - faster!
public static String intern(String s) {
    String result = map.get(s);
    if (result == null) {
        result = map.putIfAbsent(s, s);
        if (result == null)
            result = s;
    }
    return result;
}

除了提供出色的並發性外,ConcurrentHashMap非常快。 在我的機器上,上面的intern方法比String.intern快6倍(但請記住,String.intern必須采用一些策略來防止在長期運行的應用程序中泄漏內存)。 並發集合使基於同步的集合在很大程度上已經過時了。 例如,使用ConcurrentHashMap優先於Collections.synchronizedMap。 簡單地用並發Map替換同步Map以顯着提高並發應用程序的性能。

一些集合接口使用阻塞操作進行擴展,這些操作等待(或阻塞)直到可以成功執行。 例如,BlockingQueue擴展了Queue並添加了幾個方法,包括take,它從隊列中刪除並返回head元素,等待隊列為空。 這允許阻塞隊列用於工作隊列(也稱為生產者——消費者隊列),一個或多個生產者線程將工作項入隊,並且一個或多個消費者線程從哪個隊列變為可用時出隊並處理項目。 正如所期望的那樣,大多數ExecutorService實現(包括ThreadPoolExecutor)都使用BlockingQueue(條目 80)。

同步器是使線程能夠彼此等待的對象,允許它們協調各自的活動。 最常用的同步器是CountDownLatch和Semaphore。 不太常用的是CyclicBarrier和Exchanger。 最強大的同步器是Phaser。

倒計時鎖存器(CountDownLatch)是一次性使用的屏障,允許一個或多個線程等待一個或多個其他線程執行某些操作。 CountDownLatch的唯一構造方法接受一個int類型的參數,它是在允許所有等待的線程繼續之前,必須在latch上調用countDown方法的次數。

在這個簡單的原語上構建有用的東西非常容易。例如,假設想要構建一個簡單的框架來為一個操作的並發執行計時。這個框架由一個方法組成,該方法使用一個執行器executor來執行操作,一個表示要並發執行的操作數量並發級別,以及一個表示該操作的runnable組成。所有工作線程都准備在計時器線程啟動時鍾之前運行操作。當最后一個工作線程准備好運行該操作時,計時器線程“發號施令(fires the starting gun)”,允許工作線程執行該操作。一旦最后一個工作線程完成該操作,計時器線程就停止計時。在wait和notify的基礎上直接實現這種邏輯至少會有點麻煩,但是在CountDownLatch的基礎上實現起來卻非常簡單:

// Simple framework for timing concurrent execution
public static long time(Executor executor, int concurrency,
            Runnable action) throws InterruptedException {
    CountDownLatch ready = new CountDownLatch(concurrency);
    CountDownLatch start = new CountDownLatch(1);
    CountDownLatch done  = new CountDownLatch(concurrency);

    for (int i = 0; i < concurrency; i++) {
        executor.execute(() -> {
            ready.countDown(); // Tell timer we're ready
            try {
                start.await(); // Wait till peers are ready
                action.run();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                done.countDown();  // Tell timer we're done
            }
        });
    }
    ready.await();     // Wait for all workers to be ready
    long startNanos = System.nanoTime();
    start.countDown(); // And they're off!
    done.await();      // Wait for all workers to finish
    return System.nanoTime() - startNanos;
}

請注意,該方法使用三個倒計時鎖存器。 第一個ready,由工作線程來告訴計時器線程何時准備就緒。 工作線程然后等待第二個鎖存器,即start。 當最后一個工作線程調用ready.countDown時,計時器線程記錄開始時間並調用start.countDown,允許所有工作線程繼續。 然后,計時器線程等待第三個鎖存器完成,直到最后一個工作線程完成運行並調用done.countDown。 一旦發生這種情況,計時器線程就會喚醒並記錄結束時間。

還有一些細節值得注意。傳遞給time方法的executor必須允許創建至少與給定並發級別相同數量的線程,否則測試將永遠不會結束。這被稱為線程飢餓死鎖(thread starvation deadlock)[Goetz06, 8.1.1]。如果工作線程捕捉到InterruptedException異常,它使用習慣用法thread.currentthread ().interrupt()重新斷言中斷,並從它的run方法返回。這允許執行程序按照它認為合適的方式處理中斷。System.nanoTime用於計算活動的時間。**對於間隔計時,請始終使用System.nanoTime而不是System.currentTimeMillisSystem.nanoTime更准確,更精確,不受系統實時時鍾調整的影響。最后,請注意,本例中的代碼不會產生准確的計時,除非action做了相當多的工作,比如一秒鍾或更長時間。准確的微基准測試是非常困難的,最好是借助諸如jmh [JMH]這樣的專業框架來完成。

這個條目只涉及使用並發實用程序做一些皮毛的事情。 例如,前一個示例中的三個倒計時鎖存器可以由單個CyclicBarrier或Phaser實例替換。 結果代碼會更簡潔,但可能更難理解。

雖然應該始終優先使用並發實用程序來等替換wait和notify方法,但你可能必須維護使用wait和notify的舊代碼。 wait方法用於使線程等待某些條件。 必須在同步區域內調用它,該區域鎖定調用它的對象。 下面是使用wait方法的標准習慣用法:

// The standard idiom for using the wait method
synchronized (obj) {
    while (<condition does not hold>)
        obj.wait(); // (Releases lock, and reacquires on wakeup)
    ... // Perform action appropriate to condition
}

始終要在循環中調用wait方法;永遠不要在循環之外調用它。循環用於測試wait前后的條件。

如果條件已經存在,則在wait之前測試條件並跳過等待以確保活性(liveness)。 如果條件已經存在並且在線程等待之前已經調用了notify(或notifyAll)方法,則無法保證線程將從等待中喚醒。

為了確保安全,需要在等待之后再測試條件,如果條件不成立,則再次等待。如果線程在條件不成立的情況下繼續執行該操作,它可能會破壞由鎖保護的不變式(invariant)。當條件不成立時,以下幾個原因可以把線程喚醒:

  • 另一個線程可以獲得鎖並在線程調用notify和等待線程醒來之間改變了保護狀態。
  • 當條件不成立時,另一個線程可能意外地或惡意地調用notify方法。類通過等待公共可訪問的對象來暴露自己。公共可訪問對象的同步方法中的任何wait方法都容易受到這個問題的影響。
  • 通知線程在喚醒等待線程時可能過於“慷慨”。例如,即使只有一些等待線程的滿足條件,通知線程也可能調用notifyAll。
  • 在沒有通知的情況下,等待的線程可能(很少)被喚醒。這被稱為虛假的喚醒(spurious wakeup)[POSIX, 11.4.3.6.1;Java9-api]。

一個相關的問題是,為了喚醒等待的線程,是使用notify還是notifyAll。(回想一下notify喚醒一個等待線程,假設存在這樣一個線程,notifyAll喚醒所有等待線程)。有時人們會說,應該始終使用notifyAll。這是合理的、保守的建議。它總是會產生正確的結果,因為它保證喚醒所有需要被喚醒的線程。可能還會喚醒其他一些線程,但這不會影響程序的正確性。這些線程將檢查它們正在等待的條件,如果發現為false,將繼續等待。

作為一種優化,如果所有線程都在等待相同的條件,並且每次只有一個線程可以從條件變為true中喚醒,那么可以選擇調用notify而不是notifyAll。

即使滿足了這些先決條件,也可能有理由使用notifyAll來代替notify。正如將wait方法調用放在循環中可以防止公共訪問對象上的意外或惡意通知一樣,使用notifyAll代替notify可以防止不相關線程的意外或惡意等待。否則,這樣的等待可能會“吞下”一個關鍵通知,讓預期的接收者無限期地等待。

總之,與java.util.concurrent提供的高級語言相比,直接使用wait和notify就像在“並發匯編語言”中編程一樣。在新代碼中基本上不存在使用wait和notify的理由。 如果正在維護使用wait和notify的代碼,請確保它始終使用標准慣用法在while循環內調用wait方法。 通常應優先使用notifyAll方法進行通知。 如果使用notify,必須非常小心以確保程序的活性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM