folly無鎖隊列,嘗試添加新的函數


1. folly是facebook開源的關於無鎖隊列的庫,實現過程很精妙。folly向隊列中添加節點過程,符合標准庫中的隊列的設計,而取出節點的過程,則會造成多個線程的分配不均。我曾經試着提供一次 取出一個節點的函數,雖然存在一些問題,不過還是有很多可以學習的地方。我新增的函數,在下面代碼中,會在注釋中標識“新增函數”。

/*
* Copyright 2014-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <atomic>
#include <cassert>
#include <utility>

namespace folly {

    /**
    * A very simple atomic single-linked list primitive.
    *
    * Usage:
    *
    * class MyClass {
    *   AtomicIntrusiveLinkedListHook<MyClass> hook_;
    * }
    *
    * AtomicIntrusiveLinkedList<MyClass, &MyClass::hook_> list;
    * list.insert(&a);
    * list.sweep([] (MyClass* c) { doSomething(c); }
    */
    template <class T>
    struct AtomicIntrusiveLinkedListHook {
        T* next{ nullptr };
    };

    template <class T, AtomicIntrusiveLinkedListHook<T> T::*HookMember>
    class AtomicIntrusiveLinkedList {
    public:
        AtomicIntrusiveLinkedList() {}
        AtomicIntrusiveLinkedList(const AtomicIntrusiveLinkedList&) = delete;
        AtomicIntrusiveLinkedList& operator=(const AtomicIntrusiveLinkedList&) =
            delete;
        AtomicIntrusiveLinkedList(AtomicIntrusiveLinkedList&& other) noexcept {
            auto tmp = other.head_.load();
            other.head_ = head_.load();
            head_ = tmp;
        }
        AtomicIntrusiveLinkedList& operator=(
            AtomicIntrusiveLinkedList&& other) noexcept {
            auto tmp = other.head_.load();
            other.head_ = head_.load();
            head_ = tmp;

            return *this;
        }

        /**
        * Note: list must be empty on destruction.
        */
        ~AtomicIntrusiveLinkedList() {
            assert(empty());
        }

        bool empty() const {
            return head_.load() == nullptr;
        }

        /**
        * Atomically insert t at the head of the list.
        * @return True if the inserted element is the only one in the list
        *         after the call.
        */
        bool insertHead(T* t) {
            assert(next(t) == nullptr);

            auto oldHead = head_.load(std::memory_order_relaxed);
            do {
                next(t) = oldHead;
                /* oldHead is updated by the call below.
                NOTE: we don't use next(t) instead of oldHead directly due to
                compiler bugs (GCC prior to 4.8.3 (bug 60272), clang (bug 18899),
                MSVC (bug 819819); source:
                http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange */
            } while (!head_.compare_exchange_weak(oldHead, t,
                std::memory_order_release,
                std::memory_order_relaxed));

            return oldHead == nullptr;
        }

        /**
        * Replaces the head with nullptr,
        * and calls func() on the removed elements in the order from tail to head.
        * Returns false if the list was empty.
        */
        template <typename F>
        bool sweepOnce(F&& func) {
            if (auto head = head_.exchange(nullptr)) {
                auto rhead = reverse(head);
                unlinkAll(rhead, std::forward<F>(func));
                return true;
            }
            return false;
        }

        // 新增函數
        // if std::memory_order_acquire applies to next(oldHead)(the first one, the argument of compare_exchange_weak)
        // and I don't know if following bugs affect the code
        // GCC prior to 4.8.3 (bug 60272), clang prior to 2014-05-05 (bug 18899)
        // MSVC prior to 2014-03-17 (bug 819819). 
        template <typename F>
        bool sweepHead(F&& func)
        {
            // handle if the list is not empty
            auto oldHead = head_.load(std::memory_order_relaxed);

            while (oldHead != nullptr &&
                   !head_.compare_exchange_weak(oldHead, next(oldHead),
                                                std::memory_order_acquire, std::memory_order_relaxed))
                ;
            // if drop out head successfully
            if (oldHead)
            {
                next(oldHead) = nullptr;
                unlinkAll(oldHead, std::forward<F>(func));
                return true;
            }

            return false;
        }

        // 新增函數
        // if std::memory_order_acquire does not apply to next(oldHead)
        // and I don't know if following bugs affect the code
        // GCC prior to 4.8.3 (bug 60272), clang prior to 2014-05-05 (bug 18899)
        // MSVC prior to 2014-03-17 (bug 819819). 
        template <typename F>
        bool dropHead(F&& func)
        {
            T* oldHead = nullptr;
            // handle if the list is not empty
            while ((oldHead = head_.load(std::memory_order_acquire)))
            {
                // because insert and drop out will be involving with head_, they 
                // will change head_ first, then others
                bool res = head_.compare_exchange_weak(oldHead, next(oldHead), std::memory_order_relaxed,
                    std::memory_order_relaxed);
                if (res/* && oldHead != nullptr*/)
                {
                    next(oldHead) = nullptr;
                    unlinkAll(oldHead, std::forward<F>(func));
                    return true;
                }
            }

            return false;
        }

        /**
        * Repeatedly replaces the head with nullptr,
        * and calls func() on the removed elements in the order from tail to head.
        * Stops when the list is empty.
        */
        template <typename F>
        void sweep(F&& func) {
            while (sweepOnce(func)) {
            }
        }

        /**
        * Similar to sweep() but calls func() on elements in LIFO order.
        *
        * func() is called for all elements in the list at the moment
        * reverseSweep() is called.  Unlike sweep() it does not loop to ensure the
        * list is empty at some point after the last invocation.  This way callers
        * can reason about the ordering: elements inserted since the last call to
        * reverseSweep() will be provided in LIFO order.
        *
        * Example: if elements are inserted in the order 1-2-3, the callback is
        * invoked 3-2-1.  If the callback moves elements onto a stack, popping off
        * the stack will produce the original insertion order 1-2-3.
        */
        template <typename F>
        void reverseSweep(F&& func) {
            // We don't loop like sweep() does because the overall order of callbacks
            // would be strand-wise LIFO which is meaningless to callers.
            auto head = head_.exchange(nullptr);
            unlinkAll(head, std::forward<F>(func));
        }

    private:
        std::atomic<T*> head_{ nullptr };

        static T*& next(T* t) {
            return (t->*HookMember).next;
        }

        /* Reverses a linked list, returning the pointer to the new head
        (old tail) */
        static T* reverse(T* head) {
            T* rhead = nullptr;
            while (head != nullptr) {
                auto t = head;
                head = next(t);
                next(t) = rhead;
                rhead = t;
            }
            return rhead;
        }

        /* Unlinks all elements in the linked list fragment pointed to by `head',
        * calling func() on every element */
        template <typename F>
        void unlinkAll(T* head, F&& func) {
            while (head != nullptr) {
                auto t = head;
                head = next(t);
                next(t) = nullptr;
                func(t);
            }
        }
    };

} // namespace folly

下面是我測試時使用的代碼:

// Test.cpp : 定義控制台應用程序的入口點。
//

#include <memory>
#include <cassert>

#include <iostream>
#include <vector>
#include <thread>
#include <future>
#include <random>
#include <cmath>

#include "folly.h"

using namespace folly;

struct student_name
{
    //student_name(const std::string& name)
    //    : name(name)
    //{

    //}

    //std::string name;
    student_name(int age = 0)
        : age(age)
    {

    }

    int age;
    AtomicIntrusiveLinkedListHook<student_name> node;
};

AtomicIntrusiveLinkedList<student_name, &student_name::node> g_students;

std::atomic<int> g_inserts; // insert num (successful)
std::atomic<int> g_drops;   // drop num (successful)

std::atomic<int> g_printNum;    // as same as g_drops

std::atomic<long> g_ageInSum;   // age sum when producing student_name
std::atomic<long> g_ageOutSum;  // age sum when consuming student_name

std::atomic<bool> goOn(true);

constexpr int ONE_THREAD_PRODUCE_NUM = 2000000;    // when testing, no more than this number, you know 20,000,00 * 100 * 10~= MAX_INT

constexpr int PRODUCE_THREAD_NUM = 10;     // producing thread number
constexpr int CONSUME_THREAD_NUM = 9;     // consuming thread number

void printOne(student_name* t)
{
    g_printNum.fetch_add(1, std::memory_order_relaxed);
    g_ageOutSum.fetch_add(t->age, std::memory_order_relaxed);
    // use memory_order_relaxed avoiding affect folly memory order
    g_drops.fetch_add(1, std::memory_order_relaxed);
    // clean node
    // delete t;
}

void insert_students()
{
    std::default_random_engine dre(time(nullptr));
    std::uniform_int_distribution<int> ageDi(1, 99);

    for (int i = 0; i != ONE_THREAD_PRODUCE_NUM; ++i)
    {
        int newAge = ageDi(dre);
        g_ageInSum.fetch_add(newAge, std::memory_order_relaxed);
        g_students.insertHead(new student_name(newAge));
        // use memory_order_relaxed avoiding affect folly memory order
        g_inserts.fetch_add(1, std::memory_order_relaxed);
    }
}

void drop_students()
{
    while (goOn.load(std::memory_order_relaxed))
    {
        g_students.dropHead(printOne);
    }
}

int main()
{
    std::vector<std::future<void>> insert_threads;
    for (int i = 0; i != PRODUCE_THREAD_NUM; ++i)
    {
        insert_threads.push_back(std::async(insert_students));
    }

    std::vector<std::future<void>> drop_threads;
    for (int i = 0; i != CONSUME_THREAD_NUM; ++i)
    {
        drop_threads.push_back(std::async(drop_students));
    }

    for (auto& item : insert_threads)
    {
        item.get();
    }

    goOn.store(std::memory_order_relaxed);

    for (auto& item : drop_threads)
    {
        item.get();
    }

    g_students.sweep(printOne);

    std::cout << "insert count1: " << g_inserts.load() << std::endl;
    std::cout << "drop count1: " << g_drops.load() << std::endl;
    std::cout << "print num1: " << g_printNum.load() << std::endl;

    std::cout << "age in1: " << g_ageInSum.load() << std::endl;
    std::cout << "age out1: " << g_ageOutSum.load() << std::endl;

    std::cout << std::endl;
}

 

我將我測試中的主要要點說一下:

(1)以上代碼,將printOne函數中的// delete t;前面的注釋符號(“//”)去掉。我在ubuntu測試結果正常。

(2)如果將main函數中的insert_threads.push_back(std::async(insert_students, i));改為insert_threads.push_back(std::async(std::launch::async, insert_students, i));,將main函數中的drop_threads.push_back(std::async(drop_students, i));改為drop_threads.push_back(std::async(std::launch::async, drop_students, i));,printOne函數中的// delete t;前面的注釋符號(“//”)去掉。在ubuntu下運行,會出現內存訪問的問題。(這個在多線程測試時值得注意)

(3)如果將(2)中的delete t注釋掉,則不會出現內存訪問的問題。

(4)如果PRODUCE_THTREAD_NUM依舊為10,而CONSUME_THREAD_NUM改為1,也不會出現內存訪問的問題。

(5)如果PRODUCE_THTREAD_NUM為1,而CONSUME_THREAD_NUM依舊為9,會出現內存訪問的問題。

(6)修改PRODUCE_THTREAD_NUM和PRODUCE_THTREAD_NUM,只要不是1改成非1,非1改成1,對結果沒有影響。

(7)如果大幅度減小HANDLE_NUM,例如改為20000,在PRODUCE_THREAD_NUM為10,CONSUME_THREAD_NUM也沒有出現內存訪問的問題。

說明:

由測試結果表明,新增函數(兩個函數測試效果相同)存在問題,不能適用於多線程取出。問題的原因,經過分析,應該是head_.compare_exchange_weak(oldHead, next(oldHead), std::memory_order_relaxed, std::memory_order_relaxed);(對於dropHead)和delete t之間的沖突,如果在該函數調用之前,對應節點已經被刪除,則會出現內存訪問的問題,在我看來,對於compare_exchange_weak失敗的情況,如果不用分析next(oldHead),則沒有任何問題,也就是說如果compare_exchange_weak使用類似於&&或者||的短路設計方式的話,以上代碼可以正常運行。但是compare_exchange_weak的實現,為標准庫提供,不能修改,所以如果想要使用以上的函數,則需要考慮以上的沖突。

 

2. 我之前嘗試寫上面的函數的目的是為了將節點均勻分配到不同的線程。我有一個設想, 沒有實際的代碼,各位可以參考一下,如果覺得可用,可以考慮實現。

dropHead的實現步驟如下:

(1)使用folly的sweepOnce函數,一次取出所有的節點。

(2)判斷sweepOnce中的head是否為nullptr,next(head)是否為nullptr,如果head不為nullptr,而且next(head)不為nullptr,則將_head與next(head)交換。

(3)將next(head)指向的節點隊列添加到_head指向的無鎖隊列中,處理head指向的節點。

以上的代碼,因為(2)中的操作步驟很少(兩個判斷),所以(3)中的插入應該也會很少。所以新增函數的負擔應該不大,甚至可以考慮(3)中的節點不再進行回插,總的來說,應該會使每個線程的處理量更加均勻。不過,考慮到線程可能會在(2)中被中斷,所以建議進行認真測試的情況下再使用。

 

如果有什么疑問,或者有什么好的想法,希望能夠告訴我。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM