一、解決問題和適用范圍
主要是用來等待一個條件,這個條件可能需要另一個線程來滿足這個條件。這個和我們平常適用的pthread_mutex_lock的最大不同在於后者保護的一般是一個代碼段(也就是關鍵區),或者一個變量,但是由於一般來說這個變量的訪問是在一個關鍵區中,所以可以認為是一個關鍵區。
但是對於條件變量,是需要的是一個事件,只有事件滿足的時候才會執行后面的操作,此時就出現一個問題:如果不滿足我們應該怎么辦?如果如果使用簡單信號量,可能另一方觸發了這個條件,然后通過unlock來喚醒一個線程,但是此時經過多次喚醒其實這邊根本沒有等待,那么信號就可能丟失。如果這邊一直的空等,那么對於CPU的利用率又非常大,所以就引發了條件等待的概念。
二、網絡上的例子代碼
一下代碼來自http://download.oracle.com/docs/cd/E19455-01/806-5257/6je9h032r/index.html
void producer(buffer_t *b, char item) { pthread_mutex_lock(&b->mutex); while (b->occupied >= BSIZE) pthread_cond_wait(&b->less, &b->mutex); assert(b->occupied < BSIZE); b->buf[b->nextin++] = item; b->nextin %= BSIZE; b->occupied++; /* now: either b->occupied < BSIZE and b->nextin is the index of the next empty slot in the buffer, or b->occupied == BSIZE and b->nextin is the index of the next (occupied) slot that will be emptied by a consumer (such as b->nextin == b->nextout) */ pthread_cond_signal(&b->more); pthread_mutex_unlock(&b->mutex); }
char consumer(buffer_t *b) { char item; pthread_mutex_lock(&b->mutex); while(b->occupied <= 0) pthread_cond_wait(&b->more, &b->mutex); assert(b->occupied > 0); item = b->buf[b->nextout++]; b->nextout %= BSIZE; b->occupied--; /* now: either b->occupied > 0 and b->nextout is the index of the next occupied slot in the buffer, or b->occupied == 0 and b->nextout is the index of the next (empty) slot that will be filled by a producer (such as b->nextout == b->nextin) */ pthread_cond_signal(&b->less); pthread_mutex_unlock(&b->mutex); return(item); }
三、Glibc的實現
1、數據結構
/* Data structure for conditional variable handling. The structure of
the attribute type is not exposed on purpose. */
typedef union
{
struct
{
int __lock;保護多線程中cond結構本身的變量操作不會並發,例如對於total_seq進而wakup_seq的使用和遞增操作。
unsigned int __futex;另一個線程和這個線程之間在條件點上同步的方式,也就是如果需要和其它線程同步的話,使用這個互斥鎖替換pthread_cond_wait傳入的互斥鎖進行同步。
__extension__ unsigned long long int __total_seq;這個表示在這個條件變量上有多少個線程在等待這個信號。
__extension__ unsigned long long int __wakeup_seq;已經在這個條件變量上執行了多少次喚醒操作。
__extension__ unsigned long long int __woken_seq;這個條件變量中已經被真正喚醒的線程數目。
void *__mutex;保存pthread_cond_wait傳入的互斥鎖,需要保證pthread_cond_wait和pthread_cond_signal傳入的值都是相同值。
unsigned int __nwaiters;表示這個cond結構現在還有多少個線程在使用,當有人在使用的時候,pthread_cond_destroy需要等待所有的操作完成
unsigned int __broadcast_seq; 廣播動作發生了多少次,也就是執行了多少次broadcast
} __data;
char __size[__SIZEOF_PTHREAD_COND_T];
__extension__ long long int __align;
} pthread_cond_t;
2、lll_futex_wait的意義
lll_futex_wait (&cond->__data.__futex, futex_val, pshared);
lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);
對於第一個wait,需要傳入一個我們用戶態判斷時使用的futex值,也就是這里的第二個參數futex_val,這樣內核會判斷進行真正的wait掛起的時候這個地址的是不是還是這個值,如果不是這個wait失敗。但是進行wakup的時候不需要傳入判斷值,可能是假設此時已經獲得互斥鎖,所以不會有其它線程來競爭了吧。
這個要和pthread_mutex_lock使用的0、1、2三值區分開來,因為這些都是C庫規定的語義,內核對他們沒有任何特殊要求和語義判斷,所以用戶態可以隨意的改變這個變量的值。
3、pthread_cond_wait的操作
int
__pthread_cond_wait (cond, mutex)
pthread_cond_t *cond;
pthread_mutex_t *mutex;
{
struct _pthread_cleanup_buffer buffer;
struct _condvar_cleanup_buffer cbuffer;
int err;
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
/* Make sure we are along. */
lll_lock (cond->__data.__lock, pshared);即將對cond結構的成員進行操作和判斷,所以首先獲得結構本身保護互斥鎖。
/* Now we can release the mutex. */
err = __pthread_mutex_unlock_usercnt (mutex, 0);釋放用戶傳入的互斥鎖,此時另外一個執行pthread_cond_signal的線程可以通過pthread_mutex_lock執行可能的signal判斷,但是我們還沒有釋放數據操作互斥鎖,所以另一方執行pthread_cond_signal的時候依然可能會等待。
if (__builtin_expect (err, 0))
{
lll_unlock (cond->__data.__lock, pshared);
return err;
}
/* We have one new user of the condvar. */
++cond->__data.__total_seq;增加系統中所有需要執行的喚醒次數。
++cond->__data.__futex;增加futex,主要是為了保證用戶態數據一致性。
cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;增加cond結構的使用次數。
/* Remember the mutex we are using here. If there is already a
different address store this is a bad user bug. Do not store
anything for pshared condvars. */
if (cond->__data.__mutex != (void *) ~0l)
cond->__data.__mutex = mutex;
/* Prepare structure passed to cancellation handler. */
cbuffer.cond = cond;
cbuffer.mutex = mutex;
/* Before we block we enable cancellation. Therefore we have to
install a cancellation handler. */
__pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);注冊撤銷點。
/* The current values of the wakeup counter. The "woken" counter
must exceed this value. */
unsigned long long int val;
unsigned long long int seq;
val = seq = cond->__data.__wakeup_seq;
/* Remember the broadcast counter. */
cbuffer.bc_seq = cond->__data.__broadcast_seq;
do
{
unsigned int futex_val = cond->__data.__futex;
/* Prepare to wait. Release the condvar futex. */
lll_unlock (cond->__data.__lock, pshared);此處真正釋放cond操作互斥鎖,我們已經不再對其中的變量進行操作。
/* Enable asynchronous cancellation. Required by the standard. */
cbuffer.oldtype = __pthread_enable_asynccancel ();
/* Wait until woken by signal or broadcast. */
lll_futex_wait (&cond->__data.__futex, futex_val, pshared);等待在futex變量上,由於我們剛才保存了futex的原始值,所以如果在上面我們釋放了data.lock之后另一個線程修改了這個變量的值,那么這里的lll_futex_wait將會返回失敗,所以會繼續進行下一輪的while循環,直到連個執行相同,說明我們做的判斷時正確的。
/* Disable asynchronous cancellation. */如果執行到這里,說明我們已經被signal喚醒。
__pthread_disable_asynccancel (cbuffer.oldtype);
/* We are going to look at shared data again, so get the lock. */
lll_lock (cond->__data.__lock, pshared);訪問變量,需要獲得互斥鎖。
/* If a broadcast happened, we are done. */
if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
goto bc_out;
/* Check whether we are eligible for wakeup. */
val = cond->__data.__wakeup_seq;
}
while (val == seq || cond->__data.__woken_seq == val); 當val!=seq&&cond->data.wokenup!=val的時候可以進行喚醒,也就是另一個放修改了已經執行了喚醒的次數並且已經被喚醒的線程還有名額的時候。
/* Another thread woken up. */
++cond->__data.__woken_seq;增加系統中已經被喚醒的線程的數目。
bc_out: broadcast跳轉到這里。
cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT;
/* If pthread_cond_destroy was called on this varaible already,
notify the pthread_cond_destroy caller all waiters have left
and it can be successfully destroyed. */
if (cond->__data.__total_seq == -1ULL
&& cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT))
lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);
/* We are done with the condvar. */
lll_unlock (cond->__data.__lock, pshared);
/* The cancellation handling is back to normal, remove the handler. */
__pthread_cleanup_pop (&buffer, 0);
/* Get the mutex before returning. */
return __pthread_mutex_cond_lock (mutex);再次獲得mutex互斥鎖,可能會睡眠,因為我們的這個釋放是對上層透明的,而在進入函數的時候我們已經釋放了這個互斥鎖,所以此時還要進行一次獲得操作,從而配對。
}
4、pthread_cond_signal的操作
int
__pthread_cond_signal (cond)
pthread_cond_t *cond;
{
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
/* Make sure we are alone. */
lll_lock (cond->__data.__lock, pshared);
/* Are there any waiters to be woken? */
if (cond->__data.__total_seq > cond->__data.__wakeup_seq)如果待喚醒次數比已經喚醒的次數多,那么此時就進行一個喚醒操作。
{
/* Yes. Mark one of them as woken. */
++cond->__data.__wakeup_seq;
++cond->__data.__futex;改變futex的值,這個值的具體意義並不重要,只是為了告訴另一方,這個值已經變化,如果另一方使用的是原始值,那么對futex的wait操作將會失敗。
/* Wake one. */
if (! __builtin_expect (lll_futex_wake_unlock (&cond->__data.__futex, 1,
1, &cond->__data.__lock,
pshared), 0))
return 0;
lll_futex_wake (&cond->__data.__futex, 1, pshared);
}
/* We are done. */
lll_unlock (cond->__data.__lock, pshared);
return 0;
}
5、__pthread_cond_broadcast
int
__pthread_cond_broadcast (cond)
pthread_cond_t *cond;
{
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
/* Make sure we are alone. */
lll_lock (cond->__data.__lock, pshared);
/* Are there any waiters to be woken? */
if (cond->__data.__total_seq > cond->__data.__wakeup_seq)判斷是否有等待喚醒的線程。
{
/* Yes. Mark them all as woken. */
cond->__data.__wakeup_seq = cond->__data.__total_seq;
cond->__data.__woken_seq = cond->__data.__total_seq;
cond->__data.__futex = (unsigned int) cond->__data.__total_seq * 2;
int futex_val = cond->__data.__futex;
/* Signal that a broadcast happened. */
++cond->__data.__broadcast_seq;
/* We are done. */
lll_unlock (cond->__data.__lock, pshared);
/* Do not use requeue for pshared condvars. */
if (cond->__data.__mutex == (void *) ~0l)
goto wake_all;
/* Wake everybody. */
pthread_mutex_t *mut = (pthread_mutex_t *) cond->__data.__mutex;
/* XXX: Kernel so far doesn't support requeue to PI futex. */
/* XXX: Kernel so far can only requeue to the same type of futex,
in this case private (we don't requeue for pshared condvars). */
if (__builtin_expect (mut->__data.__kind
& (PTHREAD_MUTEX_PRIO_INHERIT_NP
| PTHREAD_MUTEX_PSHARED_BIT), 0))
goto wake_all;
/* lll_futex_requeue returns 0 for success and non-zero
for errors. */
if (__builtin_expect (lll_futex_requeue (&cond->__data.__futex, 1,
INT_MAX, &mut->__data.__lock,
futex_val, LLL_PRIVATE), 0))把futex上的轉移到data.lock中並喚醒,如果失敗則直接喚醒而不轉移。
{
/* The requeue functionality is not available. */
wake_all:
lll_futex_wake (&cond->__data.__futex, INT_MAX, pshared);這里的INT_MAX就是告訴內核喚醒所有在這個變量上等待的線程。
}
/* That's all. */
return 0;
}
/* We are done. */
lll_unlock (cond->__data.__lock, pshared);
return 0;
}