C++11 —— 單生產者/單消費者 的 FIFO 無鎖隊列


  發現 zeromq 的 yqueue_t 模板類,其數據存儲理念設計得非常妙。借這一理念,按照 STL 的泛型類 queue 的接口標准,我設計了一個線程安全的 單生產者/單消費者(單線程push/單線程pop) FIFO 隊列,以此滿足更為廣泛的應用。

1. 數據存儲理念的結構圖

type_index

  • 隊列的整體結構上,使用鏈表的方式,將多個固定長度的 chunk 串聯起來;
  • 每個 chunk 則可用於存儲隊列所需要的元素;
  • 增加一個可交換的 chunk 單元,利於內存復用;
  • 隊列使用時,支持 單個線程的 push(生產) 和 單個線程 pop(消費)的並發操作(內部並未加鎖)。

2. 源碼 (xqueue.h)

/**
 * @file xqueue.h
 * Copyright (c) 2021 Gaaagaa. All rights reserved.
 * 
 * @author  : Gaaagaa
 * @date    : 2019-11-29
 * @version : 1.0.0.0
 * @brief   : 實現雙線程安全的 單生產者/單消費者 FIFO 隊列。
 */

/**
 * The MIT License (MIT)
 * Copyright (c) 2019, Gaaagaa All rights reserved.
 * 
 * Permission is hereby granted, free of charge, to any person obtaining a copy of
 * this software and associated documentation files (the "Software"), to deal in
 * the Software without restriction, including without limitation the rights to
 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is furnished to do
 * so, subject to the following conditions:
 * 
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 * 
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

#ifndef __XQUEUE_H__
#define __XQUEUE_H__

#include <memory>
#include <atomic>
#include <cassert>

////////////////////////////////////////////////////////////////////////////////
// xqueue_t : single producer/single consumer FIFO queue

/**
 * @class xqueue_t
 * @brief 雙線程安全的 單生產者/單消費者 FIFO隊列。
 * 
 * @tparam __elem_t  : 隊列存儲的節點元素類型。
 * @tparam __csize_v : 隊列中的存儲分塊可容納節點元素的數量。
 * @tparam __alloc_t : 元素對象分配器。
 */
template< typename __elem_t,
          size_t   __csize_v = 16,
          typename __alloc_t = std::allocator< __elem_t > >
class xqueue_t : protected __alloc_t
{
    static_assert(__csize_v >= 4,
                  "__csize_v size value must be greater than or equal to 4!");

    // common data types
public:
    using value_type      = __elem_t;
    using reference       = __elem_t &;
    using const_reference = const __elem_t &;
    using size_type       = size_t;

    static constexpr const size_type xchunk_size = __csize_v;

private:
    /** 前置聲明 存儲分塊 的類型 */
    struct x_chunk_t;

    using x_chkptr_t  = struct x_chunk_t *;
    using x_chkpos_t  = size_t;
    using x_array_t   = value_type[xchunk_size];
    using x_swapchk_t = std::atomic< x_chkptr_t >;
    using x_quesize_t = std::atomic< size_type >;

    using x_alvalue_t = typename std::allocator_traits<
                            __alloc_t >::template rebind_alloc< __elem_t >;
    using x_alchunk_t = typename std::allocator_traits<
                            __alloc_t >::template rebind_alloc< x_chunk_t >;

    /**
     * @struct x_chunk_t
     * @brief  存儲元素節點的連續內存塊結構體。
     */
    typedef struct x_chunk_t
    {
        x_chkptr_t xchk_next;  ///< 指向后一內存塊節點
        x_array_t  xchk_elem;  ///< 當前內存塊中的元素節點數組
    } x_chunk_t;

    // constructor/destructor
public:
    explicit xqueue_t(void)
        : m_chk_swap(nullptr)
        , m_que_size(0)
        , m_que_front({ nullptr, 0 })
        , m_que_back ({ nullptr, 0 })
    {
        m_que_front.m_chk_vptr = alloc_chunk();
        m_que_back.m_chk_vptr  = m_que_front.m_chk_vptr;
    }

    ~xqueue_t(void)
    {
        while (size() > 0)
            pop();

        assert(m_que_front.m_chk_vptr == m_que_back.m_chk_vptr);
        recyc_chunk(m_que_front.m_chk_vptr);
        recyc_chunk(nullptr);
    }

    xqueue_t(xqueue_t && xobject) = delete;
    xqueue_t & operator = (xqueue_t && xobject) = delete;
    xqueue_t(const xqueue_t & xobject) = delete;
    xqueue_t & operator = (const xqueue_t & xobject) = delete;

    // public interfaces
public:
    /**********************************************************/
    /**
     * @brief 當前隊列中的元素數量。
     */
    inline size_type size(void) const
    {
        return m_que_size;
    }

    /**********************************************************/
    /**
     * @brief 判斷隊列是否為空。
     */
    inline bool empty(void) const
    {
        return (0 == size());
    }

    /**********************************************************/
    /**
     * @brief 向隊列后端壓入一個元素。
     */
    void push(const value_type & xelem_value)
    {
        if (size() > 0)
        {
            back_forward();
        }

        x_alvalue_t::construct(
            &m_que_back.m_chk_vptr->xchk_elem[m_que_back.m_chk_npos],
            xelem_value);
        m_que_size.fetch_add(1);
    }

    /**********************************************************/
    /**
     * @brief 以右值引用方式,向隊列后端壓入一個元素。
     */
    void push(value_type && xelem_value)
    {
        if (size() > 0)
        {
            back_forward();
        }

        x_alvalue_t::construct(
            &m_que_back.m_chk_vptr->xchk_elem[m_que_back.m_chk_npos],
            std::forward< value_type >(xelem_value));
        m_que_size.fetch_add(1);
    }

    /**********************************************************/
    /**
     * @brief 向隊列后端壓入一個元素。
     */
    template< typename... __args_t >
    decltype(auto) emplace(__args_t &&... xargs)
    {
        if (size() > 0)
        {
            back_forward();
        }

        x_alvalue_t::construct(
            &m_que_back.m_chk_vptr->xchk_elem[m_que_back.m_chk_npos],
            std::forward< __args_t >(xargs)...);
        m_que_size.fetch_add(1);

        return back();
    }

    /**********************************************************/
    /**
     * @brief 從隊列前端彈出一個元素。
     */
    void pop(void)
    {
        assert(size() > 0);

        x_alvalue_t::destroy(
            &m_que_front.m_chk_vptr->xchk_elem[m_que_front.m_chk_npos]);
        if (m_que_size.fetch_sub(1) > 1)
        {
            front_forward();
        }
    }

    /**********************************************************/
    /**
     * @brief 返回隊列前端元素。
     */
    inline reference front(void)
    {
        assert(!empty());
        return m_que_front.m_chk_vptr->xchk_elem[m_que_front.m_chk_npos];
    }

    /**********************************************************/
    /**
     * @brief 返回隊列前端元素。
     */
    inline const_reference front(void) const
    {
        assert(!empty());
        return m_que_front.m_chk_vptr->xchk_elem[m_que_front.m_chk_npos];
    }

    /**********************************************************/
    /**
     * @brief 返回隊列后端元素。
     */
    inline reference back(void)
    {
        assert(!empty());
        return m_que_back.m_chk_vptr->xchk_elem[m_que_back.m_chk_npos];
    }

    /**********************************************************/
    /**
     * @brief 返回隊列后端元素。
     */
    inline const_reference back(void) const
    {
        assert(!empty());
        return m_que_back.m_chk_vptr->xchk_elem[m_que_back.m_chk_npos];
    }

    // internal invoking
private:
    /**********************************************************/
    /**
     * @brief 申請分塊。
     */
    inline x_chkptr_t alloc_chunk(void)
    {
        x_chkptr_t xchunk_ptr = m_chk_swap.exchange(nullptr);
        if (nullptr == xchunk_ptr)
        {
            x_alchunk_t xalloc_chunk;
            xchunk_ptr = xalloc_chunk.allocate(1);
            assert(nullptr != xchunk_ptr);
        }

        xchunk_ptr->xchk_next = nullptr;
        return xchunk_ptr;
    }

    /**********************************************************/
    /**
     * @brief 回收分塊。
     */
    inline void recyc_chunk(x_chkptr_t xchunk_ptr)
    {
        x_chkptr_t xchk_rptr = m_chk_swap.exchange(xchunk_ptr);
        if (nullptr != xchk_rptr)
        {
            x_alchunk_t xalloc_chunk;
            xalloc_chunk.deallocate(xchk_rptr, 1);
        }
    }

    /**********************************************************/
    /**
     * @brief 將前端位置向后移(該接口僅由 pop() 接口調用)。
     */
    inline void front_forward(void)
    {
        if (++m_que_front.m_chk_npos == xchunk_size)
        {
            assert(nullptr != m_que_front.m_chk_vptr);
            assert(nullptr != m_que_front.m_chk_vptr->xchk_next);

            x_chkptr_t xchunk_ptr = m_que_front.m_chk_vptr;
            m_que_front.m_chk_vptr = xchunk_ptr->xchk_next;
            m_que_front.m_chk_npos = 0;

            recyc_chunk(xchunk_ptr);
        }
    }

    /**********************************************************/
    /**
     * @brief 將后端位置向后移(該接口僅由 push() 接口調用)。
     */
    inline void back_forward(void)
    {
        if (++m_que_back.m_chk_npos == xchunk_size)
        {
            assert(nullptr != m_que_back.m_chk_vptr);
            assert(nullptr == m_que_back.m_chk_vptr->xchk_next);

            x_chkptr_t xchunk_ptr = alloc_chunk();
            m_que_back.m_chk_vptr->xchk_next = xchunk_ptr;

            m_que_back.m_chk_vptr = xchunk_ptr;
            m_que_back.m_chk_npos = 0;
        }
    }

    // data members
protected:
    x_swapchk_t     m_chk_swap;   ///< 用於保存交換內存塊(備用緩存塊)
    x_quesize_t     m_que_size;   ///< 隊列中的有效對象數量

    /** 指向隊列前端的分塊存儲信息 */
    struct
    {
        x_chkptr_t  m_chk_vptr;   ///< 存儲分塊
        x_chkpos_t  m_chk_npos;   ///< 當前存儲節點的索引號
    } m_que_front;

    /** 指向隊列后端的分塊存儲信息 */
    struct
    {
        x_chkptr_t  m_chk_vptr;   ///< 存儲分塊
        x_chkpos_t  m_chk_npos;   ///< 當前存儲節點的索引號
    } m_que_back;
};

////////////////////////////////////////////////////////////////////////////////

#endif // __XQUEUE_H__

3. 使用示例

#include "xqueue.h"
#include <iostream>
#include <thread>
#include <chrono>

////////////////////////////////////////////////////////////////////////////////

int main(int argc, char * argv[])
{
    using x_int_queue_t = xqueue_t< int, 8 >;

    x_int_queue_t spsc;

    std::cout << "sizeof(x_int_queue_t) : " << sizeof(x_int_queue_t) << std::endl;

    bool b_push_finished = false;
    std::thread xthread_in([&spsc, &b_push_finished](void) -> void
    {
        for (int i = 0; i < 1000; ++i)
        {
            spsc.push(i);
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }

        b_push_finished = true;
    });

    std::thread xthread_out([&spsc, &b_push_finished](void) -> void
    {
        int i = 0;
        while (true)
        {
            if (!spsc.empty())
            {
                std::cout << "[" << ++i   << "] "
                          << spsc.size()  << ", "
                          << spsc.front() << std::endl;
                spsc.pop();
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            }
            else if (b_push_finished)
            {
                break;
            }
        }
    });

    if (xthread_in.joinable())
    {
        xthread_in.join();
    }

    if (xthread_out.joinable())
    {
        xthread_out.join();
    }

    return 0;
}

////////////////////////////////////////////////////////////////////////////////


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM