C++實現管程與同步隊列


toc

管程

管程可以視為一個線程安全的數據結構,其內部提供了互斥與同步操作,向外提供訪問共享數據的專用接口(接口被稱為管程的過程),通過管程提供的接口即可達成共享數據的保護與線程間同步。

使用管程,可以簡化線程間互斥、同步的編碼復雜度(否則需自己控制互斥、同步機制,並保證正確),可以集中分散的互斥、同步操作代碼,更容易驗證、查錯,也更可讀(反之,信號量的PV操作可能分散到各個地方,驗證、閱讀相對麻煩)

管程的特點

  • 共享數據僅能被管程的過程訪問
  • 線程通過調用管程的過程進入管程
  • 任何時候僅能有一個線程在管程中執行,其他阻塞直到可用
  • 管程內共享數據不可用時
    • 需要共享數據的線程將阻塞並釋放管程
    • 其他線程可進入管程構造數據可用條件,並通知阻塞線程
  • 管程內共享數據可用時, 被阻塞線程將在合適時間重新進入管程

管程分類

管程中僅能有一個線程在其中執行,根據發起通知時,被喚醒線程(T1)執行,還是喚醒線程(T2)繼續執行,可將管程分為三種:

  • Mesa管程(Lampson/Redell):T2繼續執行直到退出,T1進入入口隊列,后面與其他線程公平競爭(由於不是立刻執行,所以當T1執行時,條件可能已經不滿足,因此需循環檢測條件是否仍舊達成)
  • Hoare管程:T2立即阻塞並釋放管程,T1馬上執行,退出后T2恢復執行(多兩次線程切換)
  • Brinch Hanson管程:T2的通知操作僅允許在退出時發送通知,通知發送后T2結束,T1開始執行

管程的實現

根據前述管程特點,管程應該是一個對象,內部封裝了資源,該對象實現了互斥及與阻塞、喚醒機制
使用鎖,搭配條件變量來實現互斥及與阻塞、喚醒機制
下面簡單介紹條件變量,后面的管程代碼有一個小優化,涉及對條件變量實現的理解

條件變量

條件變量是同步原語,其內部有一個隊列,用於存放被wait阻塞的線程,當另一個線程發起通知時,如果隊列不為空,隊頭線程將被喚醒,否則,什么也不做。條件變量也可以一次性喚醒隊列中的全部阻塞線程

條件變量的一種實現


條件變量里有一個存儲阻塞線程的隊列。由於阻塞線程和喚醒通知線程都需要訪問這同一隊列,所以還有一個用於保護隊列的,鎖粒度不大,並且不需要線程切換,應為自旋鎖

wait函數會將當前線程入隊,並原子的進行鎖釋放與當前線程阻塞,阻塞直到另一線程通知才解除,解除后重新獲取鎖。鎖釋放與當前線程阻塞必須是原子的,否則,別的線程發出的喚醒通知,可能發生在當前線程阻塞之前,這會造成喚醒丟失

signal函數出隊一個阻塞線程並喚醒他,當隊列為空時,不做任何事情

broadcast函數將逐個喚醒隊列中的所有阻塞線程

管程實現代碼

下面實現了一個管程,同時符合mesa與hanson管程的定義,並且做了一些優化,取消掉了不必要的通知操作

#ifndef __MONITOR_H_
#define __MONITOR_H_

#include <list>
#include <mutex>
#include <utility>
#include <condition_variable>

template<typename T>
class Monitor{
public:
    Monitor(): m_iMaxCount(100), m_bStop(false){
    }
    Monitor(int iMaxCount) : m_iMaxCount(iMaxCount), m_bStop(false){
    }
    ~Monitor() = default;

    void Enqueue(const T& data){
        Append(data);
    }

    void Enqueue(T&& data){
        Append(std::forward<T>(data));    //轉發data的原屬性,此處轉發data的右值引用
    }

    void Dequeue(T& data){
        std::unique_lock<std::mutex> lk(m_mMutex);
        m_cvNotEmpty.wait(lk, [this](){return m_bStop || !IsEmpty(); });
        if(m_bStop){
            return;
        }
        bool bNeedNotify = IsFull();
        data = m_listData.front();
        m_listData.pop_front();
        lk.unlock();
        if(bNeedNotify){
            m_cvNotFull.notify_one();
        }
    }

    void Stop(){
        {
            std::lock_guard<std::mutex> lk(m_mMutex);
            m_bStop = true;
        }
        m_cvNotEmpty.notify_all();
        m_cvNotFull.notify_all();
    }

private:
    template<typename U>
    void Append(U&& data){                                //實現通用引用
        std::unique_lock<std::mutex> lk(m_mMutex);
        m_cvNotFull.wait(lk, [this](){return m_bStop || !IsFull(); });
        if(m_bStop){
            return;
        }
        bool bNeedNotify = IsEmpty();
        m_listData.emplace_back(std::forward<U>(data));    //再次轉發
        lk.unlock();
        if(bNeedNotify){
            m_cvNotEmpty.notify_one();
        }
    }

    bool IsFull(){
        return static_cast<int>(m_listData.size()) == m_iMaxCount;
    }

    bool IsEmpty(){
        return 0 == static_cast<int>(m_listData.size());
    }

    Monitor(const Monitor& rhs) = delete;
    Monitor(Monitor&& rhs) = delete;
    Monitor& operator=(const Monitor& rhs) = delete;
    Monitor& operator=(Monitor&& rhs) = delete;

private:
    int m_iMaxCount;
    bool m_bStop;
    std::mutex m_mMutex;
    std::list<T> m_listData;
    std::condition_variable m_cvNotEmpty;
    std::condition_variable m_cvNotFull;
};

#endif //!__MONITOR_H_
  • 代碼中條件變量使用了帶可調用對象作為參數的wait,它的實現是這樣的
template<typename _Predicate>
  void
  wait(unique_lock<mutex>& __lock, _Predicate __p)
  {
    while (!__p())
        wait(__lock);
  }

可調用對象返回true則什么都不做,否則,重復wait到滿足條件,這可以避免虛假喚醒,同時也是Mesa管程的行為

  • 進行隊列不滿、不空通知前,檢查隊列原狀態 ,通過狀態判斷是否進行notify
    • 在由滿到不滿,空到不空這兩種狀態變化下才進行通知
    • 根據上面的條件變量的signal函數實現可知,通知時會使用到,雖然鎖粒度很小,並且是自旋鎖,但是也存在較大的性能消耗,通過一個簡單的判斷,避免了通知,也就避免了鎖
  • 模板方法Append實現了通用引用,使得Append同時支持引用左值與右值,配合std::forward<>可將參數原屬性(左值引用或右值引用)轉發給另一個函數當參數
    • 實現通用引用必須含有&&,並且需要發生類型推導,所以Append必須是不同的模板參數U,而不能和類模板Monitor一樣為T(T的類型在模板實例化時固定,將不能發生類型推導)
    • 一切變量都是左值(變量本身分配有內存空間,可以對其取地址),所以在調用實現了通用引用函數時,應該std::forward<>一下,否則通用引用綁定的是變量本身(左值),即得到左值引用
    • std::forward<>靠引用折疊實現對原屬性的轉發:含左值引用折疊后為左值引用,否則為右值引用
template<typename _Tp>
    constexpr _Tp&&
    forward(typename std::remove_reference<_Tp>::type& __t) noexcept
    { return static_cast<_Tp&&>(__t); }

template<typename _Tp>
    constexpr _Tp&&
    forward(typename std::remove_reference<_Tp>::type&& __t) noexcept
    {
    static_assert(!std::is_lvalue_reference<_Tp>::value, "template argument"
    " substituting _Tp is an lvalue reference type");
    return static_cast<_Tp&&>(__t);
    }
  • 參數_Tp若為type &(左值引用)
則static_cast<_Tp&&>為static_cast<type& &&>,折疊后為static_cast<type&>,
所以轉發返回值仍為為左值引用
  • 參數_Tp若為type &&(右值引用)
則static_cast<_Tp&&>為static_cast<type&& &&>,折疊后為static_cast<type&&>,
所以轉發返回仍為右值引用
  • Enqueue最好不要通用引用方式實現,除非調用者明確知道使用std::forward<>(不用的代價為丟失右值語義)

同步隊列

管程其實也是一種同步隊列,現在

  • 在管程的基礎上添加Try類操作,Try類操作操作時,若條件無法滿足,則立刻返回false,不阻塞
  • 添加獲取元素個數的方法

得到同步隊列如下

代碼

#ifndef __SyncQueue_H_
#define __SyncQueue_H_

#include <list>
#include <mutex>
#include <utility>
#include <condition_variable>

template<typename T>
class SyncQueue{
public:
    SyncQueue() : m_iMaxCount(100), m_bStop(false){
    }
    SyncQueue(int iMaxCount) : m_iMaxCount(iMaxCount), m_bStop(false){
    }
    ~SyncQueue() = default;

    void Enqueue(const T& data){
        Append(data);
    }

    void Enqueue(T&& data){
        Append(std::forward<T>(data));    //轉發data的原屬性,此處轉發data的右值引用
    }

    bool TryEnqueue(const T& data){
        return TryAppend(data);
    }

    bool TryEnqueue(T&& data){
        return TryAppend(std::forward<T>(data));
    }

    void Dequeue(T& data){
        std::unique_lock<std::mutex> lk(m_mMutex);
        m_cvNotEmpty.wait(lk, [this](){return m_bStop || !IsEmpty(); });
        if(m_bStop){
            return;
        }
        bool bNeedNotify = IsFull();
        data = m_listData.front();
        m_listData.pop_front();
        lk.unlock();
        if(bNeedNotify){
            m_cvNotFull.notify_one();
        }
    }

    bool TryDequeue(T& data){
        std::unique_lock<std::mutex> lk(m_mMutex);
        if(m_bStop || IsEmpty()){
            return false;
        }
        bool bNeedNotify = IsFull();
        data = m_listData.front();
        m_listData.pop_front();
        lk.unlock();
        if(bNeedNotify){
            m_cvNotFull.notify_one();
        }
        return true;
    }

    void Stop(){
        {
            std::lock_guard<std::mutex> lk(m_mMutex);
            m_bStop = true;
        }
        m_cvNotEmpty.notify_all();
        m_cvNotFull.notify_all();
    }

    int Size(){
        std::lock_guard<std::mutex> lk(m_mMutex);
        return static_cast<int>(m_listData.size());
    }

private:
    template<typename U>
    bool TryAppend(U&& data){                                //實現通用引用
        std::unique_lock<std::mutex> lk(m_mMutex);
        if(m_bStop || IsFull()){
            return false;
        }
        bool bNeedNotify = IsEmpty();
        m_listData.emplace_back(std::forward<U>(data));    //再次轉發
        lk.unlock();
        if(bNeedNotify){
            m_cvNotEmpty.notify_one();
        }
        return true;
    }

    template<typename U>
    void Append(U&& data){                                //實現通用引用
        std::unique_lock<std::mutex> lk(m_mMutex);
        m_cvNotFull.wait(lk, [this](){return m_bStop || !IsFull(); });
        if(m_bStop){
            return;
        }
        bool bNeedNotify = IsEmpty();
        m_listData.emplace_back(std::forward<U>(data));    //再次轉發
        lk.unlock();
        if(bNeedNotify){
            m_cvNotEmpty.notify_one();
        }
    }

    bool IsFull(){
        return static_cast<int>(m_listData.size()) == m_iMaxCount;
    }

    bool IsEmpty(){
        return 0 == static_cast<int>(m_listData.size());
    }

    SyncQueue(const SyncQueue& rhs) = delete;
    SyncQueue(SyncQueue&& rhs) = delete;
    SyncQueue& operator=(const SyncQueue& rhs) = delete;
    SyncQueue& operator=(SyncQueue&& rhs) = delete;

private:
    int m_iMaxCount;
    bool m_bStop;
    std::mutex m_mMutex;
    std::list<T> m_listData;
    std::condition_variable m_cvNotEmpty;
    std::condition_variable m_cvNotFull;
};

#endif //!__SyncQueue_H_

參考資料

Monitors and Condition Variables
操作系統精髓與設計原理 原書第6版第五章第四節
Universal References in C++11 -- Scott Meyers






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM