java.util.concurrent.locks.LockSupport用法


  在看AQS內部的時候發現很多使用java.util.concurrent.locks.LockSupport類的東西。 比如CountDownLatch.await 阻塞的時候以及使用阻塞隊列進行take、take 方法在線程阻塞的時候也是使用的該類。下面研究其主要的使用方法。

1. 線程狀態簡單理解

一開始學習線程的時候線程的狀態如下:

1、新建狀態NEW

  new了線程但是沒有開始執行,比如: Thread t1 = new Thread();t1就是一個新建狀態的線程。

2、可運行狀態RUNNABLE

  new出來線程,調用start()方法即處於RUNNABLE狀態了。處於RUNNABLE狀態的線程可能正在Java虛擬機中運行,也可能正在等待處理器的資源,因為一個線程必須獲得CPU的資源后,才可以運行其run()方法中的內容,否則排隊等待

3、阻塞BLOCKED

  如果某一線程正在等待監視器鎖,以便進入一個同步的塊/方法,那么這個線程的狀態就是阻塞BLOCKED

4、等待WAITING

  某一線程因為調用不帶超時的Object的wait()方法、不帶超時的Thread的join()方法、LockSupport的park()方法,就會處於等待WAITING狀態

5、超時等待TIMED_WAITING

  某一線程因為調用帶有指定正等待時間的Object的wait()方法、Thread的join()方法、Thread的sleep()方法、LockSupport的parkNanos()方法、LockSupport的parkUntil()方法,就會處於超時等待TIMED_WAITING狀態

6、終止狀態TERMINATED

  線程調用終止或者run()方法執行結束后,線程即處於終止狀態。處於終止狀態的線程不具備繼續運行的能力。

  可以看到當調用park 方法之后進入WAITING 狀態。

2. 主要API

1. 主要API如下:

源碼如下:

/*
 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 */

/*
 *
 *
 *
 *
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 */

package java.util.concurrent.locks;
import sun.misc.Unsafe;

/**
 * Basic thread blocking primitives for creating locks and other
 * synchronization classes.
 *
 * <p>This class associates, with each thread that uses it, a permit
 * (in the sense of the {@link java.util.concurrent.Semaphore
 * Semaphore} class). A call to {@code park} will return immediately
 * if the permit is available, consuming it in the process; otherwise
 * it <em>may</em> block.  A call to {@code unpark} makes the permit
 * available, if it was not already available. (Unlike with Semaphores
 * though, permits do not accumulate. There is at most one.)
 *
 * <p>Methods {@code park} and {@code unpark} provide efficient
 * means of blocking and unblocking threads that do not encounter the
 * problems that cause the deprecated methods {@code Thread.suspend}
 * and {@code Thread.resume} to be unusable for such purposes: Races
 * between one thread invoking {@code park} and another thread trying
 * to {@code unpark} it will preserve liveness, due to the
 * permit. Additionally, {@code park} will return if the caller's
 * thread was interrupted, and timeout versions are supported. The
 * {@code park} method may also return at any other time, for "no
 * reason", so in general must be invoked within a loop that rechecks
 * conditions upon return. In this sense {@code park} serves as an
 * optimization of a "busy wait" that does not waste as much time
 * spinning, but must be paired with an {@code unpark} to be
 * effective.
 *
 * <p>The three forms of {@code park} each also support a
 * {@code blocker} object parameter. This object is recorded while
 * the thread is blocked to permit monitoring and diagnostic tools to
 * identify the reasons that threads are blocked. (Such tools may
 * access blockers using method {@link #getBlocker(Thread)}.)
 * The use of these forms rather than the original forms without this
 * parameter is strongly encouraged. The normal argument to supply as
 * a {@code blocker} within a lock implementation is {@code this}.
 *
 * <p>These methods are designed to be used as tools for creating
 * higher-level synchronization utilities, and are not in themselves
 * useful for most concurrency control applications.  The {@code park}
 * method is designed for use only in constructions of the form:
 *
 *  <pre> {@code
 * while (!canProceed()) { ... LockSupport.park(this); }}</pre>
 *
 * where neither {@code canProceed} nor any other actions prior to the
 * call to {@code park} entail locking or blocking.  Because only one
 * permit is associated with each thread, any intermediary uses of
 * {@code park} could interfere with its intended effects.
 *
 * <p><b>Sample Usage.</b> Here is a sketch of a first-in-first-out
 * non-reentrant lock class:
 *  <pre> {@code
 * class FIFOMutex {
 *   private final AtomicBoolean locked = new AtomicBoolean(false);
 *   private final Queue<Thread> waiters
 *     = new ConcurrentLinkedQueue<Thread>();
 *
 *   public void lock() {
 *     boolean wasInterrupted = false;
 *     Thread current = Thread.currentThread();
 *     waiters.add(current);
 *
 *     // Block while not first in queue or cannot acquire lock
 *     while (waiters.peek() != current ||
 *            !locked.compareAndSet(false, true)) {
 *       LockSupport.park(this);
 *       if (Thread.interrupted()) // ignore interrupts while waiting
 *         wasInterrupted = true;
 *     }
 *
 *     waiters.remove();
 *     if (wasInterrupted)          // reassert interrupt status on exit
 *       current.interrupt();
 *   }
 *
 *   public void unlock() {
 *     locked.set(false);
 *     LockSupport.unpark(waiters.peek());
 *   }
 * }}</pre>
 */
public class LockSupport {
    private LockSupport() {} // Cannot be instantiated.

    private static void setBlocker(Thread t, Object arg) {
        // Even though volatile, hotspot doesn't need a write barrier here.
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }

    /**
     * Makes available the permit for the given thread, if it
     * was not already available.  If the thread was blocked on
     * {@code park} then it will unblock.  Otherwise, its next call
     * to {@code park} is guaranteed not to block. This operation
     * is not guaranteed to have any effect at all if the given
     * thread has not been started.
     *
     * @param thread the thread to unpark, or {@code null}, in which case
     *        this operation has no effect
     */
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

    /**
     * Disables the current thread for thread scheduling purposes unless the
     * permit is available.
     *
     * <p>If the permit is available then it is consumed and the call returns
     * immediately; otherwise
     * the current thread becomes disabled for thread scheduling
     * purposes and lies dormant until one of three things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread upon return.
     *
     * @param blocker the synchronization object responsible for this
     *        thread parking
     * @since 1.6
     */
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

    /**
     * Disables the current thread for thread scheduling purposes, for up to
     * the specified waiting time, unless the permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant until one of four
     * things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The specified waiting time elapses; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread, or the elapsed time
     * upon return.
     *
     * @param blocker the synchronization object responsible for this
     *        thread parking
     * @param nanos the maximum number of nanoseconds to wait
     * @since 1.6
     */
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }

    /**
     * Disables the current thread for thread scheduling purposes, until
     * the specified deadline, unless the permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant until one of four
     * things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
     * current thread; or
     *
     * <li>The specified deadline passes; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread, or the current time
     * upon return.
     *
     * @param blocker the synchronization object responsible for this
     *        thread parking
     * @param deadline the absolute time, in milliseconds from the Epoch,
     *        to wait until
     * @since 1.6
     */
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }

    /**
     * Returns the blocker object supplied to the most recent
     * invocation of a park method that has not yet unblocked, or null
     * if not blocked.  The value returned is just a momentary
     * snapshot -- the thread may have since unblocked or blocked on a
     * different blocker object.
     *
     * @param t the thread
     * @return the blocker
     * @throws NullPointerException if argument is null
     * @since 1.6
     */
    public static Object getBlocker(Thread t) {
        if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }

    /**
     * Disables the current thread for thread scheduling purposes unless the
     * permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant until one of three
     * things happens:
     *
     * <ul>
     *
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread upon return.
     */
    public static void park() {
        UNSAFE.park(false, 0L);
    }

    /**
     * Disables the current thread for thread scheduling purposes, for up to
     * the specified waiting time, unless the permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant until one of four
     * things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The specified waiting time elapses; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread, or the elapsed time
     * upon return.
     *
     * @param nanos the maximum number of nanoseconds to wait
     */
    public static void parkNanos(long nanos) {
        if (nanos > 0)
            UNSAFE.park(false, nanos);
    }

    /**
     * Disables the current thread for thread scheduling purposes, until
     * the specified deadline, unless the permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant until one of four
     * things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The specified deadline passes; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread, or the current time
     * upon return.
     *
     * @param deadline the absolute time, in milliseconds from the Epoch,
     *        to wait until
     */
    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }

    /**
     * Returns the pseudo-randomly initialized or updated secondary seed.
     * Copied from ThreadLocalRandom due to package access restrictions.
     */
    static final int nextSecondarySeed() {
        int r;
        Thread t = Thread.currentThread();
        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
            r ^= r << 13;   // xorshift
            r ^= r >>> 17;
            r ^= r << 5;
        }
        else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
            r = 1; // avoid zero
        UNSAFE.putInt(t, SECONDARY, r);
        return r;
    }

    // Hotspot implementation via intrinsics API
    private static final sun.misc.Unsafe UNSAFE;
    private static final long parkBlockerOffset;
    private static final long SEED;
    private static final long PROBE;
    private static final long SECONDARY;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) { throw new Error(ex); }
    }

}
View Code

2. 使用

import java.util.concurrent.locks.LockSupport;

public class PlainTest {

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            System.out.println("111222");
            LockSupport.park();
            System.out.println("222333");
            try {
                Thread.sleep(3*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("333444");
        });
        thread.start();

        Thread.sleep(1*1000);
        System.out.println("thread.getState(): " + thread.getState() + "\t1");
        LockSupport.unpark(thread);
        Thread.sleep(1*1000);
        System.out.println("thread.getState(): " + thread.getState() + "\t2");
        Thread.sleep(3*1000);
        System.out.println("thread.getState(): " + thread.getState() + "\t3");
    }
}

結果:

111222
thread.getState(): WAITING    1
222333
thread.getState(): TIMED_WAITING    2
333444
thread.getState(): TERMINATED    3

2. park 方法

  park用於掛起當前線程。 當前線程進入等待或者超時等待狀態。其恢復的條件是調用unpark、其它線程中斷了線程、帶參數的park 時間到達指定時間。

1. park 可以設置一個blocker 參數, 也可以不設置。設置之后可以獲取到當前線程阻塞的信息。

設置的時候會通過unsafe(parkBlockerOffset 偏移量獲取到thread 對象的parkBlocker 的偏移量), 然后設置到java.lang.Thread#parkBlocker。

java.lang.Thread#parkBlocker:

    /**
     * The argument supplied to the current call to
     * java.util.concurrent.locks.LockSupport.park.
     * Set by (private) java.util.concurrent.locks.LockSupport.setBlocker
     * Accessed using java.util.concurrent.locks.LockSupport.getBlocker
     */
    volatile Object parkBlocker;

(1) 不設置:

import java.util.concurrent.locks.LockSupport;

public class PlainTest {

    public static void main(String[] args) {
        LockSupport.park();
    }
}

jstack查看線程信息:

"main" #1 prio=5 os_prio=0 tid=0x00000224f146d000 nid=0x3524 waiting on condition [0x00000081e89fe000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
        at PlainTest.main(PlainTest.java:6)

(2) 設置blocker

代碼:

import java.util.concurrent.locks.LockSupport;

public class PlainTest {

    public static void main(String[] args) {
        LockSupport.park(new Object());
    }
}

結果:

"main" #1 prio=5 os_prio=0 tid=0x000002402c6ad800 nid=0x4a14 waiting on condition [0x000000d0ed6ff000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076caeab80> (a java.lang.Object)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at PlainTest.main(PlainTest.java:6)

(3) 設置超時時間

import java.util.concurrent.locks.LockSupport;

public class PlainTest {

    public static void main(String[] args) {
        LockSupport.parkNanos(Long.MAX_VALUE);
    }
}

結果:

"main" #1 prio=5 os_prio=0 tid=0x000001e39c82c800 nid=0x3e70 waiting on condition [0x000000215d5ff000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
        at PlainTest.main(PlainTest.java:6)

3. unpark 需要設置一個線程進行解除阻塞

  用於解除線程的阻塞。注意也可以先unpark, 在park。 只不過先unpark、后park, 調用park的時候相當於線程不會進行阻塞。多次unpark 和 一次unpark 的效果一樣, 只能對一次park 生效。

  下面研究其原理。

3. 原理

1. park 原理

park 調用的最終是: sun.misc.Unsafe#park, 是一個native 方法

    public native void park(boolean var1, long var2);

  其參數有兩個, 第一個是是否是相對時間(isAbsolute), 第二個參數是時間。

  對於第一個參數,LockSupport 使用的時候只有Ijava.util.concurrent.locks.LockSupport#parkUntil(java.lang.Object, long)傳遞的是true(代表是相對時間), 其他是false。

接下來查看其調用到C++相關方法。

1. \openjdk\hotspot\src\share\vm\prims\unsafe.cpp 內部的方法:

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
  UnsafeWrapper("Unsafe_Park");
  EventThreadPark event;
#ifndef USDT2
  HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
   HOTSPOT_THREAD_PARK_BEGIN(
                             (uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
  JavaThreadParkedState jtps(thread, time != 0);
  thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
  HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
  HOTSPOT_THREAD_PARK_END(
                          (uintptr_t) thread->parker());
#endif /* USDT2 */
  if (event.should_commit()) {
    oop obj = thread->current_park_blocker();
    event.set_klass((obj != NULL) ? obj->klass() : NULL);
    event.set_timeout(time);
    event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
    event.commit();
  }
UNSAFE_END

 核心是在thread->parker()->park(isAbsolute != 0, time); 這一行代碼,調用線程內部的parker() 獲取到parker 之后繼續調用 park 方法。(每個線程對象都有一個parker對象)

parker 對象:openjdk\hotspot\src\share\vm\runtime\park.hpp

class Parker : public os::PlatformParker {
private:
  volatile int _counter ;
  Parker * FreeNext ;
  JavaThread * AssociatedWith ; // Current association

public:
  Parker() : PlatformParker() {
    _counter       = 0 ;
    FreeNext       = NULL ;
    AssociatedWith = NULL ;
  }
protected:
  ~Parker() { ShouldNotReachHere(); }
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark();

  // Lifecycle operators
  static Parker * Allocate (JavaThread * t) ;
  static void Release (Parker * e) ;
private:
  static Parker * volatile FreeList ;
  static volatile int ListLock ;

};

  _counter 屬性是起重要作用的屬性。

  其父類有互斥變量等屬性:\openjdk\hotspot\src\os\linux\vm\os_linux.hpp

class PlatformParker : public CHeapObj<mtInternal> {
  protected:
    enum {
        REL_INDEX = 0,
        ABS_INDEX = 1
    };
    int _cur_index;  // which cond is in use: -1, 0, 1
    pthread_mutex_t _mutex [1] ;
    pthread_cond_t  _cond  [2] ; // one for relative times and one for abs.

  public:       // TODO-FIXME: make dtor private
    ~PlatformParker() { guarantee (0, "invariant") ; }

  public:
    PlatformParker() {
      int status;
      status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
      assert_status(status == 0, status, "cond_init rel");
      status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
      assert_status(status == 0, status, "cond_init abs");
      status = pthread_mutex_init (_mutex, NULL);
      assert_status(status == 0, status, "mutex_init");
      _cur_index = -1; // mark as unused
    }
};

#endif // OS_LINUX_VM_OS_LINUX_HPP

2. park 方法根據操作系統不同交給對應的實現:

比如: openjdk\hotspot\src\os\solaris\vm\os_solaris.cpp

void Parker::park(bool isAbsolute, jlong time) {
  // Ideally we'd do something useful while spinning, such
  // as calling unpackTime().

  // Optional fast-path check:
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
    // 如果_counter 屬性大於0, 代表有許可,直接返回
  if (Atomic::xchg(0, &_counter) > 0) return;

  // Optional fast-exit: Check interrupt before trying to wait
  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;
  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  // First, demultiplex/decode time arguments
  timespec absTime;
   // 如果time 參數小於0, 或者是絕對時間且時間等於0, 直接返回 
 if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
    return;
  }
  if (time > 0) {
    // Warning: this code might be exposed to the old Solaris time
    // round-down bugs.  Grep "roundingFix" for details.
  // 將時間換算后保存起來
    unpackTime(&absTime, isAbsolute, time);
  }

  // Enter safepoint region
  // Beware of deadlocks such as 6317397.
  // The per-thread Parker:: _mutex is a classic leaf-lock.
  // In particular a thread must never block on the Threads_lock while
  // holding the Parker:: mutex.  If safepoints are pending both the
  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
  ThreadBlockInVM tbivm(jt);

  // Don't wait if cannot get lock since interference arises from
  // unblocking.  Also. check interrupt before trying wait
  // 如果線程被中斷,或者是在嘗試給互斥變量加鎖的過程中,加鎖失敗,比如被其它線程鎖住了,直接返回
  if (Thread::is_interrupted(thread, false) ||
      os::Solaris::mutex_trylock(_mutex) != 0) {
    return;
  }

  int status ;

  // 走到這里代表有_counter 大於0, 則將其重置為0。
  if (_counter > 0)  { // no wait needed
    _counter = 0;
  // 對互斥變量解鎖
    status = os::Solaris::mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other and Java-level accesses.
    OrderAccess::fence();
    return;
  }

#ifdef ASSERT
  // Don't catch signals while blocked; let the running threads have the signals.
  // (This allows a debugger to break into the running thread.)
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Solaris::allowdebug_blocked_signals();
  thr_sigsetmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif

  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

  // Do this the hard way by blocking ...
  // See http://monaco.sfbay/detail.jsf?cr=5094058.
  // TODO-FIXME: for Solaris SPARC set fprs.FEF=0 prior to parking.
  // Only for SPARC >= V8PlusA
#if defined(__sparc) && defined(COMPILER2)
  if (ClearFPUAtPark) { _mark_fpu_nosave() ; }
#endif

  if (time == 0) {
    status = os::Solaris::cond_wait (_cond, _mutex) ;
  } else {
    status = os::Solaris::cond_timedwait (_cond, _mutex, &absTime);
  }
  // Note that an untimed cond_wait() can sometimes return ETIME on older
  // versions of the Solaris.
  assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");

#ifdef ASSERT
  thr_sigsetmask(SIG_SETMASK, &oldsigs, NULL);
#endif
  _counter = 0 ;
  status = os::Solaris::mutex_unlock(_mutex);
  assert_status(status == 0, status, "mutex_unlock") ;
  // Paranoia to ensure our locked and lock-free paths interact
  // correctly with each other and Java-level accesses.
  OrderAccess::fence();

  // If externally suspended while waiting, re-suspend
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
}

(1) ThreadBlockInVM tbivm(jt); 是修改線程狀態為阻塞。 相當於創建一個ThreadBlockInVM  對象, 變量名為tbivm, 參數為jt

\openjdk\hotspot\src\share\vm\runtime\interfaceSupport.hpp

class ThreadBlockInVM : public ThreadStateTransition {
 public:
  ThreadBlockInVM(JavaThread *thread)
  : ThreadStateTransition(thread) {
    // Once we are blocked vm expects stack to be walkable
    thread->frame_anchor()->make_walkable(thread);
    trans_and_fence(_thread_in_vm, _thread_blocked);
  }
  ~ThreadBlockInVM() {
    trans_and_fence(_thread_blocked, _thread_in_vm);
    // We don't need to clear_walkable because it will happen automagically when we return to java
  }
};

 然后調用到: D:\study\sourcecode\openjdk\openjdk\hotspot\src\share\vm\runtime\interfaceSupport.hpp 中的 transition_and_fence

  // transition_and_fence must be used on any thread state transition
  // where there might not be a Java call stub on the stack, in
  // particular on Windows where the Structured Exception Handler is
  // set up in the call stub. os::write_memory_serialize_page() can
  // fault and we can't recover from it on Windows without a SEH in
  // place.
  static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) {
    assert(thread->thread_state() == from, "coming from wrong thread state");
    assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states");
    // Change to transition state (assumes total store ordering!  -Urs)
    thread->set_thread_state((JavaThreadState)(from + 1));

    // Make sure new state is seen by VM thread
    if (os::is_MP()) {
      if (UseMembar) {
        // Force a fence between the write above and read below
        OrderAccess::fence();
      } else {
        // Must use this rather than serialization page in particular on Windows
        InterfaceSupport::serialize_memory(thread);
      }
    }

    if (SafepointSynchronize::do_call_back()) {
      SafepointSynchronize::block(thread);
    }
    thread->set_thread_state(to);

    CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();)
  }

定義的線程狀態: openjdk\hotspot\src\share\vm\utilities\globalDefinitions.hpp

enum JavaThreadState {
  _thread_uninitialized     =  0, // should never happen (missing initialization)
  _thread_new               =  2, // just starting up, i.e., in process of being initialized
  _thread_new_trans         =  3, // corresponding transition state (not used, included for completness)
  _thread_in_native         =  4, // running in native code
  _thread_in_native_trans   =  5, // corresponding transition state
  _thread_in_vm             =  6, // running in VM
  _thread_in_vm_trans       =  7, // corresponding transition state
  _thread_in_Java           =  8, // running in Java or in stub code
  _thread_in_Java_trans     =  9, // corresponding transition state (not used, included for completness)
  _thread_blocked           = 10, // blocked in vm
  _thread_blocked_trans     = 11, // corresponding transition state
  _thread_max_state         = 12  // maximum thread state+1 - used for statistics allocation
};

2. unpark 原理

\openjdk\hotspot\src\os\solaris\vm\os_solaris.cpp 中 unpark 方法如下:

void Parker::unpark() {
  int s, status ;
  // 獲取互斥鎖
  status = os::Solaris::mutex_lock (_mutex) ;
  assert (status == 0, "invariant") ;
  // s記錄原來的_counter 的值
  s = _counter;
  // _counter 設置為0
  _counter = 1;
  // 釋放互斥鎖
  status = os::Solaris::mutex_unlock (_mutex) ;
  assert (status == 0, "invariant") ;

  // 如果原來的_counter為0, 證明有線程調用park 在等待信號。 則調用下面方法通知線程解除阻塞。 則原來park 等待的線程會繼續后面的代碼
  if (s < 1) {
    status = os::Solaris::cond_signal (_cond) ;
    assert (status == 0, "invariant") ;
  }
}

  unpark本身就是將_counter 設置為1,並通知條件阻塞的線程已經可以結束等待了. 如果多次連續調用unpark 方法,則s < 1 不成立, 也就不會走下面的方法。

cond_signal 調用 \openjdk\hotspot\src\os\solaris\vm\os_solaris.hpp 中 下面代碼:

  static int cond_signal(cond_t *cv)            { return _cond_signal(cv); }

 

總結: 每個線程都有一個parker 對象,內部包含_counter 可以視作許可證。 每次park 的時候相當於等待該許可證(等待該變量改為1), 調用unpark 相當於將許可證變量改為1。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM