生產者
雙緩沖組與信號量機制
在第陸章中提到了,如何模擬,以及取代根本不存的Q.full()函數。
其本質是:除了為生產者提供一個成品緩沖隊列,還提供一個零件緩沖隊列。
當我們從外部給定了固定容量的零件之后,生產者的產能就受到了限制。
由兩個阻塞隊列組成的QueuePair,並不是Caffe的獨創,它實際上是生產者與消費者的編程方式之一。
在大部分操作系統教材中,雙緩沖區free、full通常由兩個信號量empty、full實現。
信號量(Semaphore)由操作系統底層實現,並且幾乎沒有人會直接使用信號量去編程。
因為在邏輯上,可以由信號量可由mutex+計數器模擬得到。
信號量的名字很有趣,它實際上由兩部分組成,信號(激活信號)、量(計數器)。
漢語的博大精深恰當地詮釋的信號量的語義精神,而從Semaphore中,你讀不出任何精華。
激活信號掩蓋了mutex的功與名,信號量的第一大功能,就是mutex鎖。
量,顯然表明信號量可以計數,實際上,信號量經常會被拿來為臨界資源計數。
下面的偽代碼摘自我的操作系統課本,《計算機操作系統 <第四版> 湯小丹等 著》:
int in=0,out=0; item buffer[n]; semaphore mutex=1,empty=n,full=0; void wait(S){ while(S<=0); S--; } void signal(S) {S++;} void producer{ while(1){ produce an item in nexp; ... wait(empty); wait(mutex); buffer[in]=nexp; in=(in+1)%n; signal(mutex); signal(full); } }
可以看到,除了mutex履行其互斥鎖的職責之外,empty和full用來計數。
作為生產者,每次生產時,都要讓empty減1,讓full加1。
當empty小於等於零時,形成第二把鎖,當然,這把鎖不是為了互斥,只是為了阻塞。
為了增加效率,這第二把鎖可以修改成條件阻塞,讓生產者交出CPU控制權,當然這需要操作系統的支持。
信號量在現代編程中是多余的,事實上,也沒有哪個線程庫會提供。
當"量"為1時,信號量通常是去實現互斥鎖功能。
當"量"為臨界資源數量時,信號量通常是去實現資源計數、並且條件阻塞的功能。
這兩部分的精神內涵都在Blocking Queue中實現了,So,忘記信號量吧。
多生產者單緩沖區
作為一般的機器學習玩家,你是用不着考慮多生產者的。
如果你比較有錢,經常喜歡擺弄4-way泰坦交火,那么就需要考慮一下多生產者的模型了。
在第肆章中,介紹了多GPU的基本運行原理,給出了如下這張圖:
對於每個GPU而言,它至少需要一個對它負責的DataReader,每個DataRedaer應當有不同的數據來源。
Caffe中,將控制一個數據來源的類對象稱為Body,默認有一個類靜態成員的Body關聯容器:
class DataReader { public: ..... private: static map<string, boost::weak_ptr<Body> > global_bodies; };
值得注意的是,此處應該使用weak_ptr,而不是shared_ptr,因為Body本身將由一個shared_ptr控制。
將Body的shared_ptr存入map容器,將會導致指針計數器永遠為1。
這樣,當我們准備將Body從map容器中清除時,無法獲知它是否已經被釋放。
而weak_ptr指向shared_ptr時,不會增加指針計數器計數,當計數為0時,即可將其從map里清除。
每一個DataReader只能擁有一個Body,而每個Body可以有多個成品存儲緩沖區(非用於零件緩沖,下節講)。
每個Body控制一個數據來源,不同的數據來源可以用關鍵字來hash,默認Caffe提供的關鍵字是:
static string source_key(const LayerParameter& param){ return param.name() + ":" + param.data_param().source(); }
即Layer名,加上數據庫路徑。
多生產者主要用於多數據庫同時並行訓練,這是一種非常經典的模型。
一部分代碼涉及到上層的DataLayer,將后續詳解。
另外一種模型是單生產者,以單數據庫,不同數據區域同時並行訓練,該方法也可以采用。(下節講)
Caffe的默認源碼中,既沒有完整實現多生產者並行模型,也沒有完整實現單生產者並行模型,這點令人遺憾。
不過,從源碼中仍然可以看出一點端倪,本教程只介紹大體思路,同樣並不提供具體代碼。
單生產者多緩沖區
在這種模型下,將只有一個DataReader,一個Body,但是有多個Pair,如圖:
有趣的是,Body結構體中,提供了QueuePair數組容器:
class Body :public DragonThread{ public: ....... BlockingQueue<boost::shared_ptr<QueuePair> > new_pairs; };
但是,Caffe源碼中的DataReader,默認只會使用該容器數組的第一個QueuePair,並沒有完整實現多緩沖區:
class DataReader { public: DataReader(const LayerParameter& param){ ........ ptr_body->new_pairs.push(ptr_pair); } BlockingQueue<Datum*>& free() const { return ptr_pair->free; } BlockingQueue<Datum*>& full() const { return ptr_pair->full; } private: boost::shared_ptr<QueuePair> ptr_pair; boost::shared_ptr<Body> ptr_body; };
可以看到,盡管我們設置了Body,存儲多個QueuePair,但是提供的外部訪問接口,居然直接使用了ptr_pair。
當然,如果你要編程使用多緩沖區,一定要修改DataReader的訪問接口。
對於單個數據庫的順序數據讀取,如何將順序資源,平攤到多個緩沖區?
Caffe使用了循環讀取法:
void Body::interfaceKernel(){ boost::shared_ptr<DB> db(GetDB(param.data_param().backend())); db->Open(param.data_param().source(), DB::READ); boost::shared_ptr<Cursor> cursor(db->NewCursor()); vector<boost::shared_ptr<QueuePair> > container; try{ ............... while (!must_stop()){ for (int i = 0; i < solver_count; i++) read_one(cursor.get(), container[i].get()); } } catch (boost::thread_interrupted&) {} }
可以看到,在Body的線程函數中,利用全局管理器提供的solver_count,循環均攤數據到多個QueuePair中。
當你將solver_count設置成大於1時,將可以使用Body中的多個緩沖區QueuePair,這點需要注意。
單生產者單緩沖區(默認代碼)
仔細思考一下,就會發現,單生產者多緩沖區方案是毫無意義的,看起來我們似乎模擬了多緩沖區。
但是實質只是一個線程,把資源分了一下組,多個組在DataLayer進行消費的時候,又會被合並成一個Batch:
如圖,因為一個DataLayer只能有一個Prefetching Thread,所以必然是每次從各個Pair里取一次。
如果我們先把Pair0取完,再取Pair1,再取Pair2,這樣也是可以的,是一種不錯的shuffle,但是需要追加代碼。
從計算角度分析,多緩沖區不會加速,反而會減速,如果是為了做上述的shuffle,是情有可原的。
如果不是,只是單純地為了負載均衡,輪流從各個Pair里取,那么本質上,就會退化成單生產者單緩沖區。
————————————————————————————————————————————————————
這可能是Caffe源碼的本意。在這種方案中,DataReader和DataLayer是無須改動代碼的。
只要我們加大DataParameter里的prefech數值,讓CPU多緩沖幾個Batch,為多個GPU准備就好了。
三種速度方案排名:
多生產者單緩沖區>單生產者單緩沖區>單生產者多緩沖區
線程嵌套線程與Socket
Caffe的源碼真的很有啟發性,在DataReader的構造和析構函數中,可以發現貢獻者悄悄加了mutex:
DataReader::DataReader(const LayerParameter& param){ ...... boost::mutex::scoped_lock lock(bodies_mutex); ...... } DataReader::~DataReader(){ ...... boost::mutex::scoped_lock lock(bodies_mutex); ...... }
熟悉C++的人應該知道,在常規情況下,構造和析構函數是不會並行執行的,也就是不會被線程執行。
線程並行的僅僅是工作函數,工作之前主進程構造,工作之后,主進程析構。
如果偏要認為構造和析構可能並行的話,那么將出現一種好玩的情況:
由於DataReader本身是線程,線程並行線程,將導致線程嵌套線程。
在我的操作系統課上,我的老師這么說:
線程僅僅擁有進程的少部分資源,權限很小。
那么線程能夠嵌套線程么?經過百度之后,我發現真還可以。
當今的操作系統,無論是Linux,還是Windows,線程的資源權限都是非常大的。
————————————————————————————————————————————————————
線程嵌套線程,會不會和多GPU有關?我認為無關。
每個GPU的監督線程,這里我們假設使用DragonThread,在需要工作時,
只需要傳入:Solver::solve函數就可以了,Solver、Net、Layer的構造和析構,顯然是在主進程里執行的。
那么,線程嵌套線程,有什么意義,有什么情況是必須在線程里觸發構造函數?
很有趣,一般來講,只有Socket線程是這樣的。
Socket線程無須使用DragonThread,實際上,Boost的Socket也是由boost::asio而不是boost::thread實現的。
不像多GPU,我們無法預估,在某一時刻,實際有多少個Socket在執行,有多少個用戶發出了訪問請求。
因此,不能直接把Solver、Net、Layer的構造,放在主進程當中。不然你知道你要構造多少份嘛?顯然你不知道。
所以,從直覺上,將這些的構造,放在每一個啟動的Socket線程里,用多少,構造多少,看起來不錯,如圖:
這樣,假如這幾個Solver使用了不同數據來源,那么global_bodies就有被幾個Solver同時修改的可能。
這是構造和析構函數里,需要加mutex的直接原因。
————————————————————————————————————————————————————
Socket的意義何在?
①從訓練角度,多個用戶可以遠程操控一台主機,訓練不同的Net。
這點與多GPU訓練一個模型是不一樣的。一般而言,我們不會認為,多個用戶通過Socket,居然想要訓練同一個模型。
當然,這也是可以的。
②從測試角度,多個用戶,可以利用同一個Net的參數,並行得到自己提供的數據的測試結果。
注意,這樣就不要share整個Net,每個用戶的solver使用獨立的Net,獨立讀取訓練好的參數。
否則,多個用戶會在一個Net上卡半天。
代碼實戰
建立data_reader.hpp、data_reader.cpp。
QueuePair
class QueuePair{ public: QueuePair(const int size); ~QueuePair(); BlockingQueue<Datum*> free; // as producter queue BlockingQueue<Datum*> full; // as consumer queue };
QueuePair的結構在上一章已經介紹過,每一個QueuePair將作為一個緩沖區。
QueuePair只需要實現構造函數和析構函數:
QueuePair::QueuePair(const int size){ // set the upbound for a producter for (int i = 0; i < size; i++) free.push(new Datum()); } QueuePair::~QueuePair(){ // release and clear Datum *datum; while (free.try_pop(&datum)) delete datum; while (full.try_pop(&datum)) delete datum; }
在構造函數中,我們進行"零件"的填充,注意里面的Datum全是空元素,且存入隊列的應該是指針。
切記勿存入實體對象Datum,這在應用程序開發中是大忌,因為C++並非Python,默認執行的深拷貝。
深拷貝大內存數據結構體,會嚴重拖慢程序執行,而且還是沒有意義的,傳遞指針更恰當。
在析構函數中,實際上這是唯一一處對Protocol Buffer對象的主動析構,因為Datum沒有用shared_ptr。
主動析構主要利用Blocking Queue提供的try,來控制循環進度。
此處切記不要把pop寫成peek,否則會造成對空指針的delete,導致程序崩潰。
LayerParameter
DataReader的上層是DataLayer,它是DataLayer的成員變量之一,需要DataLayer提供proto參數。
在你的proto腳本中,追加如下項:
message DataParameter{ enum DB{ LEVELDB=0; LMDB=1; } optional string source=1; optional uint32 batch_size=2; optional DB backend=3 [default=LMDB]; //4-way pre-buffering is enough for normal machines optional uint32 prefech=4 [default=4]; } message LayerParameter{ optional string name=1; optional string type=2; optional DataParameter data_param=8; }
重新編譯后,覆蓋你的舊頭文件和源文件。
DataParameter中,包含:數據庫源路徑、batch大小、數據庫類型,以及預緩沖區大小。
比較特別的是預緩沖大小,默認是開4個Batch的預緩沖。如果你的GPU計算速度過快,明顯大於
CPU供給數據的速度,消費者(DataLayer)經常提示缺數據,你得考慮加大預緩沖區數量。
將DataParameter嵌入到LayerParameter中去。
LayerParameter是一個巨型的數據結構,將包含所有類型Layer的超參數,你可以將其視為基類。
Body
class Body :public DragonThread{ public: Body(const LayerParameter& param); virtual ~Body(); vector<boost::shared_ptr<QueuePair>> new_pairs; protected: void interfaceKernel(); void read_one(Cursor *cursor, QueuePair *pair); LayerParameter param; };
Body實際上是一個線程,而DataReader卻不是,盡管Body是DataReader成員變量。
Body的構造函數和析構函數就是啟動線程和停止線程:
Body::Body(const LayerParameter& param) :param(param) { startThread();} Body::~Body() { stopThread();}
線程工作函數比較復雜:
void Body::interfaceKernel(){ boost::shared_ptr<DB> db(GetDB(param.data_param().backend())); db->Open(param.data_param().source(), DB::READ); boost::shared_ptr<Cursor> cursor(db->NewCursor()); try{ // default solver_count=1 int solver_count = param.phase() == TRAIN ? Dragon::get_solver_count() : 1; // working period while (!must_stop()){ for (int i = 0; i < solver_count; i++) read_one(cursor.get(), new_pairs[i].get()); } // complex condition } catch (boost::thread_interrupted&) {} }
該函數將會一直卡在循環里,直到訓練結束,Body執行析構函數,將線程執行停止。
Body-DataReader構成了Caffe數據緩沖的第一級別:數據庫->Datum 。
在DataLayer中,還會進行第二級別的緩沖:Datum->Blob->Batch,將在后續分析。
最后,還剩下一個read_one函數:
void Body::read_one(Cursor *cursor, QueuePair *pair){ Datum *datum = pair->free.pop(); datum->ParseFromString(cursor->value()); pair->full.push(datum); cursor->Next(); if (!cursor->valid()){ DLOG(INFO) << "Restarting data prefeching from start.\n"; cursor->SeekToFirst(); } }
read_one每次從一個雙緩沖組的free隊列中取出空Datum指針。
利用Protocol Buffer的反序列化函數ParseFromString,從數據庫中還原Datum,再扔到full隊列里。
感謝Protocol Buffer,否則這部分的代碼估計不下200行。
當數據庫跑完之后,需要回到開頭,再次重讀,為迭代過程反復提供數據。
這一步只適合訓練過程,如果你要一次測試自己的數據,請忘記這個函數,重寫一個不要反復讀的版本。
DataReader
class DataReader { public: DataReader(const LayerParameter& param); BlockingQueue<Datum*>& free() const { return ptr_pair->free; } BlockingQueue<Datum*>& full() const { return ptr_pair->full; } ~DataReader(); static string source_key(const LayerParameter& param){ return param.name() + ":" + param.data_param().source(); } private: LayerParameter param; boost::shared_ptr<QueuePair> ptr_pair; boost::shared_ptr<Body> ptr_body; static map<string, boost::weak_ptr<Body> > global_bodies; };
該結構上文已經全面解析過。
在cpp的實現中,首先完成類靜態成員變量的外部初始化。
map<string, boost::weak_ptr<Body> > DataReader::global_bodies;
以及一個靜態mutex的定義:
static boost::mutex bodies_mutex;
該mutex是Caffe挖的坑之一,雖然默認不會生效,倒是給出了不錯的指導。
當構建多生產者單緩沖區時,我們將會有多個Body,即多個DataReader,即多個DragonThread。
這意味着,Body的Hash容器將成為一個互斥資源。
該Hash容器的存在不是沒有必要的,由於:
每個數據來源只能用一次,為了避免重復路徑,顯然需要Hash。
DataReader::DataReader(const LayerParameter& param){ ptr_pair.reset(new QueuePair( param.data_param().prefech()*param.data_param().batch_size())); boost::mutex::scoped_lock lock(bodies_mutex); string hash_key = source_key(param); boost::weak_ptr<Body> weak = global_bodies[hash_key]; ptr_body = weak.lock(); if (!ptr_body){ ptr_body.reset(new Body(param)); global_bodies[hash_key] = boost::weak_ptr<Body>(ptr_body); } ptr_body->new_pairs.push(ptr_pair); }
DataReader的構造函數首先根據用戶指定的預緩沖區大小,初始化默認的雙緩沖隊列組。
接下來,要在Body的Hash容器中登記,mutex鎖住,修改之后解鎖。
登記所使用的是weak_ptr,weak_ptr可看作shared_ptr的助手,通常視為觀察者(Viewer)。
不可使用->,只能調用lock函數獲得shared_ptr。
DataReader的析構,主要任務是析構Body,以及從Hash容器中反登記。
DataReader::~DataReader(){ string hash_key = source_key(param); ptr_body.reset(); boost::mutex::scoped_lock lock(bodies_mutex); if (global_bodies[hash_key].expired()) global_bodies.erase(hash_key); }
析構體系
DataReader中涉及幾個比較重要的析構,這里以圖描述下:
完整代碼
data_reader.hpp
https://github.com/neopenx/Dragon/blob/master/Dragon/data_include/data_reader.hpp
data_reader.cpp
https://github.com/neopenx/Dragon/blob/master/Dragon/data_src/data_reader.cpp