介紹condition_variable, wait,wait_for
直接上代碼如下:
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx; // 全局互斥鎖.
std::condition_variable cv; // 全局條件變量.
bool ready = false; // 全局標志位.
void do_print_id(int id)
{
std::unique_lock <std::mutex> lck(mtx); // 加鎖互斥量
while (!ready)
{
cv.wait(lck); // 當ready==false的時候,while語句執行到wait這里,然后就堵塞到這行,等到通知信號,同時解鎖互斥量,不影響其他線程獲取鎖。
} //當 cv.notify_all(); // 喚醒所有線程. 執行到這句wait就收到了信號就被喚醒開始干活,首先就是不斷的嘗試重新獲取並加鎖互斥量。
//若獲取不到鎖就卡在這里反復嘗試加鎖
//若獲取到了鎖才往下執行
std::cout << "thread " << id << '\n';
}
void go()
{
std::unique_lock <std::mutex> lck(mtx);
ready = true; // 設置全局標志位為 true.
cv.notify_all(); // 喚醒所有線程.
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(do_print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto & th:threads)
th.join();
return 0;
}
wait_for
與std::condition_variable::wait() 類似,不過 wait_for可以指定一個時間段,在當前線程收到通知或者指定的時間 rel_time 超時之前,該線程都會處於阻塞狀態。
而一旦超時或者收到了其他線程的通知,wait_for返回,剩下的處理步驟和 wait()類似。
#include <iostream> // std::cout
#include <thread> // std::thread
#include <chrono> // std::chrono::seconds
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable, std::cv_status
std::condition_variable cv;
int value;
void do_read_value()
{
std::cin >> value;
cv.notify_one(); //只有鍵盤敲入一個字符,才往下執行cv.notify_one();
}
int main ()
{
std::cout << "Please, enter an integer (I'll be printing dots): \n";
std::thread th(do_read_value);
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx); //加鎖互斥量
while (cv.wait_for(lck,std::chrono::seconds(1)) == std::cv_status::timeout) { //這里wait_for堵塞到這一行,解鎖互斥量。
std::cout << '.'; //當超時1s的時候,相當於收到了通知信號,就被喚醒干活了。 加鎖互斥量
std::cout.flush(); //while語句滿足就執行打印.
} //然后再次循環再wait等待1s,循環反復。
//但是當收到cv.notify_one();的時候,不滿足 std::cv_status::timeout,就會退出循環。
std::cout << "You entered: " << value << '\n'; //這個時候不斷嘗試加鎖互斥量,加鎖成功往下執行。加鎖不成功不斷嘗試加鎖。
th.join();
return 0;
}
這里的現像就是終端不斷的在打印.
Please, enter an integer (I'll be printing dots):
.................................
當我敲一個字符的時候,就會停止打印
Please, enter an integer (I'll be printing dots):
...............................................................................q.
You entered: 0
按 <RETURN> 來關閉窗口...
在多線程任務中,往往需要多個線程往同一個隊列添加或者取數據,就需要用到條件變量,當你push了一個數據到隊列,你就可以通知另外取數據的線程可以去取數據了。
當你取走一個數據的時候,你就可以通知壓(push)數據的線程可以往線程壓數據了,隊列剛剛空出一個位置了就可以壓數據了。
代碼如下:
#include <mutex>
#include <deque>
#include <condition_variable>
template <class T>
class my_buffer
{
public:
my_buffer(const char* name, int max_size = 30)
:_terminal(false)
, _max_size(max_size)
, _name(name)
, _last_warning_time(0)
{
}
~my_buffer(void)
{
}
bool push(const T& value)
{
std::unique_lock <std::mutex> lck(_mutex);
while (_job_list.size() >= _max_size && !_terminal)
{
_push_cond.wait(lck);
}
if (_terminal)
{
return false;
}
_job_list.push_back(value);
_pop_cond.notify_one();//push了一個數據就可以通知取數據pop的線程有數據可以取了。
return true;
}
bool async_push(const T& value, float& capacity_rate)
{
std::unique_lock <std::mutex> lck(_mutex);
/*while (_job_list.size() >= _max_size && !_terminal)
{
_push_cond.wait(lck);
}*/
if (_job_list.size() >= _max_size)
{
capacity_rate = _job_list.size() / ((float)_max_size);
return false;
}
/*if (_terminal)
{
return false;
}*/
_job_list.push_back(value);
_pop_cond.notify_one();//push了一個數據就可以通知取數據pop的線程有數據可以取了。
capacity_rate = _job_list.size() / ((float)_max_size);
return true;
}
bool push_front(const T& value)
{
std::unique_lock <std::mutex> lck(_mutex);
while (_job_list.size() >= _max_size && !_terminal)
{
_push_cond.wait(lck);
}
if (_terminal)
{
return false;
}
_job_list.push_front(value);
_pop_cond.notify_one();//push了一個數據就可以通知取數據pop的線程有數據可以取了。
return true;
}
bool pop(T& value)
{
std::unique_lock <std::mutex> lck(_mutex);
while (_job_list.empty() && !_terminal)
{
_pop_cond.wait(lck);
}
if (_terminal)
{
return false;
}
value = *_job_list.begin();
_job_list.pop_front();
_push_cond.notify_one();//取走一個數據就可以通知壓數據的條件變量了
return true;
}
bool pop_wait(T& value, int second)
{
std::unique_lock <std::mutex> lck(_mutex);
if (_job_list.empty())
{
if (std::cv_status::timeout == _pop_cond.wait_for(lck, std::chrono::milliseconds(second)))
{
return false;
}
}
value = *_job_list.begin();
_job_list.pop_front();
_push_cond.notify_one();//取走一個數據就可以通知壓數據的條件變量了
return true;
}
void clear(void)
{
_terminal = true;
_push_cond.notify_all();
_pop_cond.notify_all();
}
void flush(void)
{
std::unique_lock <std::mutex> lck(_mutex);
_job_list.clear();
}
void reset(void)
{
_terminal = false;
}
size_t size(void)
{
std::unique_lock <std::mutex> lck(_mutex);
return _job_list.size();
}
float capacity_rate(void)
{
std::unique_lock <std::mutex> lck(_mutex);
return _job_list .size()/ ((float)_max_size);
}
private:
std::deque<T> _job_list; //隊列
std::mutex _mutex; //互斥量
std::condition_variable _pop_cond;
std::condition_variable _push_cond;
volatile bool _terminal;
unsigned long _max_size;//容器最大空間
std::string _name;
time_t _last_warning_time;
};