條件變量 避免驚群 虛假喚醒


 

1. 為什么調用pthread_cond_wait之前需要檢查條件

在線程調用 pthread_cond_signal() 之前,如果沒有線程調用 pthread_cond_wait() 處於阻塞狀態,那么什么都不會發生;

在線程調用 pthread_cond_signal() 之后,線程調用了 pthread_cond_wait() ,那么這個線程將永遠被阻塞,所以要在調用了 pthread_cond_wait() 之前檢查條件

比如下面的例子線程B如果是在線程A執行過pthread_cond_signal()之后調用,那么將一直被阻塞

如果線程A

pthread_mutex_lock(&m);
cond = true;
pthread_cond_signal(&c);
pthread_mutex_unlock(&m);

線程B不判斷條件直接wait

pthread_mutex_lock(&m);
pthread_cond_wait(&c, &m);
/* cond now true */​
pthread_mutex_unlock(&m);​

2. 為什么條件變量要和互斥鎖一塊使用

如果不配合互斥鎖使用,會導致丟失喚醒

線程A

pthread_mutex_lock(&m);
while (cond == FALSE)
    pthread_cond_wait(&c, &m);
pthread_mutex_unlock(&m);​

線程B

condition = TRUE;
pthread_cond_signal(&cond);​

考慮下面的執行序列,在線程A判斷過 cond == FALSE后,線程B修改了cond的值為TRUE並且調用了pthread_cond_signal 然后線程A才調用pthread_cond_wait 那么線程B的signal就被丟失了,線程A會被一直阻塞

 

Thread A                       Thread B
 
pthread_mutex_lock(&m);
while (cond == FALSE) {
 
                               cond = TRUE;
                               pthread_cond_signal(&c);
 
    pthread_cond_wait(&c, &m);​

如果將線程B修改為如下,保證了線程A的cond == FALSE和pthread_cond_wait是原子操作,那么此時線程B因為拿不到鎖,就無法調用pthread_cond_signal

pthread_mutex_lock(&m);
cond = TRUE;
pthread_cond_signal(&c);
pthread_mutex_unlock(&m);​

 

3. 為什么要用while來檢查條件

在wait前必須使用while來等待條件變量而不使用if語句,原因在於要避免spurious wakeups,即虛假喚醒。

1)什么是虛假喚醒:  即使沒有線程broadcast 或者signal條件變量,pthread_cond_wait也可能偶爾返回

2)出現虛假喚醒的兩種case

a. 函數 pthread_cond_wait() 底層實現是用的  futex 系統調用,每一個阻塞的系統調用在進程收到信號后立刻返回一個 EINTR 錯誤,然后 pthread_cond_wait() 不能重啟調用futex等待,因為在收到EINTR錯誤到重新調用futex 期間pthread_cond_signal有可能被調用,那么就錯過了真正的喚醒,所以直接返回,造成了虛假喚醒,並不是pthread_cond_signal 喚醒的

b. 函數 pthread_cond_wait()是被pthread_cond_signal喚醒的,但是發現條件不成立。這是可能因為線程調度,被條件變量喚醒的線程在本線程內真正執行「加鎖並返回」前,另一個線程插了進來,完整地進行了一套「加鎖、改條件、釋放鎖鎖」的操作。

3)為什么while可以避免虛假喚醒

case 1:線程A,出現虛假喚醒后,重新進入while循環判斷cond是FALSE,重新進入wait等待

 

Thread A                        Thread B
 
pthread_mutex_lock(&m);
while (cond == FALSE) {
    //spurious wakeups
    pthread_cond_wait(&c, &m);​
}                                        
                                pthread_mutex_lock(&m);
                                cond = TRUE;
                                pthread_cond_signal(&c);
                                pthread_mutex_unlock(&m);​
pthread_mutex_unlock(&m);​

case2:線程A調用pthread_cond_wait后,線程B拿到了鎖,此時發生了虛假喚醒,線程B設置條件為TRUE,觸發喚醒並釋放鎖,線程Apthread_cond_wait拿到鎖后返回,while循環判斷cond 已經變成TRUE,不會進入pthread_cond_wait,這種情況其實是丟失了真正的喚醒,虛假喚醒起了作用,但是程序卻沒有問題

Thread A                          Thread B
 
pthread_mutex_lock(&m);
while (cond == FALSE) {              
    // unlock鎖,阻塞wait函數
    // 發生spurious wakeups       
    pthread_cond_wait(&c, &m);    pthread_mutex_lock(&m);
                                  cond = TRUE;
                                  pthread_cond_signal(&c);
                                  pthread_mutex_unlock(&m);​
    //函數返回,加鎖等B釋放鎖   
    pthread_cond_wait(&c, &m);​                                 
}
pthread_mutex_unlock(&m);​

注意:pthread_cond_wait函數會先unlock鎖,返回時會lock鎖(不管是否是虛假喚醒還是真的喚醒)

 

typedef union
{
  struct
  {
    int __lock;保護多線程中cond結構本身的變量操作不會並發,例如對於total_seq進而wakup_seq的使用和遞增操作。
    unsigned int __futex;另一個線程和這個線程之間在條件點上同步的方式,也就是如果需要和其它線程同步的話,使用這個互斥鎖替換pthread_cond_wait傳入的互斥鎖進行同步。
    __extension__ unsigned long long int __total_seq;這個表示在這個條件變量上有多少個線程在等待這個信號。
    __extension__ unsigned long long int __wakeup_seq;已經在這個條件變量上執行了多少次喚醒操作。
    __extension__ unsigned long long int __woken_seq;這個條件變量中已經被真正喚醒的線程數目。
    void *__mutex;保存pthread_cond_wait傳入的互斥鎖,需要保證pthread_cond_wait和pthread_cond_signal傳入的值都是相同值。
    unsigned int __nwaiters;表示這個cond結構現在還有多少個線程在使用,當有人在使用的時候,pthread_cond_destroy需要等待所有的操作完成
    unsigned int __broadcast_seq; 廣播動作發生了多少次,也就是執行了多少次broadcast
  } __data;
  char __size[__SIZEOF_PTHREAD_COND_T];
  __extension__ long long int __align;
} pthread_cond_t;

 

pthread_cond_wait的操作

 

 

pthread_cond_wait :1.首先解鎖相當於pthread_mutex_unlock。2.然后建立鎖與條件變量的聯系,3.等待喚醒,4.喚醒后第一件事情是上鎖相當於pthread_mutex_lock

__pthread_cond_wait (cond, mutex)
     pthread_cond_t *cond;
     pthread_mutex_t *mutex;
{
  struct _pthread_cleanup_buffer buffer;
  struct _condvar_cleanup_buffer cbuffer;
  int err;
  int pshared = (cond->__data.__mutex == (void *) ~0l)
    ? LLL_SHARED : LLL_PRIVATE;
  /* Make sure we are along.  */
  lll_lock (cond->__data.__lock, pshared);即將對cond結構的成員進行操作和判斷,所以首先獲得結構本身保護互斥鎖。
  /* Now we can release the mutex.  */
  err = __pthread_mutex_unlock_usercnt (mutex, 0);
釋放用戶傳入的互斥鎖,此時另外一個執行pthread_cond_signal的線程可以通過pthread_mutex_lock執行可能的signal判斷,
但是我們還沒有釋放數據操作互斥鎖,所以另一方執行pthread_cond_signal的時候依然可能會等待。
if (__builtin_expect (err, 0)) { lll_unlock (cond->__data.__lock, pshared); return err; } /* We have one new user of the condvar. */ ++cond->__data.__total_seq;增加系統中所有需要執行的喚醒次數。 ++cond->__data.__futex;增加futex,主要是為了保證用戶態數據一致性。 cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;增加cond結構的使用次數。 /* Remember the mutex we are using here. If there is already a different address store this is a bad user bug. Do not store anything for pshared condvars. */ if (cond->__data.__mutex != (void *) ~0l) cond->__data.__mutex = mutex; /* Prepare structure passed to cancellation handler. */ cbuffer.cond = cond; cbuffer.mutex = mutex; /* Before we block we enable cancellation. Therefore we have to install a cancellation handler. */ __pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);注冊撤銷點。 /* The current values of the wakeup counter. The "woken" counter must exceed this value. */ unsigned long long int val; unsigned long long int seq; val = seq = cond->__data.__wakeup_seq; /* Remember the broadcast counter. */ cbuffer.bc_seq = cond->__data.__broadcast_seq; do { unsigned int futex_val = cond->__data.__futex; /* Prepare to wait. Release the condvar futex. */ lll_unlock (cond->__data.__lock, pshared);此處真正釋放cond操作互斥鎖,我們已經不再對其中的變量進行操作。 /* Enable asynchronous cancellation. Required by the standard. */ cbuffer.oldtype = __pthread_enable_asynccancel (); /* Wait until woken by signal or broadcast. */ lll_futex_wait (&cond->__data.__futex, futex_val, pshared);
等待在futex變量上,由於我們剛才保存了futex的原始值,所以如果在上面我們釋放了data.lock之后另一個線程修改了這個變量的值,那么這里的lll_futex_wait將會返回失敗,
所以會繼續進行下一輪的while循環,直到連個執行相同,說明我們做的判斷時正確的。
/* Disable asynchronous cancellation. */如果執行到這里,說明我們已經被signal喚醒。 __pthread_disable_asynccancel (cbuffer.oldtype); /* We are going to look at shared data again, so get the lock. */ lll_lock (cond->__data.__lock, pshared);訪問變量,需要獲得互斥鎖。 /* If a broadcast happened, we are done. */ if (cbuffer.bc_seq != cond->__data.__broadcast_seq) goto bc_out; /* Check whether we are eligible for wakeup. */ val = cond->__data.__wakeup_seq; } while (val == seq || cond->__data.__woken_seq == val); 當val!=seq&&cond->data.wokenup!=val的時候可以進行喚醒,也就是另一個放修改了已經執行了喚醒的次數並且已經被喚醒的線程還有名額的時候。 /* Another thread woken up. */ ++cond->__data.__woken_seq;增加系統中已經被喚醒的線程的數目。 bc_out: broadcast跳轉到這里。 cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT; /* If pthread_cond_destroy was called on this varaible already, notify the pthread_cond_destroy caller all waiters have left and it can be successfully destroyed. */ if (cond->__data.__total_seq == -1ULL && cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT)) lll_futex_wake (&cond->__data.__nwaiters, 1, pshared); /* We are done with the condvar. */ lll_unlock (cond->__data.__lock, pshared); /* The cancellation handling is back to normal, remove the handler. */ __pthread_cleanup_pop (&buffer, 0); /* Get the mutex before returning. */ return __pthread_mutex_cond_lock (mutex);
//再次獲得mutex互斥鎖,可能會睡眠,因為我們的這個釋放是對上層透明的,而在進入函數的時候我們已經釋放了這個互斥鎖,所以此時還要進行一次獲得操作,從而配對。 }
 

1. 為什么是pthread_cond_wait(cond, mutex)而不是pthread_cond_wait(cond)

我當初學習條件變量時,也有過和樓主相同的疑問,在上操作系統實踐課程時,班上的個別學生也問過這個問題。相信這是一個初學者的共性問題,但很少有書籍仔細解釋這個問題。

為什么pthread_cond_wait的api被設計為

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

而不是被設計為

int pthread_cond_wait(pthread_cond_t *cond);

pthread_cond_wait(cond, mutex)的功能有3個:

  • 調用者線程首先釋放mutex
  • 然后阻塞,等待被別的線程喚醒
  • 當調用者線程被喚醒后,調用者線程會再次獲取mutex

pthread_cond_wait(cond)的功能只有1個:

  • 調用者線程阻塞,等待被別的線程喚醒。

這里首先給一個簡潔的回答:

  • 通常的應用場景下,當前線程執行pthread_cond_wait時,處於臨界區訪問共享資源,存在一個mutex與該臨界區相關聯,這是理解pthread_cond_wait帶有mutex參數的關鍵
  • 當前線程執行pthread_cond_wait前,已經獲得了和臨界區相關聯的mutex;執行pthread_cond_wait會阻塞,但是在進入阻塞狀態前,必須釋放已經獲得的mutex,讓其它線程能夠進入臨界區
  • 當前線程執行pthread_cond_wait后,阻塞等待的條件滿足,條件滿足時會被喚醒;被喚醒后,仍然處於臨界區,因此被喚醒后必須再次獲得和臨界區相關聯的mutex

綜上,調用pthread_cond_wait時,線程總是位於某個臨界區,該臨界區與mutex相關,pthread_cond_wait需要帶有一個參數mutex,用於釋放和再次獲取mutex。

 

while (predicates do not hold) {
    /* 1 */
    pthread_cond_wait(&cond);
}

 

若在1這個點正好條件滿足且signal了一下,那么這個signal就丟了,上面的代碼可能會陷入永久的等待。mutex就是確保1這個位置不可能穿插signal代碼。當然相應地,signal一端也要加鎖,否則仍然無法確保這點,這也是一個常見錯誤,因為pthread_cond_signal並不要求mutex。

一種特殊情況是,當你的條件簡單到只有一條原子指令時,就可以直接使用 futex了。事實上condition主要是簡化多線程用戶代碼的開發,當需要編寫較為底層且性能關鍵的代碼時,你需要深入地了解 atomicMemory barrier


 
pthread_cond_signal的操作
int
__pthread_cond_signal (cond)
     pthread_cond_t *cond;
{
  int pshared = (cond->__data.__mutex == (void *) ~0l)
  ? LLL_SHARED : LLL_PRIVATE;
  /* Make sure we are alone.  */
  lll_lock (cond->__data.__lock, pshared);
  /* Are there any waiters to be woken?  */
  if (cond->__data.__total_seq > cond->__data.__wakeup_seq)如果待喚醒次數比已經喚醒的次數多,那么此時就進行一個喚醒操作。
    {
      /* Yes.  Mark one of them as woken.  */
      ++cond->__data.__wakeup_seq;
      ++cond->__data.__futex;改變futex的值,這個值的具體意義並不重要,只是為了告訴另一方,這個值已經變化,如果另一方使用的是原始值,那么對futex的wait操作將會失敗。
      /* Wake one.  */
      if (! __builtin_expect (lll_futex_wake_unlock (&cond->__data.__futex, 1,
           1, &cond->__data.__lock,
           pshared), 0))
 return 0;
      lll_futex_wake (&cond->__data.__futex, 1, pshared);
    }
  /* We are done.  */
  lll_unlock (cond->__data.__lock, pshared);
  return 0;
}
5、__pthread_cond_broadcast 
int
__pthread_cond_broadcast (cond)
     pthread_cond_t *cond;
{
  int pshared = (cond->__data.__mutex == (void *) ~0l)
  ? LLL_SHARED : LLL_PRIVATE;
  /* Make sure we are alone.  */
  lll_lock (cond->__data.__lock, pshared);
  /* Are there any waiters to be woken?  */
  if (cond->__data.__total_seq > cond->__data.__wakeup_seq)判斷是否有等待喚醒的線程。
    {
      /* Yes.  Mark them all as woken.  */
      cond->__data.__wakeup_seq = cond->__data.__total_seq;
      cond->__data.__woken_seq = cond->__data.__total_seq;
      cond->__data.__futex = (unsigned int) cond->__data.__total_seq * 2;
      int futex_val = cond->__data.__futex;
      /* Signal that a broadcast happened.  */
      ++cond->__data.__broadcast_seq;
      /* We are done.  */
      lll_unlock (cond->__data.__lock, pshared);
      /* Do not use requeue for pshared condvars.  */
      if (cond->__data.__mutex == (void *) ~0l)
 goto wake_all;
      /* Wake everybody.  */
      pthread_mutex_t *mut = (pthread_mutex_t *) cond->__data.__mutex;
      /* XXX: Kernel so far doesn't support requeue to PI futex.  */
      /* XXX: Kernel so far can only requeue to the same type of futex,
  in this case private (we don't requeue for pshared condvars).  */
      if (__builtin_expect (mut->__data.__kind
       & (PTHREAD_MUTEX_PRIO_INHERIT_NP
          | PTHREAD_MUTEX_PSHARED_BIT), 0))
 goto wake_all;
      /* lll_futex_requeue returns 0 for success and non-zero
  for errors.  */
      if (__builtin_expect (lll_futex_requeue (&cond->__data.__futex, 1,
            INT_MAX, &mut->__data.__lock,
            futex_val, LLL_PRIVATE), 0))把futex上的轉移到data.lock中並喚醒,如果失敗則直接喚醒而不轉移。
 {
   /* The requeue functionality is not available.  */
 wake_all:
   lll_futex_wake (&cond->__data.__futex, INT_MAX, pshared);這里的INT_MAX就是告訴內核喚醒所有在這個變量上等待的線程。
 }
      /* That's all.  */
      return 0;
    }
  /* We are done.  */
  lll_unlock (cond->__data.__lock, pshared);
  return 0;
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM