從零開始山寨Caffe·肆:線程系統


不精通多線程優化的程序員,不是好程序員,連碼農都不是。

                          ——並行計算時代掌握多線程的重要性

線程與操作系統

用戶線程與內核線程  

廣義上線程分為用戶線程和內核線程。

前者已經絕跡,它一般只存在於早期不支持多線程的系統中。

它用模擬的方式實現一個模擬的多線程系統,不支持異步。

即,一個線程被阻塞了,其它線程也會被阻塞。

當今的操作系統幾乎都默認提供了內核線程API,底層由操作系統實現。

內核線程的好處在於,它們之間支持異步,是"真"多線程。

操作系統的流氓軟件

不過,內核線程也給線程的使用帶來了操作系統捆綁性。

不同操作系統平台,其內核線程的實現與提供的API不同,給跨平台帶來麻煩。

比如在Windows上,MFC就是封裝了Windows內核線程。

在Linux上,廣泛使用的pThread就是POSIX系列系統流傳下來的內核線程。

第三方跨平台內核線程庫

有幸的是,歷史上有許多跨平台的項目庫。

我最早知道是Qt,GTK,這倆個比較特殊,因為它們是Application Framework。

是在90年代左右,C++為了對抗Java等后期開發之秀,而專門寫成的跨平台C++庫。

主要以GUI為作戰武器,對抗Java。

Boost庫同樣提供了優秀了內核線程庫,還是跨平台的。

所以Caffe移植到Windows,是不需要改動線程系統的。

何以線程用?

生產者與消費者

生產者與消費者是一個經典的資源分配問題。

它的核心要點主要體現在兩方面:

①阻塞

②臨界

首先看①:

為什么需要阻塞?因為生產者比較快,消費者比較慢。

一次消費過程,包括整個正向傳播和反向傳播,這需要不少的時間。

而一次生產過程,就是對一個Batch數據的預緩沖,這不需要很多時間。

生產者總不能一直生產下去,然后爆掉緩沖區吧?

所以,生產者在檢測到緩沖區滿了之后,就要進入阻塞狀態。

那么問題來了,如果我們不用多線程,將阻塞代碼放在主進程中執行,會怎么樣?

讀取,阻塞,前向傳播失敗,反向傳播不可能,死鎖。

這是為什么I/O代碼需要多線程處理的根本原因。

再來看②:

為什么需要臨界?因為生產者和消費者之間必然是異步的,換言之,存在於不同線程。

證明如下:

生產者有一定概率被阻塞,假設生產者和消費者在同一個線程實體中,那么必然觸發死鎖。

又因為系統執行必然不應該有死鎖,所以假設不成立。

結論:生產者和消費者之間必然是異步的。

異步臨界資源問題,則需要加鎖(Mutex)來維持資源的一致性,具體在第陸章中詳解。

破除因果律

多線程程序設計的核心原則就是:將非因果連續的代碼,並行化。

也就是說,只要代碼前后不是上下文相關的,都能夠並行執行。

那么Caffe的I/O模型中,有哪幾處不是上下文相關的?

答案有二:

①Datum和Blob(Batch)不是上下文相關的。

Blob包含着正向傳播的shape信息,這些信息只有初始化網絡在初始化時才能確定。

而Datum則只是與輸入樣本有關。

所以,Datum的讀取工作可以在網絡未初始化之前就開始,這就是DataReader采用線程設計的內涵。

同時,這種不相關性,也為生產者和消費者對於臨界資源訪問的設計埋下伏筆。

 

②GPU之間不是上下文相關的。

Caffe的多GPU方案,是讓不同GPU覆蓋不同段的數據,最后不在網絡結構上做融合。

這點和AlexNet略有不同(AlexNet兩個GPU的網絡結構最后交叉了)

這樣的多GPU方案,使得每個GPU至少存在一個DataReader,覆蓋不一樣的數據段。

在網絡結構上,通過共享root網絡即可,如圖所示:

上圖是一個經典的多GPU流水線編程方案。3個GPU擁有各自的DataReader,但是共享所有Layer。

GPU0由主進程控制,GPU1由線程1控制,GPU2由線程2控制。

Caffe在主機端,也就是CPU主進程和次線程,每個Layer的前向傳播被一個mutex鎖住,而反向傳播卻沒有。

這樣,盡管主進程、線程1、線程2是並行調用Layer.Foward(),但不能同時訪問同一Layer,此時Layer為互斥臨界資源。

這種行為會構造出一個人工的流水線,比如:

GPU0在Conv1時,GPU1、GPU2會被鎖住。

GPU0在Conv3時,Conv1和Conv2是空閑的,會被其它GPU占用。

反向傳播之所以不鎖,是因為前向傳播和反向傳播是符合因果律的,前向傳播成流水線,

反向傳播自然也是流水線,非常優美的設計。

影分身之術

俗話說,一個好漢三個幫。

本篇所述的多線程,均指的是CPU多線程。

實際上,由於GPGPU的異構計算引入,CPU線程基本都在做一些后勤工作。

主要是數據的讀取、與GPU顯存的數據交換。

CPU主進程負責全程調度GPU執行計算代碼,在這點上,CPU利用率並不高。

而從線程對數據的預緩沖任務也不是很艱難。

所以,相對於計算密集型CPU多線程設計而言,Caffe的多線程任務相對輕松。

我們很難將CPU的利用率榨到100%,在這點上,為深度學習Online應用系統埋下伏筆。

試想一下,在后台構建一個基於Socket的深度學習應用服務器,同時CPU並發線程可達幾千,

我覺得只有這樣,才能真正榨干CPU計算力。至於Caffe的訓練,其實對CPU的要求不是很高。

代碼實戰

Boost線程的創建

使用boost::thread,  需要#include "boost/thread/thread.hpp"

與Qt、MFC等Application Framework提供的線程庫不同,

boost::thread的封裝性比較強,一般不建議繼承和改寫boost::thread類(沒見過有人這么用)

為了線程能夠執行自定義代碼,需要在其構造時,傳入執行函數的函數指針。

函數指針分為兩類:

①普通函數指針

②類成員函數指針

boost::function結合bind函數提供了一個函數指針的載體。(style1)

也可以直接將函數指針的構造方式傳入thread。(style2)

建議配合boost::shared_ptr使用。(style3)

若是普通函數指針,可用:

//    style 1
void helloworld(int a,string b);
boost::function<void()> f=bind(helloworld,1,"helloworld");
boost::thread(f);

//    style 2
boost::thread(helloworld,1,"helloworld")

//    style 3(Caffe style)
boost::shared_ptr<boost::thread> thread;
thread.reset(new boost::thread(helloworld,1,"helloworld"));

當然,為了系統的開發,我們顯然需要一個封裝類,如將boost::thread封裝為DragonThread類。

即,將boost::shared_ptr<boost::thread> thread作為類成員。

基於類的函數指針綁定需要傳入類this指針,寫法做如下更改:

class DragonThread{
void helloworld(int a,string b);
};

//    style 1
 boost::function<void()> f=bind(&DragonThread::helloworld,this,1,"helloworld");
boost::thread(f);

//    style 2
boost::thread(&DragonThread::helloworld,this,1,"helloworld");

//    style 3(Caffe style)
boost::shared_ptr<boost::thread> thread;
thread.reset(new boost::thread(&DragonThread::helloworld,this,1,"helloworld"));

Boost線程的生與死

boost::thread一旦被構造后,就會立刻以異步的方式執行傳入的函數。

在debug線程的過程中,注意boost::thread將晚於主進程的代碼的執行。

如果不在main函數中循環等待,很有可能boost::thread還沒有執行,main函數已經退出了。

 

想要立刻終結一個boost線程是不可能的,一些強大的Application Framework的線程庫

通常會提供thread.terminate(),來立刻終結線程的執行(比如Qt),但是boost沒有提供。

因為這種方式是相當不安全的,在Java設計模式中,更鼓勵開發者讓線程函數自動檢測終結條件而退出。

這種檢測函數在Caffe里是must_stop()函數,它使用了boost::thread提供的中斷點檢測功能。

bool DragonThread::must_stop(){
    return boost::this_thread::interruption_requested();
}

注意,中斷請求的檢測,只能在異步線程執行函數中執行,主進程從外部調用是無效的。

主進程可以從外部觸發interrupt操作,通知正在異步執行的線程,該方法封裝為stopThread函數:

void DragonThread::stopThread(){
    if (is_start()){
        thread->interrupt();
    }
    try{thread->join();}
    catch (boost::thread_interrupted&) {}
    catch (std::exception& e){ LOG(FATAL) << "Thread exception: " << e.what(); }
}

有時候,interrupt的線程可能處於阻塞睡眠狀態,我們需要從外部立即喚醒它,讓其檢測中斷請求。

所以在interrupt操作后,需要立即后接join操作。最后,還可以選擇性地補上異常檢測。

數據結構

建立dragon_thread.hpp。

class DragonThread
{
public:
    DragonThread() {}
    virtual ~DragonThread();
    void initializeThread(int device, Dragon::Mode mode, int rand_seed, int solver_count, bool root_solver);
    void startThread();
    void stopThread();
    //the interface implements for specific working task 
    virtual void interfaceKernel() {}
    bool is_start();
    bool must_stop();
    boost::shared_ptr<thread> thread;
};

在第叄章,我們提到了全局管理器是線程獨立的,因此每一個dragon線程,

需要從主管理器轉移一些參數,包括(GPU設備、計算模式、隨機種子、root_solver&solver_count)

成員函數包括:

boost::thread的傳入函數initializeThread,這個函數里最后又嵌套了interfaceKernel

前者負責轉移參數,后者默認是一個空函數,你也可以寫成純虛函數。

由於boost::thread沒有繼承的用法,所以Caffe二度封裝以后,提供了這種用法。

所有繼承DragonThread的類,只要重載interfaceKernel()這個虛函數就行了。

startThread應該最先被執行,它包括獲取主進程管理器參數,以及構造thread。

由於thread構造結束,就會立刻執行,所以startThread不負其名,就是啟動了線程。

stopThread的功能如上所述。

實現

建立dragon_thread.cpp。

首先是thread的傳入函數initializeThread:

void DragonThread::initializeThread(int device, Dragon::Mode mode, int rand_seed, int solver_count, bool root_solver){
#ifndef CPU_ONLY
    CUDA_CHECK(cudaSetDevice(device));
#endif
    Dragon::set_random_seed(rand_seed);
    Dragon::set_mode(mode);
    Dragon::set_solver_count(solver_count);
    Dragon::set_root_solver(root_solver);
    interfaceKernel();  //do nothing
}

然后是外部調用的startThread函數:

void DragonThread::startThread(){
    CHECK(!is_start());
    int device = 0;
#ifndef CPU_ONLY
    CUDA_CHECK(cudaGetDevice(&device));
#endif
    Dragon::Mode mode = Dragon::get_mode();
    unsigned int seed = Dragon::get_random_value();
    int solver_count = Dragon::get_solver_count();
    bool root_solver = Dragon::get_root_solver();
    try{
        thread.reset(new boost::thread(&DragonThread::initializeThread,
                            this, device, mode, seed, solver_count, root_solver));
    }
    catch (std::exception& e){ LOG(FATAL) << "Thread exception: " << e.what(); }
}

由於該函數是在主進程中執行,Dragon::get()與initializeThread里的Dragon::set()

操作的其實不是同一個全局管理器,所以需要這樣麻煩的轉移參數過程。

最后是線程控制與析構:

void DragonThread::stopThread(){
    if (is_start()){
        thread->interrupt();
    }

    try{thread->join();}
    catch (boost::thread_interrupted&) {}
    catch (std::exception& e){ LOG(FATAL) << "Thread exception: " << e.what(); }
}

bool DragonThread::is_start(){
    return thread&&thread->joinable();
}

bool DragonThread::must_stop(){
    return boost::this_thread::interruption_requested();
}

DragonThread::~DragonThread(){
    stopThread();
}

完整代碼

dragon_thread.hpp:

https://github.com/neopenx/Dragon/blob/master/Dragon/include/dragon_thread.hpp

dragon_thread.cpp:

https://github.com/neopenx/Dragon/blob/master/Dragon/src/dragon_thread.cpp


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM