C++多線程隊列實現


C++多線程隊列實現

介紹

在項目中,進行多線程隊列實現是一個比較麻煩的事, 找到了一個實現比較好的多線程隊列實現, 自己做了一點修改更加適應自己的項目, 記錄下來, 有需要的自己進行修改使用.

代碼寫的並不是很好, 封裝起來的實現也是並不是很好用, 個人水平的一個記錄, 希望理解

多線程隊列實現

  1. 初始化一定長度的空間存儲數據
  2. 每次壓入或者彈出操作的時候需要獲取鎖, 保證同時只有一個操作可以被執行,
  3. 壓入或者彈出數據的時候, 如果隊列已經滿了或者空的, 另外一個線程可能需要等待, 或者返回 false 具體看程序注釋,考慮自己情況進行程序修改
  4. 最終清理對象數據, 清空隊列,退出線程
具體實現代碼
#ifndef CQUEUE_H__
#define CQUEUE_H__
#pragma once
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>

/** * @class Queue CQueue.h Code\inc\CQueue.h * * @brief 線程安全隊列實現 * * 因為有std::mutex和std::condition_variable類成員,所以此類不支持復制構造函數也不支持賦值操作符(=) * * @author IRIS_Chen * @date 2019/10/10 * * @tparam T Generic type parameter. */
template <class T> /** * @class CQueue CQueue.h Code\inc\CQueue.h * * @brief Queue of cs. * * @author IRIS_Chen * @date 2019/10/17 */ class CQueue {
    protected:
    // Data
    std::queue<T> _queue;   ///< 存儲數據的真實隊列, 不是線程安全的
    private:
    typename std::queue<T>::size_type _size_max;    ///< 隊列的最大長度
    // Thread gubbins
    std::mutex _mutex;  ///< 線程操作 鎖
    std::condition_variable _fullQue;   ///< 隊列滿了的信號 
    std::condition_variable _empty; ///< 隊列為空的信號

    // Exit
    // 原子操作
    std::atomic_bool _quit; ///< { false }; // 退出信號
    std::atomic_bool _finished; ///< { false }; // 完成信號 // 表示不再繼續輸入數據

    public:

    /** * @fn CQueue::CQueue(const size_t size_max) * * @brief 初始化隊列長度,並將退出標志和 滿信號標志置空 * * @author IRIS_Chen * @date 2019/10/17 * * @param size_max 隊列的最長尺寸 */

    CQueue(const size_t size_max) :_size_max(size_max) {
        _quit = ATOMIC_VAR_INIT(false);
        _finished = ATOMIC_VAR_INIT(false);
    }

    /** * @fn CQueue::CQueue(CONST CQueue&) = delete; * * @brief 不允許拷貝構造函數 * * @author IRIS_Chen * @date 2019/10/17 * * @param parameter1 The first parameter */

    CQueue(CONST CQueue&) = delete; ///< 不允許拷貝構造函數
    /** * @fn CQueue::~CQueue() * * @brief Finalizes an instance of the CQueue class 銷毀隊列, 退出線程 清除數據 // 存在問題 * * @author IRIS_Chen * @date 2019/11/8 */

    ~CQueue()
    {
        Quit();
        while (_queue.size())
            ;
    }
    /** * @fn bool CQueue::Push(T& data) * * @brief 隊列中加入新的 對象 根據情況決定 滿信號之后 新數據丟棄或者等待 * * @author IRIS_Chen * @date 2019/10/10 * * @param [in,out] data The data to Push. * * @return True if it succeeds, false if it fails. */

    bool Push(T& data) {
        std::unique_lock<std::mutex> lock(_mutex);
        while (!_quit && !_finished)
        {
            if (_queue.size() < _size_max)
            {
                _queue.push(std::move(data));
                //_queue.Push(data);
                _empty.notify_all();
                return true;
            }
            else
            {
                // wait的時候自動釋放鎖,如果wait到了會獲取鎖
                // _fullQue.wait(lock);
                return false;   ///< 如果滿了 這里不進行等待 避免出現問題
            }
        }

        return false;
    }

    /** * @fn bool CQueue::Pop(T &data) * * @brief 返回隊列最前面的元素 並且彈出 // 如果空 如果finish 則直接返回fasle 否則 等待隊列加入元素 * * @author IRIS_Chen * @date 2019/10/14 * * @param [in,out] data The data to Pop. * * @return True if it succeeds, false if it fails. */

    bool Pop(T &data) {
        std::unique_lock<std::mutex> lock(_mutex);
        while (!_quit)
        {
            if (!_queue.empty())                // 隊列非空
            {
                //data = std::move(_queue.front());
                data = _queue.front();
                _queue.pop();

                _fullQue.notify_all();       // 通知所有 由於滿隊無法加入的線程
                return true;
            }
            else if (_queue.empty() && _finished)   // 隊列為空 且不再加入
            {
                return false;
            }
            else
            {
                // _empty.wait(lock); // 等待隊列加入元素
                return false;   ///< 不等待元素加入數據
            }
        }
        return false;
    }

    /** * @fn std::shared_ptr<T> CQueue::Pop(void) * * @brief 彈出一個元素 直接返回 出錯無法報錯 * * @author IRIS_Chen * @date 2019/10/14 * * @return The previous top-of-stack object. */

    std::shared_ptr<T> Pop(void)
    {
        std::unique_lock<std::mutex> lock(_mutex);
        std::shared_ptr<T> res = nullptr;
        while (!_quit)
        {
            if (!_queue.empty())                // 隊列非空
            {
                //data = std::move(_queue.front());
                res = std::make_shared<T>(_queue.front());
                _queue.pop();

                _fullQue.notify_all();       // 通知所有 由於滿隊無法加入的線程

                return res;
            }
            else if (_queue.empty() && _finished)   // 隊列為空 且不再加入
            {
                return res;     // 無數據進入 智能返回一個空指針 (可能出錯)
            }
            else
            {
                _empty.wait(lock);          // 等待隊列加入元素
            }
        }
        return false;
    }

    /** * @fn void CQueue::Finished() * * @brief The queue has Finished accepting input 標識隊列完成輸入 不再繼續輸入 * * @author IRIS_Chen * @date 2019/10/14 */

    void Finished() {
        _finished = true;
        _empty.notify_all();
    }

    /** * @fn void CQueue::Quit() * * @brief Quits this CQueue 退出隊列, 無法再加入壓入或者彈出數據 * * @author IRIS_Chen * @date 2019/10/14 */

    void Quit() {
        _quit = true;
        _empty.notify_all();
        _fullQue.notify_all();
    }

    /** * @fn int CQueue::Length() * * @brief Gets the Length 返回隊列目前長度 * * @author IRIS_Chen * @date 2019/10/14 * * @return An int. */

    int Length() {
        std::unique_lock<std::mutex> lock(_mutex);
        return static_cast<int>(_queue.size());
    }

    /** * @fn int CQueue::Size() * * @brief Gets the Size 返回當前隊列長度 * * @author IRIS_Chen * @date 2019/10/14 * * @return An int. */
    int Size() {
        std::unique_lock<std::mutex> lock(_mutex);
        return static_cast<int>(_queue.size());
    }

    /** * @fn bool CQueue::empty(void) * * @brief 判斷是否為空 * * @author IRIS_Chen * @date 2019/10/17 * * @return True if it succeeds, false if it fails */

    bool Empty(void) {
        std::unique_lock<std::mutex> lock(_mutex);
        return (0 == _queue.size());
    }

    /** * @fn bool CQueue::Clear(void) * * @brief 清空隊列 * * @author IRIS_Chen * @date 2019/10/17 * * @return True if it succeeds, false if it fails */

    bool Clear(void) {
        std::unique_lock<std::mutex> lock(_mutex);
        while (!_queue.empty ())
        {
            Pop();  // 依次彈出數據
        }
        return true;
    }
};
#endif

更多

有另外的多線程實現, 來自網上找到的, 可能找不到參考鏈接了, 貼出來 供參考

線程安全隊列 1

具體實現代碼
/** * @class ThreadSafeQueue CQueue.h Code\inc\CQueue.h * * @brief 線程安全隊列實現 * * @author IRIS_Chen * @date 2019/10/10 * * @tparam T Generic type parameter. */
template<typename T>

/** * @class ThreadSafeQueue CQueue.h Code\inc\CQueue.h * * @brief Queue of thread safes. * * @author IRIS_Chen * @date 2019/10/17 */

class ThreadSafeQueue {
    private:

    /** * @property mutable std::mutex mut * * @brief Gets the mut * * @return The mut */

    mutable std::mutex mut;
    std::queue<T> data_queue;   ///< Queue of data
    std::condition_variable data_cond;  ///< The data condition
    public:

    /** * @fn ThreadSafeQueue::ThreadSafeQueue() * * @brief Initializes a new instance of the ThreadSafeQueue class * * @author IRIS_Chen * @date 2019/10/17 */

    ThreadSafeQueue() {}

    /** * @fn ThreadSafeQueue::ThreadSafeQueue(ThreadSafeQueue const& other) * * @brief 拷貝構造函數 * * @author IRIS_Chen * @date 2019/10/17 * * @param other The other */

    ThreadSafeQueue(ThreadSafeQueue const& other)
    {
        std::lock_guard<std::mutex> lk(other.mut);
        data_queue = other.data_queue;
    }

    /** * @fn void ThreadSafeQueue::Push(T& new_value) * * @brief Pushes an object onto this stack * * @author IRIS_Chen * @date 2019/10/17 * * @param [in,out] new_value The new value to Push */

    void push(T& new_value)//入隊操作 {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }

    /** * @fn void ThreadSafeQueue::wait_and_pop(T& value) * * @brief Wait and Pop * * @author IRIS_Chen * @date 2019/10/17 * * @param [in,out] value The value */

    void wait_and_pop(T& value)//直到有元素可以刪除為止 {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]
        {
            return !data_queue.empty();
        });
        value = data_queue.front();
        data_queue.pop();
    }

    /** * @fn std::shared_ptr<T> ThreadSafeQueue::wait_and_pop() * * @brief Wait and pop * * @author IRIS_Chen * @date 2019/10/17 * * @return A std::shared_ptr&lt;T&gt; */

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]
        {
            return !data_queue.empty();
        });
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }

    /** * @fn bool ThreadSafeQueue::try_pop(T& value) * * @brief Attempts to pop * * @author IRIS_Chen * @date 2019/10/17 * * @param [in,out] value The value * * @return True if it succeeds, false if it fails */

    bool try_pop(T& value)//不管有沒有隊首元素直接返回 {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return false;
        value = data_queue.front();
        data_queue.pop();
        return true;
    }

    /** * @fn std::shared_ptr<T> ThreadSafeQueue::try_pop() * * @brief Try pop * * @author IRIS_Chen * @date 2019/10/17 * * @return A std::shared_ptr&lt;T&gt; */

    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }

    /** * @fn bool ThreadSafeQueue::empty() const * * @brief Empties this object * * @author IRIS_Chen * @date 2019/10/17 * * @return True if it succeeds, false if it fails */

    bool empty() const {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

線程安全隊列 2

具體實現代碼
#include <queue>
#include <mutex>
#include <condition_variable>
#include <initializer_list>
/* * 線程安全隊列 * T為隊列元素類型 * 因為有std::mutex和std::condition_variable類成員,所以此類不支持復制構造函數也不支持賦值操作符(=) * */
template<typename T>

/** * @class threadsafe_queue CQueue.h Code\inc\CQueue.h * * @brief Queue of threadsafes. * * @author IRIS_Chen * @date 2019/10/17 */

class threadsafe_queue {
    private:

    /** * @property mutable std::mutex mut * * @brief data_queue訪問信號量 * * @return The mut */

    mutable std::mutex mut;

    /** * @property mutable std::condition_variable data_cond * * @brief Gets the data condition * * @return The data condition */

    mutable std::condition_variable data_cond;
    using queue_type = std::queue<T>;   ///< Type of the queue
    queue_type data_queue;  ///< Queue of data
    public:
    using value_type = typename queue_type::value_type; ///< Type of the value
    using container_type = typename queue_type::container_type; ///< Type of the container

    /** * @fn threadsafe_queue::threadsafe_queue() = default; * * @brief Initializes a new instance of the threadsafe_queue class * * @author IRIS_Chen * @date 2019/10/17 */

    threadsafe_queue() = default;

    /** * @fn threadsafe_queue::threadsafe_queue(const threadsafe_queue&) = delete; * * @brief Initializes a new instance of the threadsafe_queue class * * @author IRIS_Chen * @date 2019/10/17 * * @param parameter1 The first parameter */

    threadsafe_queue(const threadsafe_queue&) = delete;

    /** * @fn threadsafe_queue& threadsafe_queue::operator=(const threadsafe_queue&) = delete; * * @brief Assignment operator * * @author IRIS_Chen * @date 2019/10/17 * * @param parameter1 The first parameter * * @return A shallow copy of this object */

    threadsafe_queue& operator=(const threadsafe_queue&) = delete;
    /* * 使用迭代器為參數的構造函數,適用所有容器對象 * */
    template<typename _InputIterator>

    /** * @fn threadsafe_queue::threadsafe_queue(_InputIterator first, _InputIterator last) * * @brief Initializes a new instance of the threadsafe_queue class * * @author IRIS_Chen * @date 2019/10/17 * * @param first The first * @param last The last */

    threadsafe_queue(_InputIterator first, _InputIterator last) {
        for (auto itor = first; itor != last; ++itor)
        {
            data_queue.push(*itor);
        }
    }

    /** * @fn explicit threadsafe_queue::threadsafe_queue(const container_type &c) * * @brief Initializes a new instance of the threadsafe_queue class * * @author IRIS_Chen * @date 2019/10/17 * * @param c A container_type to process */

    explicit threadsafe_queue(const container_type &c) :data_queue(c) {}
    /* * 使用初始化列表為參數的構造函數 * */

    /** * @fn threadsafe_queue::threadsafe_queue(std::initializer_list<value_type> list) * * @brief Initializes a new instance of the threadsafe_queue class * * @author IRIS_Chen * @date 2019/10/17 * * @param list The list */

    threadsafe_queue(std::initializer_list<value_type> list) :threadsafe_queue(list.begin(), list.end()) {
    }
    /* * 將元素加入隊列 * */

    /** * @fn void threadsafe_queue::push(const value_type &new_value) * * @brief Pushes an object onto this stack * * @author IRIS_Chen * @date 2019/10/17 * * @param new_value The new value to push */

    void push(const value_type &new_value) {
        std::lock_guard<std::mutex>lk(mut);
        data_queue.push(std::move(new_value));
        data_cond.notify_one();
    }
    /* * 從隊列中彈出一個元素,如果隊列為空就阻塞 * */

    /** * @fn value_type threadsafe_queue::wait_and_pop() * * @brief Wait and pop * * @author IRIS_Chen * @date 2019/10/17 * * @return A value_type */

    value_type wait_and_pop() {
        std::unique_lock<std::mutex>lk(mut);
        data_cond.wait(lk, [this]
        {
            return !this->data_queue.empty();
        });
        auto value = std::move(data_queue.front());
        data_queue.pop();
        return value;
    }
    /* * 從隊列中彈出一個元素,如果隊列為空返回false * */

    /** * @fn bool threadsafe_queue::try_pop(value_type& value) * * @brief Attempts to pop * * @author IRIS_Chen * @date 2019/10/17 * * @param [in,out] value The value * * @return True if it succeeds, false if it fails */

    bool try_pop(value_type& value) {
        std::lock_guard<std::mutex>lk(mut);
        if (data_queue.empty())
            return false;
        value = std::move(data_queue.front());
        data_queue.pop();
        return true;
    }
    /* * 返回隊列是否為空 * */

    /** * @fn auto threadsafe_queue::empty() const->decltype(data_queue.empty()) * * @brief Gets the empty * * @author IRIS_Chen * @date 2019/10/17 * * @return An auto */

    auto empty() const->decltype(data_queue.empty()) {
        std::lock_guard<std::mutex>lk(mut);
        return data_queue.empty();
    }
    /* * 返回隊列中元素數個 * */

    /** * @fn auto threadsafe_queue::Size() const->decltype(data_queue.Size()) * * @brief Gets the Size * * @author IRIS_Chen * @date 2019/10/17 * * @return An auto */

    auto size() const->decltype(data_queue.size()) {
        std::lock_guard<std::mutex>lk(mut);
        return data_queue.size();
    }
}; /* threadsafe_queue */
## 參考鏈接
  1. C++11:基於std::queue和std::mutex構建一個線程安全的隊列
  2. Thread-safe concurrent FIFO queue in C++
  3. Java多線程總結之線程安全隊列Queue
  4. C++並發實戰12:線程安全的queue


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM