C++多线程队列实现
介绍
在项目中,进行多线程队列实现是一个比较麻烦的事, 找到了一个实现比较好的多线程队列实现, 自己做了一点修改更加适应自己的项目, 记录下来, 有需要的自己进行修改使用.
代码写的并不是很好, 封装起来的实现也是并不是很好用, 个人水平的一个记录, 希望理解
多线程队列实现
- 初始化一定长度的空间存储数据
- 每次压入或者弹出操作的时候需要获取锁, 保证同时只有一个操作可以被执行,
- 压入或者弹出数据的时候, 如果队列已经满了或者空的, 另外一个线程可能需要等待, 或者返回 false 具体看程序注释,考虑自己情况进行程序修改
- 最终清理对象数据, 清空队列,退出线程
具体实现代码
#ifndef CQUEUE_H__
#define CQUEUE_H__
#pragma once
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
/** * @class Queue CQueue.h Code\inc\CQueue.h * * @brief 线程安全队列实现 * * 因为有std::mutex和std::condition_variable类成员,所以此类不支持复制构造函数也不支持赋值操作符(=) * * @author IRIS_Chen * @date 2019/10/10 * * @tparam T Generic type parameter. */
template <class T> /** * @class CQueue CQueue.h Code\inc\CQueue.h * * @brief Queue of cs. * * @author IRIS_Chen * @date 2019/10/17 */ class CQueue {
protected:
// Data
std::queue<T> _queue; ///< 存储数据的真实队列, 不是线程安全的
private:
typename std::queue<T>::size_type _size_max; ///< 队列的最大长度
// Thread gubbins
std::mutex _mutex; ///< 线程操作 锁
std::condition_variable _fullQue; ///< 队列满了的信号
std::condition_variable _empty; ///< 队列为空的信号
// Exit
// 原子操作
std::atomic_bool _quit; ///< { false }; // 退出信号
std::atomic_bool _finished; ///< { false }; // 完成信号 // 表示不再继续输入数据
public:
/** * @fn CQueue::CQueue(const size_t size_max) * * @brief 初始化队列长度,并将退出标志和 满信号标志置空 * * @author IRIS_Chen * @date 2019/10/17 * * @param size_max 队列的最长尺寸 */
CQueue(const size_t size_max) :_size_max(size_max) {
_quit = ATOMIC_VAR_INIT(false);
_finished = ATOMIC_VAR_INIT(false);
}
/** * @fn CQueue::CQueue(CONST CQueue&) = delete; * * @brief 不允许拷贝构造函数 * * @author IRIS_Chen * @date 2019/10/17 * * @param parameter1 The first parameter */
CQueue(CONST CQueue&) = delete; ///< 不允许拷贝构造函数
/** * @fn CQueue::~CQueue() * * @brief Finalizes an instance of the CQueue class 销毁队列, 退出线程 清除数据 // 存在问题 * * @author IRIS_Chen * @date 2019/11/8 */
~CQueue()
{
Quit();
while (_queue.size())
;
}
/** * @fn bool CQueue::Push(T& data) * * @brief 队列中加入新的 对象 根据情况决定 满信号之后 新数据丢弃或者等待 * * @author IRIS_Chen * @date 2019/10/10 * * @param [in,out] data The data to Push. * * @return True if it succeeds, false if it fails. */
bool Push(T& data) {
std::unique_lock<std::mutex> lock(_mutex);
while (!_quit && !_finished)
{
if (_queue.size() < _size_max)
{
_queue.push(std::move(data));
//_queue.Push(data);
_empty.notify_all();
return true;
}
else
{
// wait的时候自动释放锁,如果wait到了会获取锁
// _fullQue.wait(lock);
return false; ///< 如果满了 这里不进行等待 避免出现问题
}
}
return false;
}
/** * @fn bool CQueue::Pop(T &data) * * @brief 返回队列最前面的元素 并且弹出 // 如果空 如果finish 则直接返回fasle 否则 等待队列加入元素 * * @author IRIS_Chen * @date 2019/10/14 * * @param [in,out] data The data to Pop. * * @return True if it succeeds, false if it fails. */
bool Pop(T &data) {
std::unique_lock<std::mutex> lock(_mutex);
while (!_quit)
{
if (!_queue.empty()) // 队列非空
{
//data = std::move(_queue.front());
data = _queue.front();
_queue.pop();
_fullQue.notify_all(); // 通知所有 由于满队无法加入的线程
return true;
}
else if (_queue.empty() && _finished) // 队列为空 且不再加入
{
return false;
}
else
{
// _empty.wait(lock); // 等待队列加入元素
return false; ///< 不等待元素加入数据
}
}
return false;
}
/** * @fn std::shared_ptr<T> CQueue::Pop(void) * * @brief 弹出一个元素 直接返回 出错无法报错 * * @author IRIS_Chen * @date 2019/10/14 * * @return The previous top-of-stack object. */
std::shared_ptr<T> Pop(void)
{
std::unique_lock<std::mutex> lock(_mutex);
std::shared_ptr<T> res = nullptr;
while (!_quit)
{
if (!_queue.empty()) // 队列非空
{
//data = std::move(_queue.front());
res = std::make_shared<T>(_queue.front());
_queue.pop();
_fullQue.notify_all(); // 通知所有 由于满队无法加入的线程
return res;
}
else if (_queue.empty() && _finished) // 队列为空 且不再加入
{
return res; // 无数据进入 智能返回一个空指针 (可能出错)
}
else
{
_empty.wait(lock); // 等待队列加入元素
}
}
return false;
}
/** * @fn void CQueue::Finished() * * @brief The queue has Finished accepting input 标识队列完成输入 不再继续输入 * * @author IRIS_Chen * @date 2019/10/14 */
void Finished() {
_finished = true;
_empty.notify_all();
}
/** * @fn void CQueue::Quit() * * @brief Quits this CQueue 退出队列, 无法再加入压入或者弹出数据 * * @author IRIS_Chen * @date 2019/10/14 */
void Quit() {
_quit = true;
_empty.notify_all();
_fullQue.notify_all();
}
/** * @fn int CQueue::Length() * * @brief Gets the Length 返回队列目前长度 * * @author IRIS_Chen * @date 2019/10/14 * * @return An int. */
int Length() {
std::unique_lock<std::mutex> lock(_mutex);
return static_cast<int>(_queue.size());
}
/** * @fn int CQueue::Size() * * @brief Gets the Size 返回当前队列长度 * * @author IRIS_Chen * @date 2019/10/14 * * @return An int. */
int Size() {
std::unique_lock<std::mutex> lock(_mutex);
return static_cast<int>(_queue.size());
}
/** * @fn bool CQueue::empty(void) * * @brief 判断是否为空 * * @author IRIS_Chen * @date 2019/10/17 * * @return True if it succeeds, false if it fails */
bool Empty(void) {
std::unique_lock<std::mutex> lock(_mutex);
return (0 == _queue.size());
}
/** * @fn bool CQueue::Clear(void) * * @brief 清空队列 * * @author IRIS_Chen * @date 2019/10/17 * * @return True if it succeeds, false if it fails */
bool Clear(void) {
std::unique_lock<std::mutex> lock(_mutex);
while (!_queue.empty ())
{
Pop(); // 依次弹出数据
}
return true;
}
};
#endif
更多
有另外的多线程实现, 来自网上找到的, 可能找不到参考链接了, 贴出来 供参考
线程安全队列 1
具体实现代码
/** * @class ThreadSafeQueue CQueue.h Code\inc\CQueue.h * * @brief 线程安全队列实现 * * @author IRIS_Chen * @date 2019/10/10 * * @tparam T Generic type parameter. */
template<typename T>
/** * @class ThreadSafeQueue CQueue.h Code\inc\CQueue.h * * @brief Queue of thread safes. * * @author IRIS_Chen * @date 2019/10/17 */
class ThreadSafeQueue {
private:
/** * @property mutable std::mutex mut * * @brief Gets the mut * * @return The mut */
mutable std::mutex mut;
std::queue<T> data_queue; ///< Queue of data
std::condition_variable data_cond; ///< The data condition
public:
/** * @fn ThreadSafeQueue::ThreadSafeQueue() * * @brief Initializes a new instance of the ThreadSafeQueue class * * @author IRIS_Chen * @date 2019/10/17 */
ThreadSafeQueue() {}
/** * @fn ThreadSafeQueue::ThreadSafeQueue(ThreadSafeQueue const& other) * * @brief 拷贝构造函数 * * @author IRIS_Chen * @date 2019/10/17 * * @param other The other */
ThreadSafeQueue(ThreadSafeQueue const& other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}
/** * @fn void ThreadSafeQueue::Push(T& new_value) * * @brief Pushes an object onto this stack * * @author IRIS_Chen * @date 2019/10/17 * * @param [in,out] new_value The new value to Push */
void push(T& new_value)//入队操作 {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
/** * @fn void ThreadSafeQueue::wait_and_pop(T& value) * * @brief Wait and Pop * * @author IRIS_Chen * @date 2019/10/17 * * @param [in,out] value The value */
void wait_and_pop(T& value)//直到有元素可以删除为止 {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]
{
return !data_queue.empty();
});
value = data_queue.front();
data_queue.pop();
}
/** * @fn std::shared_ptr<T> ThreadSafeQueue::wait_and_pop() * * @brief Wait and pop * * @author IRIS_Chen * @date 2019/10/17 * * @return A std::shared_ptr<T> */
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]
{
return !data_queue.empty();
});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
/** * @fn bool ThreadSafeQueue::try_pop(T& value) * * @brief Attempts to pop * * @author IRIS_Chen * @date 2019/10/17 * * @param [in,out] value The value * * @return True if it succeeds, false if it fails */
bool try_pop(T& value)//不管有没有队首元素直接返回 {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}
/** * @fn std::shared_ptr<T> ThreadSafeQueue::try_pop() * * @brief Try pop * * @author IRIS_Chen * @date 2019/10/17 * * @return A std::shared_ptr<T> */
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
/** * @fn bool ThreadSafeQueue::empty() const * * @brief Empties this object * * @author IRIS_Chen * @date 2019/10/17 * * @return True if it succeeds, false if it fails */
bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
线程安全队列 2
具体实现代码
#include <queue>
#include <mutex>
#include <condition_variable>
#include <initializer_list>
/* * 线程安全队列 * T为队列元素类型 * 因为有std::mutex和std::condition_variable类成员,所以此类不支持复制构造函数也不支持赋值操作符(=) * */
template<typename T>
/** * @class threadsafe_queue CQueue.h Code\inc\CQueue.h * * @brief Queue of threadsafes. * * @author IRIS_Chen * @date 2019/10/17 */
class threadsafe_queue {
private:
/** * @property mutable std::mutex mut * * @brief data_queue访问信号量 * * @return The mut */
mutable std::mutex mut;
/** * @property mutable std::condition_variable data_cond * * @brief Gets the data condition * * @return The data condition */
mutable std::condition_variable data_cond;
using queue_type = std::queue<T>; ///< Type of the queue
queue_type data_queue; ///< Queue of data
public:
using value_type = typename queue_type::value_type; ///< Type of the value
using container_type = typename queue_type::container_type; ///< Type of the container
/** * @fn threadsafe_queue::threadsafe_queue() = default; * * @brief Initializes a new instance of the threadsafe_queue class * * @author IRIS_Chen * @date 2019/10/17 */
threadsafe_queue() = default;
/** * @fn threadsafe_queue::threadsafe_queue(const threadsafe_queue&) = delete; * * @brief Initializes a new instance of the threadsafe_queue class * * @author IRIS_Chen * @date 2019/10/17 * * @param parameter1 The first parameter */
threadsafe_queue(const threadsafe_queue&) = delete;
/** * @fn threadsafe_queue& threadsafe_queue::operator=(const threadsafe_queue&) = delete; * * @brief Assignment operator * * @author IRIS_Chen * @date 2019/10/17 * * @param parameter1 The first parameter * * @return A shallow copy of this object */
threadsafe_queue& operator=(const threadsafe_queue&) = delete;
/* * 使用迭代器为参数的构造函数,适用所有容器对象 * */
template<typename _InputIterator>
/** * @fn threadsafe_queue::threadsafe_queue(_InputIterator first, _InputIterator last) * * @brief Initializes a new instance of the threadsafe_queue class * * @author IRIS_Chen * @date 2019/10/17 * * @param first The first * @param last The last */
threadsafe_queue(_InputIterator first, _InputIterator last) {
for (auto itor = first; itor != last; ++itor)
{
data_queue.push(*itor);
}
}
/** * @fn explicit threadsafe_queue::threadsafe_queue(const container_type &c) * * @brief Initializes a new instance of the threadsafe_queue class * * @author IRIS_Chen * @date 2019/10/17 * * @param c A container_type to process */
explicit threadsafe_queue(const container_type &c) :data_queue(c) {}
/* * 使用初始化列表为参数的构造函数 * */
/** * @fn threadsafe_queue::threadsafe_queue(std::initializer_list<value_type> list) * * @brief Initializes a new instance of the threadsafe_queue class * * @author IRIS_Chen * @date 2019/10/17 * * @param list The list */
threadsafe_queue(std::initializer_list<value_type> list) :threadsafe_queue(list.begin(), list.end()) {
}
/* * 将元素加入队列 * */
/** * @fn void threadsafe_queue::push(const value_type &new_value) * * @brief Pushes an object onto this stack * * @author IRIS_Chen * @date 2019/10/17 * * @param new_value The new value to push */
void push(const value_type &new_value) {
std::lock_guard<std::mutex>lk(mut);
data_queue.push(std::move(new_value));
data_cond.notify_one();
}
/* * 从队列中弹出一个元素,如果队列为空就阻塞 * */
/** * @fn value_type threadsafe_queue::wait_and_pop() * * @brief Wait and pop * * @author IRIS_Chen * @date 2019/10/17 * * @return A value_type */
value_type wait_and_pop() {
std::unique_lock<std::mutex>lk(mut);
data_cond.wait(lk, [this]
{
return !this->data_queue.empty();
});
auto value = std::move(data_queue.front());
data_queue.pop();
return value;
}
/* * 从队列中弹出一个元素,如果队列为空返回false * */
/** * @fn bool threadsafe_queue::try_pop(value_type& value) * * @brief Attempts to pop * * @author IRIS_Chen * @date 2019/10/17 * * @param [in,out] value The value * * @return True if it succeeds, false if it fails */
bool try_pop(value_type& value) {
std::lock_guard<std::mutex>lk(mut);
if (data_queue.empty())
return false;
value = std::move(data_queue.front());
data_queue.pop();
return true;
}
/* * 返回队列是否为空 * */
/** * @fn auto threadsafe_queue::empty() const->decltype(data_queue.empty()) * * @brief Gets the empty * * @author IRIS_Chen * @date 2019/10/17 * * @return An auto */
auto empty() const->decltype(data_queue.empty()) {
std::lock_guard<std::mutex>lk(mut);
return data_queue.empty();
}
/* * 返回队列中元素数个 * */
/** * @fn auto threadsafe_queue::Size() const->decltype(data_queue.Size()) * * @brief Gets the Size * * @author IRIS_Chen * @date 2019/10/17 * * @return An auto */
auto size() const->decltype(data_queue.size()) {
std::lock_guard<std::mutex>lk(mut);
return data_queue.size();
}
}; /* threadsafe_queue */