管程
管程可以視為一個線程安全的數據結構,其內部提供了互斥與同步操作,向外提供訪問共享數據的專用接口(接口被稱為管程的過程),通過管程提供的接口即可達成共享數據的保護與線程間同步。
使用管程,可以簡化線程間互斥、同步的編碼復雜度(否則需自己控制互斥、同步機制,並保證正確),可以集中分散的互斥、同步操作代碼,更容易驗證、查錯,也更可讀(反之,信號量的PV操作可能分散到各個地方,驗證、閱讀相對麻煩)
管程的特點
- 共享數據僅能被管程的過程訪問
- 線程通過調用管程的過程進入管程
- 任何時候僅能有一個線程在管程中執行,其他阻塞直到可用
- 管程內共享數據不可用時
- 需要共享數據的線程將阻塞並釋放管程
- 其他線程可進入管程構造數據可用條件,並通知阻塞線程
- 管程內共享數據可用時, 被阻塞線程將在合適時間重新進入管程
管程分類
管程中僅能有一個線程在其中執行,根據發起通知時,被喚醒線程(T1)執行,還是喚醒線程(T2)繼續執行,可將管程分為三種:
- Mesa管程(Lampson/Redell):T2繼續執行直到退出,T1進入入口隊列,后面與其他線程公平競爭(由於不是立刻執行,所以當T1執行時,條件可能已經不滿足,因此需循環檢測條件是否仍舊達成)
- Hoare管程:T2立即阻塞並釋放管程,T1馬上執行,退出后T2恢復執行(多兩次線程切換)
- Brinch Hanson管程:T2的通知操作僅允許在退出時發送通知,通知發送后T2結束,T1開始執行
管程的實現
根據前述管程特點,管程應該是一個對象,內部封裝了資源,該對象實現了互斥及與阻塞、喚醒機制
使用鎖,搭配條件變量來實現互斥及與阻塞、喚醒機制
下面簡單介紹條件變量,后面的管程代碼有一個小優化,涉及對條件變量實現的理解
條件變量
條件變量是同步原語,其內部有一個隊列,用於存放被wait阻塞的線程,當另一個線程發起通知時,如果隊列不為空,隊頭線程將被喚醒,否則,什么也不做。條件變量也可以一次性喚醒隊列中的全部阻塞線程。
條件變量的一種實現
條件變量里有一個存儲阻塞線程的隊列。由於阻塞線程和喚醒通知線程都需要訪問這同一隊列,所以還有一個用於保護隊列的鎖,鎖粒度不大,並且不需要線程切換,應為自旋鎖
wait函數會將當前線程入隊,並原子的進行鎖釋放與當前線程阻塞,阻塞直到另一線程通知才解除,解除后重新獲取鎖。鎖釋放與當前線程阻塞必須是原子的,否則,別的線程發出的喚醒通知,可能發生在當前線程阻塞之前,這會造成喚醒丟失
signal函數出隊一個阻塞線程並喚醒他,當隊列為空時,不做任何事情
broadcast函數將逐個喚醒隊列中的所有阻塞線程
管程實現代碼
下面實現了一個管程,同時符合mesa與hanson管程的定義,並且做了一些優化,取消掉了不必要的通知操作
#ifndef __MONITOR_H_
#define __MONITOR_H_
#include <list>
#include <mutex>
#include <utility>
#include <condition_variable>
template<typename T>
class Monitor{
public:
Monitor(): m_iMaxCount(100), m_bStop(false){
}
Monitor(int iMaxCount) : m_iMaxCount(iMaxCount), m_bStop(false){
}
~Monitor() = default;
void Enqueue(const T& data){
Append(data);
}
void Enqueue(T&& data){
Append(std::forward<T>(data)); //轉發data的原屬性,此處轉發data的右值引用
}
void Dequeue(T& data){
std::unique_lock<std::mutex> lk(m_mMutex);
m_cvNotEmpty.wait(lk, [this](){return m_bStop || !IsEmpty(); });
if(m_bStop){
return;
}
bool bNeedNotify = IsFull();
data = m_listData.front();
m_listData.pop_front();
lk.unlock();
if(bNeedNotify){
m_cvNotFull.notify_one();
}
}
void Stop(){
{
std::lock_guard<std::mutex> lk(m_mMutex);
m_bStop = true;
}
m_cvNotEmpty.notify_all();
m_cvNotFull.notify_all();
}
private:
template<typename U>
void Append(U&& data){ //實現通用引用
std::unique_lock<std::mutex> lk(m_mMutex);
m_cvNotFull.wait(lk, [this](){return m_bStop || !IsFull(); });
if(m_bStop){
return;
}
bool bNeedNotify = IsEmpty();
m_listData.emplace_back(std::forward<U>(data)); //再次轉發
lk.unlock();
if(bNeedNotify){
m_cvNotEmpty.notify_one();
}
}
bool IsFull(){
return static_cast<int>(m_listData.size()) == m_iMaxCount;
}
bool IsEmpty(){
return 0 == static_cast<int>(m_listData.size());
}
Monitor(const Monitor& rhs) = delete;
Monitor(Monitor&& rhs) = delete;
Monitor& operator=(const Monitor& rhs) = delete;
Monitor& operator=(Monitor&& rhs) = delete;
private:
int m_iMaxCount;
bool m_bStop;
std::mutex m_mMutex;
std::list<T> m_listData;
std::condition_variable m_cvNotEmpty;
std::condition_variable m_cvNotFull;
};
#endif //!__MONITOR_H_
- 代碼中條件變量使用了帶可調用對象作為參數的wait,它的實現是這樣的
template<typename _Predicate>
void
wait(unique_lock<mutex>& __lock, _Predicate __p)
{
while (!__p())
wait(__lock);
}
可調用對象返回true則什么都不做,否則,重復wait到滿足條件,這可以避免虛假喚醒,同時也是Mesa管程的行為
- 進行隊列不滿、不空通知前,檢查隊列原狀態 ,通過狀態判斷是否進行notify
- 在由滿到不滿,空到不空這兩種狀態變化下,才進行通知
- 根據上面的條件變量的signal函數實現可知,通知時會使用到鎖,雖然鎖粒度很小,並且是自旋鎖,但是也存在較大的性能消耗,通過一個簡單的判斷,避免了通知,也就避免了鎖
- 模板方法Append實現了通用引用,使得Append同時支持引用左值與右值,配合std::forward<>可將參數原屬性(左值引用或右值引用)轉發給另一個函數當參數
- 實現通用引用必須含有&&,並且需要發生類型推導,所以Append必須是不同的模板參數U,而不能和類模板Monitor一樣為T(T的類型在模板實例化時固定,將不能發生類型推導)
- 一切變量都是左值(變量本身分配有內存空間,可以對其取地址),所以在調用實現了通用引用函數時,應該std::forward<>一下,否則通用引用綁定的是變量本身(左值),即得到左值引用
- std::forward<>靠引用折疊實現對原屬性的轉發:含左值引用折疊后為左值引用,否則為右值引用
template<typename _Tp>
constexpr _Tp&&
forward(typename std::remove_reference<_Tp>::type& __t) noexcept
{ return static_cast<_Tp&&>(__t); }
template<typename _Tp>
constexpr _Tp&&
forward(typename std::remove_reference<_Tp>::type&& __t) noexcept
{
static_assert(!std::is_lvalue_reference<_Tp>::value, "template argument"
" substituting _Tp is an lvalue reference type");
return static_cast<_Tp&&>(__t);
}
- 參數_Tp若為type &(左值引用)
則static_cast<_Tp&&>為static_cast<type& &&>,折疊后為static_cast<type&>,
所以轉發返回值仍為為左值引用
- 參數_Tp若為type &&(右值引用)
則static_cast<_Tp&&>為static_cast<type&& &&>,折疊后為static_cast<type&&>,
所以轉發返回仍為右值引用
- Enqueue最好不要以通用引用方式實現,除非調用者明確知道使用std::forward<>(不用的代價為丟失右值語義)
同步隊列
管程其實也是一種同步隊列,現在
- 在管程的基礎上添加Try類操作,Try類操作操作時,若條件無法滿足,則立刻返回false,不阻塞
- 添加獲取元素個數的方法
得到同步隊列如下
代碼
#ifndef __SyncQueue_H_
#define __SyncQueue_H_
#include <list>
#include <mutex>
#include <utility>
#include <condition_variable>
template<typename T>
class SyncQueue{
public:
SyncQueue() : m_iMaxCount(100), m_bStop(false){
}
SyncQueue(int iMaxCount) : m_iMaxCount(iMaxCount), m_bStop(false){
}
~SyncQueue() = default;
void Enqueue(const T& data){
Append(data);
}
void Enqueue(T&& data){
Append(std::forward<T>(data)); //轉發data的原屬性,此處轉發data的右值引用
}
bool TryEnqueue(const T& data){
return TryAppend(data);
}
bool TryEnqueue(T&& data){
return TryAppend(std::forward<T>(data));
}
void Dequeue(T& data){
std::unique_lock<std::mutex> lk(m_mMutex);
m_cvNotEmpty.wait(lk, [this](){return m_bStop || !IsEmpty(); });
if(m_bStop){
return;
}
bool bNeedNotify = IsFull();
data = m_listData.front();
m_listData.pop_front();
lk.unlock();
if(bNeedNotify){
m_cvNotFull.notify_one();
}
}
bool TryDequeue(T& data){
std::unique_lock<std::mutex> lk(m_mMutex);
if(m_bStop || IsEmpty()){
return false;
}
bool bNeedNotify = IsFull();
data = m_listData.front();
m_listData.pop_front();
lk.unlock();
if(bNeedNotify){
m_cvNotFull.notify_one();
}
return true;
}
void Stop(){
{
std::lock_guard<std::mutex> lk(m_mMutex);
m_bStop = true;
}
m_cvNotEmpty.notify_all();
m_cvNotFull.notify_all();
}
int Size(){
std::lock_guard<std::mutex> lk(m_mMutex);
return static_cast<int>(m_listData.size());
}
private:
template<typename U>
bool TryAppend(U&& data){ //實現通用引用
std::unique_lock<std::mutex> lk(m_mMutex);
if(m_bStop || IsFull()){
return false;
}
bool bNeedNotify = IsEmpty();
m_listData.emplace_back(std::forward<U>(data)); //再次轉發
lk.unlock();
if(bNeedNotify){
m_cvNotEmpty.notify_one();
}
return true;
}
template<typename U>
void Append(U&& data){ //實現通用引用
std::unique_lock<std::mutex> lk(m_mMutex);
m_cvNotFull.wait(lk, [this](){return m_bStop || !IsFull(); });
if(m_bStop){
return;
}
bool bNeedNotify = IsEmpty();
m_listData.emplace_back(std::forward<U>(data)); //再次轉發
lk.unlock();
if(bNeedNotify){
m_cvNotEmpty.notify_one();
}
}
bool IsFull(){
return static_cast<int>(m_listData.size()) == m_iMaxCount;
}
bool IsEmpty(){
return 0 == static_cast<int>(m_listData.size());
}
SyncQueue(const SyncQueue& rhs) = delete;
SyncQueue(SyncQueue&& rhs) = delete;
SyncQueue& operator=(const SyncQueue& rhs) = delete;
SyncQueue& operator=(SyncQueue&& rhs) = delete;
private:
int m_iMaxCount;
bool m_bStop;
std::mutex m_mMutex;
std::list<T> m_listData;
std::condition_variable m_cvNotEmpty;
std::condition_variable m_cvNotFull;
};
#endif //!__SyncQueue_H_
參考資料
Monitors and Condition Variables
操作系統精髓與設計原理 原書第6版第五章第四節
Universal References in C++11 -- Scott Meyers
