c++11 並發編程 --- 條件變量(condition_variable) wait,wait_for


介紹condition_variable, wait,wait_for
直接上代碼如下:

#include <iostream>                // std::cout
#include <thread>                // std::thread
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable

std::mutex mtx; // 全局互斥鎖.
std::condition_variable cv; // 全局條件變量.
bool ready = false; // 全局標志位.


void do_print_id(int id)
{
    std::unique_lock <std::mutex> lck(mtx); // 加鎖互斥量
    while (!ready) 
    {
        cv.wait(lck); // 當ready==false的時候,while語句執行到wait這里,然后就堵塞到這行,等到通知信號,同時解鎖互斥量,不影響其他線程獲取鎖。 
    }                 //當 cv.notify_all(); // 喚醒所有線程. 執行到這句wait就收到了信號就被喚醒開始干活,首先就是不斷的嘗試重新獲取並加鎖互斥量。
                      //若獲取不到鎖就卡在這里反復嘗試加鎖
                      //若獲取到了鎖才往下執行
  
    std::cout << "thread " << id << '\n';
}

void go()
{
    std::unique_lock <std::mutex> lck(mtx);
    ready = true; // 設置全局標志位為 true.
    cv.notify_all(); // 喚醒所有線程.
}

int main()
{
    std::thread threads[10];
    // spawn 10 threads:
    for (int i = 0; i < 10; ++i)
        threads[i] = std::thread(do_print_id, i);

    std::cout << "10 threads ready to race...\n";
    go(); // go!

  for (auto & th:threads)
        th.join();

    return 0;
}

wait_for

與std::condition_variable::wait() 類似,不過 wait_for可以指定一個時間段,在當前線程收到通知或者指定的時間 rel_time 超時之前,該線程都會處於阻塞狀態。
而一旦超時或者收到了其他線程的通知,wait_for返回,剩下的處理步驟和 wait()類似。

#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <chrono>             // std::chrono::seconds
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable, std::cv_status
std::condition_variable cv;
int value;
void do_read_value()
{
    std::cin >> value;
    cv.notify_one(); //只有鍵盤敲入一個字符,才往下執行cv.notify_one();
}

int main ()
{
    std::cout << "Please, enter an integer (I'll be printing dots): \n";
    std::thread th(do_read_value);
    std::mutex mtx;
    std::unique_lock<std::mutex> lck(mtx); //加鎖互斥量
    while (cv.wait_for(lck,std::chrono::seconds(1)) == std::cv_status::timeout) { //這里wait_for堵塞到這一行,解鎖互斥量。
               std::cout << '.';                                                  //當超時1s的時候,相當於收到了通知信號,就被喚醒干活了。 加鎖互斥量
               std::cout.flush();                                                 //while語句滿足就執行打印.
    }                                                                             //然后再次循環再wait等待1s,循環反復。
                                                                                  //但是當收到cv.notify_one();的時候,不滿足 std::cv_status::timeout,就會退出循環。
    std::cout << "You entered: " << value << '\n';                                //這個時候不斷嘗試加鎖互斥量,加鎖成功往下執行。加鎖不成功不斷嘗試加鎖。
    th.join();
    return 0;
}

這里的現像就是終端不斷的在打印.

Please, enter an integer (I'll be printing dots): 
.................................

當我敲一個字符的時候,就會停止打印

Please, enter an integer (I'll be printing dots): 
...............................................................................q.
You entered: 0
按 <RETURN> 來關閉窗口...

在多線程任務中,往往需要多個線程往同一個隊列添加或者取數據,就需要用到條件變量,當你push了一個數據到隊列,你就可以通知另外取數據的線程可以去取數據了。
當你取走一個數據的時候,你就可以通知壓(push)數據的線程可以往線程壓數據了,隊列剛剛空出一個位置了就可以壓數據了。
代碼如下:

#include <mutex>      
#include <deque>
#include <condition_variable>


template <class T>
class my_buffer
{
public:
	my_buffer(const char* name, int max_size = 30)
		:_terminal(false)
		, _max_size(max_size)
		, _name(name)
		, _last_warning_time(0)
	{

	}

	~my_buffer(void)
	{
	}

	bool push(const T& value)
	{
		std::unique_lock <std::mutex> lck(_mutex);
		while (_job_list.size() >= _max_size && !_terminal)
		{
			_push_cond.wait(lck);
		}

		if (_terminal)
		{
			return false;
		}

		_job_list.push_back(value);
		_pop_cond.notify_one();//push了一個數據就可以通知取數據pop的線程有數據可以取了。
		return true;
	}

	bool async_push(const T& value, float& capacity_rate)
	{
		std::unique_lock <std::mutex> lck(_mutex);
		/*while (_job_list.size() >= _max_size && !_terminal)
		{
			_push_cond.wait(lck);
		}*/

		if (_job_list.size() >= _max_size)
		{
			capacity_rate = _job_list.size() / ((float)_max_size);
			return false;
		}

		/*if (_terminal)
		{
			return false;
		}*/

		_job_list.push_back(value);
		_pop_cond.notify_one();//push了一個數據就可以通知取數據pop的線程有數據可以取了。

		capacity_rate = _job_list.size() / ((float)_max_size);
		return true;
	}

	bool push_front(const T& value)
	{
		std::unique_lock <std::mutex> lck(_mutex);
		while (_job_list.size() >= _max_size && !_terminal)
		{
			_push_cond.wait(lck);
		}

		if (_terminal)
		{
			return false;
		}

		_job_list.push_front(value);
		_pop_cond.notify_one();//push了一個數據就可以通知取數據pop的線程有數據可以取了。
		return true;
	}

	bool pop(T& value)
	{
		std::unique_lock <std::mutex> lck(_mutex);
		while (_job_list.empty() && !_terminal)
		{
			_pop_cond.wait(lck);
		}
			
		if (_terminal)
		{
			return false;
		}

		value = *_job_list.begin();
		_job_list.pop_front();
		_push_cond.notify_one();//取走一個數據就可以通知壓數據的條件變量了

		return true;
	}

	bool pop_wait(T& value, int second)
	{
		std::unique_lock <std::mutex> lck(_mutex);
		if (_job_list.empty())
		{
            if (std::cv_status::timeout == _pop_cond.wait_for(lck, std::chrono::milliseconds(second)))
			{
				return false;
			}		
		}

		value = *_job_list.begin();
		_job_list.pop_front();
		_push_cond.notify_one();//取走一個數據就可以通知壓數據的條件變量了

		return true;
	}
	
	void clear(void)
	{
		_terminal = true;
		_push_cond.notify_all();
		_pop_cond.notify_all();
	}

	void flush(void)
	{
		std::unique_lock <std::mutex> lck(_mutex);
		_job_list.clear();
	}

	void reset(void)
	{
		_terminal = false;
	}

	size_t size(void)
	{
		std::unique_lock <std::mutex> lck(_mutex);
		return _job_list.size();
	}

	float capacity_rate(void)
	{
		std::unique_lock <std::mutex> lck(_mutex);
		return _job_list .size()/ ((float)_max_size);
	}
private:
	std::deque<T> _job_list; //隊列				
	std::mutex _mutex;  //互斥量						
	std::condition_variable _pop_cond;		
	std::condition_variable _push_cond;	
	volatile bool _terminal;				
	unsigned long _max_size;//容器最大空間
	std::string _name;
	time_t _last_warning_time;
}; 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM