C++任務隊列與多線程


摘要:

      很多場合之所以使用C++,一方面是由於C++編譯后的native code的高效性能,另一方面是由於C++優秀的並發能力。並行方式有多進程 和多線程之分,本章暫且只討論多線程,多進程方面的知識會在其他章節具體討論。多線程是開發C++服務器程序非常重要的基礎,如何根據需求具體的設計、分配線程以及線程間的通信,也是服務器程序非常重要的部分,除了能夠帶來程序的性能提高外,若設計失誤,則可能導致程序復雜而又混亂,變成bug滋生的溫床。所以設計、開發優秀的線程組件以供重用,無論如何都是值得的。

      線程相關的api並不復雜,然而無論是linux還是windows系統,都是c風格的接口,我們只需簡單的封裝成對象,方便易用即可。任務隊列是設計成用來進行線程間通信,使用任務隊列進行線程間通信設計到一些模式,原理並不難理解,我們需要做到是弄清楚,在什么場景下選用什么樣的模式即可。

任務隊列的定義:

      任務隊列對線程間通信進行了抽象,限定了線程間只能通過傳遞任務,而相關的數據及操作則被任務保存。任務隊列這個名詞可能在其他場景定義過其他意義,這里討論的任務隊列定義為:能夠把封裝了數據和操作的任務在多線程間傳遞的線程安全的先入先出的隊列。其與線程關系示意圖如下:

 
  clip_image001

      注:兩個虛線框分別表示線程A和線程B恩能夠訪問的數據邊界,由此可見 任務隊列是線程間通信的媒介。

任務隊列的實現:

任務的定義

      生產者消費者模型在軟件設計中是極其常見的模型,常常被用來實現對各個組件或系統解耦合。大到分布式的系統交互,小到網絡層對象和應用層對象的通訊,都會應用到生產者消費者模型,在任務隊列中,生產和消費的對象為“任務”。這里把任務定義為組合了數據和操作的對象,或者簡單理解成包含了void (void*) 類型的函數指針和void* 數據指針的結構。我們把任務定義成類task_t,下面來分析一下task_t的實現。

插入代碼:

class task_impl_i
{
public:
    virtual ~task_impl_i(){}
    virtual void run()          = 0;
    virtual task_impl_i* fork() = 0;
};

class task_impl_t: public task_impl_i
{
public:
    task_impl_t(task_func_t func_, void* arg_):
        m_func(func_),
        m_arg(arg_)
    {}

    virtual void run()
    {
        m_func(m_arg);
    }

    virtual task_impl_i* fork()
    {
        return new task_impl_t(m_func, m_arg);
    }

protected:
    task_func_t m_func;
    void*       m_arg;
};

struct task_t
{
    static void dumy(void*){}
    task_t(task_func_t f_, void* d_):
        task_impl(new task_impl_t(f_, d_))
    {
    }
    task_t(task_impl_i* task_imp_):
        task_impl(task_imp_)
    {
    }
    task_t(const task_t& src_):
        task_impl(src_.task_impl->fork())
    {
    }
    task_t()
    {
        task_impl = new task_impl_t(&task_t::dumy, NULL);
    }
    ~task_t()
    {
        delete task_impl;
    }
    task_t& operator=(const task_t& src_)
    {
        delete task_impl;
        task_impl = src_.task_impl->fork();
        return *this;
    }
    
    void run()
    {
        task_impl->run();
    }
    task_impl_i*    task_impl;
};

      Task最重要的接口是run,簡單的執行保存的操作,具體的操作保存在task_impl_i的基類中,由於對象本身就是數據加操作的集合,所以構造task_impl_i的子類對象時,為其賦予不同的數據和操作即可。這里使用了組合的方式實現了接口和實現的分離。這么做的優點是應用層只需知道task的概念即可,對應task_impl_i不需要了解。由於不同的操作和數據可能需要構造不同task_impl_i子類,我們需要提供一些泛型函數,能夠將用戶的所有操作和數據都能輕易的轉換成task對象。task_binder_t 提供一系列的gen函數,能夠轉換用戶的普通函數和數據為task_t對象。

struct task_binder_t
{
    //! C function
    
    static task_t gen(void (*func_)(void*), void* p_)
    {
        return task_t(func_, p_);
    }
    template<typename RET>
    static task_t gen(RET (*func_)(void))
    {
        struct lambda_t
        {
            static void task_func(void* p_)
            {
                (*(RET(*)(void))p_)();
            };
        };
        return task_t(lambda_t::task_func, (void*)func_);
    }
    template<typename FUNCT, typename ARG1>
    static task_t gen(FUNCT func_, ARG1 arg1_)
    {
        struct lambda_t: public task_impl_i
        {
            FUNCT dest_func;
            ARG1  arg1;
            lambda_t(FUNCT func_, const ARG1& arg1_):
                dest_func(func_),
                arg1(arg1_)
            {}
            virtual void run()
            {
                (*dest_func)(arg1);
            }
            virtual task_impl_i* fork()
            {
                return new lambda_t(dest_func, arg1);
            }
        };
        return task_t(new lambda_t(func_, arg1_));
生產任務

      函數封裝了用戶的操作邏輯,需要在某線程執行特定操作時,需要將操作對應的函數轉換成task_t,投遞到目的線程對應的任務隊列。任務隊列使用起來雖然像是在互相投遞消息,但是根本上仍然是共享數據式的數據交換方式。主要步驟如下:

l 用戶函數轉換成task_t對象

l 鎖定目的線程的任務隊列,將task_t 放到任務隊列尾,當隊列為空時,目的線程會wait在條件變量上,此時需要signal喚醒目的線程

實現的關鍵代碼如下:

void produce(const task_t& task_)
    {        
        lock_guard_t lock(m_mutex);
        bool need_sig = m_tasklist.empty();

        m_tasklist.push_back(task_);
        if (need_sig)
        {
            m_cond.signal();
        }
    }
消費任務

消費任務的線程會變成完全的任務驅動,該線程只有一個職責,執行任務隊列的所有任務,若當前任務隊列為空時,線程會阻塞在條件變量上,重新有新任務到來時,線程會被再次喚醒。實現代碼如下:

int   consume(task_t& task_)
    {
        lock_guard_t lock(m_mutex);
        while (m_tasklist.empty())
        {
            if (false == m_flag)
            {
                return -1;
            }
            m_cond.wait();
        }

        task_ = m_tasklist.front();
        m_tasklist.pop_front();

        return 0;
} 
int run()
    {
        task_t t;
        while (0 == consume(t))
        {
            t.run();
        }
        return 0;
    }

任務隊列的模式

單線程單任務隊列方式

任務隊列已經提供了run接口,綁定任務隊列的線程只需執行此函數即可,此函數除非用戶顯示的調用任務隊列的close接口,否則run函數永不返回。任務隊列的close接口是專門用來停止任務隊列的工作的,代碼如下:

void close()
    {
        lock_guard_t lock(m_mutex);
        m_flag = false;
        m_cond.broadcast();
}

首先設置了關閉標記,然后在條件變量上執行broadcast, 任務隊列的run函數也會由此退出。在回頭看一下run接口的代碼你會發現,檢查任務隊列是否關閉(m_flag 變量)的代碼是在任務隊列為空的時候才檢測的,這樣能夠保證任務隊列被全部執行后,run函數才返回。

下面是一個使用任務隊列的helloworld的示例:

class foo_t
{
public:
    void print(int data)
    {
        cout << "helloworld, data:" <<data << " thread id:"<< ::pthread_self() << endl;
    }
    void print_callback(int data, void (*callback_)(int))
    {
        callback_(data);
    }
    static void check(int data)
    {
        cout << "helloworld, data:" <<data << " thread id:"<< ::pthread_self() << endl;
    }
};

//  單線程單任務隊列
void test_1()
{
    thread_t thread;
    task_queue_t tq;

    thread.create_thread(task_binder_t::gen(&task_queue_t::run, &tq), 1);

    foo_t foo;
    for (int i = 0; i < 100; ++i)
    {
        cout << "helloworld, thread id:"<< ::pthread_self() << endl;
        tq.produce(task_binder_t::gen(&foo_t::print, &foo, i));
        sleep(1);
    }
    thread.join();
}
int main(int argc, char* argv[])
{
    test_1();
    return 0;
}

本例使用單線程單任務隊列的方式,由於只有一個線程綁定在任務隊列上,所以任務的執行會嚴格按照先入先出的方式執行。優點是能夠保證邏輯操作的有序性,所以最為常用。

多線程多任務隊列方式

如果想利用更多線程,那么創建更多線程的同時,仍然保證每個任務隊列綁定在單線程上。讓不同的任務隊列並行執行就可以了。

下面幾種情況適用此模式:

l 比如網游中數據庫一般會創建連接池,用戶的操作數據庫都是有數據庫線程池完成,在將結果投遞給邏輯層。對每個用戶的數據增刪改查操作都必須是有序的,所以每個用戶綁定一個固定的任務隊列。而不同的用戶的數據修改互不干擾,不同的用戶分配不同的任務隊列即可。

l 比如網絡層中的多個socket的讀寫是互不干擾的,可以創建兩個或更多線程,每個對應一個任務隊列,不同的socket的操作可以隨機的分配一個任務隊列(注意分配是隨機的,一旦分配了,單個socket的所有操作都會由這個任務隊列完成,保證邏輯有序性)。

示例代碼:

//! 多線程多任務隊列
void test_2()
{
    thread_t thread;
    task_queue_t tq[3];

    for (unsigned int i = 0; i < sizeof(tq)/sizeof(task_queue_t); ++i)
    {
        thread.create_thread(task_binder_t::gen(&task_queue_t::run, &(tq[i])), 1);
    }

    foo_t foo;
    cout << "helloworld, thread id:"<< ::pthread_self() << endl;
    for (unsigned int j = 0; j < 100; ++j)
    {
        tq[j % (sizeof(tq)/sizeof(task_queue_t))].produce(task_binder_t::gen(&foo_t::print, &foo, j));
        sleep(1);
    }
    thread.join();
}
多線程單任務隊列方式

有時候可能並不需要邏輯操作的完全有序,而是要求操作盡可能快的執行,只要有空閑線程,任務就投遞到空閑線程立刻執行。如果時序不影響結果,這種模式會更有效率,下面幾種情況可能用到這種模式:

l 比如social game中的好友是從platform的api獲取的,需要http協議通訊,若采用curl等http庫同步通訊時,會阻塞線程,這是可以使用多線程單隊列方式,請求投遞到任務隊列后,只要有空閑線程立馬執行,用戶A雖然比用戶B先到達任務隊列,但是並不能保證A比B一定先獲取到好友列表,如果A有2k好友,而B只有兩個呢,當然有可能B請求更快。

//! 多線程單任務隊列
void test_3()
{
    thread_t thread;
    task_queue_t tq;

    thread.create_thread(task_binder_t::gen(&task_queue_t::run, &tq), 3);

    foo_t foo;
    cout << "helloworld, thread id:"<< ::pthread_self() << endl;
    for (unsigned int j = 0; j < 100; ++j)
    {
        tq.produce(task_binder_t::gen(&foo_t::print, &foo, j));
        sleep(1);
    }
    thread.join();
}

任務隊列的高階用法

異步回調

任務隊列的模式中列舉的例子都是線程間單項通訊,線程A將請求投遞給了B,但B執行完畢后A並沒有檢測結果。實際中往往都是需要將執行結果進行額外處理或者投遞到另外任務隊列。異步回調可以很好的解決這個問題,原理就是投遞任務時,同時包含檢查任務執行結果的函數。示例代碼:

//! 異步回調
void test_4()
{
    thread_t thread;
    task_queue_t tq;

    thread.create_thread(task_binder_t::gen(&task_queue_t::run, &tq), 1);

    foo_t foo;
    cout << "helloworld, thread id:"<< ::pthread_self() << endl;
    for (unsigned int j = 0; j < 100; ++j)
    {
        tq.produce(task_binder_t::gen(&foo_t::print_callback, &foo, j, &foo_t::check));
        sleep(1);
    }
    thread.join();
}

異步是性能優化非常重要的手段,下面如下場合可以使用異步:

l 服務器程序要求很高的實時性,幾乎邏輯層不執行io操作,io操作通過任務隊列被io線程執行成功后再通過回調的方式傳回邏輯層。

l 網游中用戶登錄,需呀從數據庫載入用戶數據,數據庫層不需要知曉邏輯層如何處理用戶數據,當接口被調用時必須傳入回調函數,數據庫層載入數據后直接調用回調函數,而數據作為參數。

隱式任務隊列

使用任務隊列可以解耦多線程的設計。更加優秀的使用是將其封裝在接口之后。前邊的例子中都是顯示的操作了任務隊列對象。但這就限制了用戶必須知道某個接口需要綁定哪個任務隊列上,尤其是多線程多任務隊列的例子,如果當用戶操作socket接口時還要知道socket對應哪個任務隊列就顯得不夠優雅了。Socket自己本身可以保存對應任務隊列的引用,這樣使用者只需調用socket的接口,而接口內部再將請求投遞到爭取的任務隊列。示例代碼:

void socket_impl_t::async_send(const string& msg_)
{
    tq.produce(task_binder_t::gen(&socket_impl_t::send, &this, msg_));
}
void socket_impl_t::send(const string& msg_)
{
    //do send code
}

總結:

l 設計多線程程序時,往往設計使用任務隊列是關鍵,好用、高效、靈活的任務隊列組件十分必需,本節介紹的實現支持多種多線程模式,易用易理解。

l 異步回調在多線程程序中非常常見,異步往往是為了提高性能和系統吞吐量的,但是異步其不可避免的會帶來復雜性,所以盡量保證異步相關的步驟簡單。

l 任務隊列封裝對象接口的內部更佳,使用者直接調用接口,仿佛沒有任務隊列這回事,讓他在看不見的地方默默運行。

l 本節設計的任務隊列是線程安全的,並且關閉時已經投遞的任務能夠保證被 。

代碼:http://code.google.com/p/ffown/source/browse/trunk/#trunk%2Ffflib%2Finclude

相關連接

  1. 文檔 http://h2cloud.org
  2. 源碼 https://github.com/fanchy/h2engine
  3. 介紹 http://www.cnblogs.com/zhiranok/p/ffengine.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM