你說你學過操作系統這門課?寫個無Bug的生產者和消費者模型試試!
——你真的學好了操作系統這門課嘛?
在第壹章,展示過這樣圖:
其中,左半部分構成了新版Caffe最惱人、最龐大的IO系統。
也是歷來最不重視的一部分。
第伍章又對左半部分的獨立性進行了分析,我是這么描述到:
Datum和Blob(Batch)不是上下文相關的。
Blob包含着正向傳播的shape信息,這些信息只有初始化網絡在初始化時才能確定。
而Datum則只是與輸入樣本有關。
所以,Datum的讀取工作可以在網絡未初始化之前就開始,這就是DataReader采用線程設計的內涵。
所以,左半部分又可以分為左左半部分,和左右半部分。
阻塞隊列
生產者與消費者
第伍章講到,在一個機器學習系統中,生產者和消費者的執行周期是不一樣的。
為了平衡在周期上的差異,節約計算資源,我們顯然需要對生產者做一定限制。
存儲生產資源,可以用數組,也可以用STL容器。
再考慮生產者和消費者的行為:
①不存在隨機訪問:
顯然,消費者是按照固定順序訪問緩沖區的。
我們沒有必要考慮隨機訪問的情況。
②不存在隨機寫入:
顯然,生產者每次只需要將資源放置於緩沖區兩端。
我們沒有必要考慮在線性表中間位置寫入的情況。
由於vector底層由順序表實現,其訪問速度隨着元素數量的遞增而遞減,
而queue底層由鏈式表實現,其訪問速度不隨元素數量的遞增而遞減,且沒有隨機寫入/訪問的情況。
所以,選擇queue作為緩沖區是比較優異的。
為了限制生產者的行為,我們需要在STL提供的queue基礎上,改進出一種新的數據結構——Blocking Queue。
互斥鎖
第肆章簡單提到了mutex問題,這是阻塞隊列除了Blocking之外,需要考慮的第二大問題。
並且已經證明了:生產者和消費者之間必然是異步的。
我們以隊列的push和pop操作為例,分析一下,為什么在多線程情況下,需要加mutex。
假設線程A預備執行push操作,所以它是一個生產者;
假設線程B預備執行pop操作,所以它是一個消費者;
設有臨界緩沖區隊列Q,在某時刻T,線程A發出push操作,在T+1時候,線程B發出pop操作,
且push需要10個單位時間,pop只需要一個單位時間,問T+2時刻,pop出去的資源你敢用嘛?
顯然,沒人敢用這個執行push的半成品。
發生上述問題的症結在於,兩個異步線程對於同一個資源,產生了爭奪行為。
解決方案就是:在push時,鎖住資源,禁止pop;在pop時,鎖住資源,禁止push。
廣義上,我們可以認為,需要將push和pop函數變成原子函數,即:執行期間不可中斷的函數。
———————————————————————————————————————————————————————————
另外,需要注意的是,mutex與blocking是兩個概念。
在廣義上,mutex會將多個線程對同一個資源的異步並行操作,拉成一個串行執行隊列,串行等待執行。
而blocking則是將線程休眠,CPU會暫時放棄對其控制。
在程序員界,雖然有時候會把mutex和blocking都稱為阻塞,但其原理和內涵是完全不同的。
———————————————————————————————————————————————————————————
boost提供不俗的mutex功能,使用前需要 #include "boost/thread/mutex.hpp"
你可以將一個boost::mutex對象嵌入到一個類當中,這樣,允許每一個類對象擁有一把鎖。
由於對一個queue對象,主要是鎖住來自該對象的push和pop操作,
所以,mutex理應當是以類對象為一個單位的,參考代碼如下:
template <typename T> class BlockingQueue{ public: void push(const T& t){ boost::mutex::scoped_lock lock(mutex); Q.push(t); } T pop(){ boost::mutex::scoped_lock lock(mutex); T t = Q.front(); Q.pop(); return t; } private: boost::mutex mutex; queue<T> Q; };
boost::mutex::scoped_lock lock提供局部鎖定功能。
它與boost::scoped_ptr有類似的效果,scoped_ptr在作用域結束后,就立即釋放對象。
而scoped_lock在作用域結束后,會立即解鎖,如果不用scoped_lock,我們可以這么寫:
void push(const T& t){ mutex.lock(); Q.push(t); mutex.unlock(); }
條件阻塞與激活
前面幾章說了那么久的阻塞,其中大部分指的應該是blocking。
mutex大部分情況下,都只是在鎖一個局部函數,阻塞周期非常短。
唯一的例外是Layer的正向傳播函數forward,mutex鎖住的周期非常長。
blocking和mutex的唯一不同在於:
blocking之后,操作系統會唆使CPU放棄對線程的處理。
這是非常危險的一個行為,因為該線程被家長趕去睡覺了,而且不能反抗家長的命令。
除非家長通知它:噢,你可以活動了。在此之前,該線程將永遠處於無效狀態。
上面的例子有兩個重點:
①CPU放棄線程
②不可主動激活
既然如此,為了激活這個線程,模型就必須設計成“對偶模型”,而生產者和消費者,恰恰正是對偶的。
———————————————————————————————————————————————————————————
boost::condition_variable提供了簡單的blocking功能,為了統一控制,可以將其與mutex捆在一起:
template <typename T> class BlockingQueue { public: class Sync{ public: boost::mutex mutex; boost::condition_variable condition; }; private: queue<T> Q; boost::shared_ptr<Sync> sync; };
現在考慮一下,何時需要注銷、阻塞一個線程,大致有兩種情況:
①緩沖區空,此時消費者不能消費,拒絕pop操作之后,可以交出CPU控制權。
②緩沖區滿,此時生產者不能生產,拒絕push操作之后,可以交出CPU控制權。
為了激活彼此,就需要模型是對偶的:
①經歷緩沖區空之后,突然push了一個元素,此時應當由生產者激活消費者線程。
②經歷緩沖區滿之后,突然pop了一個元素,此時應當由消費者激活生產者線程。
看起來,我們可以將代碼寫成這樣:
void BlockingQueue<T>::push(const T& t){ boost::mutex::scoped_lock lock(sync->mutex); while (Q.full()){ sync->condition.wait(lock); //suspend, spare CPU clock } Q.push(t); sync->condition.notify_one(); } template<typename T> T BlockingQueue<T>::pop(const string& log_waiting_msg){ boost::mutex::scoped_lock lock(sync->mutex); while (Q.empty()){ sync->condition.wait(lock); //suspend, spare CPU clock } T t = Q.front(); Q.pop(); sync->condition.notify_one(); return t; }
其中,sync->condition.wait(lock)表示使用當前mutex為標記,交出CPU控制權。
sync->condition.notify_one()則表示激活一個線程的CPU控制權。
可以看到,blocking和activating的代碼是完全對偶的,blocking自己,activating對方。
雙阻塞隊列
上節代碼是不可能實現的,因為沒有Q.full()這個函數。
在傳統生產者、消費者程序中,通常會使用單緩沖隊列。
使用單緩沖隊列是沒有問題的,因為在這種簡單的代碼結構中,我們很容易知道緩沖隊列的上界。
比如,設定緩沖隊列大小為20,在編程中,可以通過檢測 if(count==20)來達到。
當代碼結構復雜時,比如,緩沖隊列大小變量通常在非常上層上層上層的位置,而處於底層的緩沖隊列,
是無法探知何謂“緩沖隊列滿”的含義的,這就為編程帶來很大的難題。
———————————————————————————————————————————————————————————
解決方案是,使用雙緩沖隊列組方案,我們設定兩個阻塞隊列,一個叫free,一個叫full。
兩者組成一個QueuePair:
class QueuePair{ public: QueuePair(const int size); ~QueuePair(); BlockingQueue<Datum*> free; // as producter queue BlockingQueue<Datum*> full; // as consumer queue };
為了避免檢測緩沖隊列的上界,我們可以先放置與上界數量等量的空元素指針到free隊列。
每次生產者生產時,從free隊列中pop一個空Datum元素,填充,再扔進full隊列。
這樣,BlockingQueue的push操作就不需要檢測上界了。
原理很簡單,生產者想要push,之前必須pop,pop可以通過檢測緩沖隊列空來實現。
這樣,就用檢測一個緩沖隊列的空,模擬且替代了檢測另一個緩沖隊列的滿。
對於上層代碼而言,我們僅僅需要預先填充N個元素至free隊列中即可,非常方便。
這部分是DataReader的設計核心。
代碼實戰
★數據結構
———————————————————————————————————————————————————————————
建立blocking_queue.hpp。

template <typename T> class BlockingQueue { public: BlockingQueue(); void push(const T& t); T pop(const string& log_waiting_msg=""); T peek(); size_t size(); // try_func return false when need blocking // try_func for destructor bool try_pop(T* t); bool try_peek(T* t); class Sync{ public: boost::mutex mutex; boost::condition_variable condition; }; private: queue<T> Q; boost::shared_ptr<Sync> sync; };
除了push和pop之外,追加隊列第三個常用操作——peek。
peek目的是取出隊首元素,但是不從隊列里pop掉。
peek用於實驗性讀取Datum,為DataTransfomer初始化所用。
除了通過返回值之外獲取之外,我們還要准備try系列函數。
try除了獲取元素外,同時返回一個bool值,表明成功或者失敗。
主要用於對Datum的析構,這也是所有代碼里,唯一一處對protobuff數值的析構。
★實現
———————————————————————————————————————————————————————————
建立blocking_queue.cpp。
整體代碼沒有什么好說的,細節以及在上文講解了。

template<typename T> BlockingQueue<T>::BlockingQueue() :sync(new Sync()) {} template<typename T> void BlockingQueue<T>::push(const T& t){ // function_local mutex and unlock automaticly // cause another thread could call pop externally // when this thread is calling push pop&peer at the same time boost::mutex::scoped_lock lock(sync->mutex); Q.push(t); // must wake one opposite operation avoid deadlock // formula: wait_kind_num = notify_kind_num // referring Producter-Consumer Model and it's semaphore setup method sync->condition.notify_one(); } template<typename T> T BlockingQueue<T>::pop(const string& log_waiting_msg){ boost::mutex::scoped_lock lock(sync->mutex); while (Q.empty()){ if (!log_waiting_msg.empty()){ LOG_EVERY_N(INFO, 1000) << log_waiting_msg; } sync->condition.wait(lock); //suspend, spare CPU clock } T t = Q.front(); Q.pop(); return t; } template<typename T> T BlockingQueue<T>::peek(){ boost::mutex::scoped_lock lock(sync->mutex); while (Q.empty()) sync->condition.wait(lock); T t = Q.front(); return t; } template<typename T> bool BlockingQueue<T>::try_pop(T* t){ boost::mutex::scoped_lock lock(sync->mutex); if (Q.empty()) return false; *t = Q.front(); Q.pop(); return true; } template<typename T> bool BlockingQueue<T>::try_peek(T* t){ boost::mutex::scoped_lock lock(sync->mutex); if (Q.empty()) return false; *t = Q.front(); return true; } template<typename T> size_t BlockingQueue<T>::size(){ boost::mutex::scoped_lock lock(sync->mutex); return Q.size(); }
模板實例化
在第壹章,我們提到了INSTANTIATE_CLASS(classname)宏的作用。
本段將重點解釋,出現在blocking_queue.cpp最后的實例化代碼。
模板機制與編譯空間
template<typename T>可以說是整個Caffe里出現頻率最高的代碼了。
C++編譯器有個好玩的特性,就是對於在cpp文件里出現的模板定義代碼,
只檢查最基本的語法錯誤,比如標點符號之類的。甚至你把變量名拼錯了,編譯仍然能通過。
所以,我在最初山寨Caffe的時候,寫了一堆錯誤的代碼,編譯器都沒告訴我。
后來在醫院體檢時,偶然轉了幾圈,大概猜到了編譯器應該是為模板代碼開了獨立的編譯檢查空間。
為了便於理解,參考圖如下:
由於C/C++是強類型檢查語言,類型檢查處於編譯先鋒位置。
而未確定類型的模板定義代碼,將不會進行大部分詞法分析、語法分析、語義分析。
頭文件與源文件
奇怪的是,如果你將模板定義代碼寫在頭文件里,那么它就會被上升到普通編譯空間。
原理大致如下:
編譯器不會對未include的頭文件進行最終編譯。
這意味着,如果你要使用一個模板類型,比如A<int> a;
必然處於include下,此時必然是指定類型的,編譯器就不必將代碼push到模板空間。
或者,存在一種轉移,編譯器將定義代碼由模板空間轉到普通空間,進行下一步分析。
然而,如果我們將模板定義代碼寫在源文件A.cpp里,然后在B.cpp里,使用A<int> a,
此時編譯器應該去哪里找模板類A的定義代碼?按照編譯鏈追溯,應該是到A.hpp里,
再由A.hpp,找到A.cpp。
這種思路在模板定義於A.cpp是不可能實現的,如圖所示:
這是兩種空間本質區別,由於模板空間的分析沒有結束,C++不會讓你由hpp找到cpp中的定義代碼的。
實例化
為了能讓編譯A.cpp時,從模板空間遷移到普通空間,我們必須為其提供明確的類型。
比如在blocking_queue.cpp的結尾,你應該添加以下代碼:
template class BlockingQueue<Batch<float>*>; template class BlockingQueue<Batch<double>*>; template class BlockingQueue < Datum* > ; template class BlockingQueue < boost::shared_ptr<QueuePair> > ;
這四行代碼枚舉了BlockingQueue中可能出現的所有具體類型,此時編譯器才會對A.cpp進行完整的編譯。
在common.hpp中的實例化宏則要簡單的多,
#define INSTANTIATE_CLASS(classname) \ template class classname<float>; \ template class classname<double>
該宏用於Blob、Layer、Net和Solver四大數據結構,因為它們的類型,除了float,就是double。
特殊化
模板機制中存在模板特殊化的概念,它在功能上等效於實例化。
模板特殊化在math_functions.cpp中將會大量存在。
比如此函數:
template<> void dragon_cpu_gemm<double>(const CBLAS_TRANSPOSE transA, const CBLAS_TRANSPOSE transB, const int M, const int N, const int K, const double alpha, const double* A, const double* B, const double beta, double *C){ int lda = (transA == CblasNoTrans) ? K : M; int ldb = (transB == CblasNoTrans) ? N : K; cblas_dgemm(CblasRowMajor, transA, transB, M, N, K, alpha, A, lda, B, ldb, beta, C, N); }
注意實例化與特殊化template附近的區別,特殊化需要添加<>。
模板特殊化必須要明確給出指定類型的代碼,而實例化則不必給出。
模板實例化本質是模板特殊化的特例,條件是:所有類型,執行相同的代碼。
而這份相同的代碼,以下述形式給出:
template<typename T> XXX<T>::Y(){ ...... ...... }
你可以將實例化視為聲明,特殊化視為定義。
兩者給出其一,就能讓編譯器完整編譯分離的模板定義代碼,前提是,必須寫在cpp文件中。
CUDA與NVCC編譯器
NVCC編譯cu文件時,會無視A.cpp里的任何實例化、特殊化代碼。
Caffe中給出的解決方案是,追加對cu文件中函數的特別實例化。
由以下幾個宏實現:
#define INSTANTIATE_LAYER_GPU_FORWARD(classname) \ template void classname<float>::forward_gpu( \ const vector<Blob<float>*>& bottom, \ const vector<Blob<float>*>& top); \ template void classname<double>::forward_gpu( \ const vector<Blob<double>*>& bottom, \ const vector<Blob<double>*>& top); #define INSTANTIATE_LAYER_GPU_BACKWARD(classname) \ template void classname<float>::backward_gpu( \ const vector<Blob<float>*>& top, \ const vector<bool> &data_need_bp, \ const vector<Blob<float>*>& bottom); \ template void classname<double>::backward_gpu( \ const vector<Blob<double>*>& top, \ const vector<bool> &data_need_bp, \ const vector<Blob<double>*>& bottom) #define INSTANTIATE_LAYER_GPU_FUNCS(classname) \ INSTANTIATE_LAYER_GPU_FORWARD(classname); \ INSTANTIATE_LAYER_GPU_BACKWARD(classname)
更多參考
見CSDN板塊的討論:http://bbs.csdn.net/topics/380250382
完整代碼
blocking_queue.hpp
https://github.com/neopenx/Dragon/blob/master/Dragon/include/blocking_queue.hpp
blocking_queue.cpp
https://github.com/neopenx/Dragon/blob/master/Dragon/src/blocking_queue.cpp