條件變量


條件變量是thread庫提供的一種用於等待的同步機制,可以實現線程間的通信,它必須與互斥量配合使用,等待另一個線程中某個事件的發生(滿足某個條件),然后線程才能繼續執行。

thread庫提供兩種條件變量對象condition_variable和condition_variable_any,一般情況下,我們應該使用condition_variable_any,它能夠適用更廣泛的互斥量類型。

用法:擁有條件變量的線程先鎖定互斥量,然后循環檢查某個條件,如果條件不滿足,那么就調用條件變量的成員函數wait()等待直至條件滿足。其他線程處理條件變量要求的條件,當條件滿足時調用它的成員函數notify_one()或notify_all(),以通知一個或者所有正在等待條件變量的線程停止等待繼續執行。

wait():當前線程調用wait()后將被阻塞,直到另外某個線程調用notify_*喚醒當前線程;當線程被阻塞時,該函數會自動調用std::mutex的unlock()釋放鎖,使得其它被阻塞在鎖競爭上的線程得以繼續執行。一旦當前線程獲得通知(notify,通常是另外某個線程調用notify_*喚醒了當前線程),wait()函數也是自動調用std::mutex的lock()。

生產者-消費者模式:

#include <boost/thread.hpp>
#include <boost/ref.hpp>
#include <iostream>
#include <stack>

boost::mutex io_mu;
class buffer
{
private:
	boost::mutex mu; // 互斥量,配合條件變量使用
	boost::condition_variable_any cond_put; // 寫條件變量
	boost::condition_variable_any cond_get; // 讀條件變量
	std::stack<int> stk; // 緩沖區對象
	int un_read, capaccity; 
	bool is_full() // 緩沖區滿判斷
	{
		return un_read == capaccity;
	}
	bool is_empty() // 緩沖區空判斷
	{
		return stk.size() == 0;
	}
public:
	buffer(std::size_t n) :un_read(0), capaccity(n){}
	void put(int x) // 寫數據
	{
		{ // 開始一個局部域
			boost::mutex::scoped_lock lock(mu); // 鎖定互斥量
			while (is_full()) // 檢查緩沖區是否滿
			{
				{ // 局部域,鎖定io_mu
					boost::mutex::scoped_lock lock(io_mu);
					std::cout << "full waiting..." << std::endl;
				}
				cond_put.wait(mu); // 條件變量等待
			} // 條件滿足,停止等待
			stk.push(x); // 壓棧,寫入數據
			++un_read;
		} // 通知前解鎖互斥量,條件變量的通知不需要互斥量鎖定
		cond_get.notify_one(); // 通知可以讀數據
	}

	void get(int *x) // 讀數據
	{
		{ // 局部域開始
			boost::mutex::scoped_lock lock(mu); // 鎖定互斥量
			while (is_empty()) // 檢查緩沖區是否空
			{
				{ // 鎖定io_mu
					boost::mutex::scoped_lock lock(io_mu);
					std::cout << "empty waiting..." << std::endl;
				}
				cond_get.wait(mu); // 條件變量等待
			} // 條件滿足,停止等待
			--un_read;
			*x = stk.top(); // 讀取數據
			stk.pop();
		} // 通知前解鎖
		cond_put.notify_one();
	}
};

buffer buf(5); // 定義一個緩沖區對象
void producer(int n) // 生產者
{
	for (int i = 0; i < n; ++i)
	{
		{
			boost::mutex::scoped_lock lock(io_mu);
			std::cout << "put: " << i << std::endl;
		}
		buf.put(i); // 寫入數據
	}
}

void consumer(int n) // 消費者
{
	int x;
	for (int i = 0; i < n; ++i)
	{
		{
			buf.get(&x); // 讀取數據
			boost::mutex::scoped_lock lock(io_mu);
			std::cout << "get: " << x << std::endl;
		}
	}
}

int main()
{

	boost::thread t1(producer, 20);
	boost::thread t2(consumer, 10);
	boost::thread t3(consumer, 10);
	t1.join();
	t2.join();
	t3.join();
	return 0;
}

  

運行截圖:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM