不精通多線程優化的程序員,不是好程序員,連碼農都不是。
——並行計算時代掌握多線程的重要性
線程與操作系統
用戶線程與內核線程
廣義上線程分為用戶線程和內核線程。
前者已經絕跡,它一般只存在於早期不支持多線程的系統中。
它用模擬的方式實現一個模擬的多線程系統,不支持異步。
即,一個線程被阻塞了,其它線程也會被阻塞。
當今的操作系統幾乎都默認提供了內核線程API,底層由操作系統實現。
內核線程的好處在於,它們之間支持異步,是"真"多線程。
操作系統的流氓軟件
不過,內核線程也給線程的使用帶來了操作系統捆綁性。
不同操作系統平台,其內核線程的實現與提供的API不同,給跨平台帶來麻煩。
比如在Windows上,MFC就是封裝了Windows內核線程。
在Linux上,廣泛使用的pThread就是POSIX系列系統流傳下來的內核線程。
第三方跨平台內核線程庫
有幸的是,歷史上有許多跨平台的項目庫。
我最早知道是Qt,GTK,這倆個比較特殊,因為它們是Application Framework。
是在90年代左右,C++為了對抗Java等后期開發之秀,而專門寫成的跨平台C++庫。
主要以GUI為作戰武器,對抗Java。
Boost庫同樣提供了優秀了內核線程庫,還是跨平台的。
所以Caffe移植到Windows,是不需要改動線程系統的。
何以線程用?
生產者與消費者
生產者與消費者是一個經典的資源分配問題。
它的核心要點主要體現在兩方面:
①阻塞
②臨界
首先看①:
為什么需要阻塞?因為生產者比較快,消費者比較慢。
一次消費過程,包括整個正向傳播和反向傳播,這需要不少的時間。
而一次生產過程,就是對一個Batch數據的預緩沖,這不需要很多時間。
生產者總不能一直生產下去,然后爆掉緩沖區吧?
所以,生產者在檢測到緩沖區滿了之后,就要進入阻塞狀態。
那么問題來了,如果我們不用多線程,將阻塞代碼放在主進程中執行,會怎么樣?
讀取,阻塞,前向傳播失敗,反向傳播不可能,死鎖。
這是為什么I/O代碼需要多線程處理的根本原因。
再來看②:
為什么需要臨界?因為生產者和消費者之間必然是異步的,換言之,存在於不同線程。
證明如下:
生產者有一定概率被阻塞,假設生產者和消費者在同一個線程實體中,那么必然觸發死鎖。
又因為系統執行必然不應該有死鎖,所以假設不成立。
結論:生產者和消費者之間必然是異步的。
異步臨界資源問題,則需要加鎖(Mutex)來維持資源的一致性,具體在第陸章中詳解。
破除因果律
多線程程序設計的核心原則就是:將非因果連續的代碼,並行化。
也就是說,只要代碼前后不是上下文相關的,都能夠並行執行。
那么Caffe的I/O模型中,有哪幾處不是上下文相關的?
答案有二:
①Datum和Blob(Batch)不是上下文相關的。
Blob包含着正向傳播的shape信息,這些信息只有初始化網絡在初始化時才能確定。
而Datum則只是與輸入樣本有關。
所以,Datum的讀取工作可以在網絡未初始化之前就開始,這就是DataReader采用線程設計的內涵。
同時,這種不相關性,也為生產者和消費者對於臨界資源訪問的設計埋下伏筆。
②GPU之間不是上下文相關的。
Caffe的多GPU方案,是讓不同GPU覆蓋不同段的數據,最后不在網絡結構上做融合。
這點和AlexNet略有不同(AlexNet兩個GPU的網絡結構最后交叉了)
這樣的多GPU方案,使得每個GPU至少存在一個DataReader,覆蓋不一樣的數據段。
在網絡結構上,通過共享root網絡即可,如圖所示:
上圖是一個經典的多GPU流水線編程方案。3個GPU擁有各自的DataReader,但是共享所有Layer。
GPU0由主進程控制,GPU1由線程1控制,GPU2由線程2控制。
Caffe在主機端,也就是CPU主進程和次線程,每個Layer的前向傳播被一個mutex鎖住,而反向傳播卻沒有。
這樣,盡管主進程、線程1、線程2是並行調用Layer.Foward(),但不能同時訪問同一Layer,此時Layer為互斥臨界資源。
這種行為會構造出一個人工的流水線,比如:
GPU0在Conv1時,GPU1、GPU2會被鎖住。
GPU0在Conv3時,Conv1和Conv2是空閑的,會被其它GPU占用。
反向傳播之所以不鎖,是因為前向傳播和反向傳播是符合因果律的,前向傳播成流水線,
反向傳播自然也是流水線,非常優美的設計。
影分身之術
俗話說,一個好漢三個幫。
本篇所述的多線程,均指的是CPU多線程。
實際上,由於GPGPU的異構計算引入,CPU線程基本都在做一些后勤工作。
主要是數據的讀取、與GPU顯存的數據交換。
CPU主進程負責全程調度GPU執行計算代碼,在這點上,CPU利用率並不高。
而從線程對數據的預緩沖任務也不是很艱難。
所以,相對於計算密集型CPU多線程設計而言,Caffe的多線程任務相對輕松。
我們很難將CPU的利用率榨到100%,在這點上,為深度學習Online應用系統埋下伏筆。
試想一下,在后台構建一個基於Socket的深度學習應用服務器,同時CPU並發線程可達幾千,
我覺得只有這樣,才能真正榨干CPU計算力。至於Caffe的訓練,其實對CPU的要求不是很高。
代碼實戰
Boost線程的創建
使用boost::thread, 需要#include "boost/thread/thread.hpp"
與Qt、MFC等Application Framework提供的線程庫不同,
boost::thread的封裝性比較強,一般不建議繼承和改寫boost::thread類(沒見過有人這么用)
為了線程能夠執行自定義代碼,需要在其構造時,傳入執行函數的函數指針。
函數指針分為兩類:
①普通函數指針
②類成員函數指針
boost::function結合bind函數提供了一個函數指針的載體。(style1)
也可以直接將函數指針的構造方式傳入thread。(style2)
建議配合boost::shared_ptr使用。(style3)
若是普通函數指針,可用:
// style 1 void helloworld(int a,string b); boost::function<void()> f=bind(helloworld,1,"helloworld"); boost::thread(f); // style 2 boost::thread(helloworld,1,"helloworld") // style 3(Caffe style) boost::shared_ptr<boost::thread> thread; thread.reset(new boost::thread(helloworld,1,"helloworld"));
當然,為了系統的開發,我們顯然需要一個封裝類,如將boost::thread封裝為DragonThread類。
即,將boost::shared_ptr<boost::thread> thread作為類成員。
基於類的函數指針綁定需要傳入類this指針,寫法做如下更改:
class DragonThread{ void helloworld(int a,string b); }; // style 1 boost::function<void()> f=bind(&DragonThread::helloworld,this,1,"helloworld"); boost::thread(f); // style 2 boost::thread(&DragonThread::helloworld,this,1,"helloworld"); // style 3(Caffe style) boost::shared_ptr<boost::thread> thread; thread.reset(new boost::thread(&DragonThread::helloworld,this,1,"helloworld"));
Boost線程的生與死
boost::thread一旦被構造后,就會立刻以異步的方式執行傳入的函數。
在debug線程的過程中,注意boost::thread將晚於主進程的代碼的執行。
如果不在main函數中循環等待,很有可能boost::thread還沒有執行,main函數已經退出了。
想要立刻終結一個boost線程是不可能的,一些強大的Application Framework的線程庫
通常會提供thread.terminate(),來立刻終結線程的執行(比如Qt),但是boost沒有提供。
因為這種方式是相當不安全的,在Java設計模式中,更鼓勵開發者讓線程函數自動檢測終結條件而退出。
這種檢測函數在Caffe里是must_stop()函數,它使用了boost::thread提供的中斷點檢測功能。
bool DragonThread::must_stop(){ return boost::this_thread::interruption_requested(); }
注意,中斷請求的檢測,只能在異步線程執行函數中執行,主進程從外部調用是無效的。
主進程可以從外部觸發interrupt操作,通知正在異步執行的線程,該方法封裝為stopThread函數:
void DragonThread::stopThread(){ if (is_start()){ thread->interrupt(); } try{thread->join();} catch (boost::thread_interrupted&) {} catch (std::exception& e){ LOG(FATAL) << "Thread exception: " << e.what(); } }
有時候,interrupt的線程可能處於阻塞睡眠狀態,我們需要從外部立即喚醒它,讓其檢測中斷請求。
所以在interrupt操作后,需要立即后接join操作。最后,還可以選擇性地補上異常檢測。
數據結構
建立dragon_thread.hpp。
class DragonThread { public: DragonThread() {} virtual ~DragonThread(); void initializeThread(int device, Dragon::Mode mode, int rand_seed, int solver_count, bool root_solver); void startThread(); void stopThread(); //the interface implements for specific working task virtual void interfaceKernel() {} bool is_start(); bool must_stop(); boost::shared_ptr<thread> thread; };
在第叄章,我們提到了全局管理器是線程獨立的,因此每一個dragon線程,
需要從主管理器轉移一些參數,包括(GPU設備、計算模式、隨機種子、root_solver&solver_count)
成員函數包括:
boost::thread的傳入函數initializeThread,這個函數里最后又嵌套了interfaceKernel。
前者負責轉移參數,后者默認是一個空函數,你也可以寫成純虛函數。
由於boost::thread沒有繼承的用法,所以Caffe二度封裝以后,提供了這種用法。
所有繼承DragonThread的類,只要重載interfaceKernel()這個虛函數就行了。
startThread應該最先被執行,它包括獲取主進程管理器參數,以及構造thread。
由於thread構造結束,就會立刻執行,所以startThread不負其名,就是啟動了線程。
stopThread的功能如上所述。
實現
建立dragon_thread.cpp。
首先是thread的傳入函數initializeThread:
void DragonThread::initializeThread(int device, Dragon::Mode mode, int rand_seed, int solver_count, bool root_solver){ #ifndef CPU_ONLY CUDA_CHECK(cudaSetDevice(device)); #endif Dragon::set_random_seed(rand_seed); Dragon::set_mode(mode); Dragon::set_solver_count(solver_count); Dragon::set_root_solver(root_solver); interfaceKernel(); //do nothing }
然后是外部調用的startThread函數:
void DragonThread::startThread(){ CHECK(!is_start()); int device = 0; #ifndef CPU_ONLY CUDA_CHECK(cudaGetDevice(&device)); #endif Dragon::Mode mode = Dragon::get_mode(); unsigned int seed = Dragon::get_random_value(); int solver_count = Dragon::get_solver_count(); bool root_solver = Dragon::get_root_solver(); try{ thread.reset(new boost::thread(&DragonThread::initializeThread, this, device, mode, seed, solver_count, root_solver)); } catch (std::exception& e){ LOG(FATAL) << "Thread exception: " << e.what(); } }
由於該函數是在主進程中執行,Dragon::get()與initializeThread里的Dragon::set()
操作的其實不是同一個全局管理器,所以需要這樣麻煩的轉移參數過程。
最后是線程控制與析構:
void DragonThread::stopThread(){ if (is_start()){ thread->interrupt(); } try{thread->join();} catch (boost::thread_interrupted&) {} catch (std::exception& e){ LOG(FATAL) << "Thread exception: " << e.what(); } } bool DragonThread::is_start(){ return thread&&thread->joinable(); } bool DragonThread::must_stop(){ return boost::this_thread::interruption_requested(); } DragonThread::~DragonThread(){ stopThread(); }
完整代碼
dragon_thread.hpp:
https://github.com/neopenx/Dragon/blob/master/Dragon/include/dragon_thread.hpp
dragon_thread.cpp:
https://github.com/neopenx/Dragon/blob/master/Dragon/src/dragon_thread.cpp