發現 zeromq 的 yqueue_t 模板類,其數據存儲理念設計得非常妙。借這一理念,按照 STL 的泛型類 queue 的接口標准,我設計了一個線程安全的 單生產者/單消費者(單線程push/單線程pop) FIFO 隊列,以此滿足更為廣泛的應用。
1. 數據存儲理念的結構圖
- 隊列的整體結構上,使用鏈表的方式,將多個固定長度的 chunk 串聯起來;
- 每個 chunk 則可用於存儲隊列所需要的元素;
- 增加一個可交換的 chunk 單元,利於內存復用;
- 隊列使用時,支持 單個線程的 push(生產) 和 單個線程 pop(消費)的並發操作(內部並未加鎖)。
2. 源碼 (xqueue.h)
/**
* @file xqueue.h
* Copyright (c) 2021 Gaaagaa. All rights reserved.
*
* @author : Gaaagaa
* @date : 2019-11-29
* @version : 1.0.0.0
* @brief : 實現雙線程安全的 單生產者/單消費者 FIFO 隊列。
*/
/**
* The MIT License (MIT)
* Copyright (c) 2019, Gaaagaa All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is furnished to do
* so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef __XQUEUE_H__
#define __XQUEUE_H__
#include <memory>
#include <atomic>
#include <cassert>
////////////////////////////////////////////////////////////////////////////////
// xqueue_t : single producer/single consumer FIFO queue
/**
* @class xqueue_t
* @brief 雙線程安全的 單生產者/單消費者 FIFO隊列。
*
* @tparam __elem_t : 隊列存儲的節點元素類型。
* @tparam __csize_v : 隊列中的存儲分塊可容納節點元素的數量。
* @tparam __alloc_t : 元素對象分配器。
*/
template< typename __elem_t,
size_t __csize_v = 16,
typename __alloc_t = std::allocator< __elem_t > >
class xqueue_t : protected __alloc_t
{
static_assert(__csize_v >= 4,
"__csize_v size value must be greater than or equal to 4!");
// common data types
public:
using value_type = __elem_t;
using reference = __elem_t &;
using const_reference = const __elem_t &;
using size_type = size_t;
static constexpr const size_type xchunk_size = __csize_v;
private:
/** 前置聲明 存儲分塊 的類型 */
struct x_chunk_t;
using x_chkptr_t = struct x_chunk_t *;
using x_chkpos_t = size_t;
using x_array_t = value_type[xchunk_size];
using x_swapchk_t = std::atomic< x_chkptr_t >;
using x_quesize_t = std::atomic< size_type >;
using x_alvalue_t = typename std::allocator_traits<
__alloc_t >::template rebind_alloc< __elem_t >;
using x_alchunk_t = typename std::allocator_traits<
__alloc_t >::template rebind_alloc< x_chunk_t >;
/**
* @struct x_chunk_t
* @brief 存儲元素節點的連續內存塊結構體。
*/
typedef struct x_chunk_t
{
x_chkptr_t xchk_next; ///< 指向后一內存塊節點
x_array_t xchk_elem; ///< 當前內存塊中的元素節點數組
} x_chunk_t;
// constructor/destructor
public:
explicit xqueue_t(void)
: m_chk_swap(nullptr)
, m_que_size(0)
, m_que_front({ nullptr, 0 })
, m_que_back ({ nullptr, 0 })
{
m_que_front.m_chk_vptr = alloc_chunk();
m_que_back.m_chk_vptr = m_que_front.m_chk_vptr;
}
~xqueue_t(void)
{
while (size() > 0)
pop();
assert(m_que_front.m_chk_vptr == m_que_back.m_chk_vptr);
recyc_chunk(m_que_front.m_chk_vptr);
recyc_chunk(nullptr);
}
xqueue_t(xqueue_t && xobject) = delete;
xqueue_t & operator = (xqueue_t && xobject) = delete;
xqueue_t(const xqueue_t & xobject) = delete;
xqueue_t & operator = (const xqueue_t & xobject) = delete;
// public interfaces
public:
/**********************************************************/
/**
* @brief 當前隊列中的元素數量。
*/
inline size_type size(void) const
{
return m_que_size;
}
/**********************************************************/
/**
* @brief 判斷隊列是否為空。
*/
inline bool empty(void) const
{
return (0 == size());
}
/**********************************************************/
/**
* @brief 向隊列后端壓入一個元素。
*/
void push(const value_type & xelem_value)
{
if (size() > 0)
{
back_forward();
}
x_alvalue_t::construct(
&m_que_back.m_chk_vptr->xchk_elem[m_que_back.m_chk_npos],
xelem_value);
m_que_size.fetch_add(1);
}
/**********************************************************/
/**
* @brief 以右值引用方式,向隊列后端壓入一個元素。
*/
void push(value_type && xelem_value)
{
if (size() > 0)
{
back_forward();
}
x_alvalue_t::construct(
&m_que_back.m_chk_vptr->xchk_elem[m_que_back.m_chk_npos],
std::forward< value_type >(xelem_value));
m_que_size.fetch_add(1);
}
/**********************************************************/
/**
* @brief 向隊列后端壓入一個元素。
*/
template< typename... __args_t >
decltype(auto) emplace(__args_t &&... xargs)
{
if (size() > 0)
{
back_forward();
}
x_alvalue_t::construct(
&m_que_back.m_chk_vptr->xchk_elem[m_que_back.m_chk_npos],
std::forward< __args_t >(xargs)...);
m_que_size.fetch_add(1);
return back();
}
/**********************************************************/
/**
* @brief 從隊列前端彈出一個元素。
*/
void pop(void)
{
assert(size() > 0);
x_alvalue_t::destroy(
&m_que_front.m_chk_vptr->xchk_elem[m_que_front.m_chk_npos]);
if (m_que_size.fetch_sub(1) > 1)
{
front_forward();
}
}
/**********************************************************/
/**
* @brief 返回隊列前端元素。
*/
inline reference front(void)
{
assert(!empty());
return m_que_front.m_chk_vptr->xchk_elem[m_que_front.m_chk_npos];
}
/**********************************************************/
/**
* @brief 返回隊列前端元素。
*/
inline const_reference front(void) const
{
assert(!empty());
return m_que_front.m_chk_vptr->xchk_elem[m_que_front.m_chk_npos];
}
/**********************************************************/
/**
* @brief 返回隊列后端元素。
*/
inline reference back(void)
{
assert(!empty());
return m_que_back.m_chk_vptr->xchk_elem[m_que_back.m_chk_npos];
}
/**********************************************************/
/**
* @brief 返回隊列后端元素。
*/
inline const_reference back(void) const
{
assert(!empty());
return m_que_back.m_chk_vptr->xchk_elem[m_que_back.m_chk_npos];
}
// internal invoking
private:
/**********************************************************/
/**
* @brief 申請分塊。
*/
inline x_chkptr_t alloc_chunk(void)
{
x_chkptr_t xchunk_ptr = m_chk_swap.exchange(nullptr);
if (nullptr == xchunk_ptr)
{
x_alchunk_t xalloc_chunk;
xchunk_ptr = xalloc_chunk.allocate(1);
assert(nullptr != xchunk_ptr);
}
xchunk_ptr->xchk_next = nullptr;
return xchunk_ptr;
}
/**********************************************************/
/**
* @brief 回收分塊。
*/
inline void recyc_chunk(x_chkptr_t xchunk_ptr)
{
x_chkptr_t xchk_rptr = m_chk_swap.exchange(xchunk_ptr);
if (nullptr != xchk_rptr)
{
x_alchunk_t xalloc_chunk;
xalloc_chunk.deallocate(xchk_rptr, 1);
}
}
/**********************************************************/
/**
* @brief 將前端位置向后移(該接口僅由 pop() 接口調用)。
*/
inline void front_forward(void)
{
if (++m_que_front.m_chk_npos == xchunk_size)
{
assert(nullptr != m_que_front.m_chk_vptr);
assert(nullptr != m_que_front.m_chk_vptr->xchk_next);
x_chkptr_t xchunk_ptr = m_que_front.m_chk_vptr;
m_que_front.m_chk_vptr = xchunk_ptr->xchk_next;
m_que_front.m_chk_npos = 0;
recyc_chunk(xchunk_ptr);
}
}
/**********************************************************/
/**
* @brief 將后端位置向后移(該接口僅由 push() 接口調用)。
*/
inline void back_forward(void)
{
if (++m_que_back.m_chk_npos == xchunk_size)
{
assert(nullptr != m_que_back.m_chk_vptr);
assert(nullptr == m_que_back.m_chk_vptr->xchk_next);
x_chkptr_t xchunk_ptr = alloc_chunk();
m_que_back.m_chk_vptr->xchk_next = xchunk_ptr;
m_que_back.m_chk_vptr = xchunk_ptr;
m_que_back.m_chk_npos = 0;
}
}
// data members
protected:
x_swapchk_t m_chk_swap; ///< 用於保存交換內存塊(備用緩存塊)
x_quesize_t m_que_size; ///< 隊列中的有效對象數量
/** 指向隊列前端的分塊存儲信息 */
struct
{
x_chkptr_t m_chk_vptr; ///< 存儲分塊
x_chkpos_t m_chk_npos; ///< 當前存儲節點的索引號
} m_que_front;
/** 指向隊列后端的分塊存儲信息 */
struct
{
x_chkptr_t m_chk_vptr; ///< 存儲分塊
x_chkpos_t m_chk_npos; ///< 當前存儲節點的索引號
} m_que_back;
};
////////////////////////////////////////////////////////////////////////////////
#endif // __XQUEUE_H__
3. 使用示例
#include "xqueue.h"
#include <iostream>
#include <thread>
#include <chrono>
////////////////////////////////////////////////////////////////////////////////
int main(int argc, char * argv[])
{
using x_int_queue_t = xqueue_t< int, 8 >;
x_int_queue_t spsc;
std::cout << "sizeof(x_int_queue_t) : " << sizeof(x_int_queue_t) << std::endl;
bool b_push_finished = false;
std::thread xthread_in([&spsc, &b_push_finished](void) -> void
{
for (int i = 0; i < 1000; ++i)
{
spsc.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
b_push_finished = true;
});
std::thread xthread_out([&spsc, &b_push_finished](void) -> void
{
int i = 0;
while (true)
{
if (!spsc.empty())
{
std::cout << "[" << ++i << "] "
<< spsc.size() << ", "
<< spsc.front() << std::endl;
spsc.pop();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
else if (b_push_finished)
{
break;
}
}
});
if (xthread_in.joinable())
{
xthread_in.join();
}
if (xthread_out.joinable())
{
xthread_out.join();
}
return 0;
}
////////////////////////////////////////////////////////////////////////////////