[源碼解析] 機器學習參數服務器Paracel (3)------數據處理
0x00 摘要
Paracel是豆瓣開發的一個分布式計算框架,它基於參數服務器范式,用於解決機器學習的問題:邏輯回歸、SVD、矩陣分解(BFGS,sgd,als,cg),LDA,Lasso...。
Paracel支持數據和模型的並行,為用戶提供簡單易用的通信接口,比mapreduce式的系統要更加靈活。Paracel同時支持異步的訓練模式,使迭代問題收斂地更快。此外,Paracel程序的結構與串行程序十分相似,用戶可以更加專注於算法本身,不需將精力過多放在分布式邏輯上。
前文介紹了PyTorch 的數據處理部分,本文接着介紹Paracel的數據處理部分,正好可以與PyTorch做一下印證。
為了行文完整,本文部分基礎知識與前文重復,另外在解析時候會刪除部分非主體代碼。
參數服務器系列其他文章如下:
[源碼解析] 機器學習參數服務器ps-lite 之(1) ----- PostOffice
[源碼解析] 機器學習參數服務器ps-lite(2) ----- 通信模塊Van
[源碼解析] 機器學習參數服務器ps-lite 之(3) ----- 代理人Customer
[源碼解析]機器學習參數服務器ps-lite(4) ----- 應用節點實現
[源碼解析] 機器學習參數服務器 Paracel (1)-----總體架構
[源碼解析] PyTorch 分布式(1) --- 數據加載之DistributedSampler
[源碼解析] PyTorch 分布式(2) --- 數據加載之DataLoader
0x01 切分需要
1.1 切分的好處
深度學習領域的特點是:海量數據 + 海量運算。因為會出現運算時間過長或者模型過大的情況,所以會對數據或者模型進行切分,從而並行的分布式的解決問題,就是我們常常聽到的數據並行或者模型並行。
切分問題包括對訓練數據和訓練模型的切分。即:切分模型以便處理大模型,切分數據以加速訓練。
1.2 數據並行
比如下圖中,每一個節點都擁有一個模型的完整拷貝,但是每個節點的訓練數據不同。每個節點上運行一個訓練進程,我們稱之為 worker。這些worker讀取一個批次數據,各自完成前向計算和后向傳播,得到梯度,然后把各自的梯度提交到參數服務器上,由參數服務器進行歸並/更新參數操作,然后參數服務器把更新后的模型回傳給各個節點,然后每個計算節點負責對本地模型的參數進行更新。進行新一輪迭代訓練。
1.3 模型並行
如果可以對模型進行有意義的分割,然后分段加載並且傳送到參數服務器上,算法也支持分段並行處理,那么理論上就可以進行模型並行。我們首先可以把模型分為線性可分模型和非線性模型(神經網絡)。
1.3.1 線性模型
針對線性模型,我們可以把模型和數據按照特征維度進行划分,分配到不同的計算節點上,每個節點的局部模型參數計算不依賴於其他維度的特征,彼此相對獨立,不需要與其他節點進行參數交換。這樣就可以在每個計算節點上采用梯度下降優化算法進行優化,進行模型並行處理。
某些機器學習問題,如矩陣因子化、主題建模和線性回歸,由於使用的小批量大小不是非常大,從而提高了統計效率,因此模型並行通常可以實現比數據並行更快的訓練時間。
1.3.2 非線性模型(神經網絡)
神經網絡的模型與傳統機器學習模型不同,具有如下特點:
- 神經網絡具有很強的非線性,參數之間有較強的關聯依賴。
- 深度學習的計算本質上是矩陣運算,這些矩陣保存在GPU顯存之中。
- 因為過於復雜,所以神經網絡需要較高的網絡帶寬來完成節點之間的通信。
根據這些特征,神經網絡可以分為 層間分割 和 層內分割:
- 層間分割:橫向按層划分或縱向跨層划分進行網絡划分。每個計算節點計算然后通過RPC將參數傳遞到其他節點上進行參數的合並。從網絡角度來看,就是把神經網絡結構拆分。
- 層內分割:如果矩陣過大,則一張顯卡無法加載整個矩陣,這就需要把一張巨大矩陣拆分開來放到不同GPU上去計算,每個GPU只負責模型的一部分。從計算角度看就是把矩陣做分塊拆分處理。
具體可以參見下圖:
1.4 混合使用
有的時候數據並行和模型並行會被同時用上。
- 對於與數據相關的這類模型(如矩陣分解,pagerank,svd等,換句話說就是model是key-value的樣子,key對應一個物體或人),我們可以通過對數據的切分來控制切分模型的方式。
- 有些模型不直接和數據相關(如LR、神經網絡等),這時只要分別對數據和模型做各自的切分即可。
比如:
- 卷積神經網絡中卷積層計算量大,但所需參數系數 W 少,適合使用數據並行。
- 全連接層計算量小,所需參數系數 W 多。適合使用模型並行。
就像這樣:
0x02 切分機制與數據格式
2.1 切分原則
切分數據意味着減少計算量,切分模型的方式則決定了計算和通信的拓撲。不同的划分方式可能導致計算性能上的差異。所以我們要試圖尋找切分的一些原則:
-
切分數據的同時盡量保證切開的模型大小均衡以及通信較優。
-
要能保證參數服務器負載均衡,降低參數服務器單點性能瓶頸,降低網絡傳輸成本(比如在網絡中傳輸Embedding模型參數,整個時延和成本將是不可接受的),因此原則如下:
- 關聯的數據/模型在同一個參數服務器上。
- 盡量將一個模型平均分配到所有參數服務器節點上。
- 對於非常小的模型,將它們盡量放在一個參數服務器節點上。
- 對於多行的模型,盡量將同一行放在一個參數服務器節點上。
-
提供定制化的需求,因為各個算法,或者一個算法的各種實現,對划分方式要求都不一樣。
2.2 模型和數據格式
因為分布式實際上不僅包括計算分布式,也涉及到存儲分布式。這就要求模型文件和數據文件的格式必須天生支持切分。
對於與數據相關的模型(如矩陣分解,pagerank,svd等,即模型表示成key-value格式),可以通過對數據的切分來控制切分模型的方式。另一些情況是模型不直接和數據相關(如LR、神經網絡等),只要分別對數據和模型做各自的切分即可。
在這個方面,各個公司也做了自己的努力。比如騰訊的Angel的模型是以矩陣為單位來保存的。默認情況下, Angel將模型(矩陣)切分成大小相等的矩形區域,每一個矩陣在模型保存路徑下對應一個以矩陣名命名的文件夾,里面包含矩陣的元數據文件和數據文件。一個矩陣只有一個元數據文件(元數據主要由矩陣特征,分區索引和行相關索引組成),但是一般有多個數據文件。
2.3 Paracel 數據機制
Paracel 提供了豐富的數據切分方式,我們需要從幾個方面一一說明。
2.3.1 數據表示
Paracel用圖和矩陣來表示訓練數據。
有四種類型的圖:
- bigraph
- bigraph_continuous
- digraph
- undirected_graph
Paracel使用 Eigen3庫來支撐矩陣/向量的操作,因此支持兩種矩陣:
- SparseMatrix
- MatrixXd
我們以bigraph為例看看。
在圖論的數學領域中,bigraph的頂點可以划分為兩個不相交的集合U和V(即U和V是各自獨立的集合),使得U中的一個頂點與V中的一個頂點相連。
定義如下:
template <class T = paracel::default_id_type>
class bigraph {
private:
size_t v_sz = 0;
size_t e_sz = 0;
paracel::dict_type<T, paracel::dict_type<T, double> > adj;
public:
MSGPACK_DEFINE(v_sz, e_sz, adj);
public:
bigraph();
bigraph(std::unordered_map<T, std::unordered_map<T, double> > edge_info);
bigraph(std::vector<std::tuple<T, T> > tpls);
bigraph(std::vector<std::tuple<T, T, double> > tpls);
void add_edge(const T & v, const T & w);
void add_edge(const T & v, const T & w, double wgt);
// return bigraph data
std::unordered_map<T, std::unordered_map<T, double> > get_data();
// traverse bigraph edge using functor func
template <class F>
void traverse(F & func);
// traverse vertex v’s related edges using functor func
template <class F>
void traverse(const T & v, F & func);
// return U bag
std::vector<T> left_vertex_bag();
// return U set
std::unordered_set<T> left_vertex_set();
// out: tpls
void dump2triples(std::vector<std::tuple<T, T, double> > & tpls);
// out: dict
void dump2dict(std::unordered_map > & dict);
// return number of vertexes in U
inline size_t v();
// return number of edges in bigraph
inline size_t e();
// return adjacent info of vertex v
std::unordered_map<T, double> adjacent(const T & v);
// return outdegree of vertex u in U
inline size_t outdegree(const T & u);
// return indegree of vertex v in V
inline size_t indegree(const T & v);
};
2.3.2 數據加載
Paracel為加載輸入文件提供了各種接口。在最新版本中,所有與加載相關的接口都只支持文本格式的文件,這樣會占用多一些內存。
用戶可以並行讀取數據的一個分區對應的行,然后構造自定義的數據結構,也可以直接將輸入數據加載為Paracel的“graph”或“matrix”類型。在后一種情況下,必須使用'pattern'和'mix_flag'變量來描述輸入文件的結構。Pattern
還決定輸入數據的分區方法。
Paracel用變量“pattern”定義了幾個模式:
pattern | structure | line example |
---|---|---|
linesplit(default) | 用行來確定分區 | all structures |
fmap | first-second case(value set to 1.0) first-second-value case 依據第一個字段進行分區 |
a,b a,b,0.2 |
smap | second-first case(value set to 1.0) second-first-value case 依據第二個字段進行分區 |
a,b a,b,0.2 |
fsmap | support the same structure as fmap and smap 用兩個字段一起分區 |
a,b or a,b,0.2 |
fvec | id,feature1,…,feature k, 依據id分區 |
1001 0.1|0.2|0.3|0.4 |
fset | attr1,attr2,attr3,… attr1,attr2|value2,attr3|value3,… 依據第一個字段進行分區 |
a,b,c or a,b|0.2,c|0.4 |
變量mix_flag
表示圖形/矩陣的鏈接關系是否在一行中定義。如下面的示例所示,當mix_flag
設置為false時,節點 “a” 的所有鏈接關系都展開為三行。如果“pattern”等於“fvec”和“fset”,則mix_flag
始終為“true”。
mix_flag | example |
---|---|
true | a,b,c,d b,c,d … |
true | a,b a,c,d b,c b,d … |
false(default) | a,b a,c a,d b,c b,d … |
如上所述,pattern
不僅決定數據格式,還決定分區策略,而mix_flag
告訴Paracel鏈接關系是否在一行中混合。
0x03 數據加載
3.1 並行處理
AI框架的數據處理主要如下並行處理:
- 數據加載/處理使用CPU。
- 訓練使用GPU。
在理想狀態下,應該是每輪迭代訓練之前,CPU就完成加載,准備好訓練數據,這樣訓練就可以持續無縫迭代。
然而,GPU算力每年會提升一倍,CPU的提升速度遠遠落后於GPU,所以CPU會是拖后腿的那個角色。這里不僅僅是CPU算力不足的問題,也包括從存儲中讀取數據速度不足的問題。
因此,機器學習對於數據加載和前期預處理的要求越來越高,必須在GPU計算時間內,完成下一迭代數據的准備工作,不能讓GPU因為等待訓練數據而空閑。
3.2 流水線
對於機器學習訓練,加載數據可以分為三個步驟:
- 將數據從磁盤或者分布式存儲加載到主機(CPU)。
- 將數據從主機可分頁內存傳輸到主機固定內存。
- 將數據從主機固定內存轉移到主機GPU。
因此,流行的深度學習框架會依據加載步驟的特點和異構硬件的特點來進行流水線處理,從而提高數據處理過程的吞吐量。
流水線一般包括多個算子,每個算子內部由數據隊列組成一個緩沖區,上游算子完成處理之后會傳給給下游算子進行處理。這樣每個算子任務會彼此獨立,算子內部可以使用細粒度的多線程/多進程來並行加速,每個算子可以獨立控制處理速度和內存以適配不同網絡對於處理速度的需求。
如果算子內部數據隊列不為空,模型就會一直源源不斷獲得數據,就不會因為等待訓練數據而產生瓶頸。
下面是串行處理邏輯:
+------+ +-----------+ +---------------------------+
| | | | | |
| Data +----------> | Load Data +---------> | Transfer to Pinned Memory |
| | | | | |
+------+ +-----------+ +---------------------------+
下面是並行流水線邏輯:
+------------+
+--------+ | |
| | | Process 1 |
| Data 1 +--------> | +------+
| | | Load Data | |
+--------+ | | |
+------------+ |
|
|
|
+------------+ |
+--------+ | | | +-----------------------------+
| | | Process 2 | +------> | Pin-memory process |
| Data 2 +--------> | | | |
| | | Load Data +-------------> | |
+--------+ | | | Transfer to Pinned Memory |
+------------+ +-----> +-----------------------------+
|
|
|
+--------+ +------------+ |
| | | | |
| Data 3 +--------> | Process 3 +-------+
| | | |
+--------+ | Load Data |
| |
+------------+
3.3 GPU
本文到現在是解決CPU側的數據傳輸問題,即:從磁盤加載數據,從可分頁到固定內存。
但是,從固定內存到GPU的數據傳輸(tensor.cuda()
)也可以使用CUDA流進行流水線處理。
另外,深度學習應用程序需要復雜的多階段數據處理管道,包括加載、解碼、裁剪、調整大小和許多其他增強功能。這些目前在 CPU 上執行的數據處理管道已經成為瓶頸,限制了訓練和推理的性能和可擴展性。
Nvidia DALI 通過將數據預處理放到 GPU 處理來解決 CPU 瓶頸問題,用戶可以依據自己模型的特點,構建基於 GPU 的 pipeline,或者基於CPU的pipeline。
0x04 Paracel數據加載
前面提到,Paracel使用圖,矩陣等進行數據加載,接下來我們就看看具體如何實現。
4.1 樣例代碼
我們從源碼中選取樣例,恰好里面有model partition 和 data partition 字樣。
其實,這里的意思是:並行加載模型和並行加載數據。
class adjust_ktop_s : public paracel::paralg {
public:
adjust_ktop_s(paracel::Comm comm,
std::string hosts_dct_str,
std::string _rating_input,
std::string _fmt,
std::string _sim_input,
int _low_limit,
std::string _output) :
paracel::paralg(hosts_dct_str, comm, _output),
rating_input(_rating_input),
fmt(_fmt),
sim_input(_sim_input),
low_limit(_low_limit) {}
virtual void solve() {
// load sim_G, model partition 並行加載模型
auto local_parser = [] (const std::string & line) {
auto tmp = paracel::str_split(line, '\t');
auto adj = paracel::str_split(tmp[1], '|');
std::vector<std::string> stuff = {tmp[0]};
stuff.insert(stuff.end(), adj.begin(), adj.end());
return stuff;
};
auto parser_func = paracel::gen_parser(local_parser);
paracel_load_as_graph(sim_G,
sim_input,
parser_func,
"fset");
// load rating_G, data partition 並行加載數據
auto local_parser_rating = [] (const std::string & line) {
return paracel::str_split(line, ',');
};
auto local_parser_rating_sfv = [] (const std::string & line) {
std::vector<std::string> tmp = paracel::str_split(line, ',');
std::vector<std::string> r({tmp[1], tmp[0], tmp[2]});
return r;
};
auto rating_parser_func = paracel::gen_parser(local_parser_rating);
if(fmt == "sfv") {
rating_parser_func = paracel::gen_parser(local_parser_rating_sfv);
}
paracel_load_as_graph(rating_G,
rating_input,
rating_parser_func,
fmt);
// init rating_G
paracel::dict_type<std::string, double> tmp_msg;
auto init_lambda = [&] (const node_t & uid,
const node_t & iid,
double v) {
std::string key = std::to_string(uid) + "_" + std::to_string(iid);
tmp_msg[key] = v;
};
rating_G.traverse(init_lambda);
paracel_write_multi(tmp_msg);
paracel_sync();
// learning
cal_low_peak();
}
private:
std::string rating_input, fmt;
std::string sim_input;
int low_limit = 1;
paracel::bigraph<node_t> sim_G;
paracel::bigraph<node_t> rating_G;
paracel::dict_type<node_t, int> ktop_result;
double training_rmse = 0., original_rmse = 0.;
}; // class adjust_ktop_s
4.2 加載圖
對於圖結構的數據或者模型,首先每個 worker 會通過fixload並行加載文件,然后會通過paracel_sync進行同步。
注意:每個worker都會執行以下函數,內部會通過MPI進行協調和統一。
template <class T, class G>
void paracel_load_as_graph(paracel::bigraph<G> & grp,
const T & fn,
parser_type & parser,
const paracel::str_type & pattern = "fmap",
bool mix_flag = false) {
if(pattern == "fset") {
mix_flag = true;
}
// TODO: check pattern
// load lines
paracel::loader<T> ld(fn, worker_comm, parser, pattern, mix_flag);
// 並行加載,fixload 里面有一個all2all交換
paracel::list_type<paracel::str_type> lines = ld.fixload();
paracel_sync(); //這里進行同步,確保所有worker都完成加載
// create graph
ld.create_graph(lines, grp); // 此時才開始建立圖
set_decomp_info(pattern);
lines.resize(0); lines.shrink_to_fit(); paracel::cheat_to_os();
}
4.3 加載文件
Paracel 的數據(模型),可能由多個數據文件構成,因此可以進行並行加載:
- 首先,依據文件列表和world size來分區,確保多個加載的worker之間不會出現workload不均衡的情況
- 其次,調用 structure_load 來做並行加載,和pytorch 暗合。
paracel::list_type<paracel::str_type> fixload() {
paracel::scheduler scheduler(m_comm, pattern, mix);
auto fname_lst = paracel::expand(filenames); //文件名字列表
// 依據文件列表和world size來分區,確保多個加載的worker之間不會出現workload不均衡的情況
paracel::partition partition_obj(fname_lst,
m_comm.get_size(),
pattern);
partition_obj.files_partition();
// parallel loading lines 此時才並行加載
auto linelst = scheduler.structure_load(partition_obj);
m_comm.synchronize();
if(m_comm.get_rank() == 0) std::cout << "lines got" << std::endl;
return linelst;
}
4.3.1 分區
於是就涉及到了一個問題,假如有6個worker,12個文件,那么每個worker怎么做到並行加載呢?
可能有同學會說:每個worker 加載兩個文件。但是這種情況只適用於文件大小基本一致的情況,如果文件大小不一致,比如一個文件15000行,一個文件50行,一個文件20000行.....,那么就會造成worker的 load 不均衡,導致無法達到並行加載的效果。
所以需要按照所有文件的總行數進行分配。比如12個文件一共120000行,則每個worker負責加載10000行。
第一個worker可能負責加載第一個文件的10000行,第二個worker負責加載第一個文件的后5000行 和第二個文件的50行,第三個文件的 xxx 行.....
4.3.2 分區定義
我們看看分區是如何定義的:
-
namelst :是模型文件或者數據文件名字列表。
-
slst 其中第 i 個元素是第 i 個分區的起始行數。
-
elst : 第 i 個元素是第 i 個分區的終止行數。
-
np : 是所有worker個數。
-
displs :第 i 個元素是第 i 個文件在所有文件行數的起始行數。比如:第一個文件5行,第二個文件6行,第三個文件6行,則displs[0] = 0,displs[1] = 5,displs[2] = 11 ...
具體如下:
class partition {
public:
partition(paracel::list_type<paracel::str_type> namelst_in,
int np_in, paracel::str_type pattern_in)
: namelst(namelst_in), np(np_in), pattern(pattern_in) {}
private:
paracel::list_type<paracel::str_type> namelst;
int np; // world size
paracel::str_type pattern;
paracel::list_type<long> slst, elst, displs;
}; // class partition
4.3.3 均衡分區
目的就是計算所有文件的總行數,然后在各個worker之中進行均衡分配。
const int BLK_SZ = 32;
void files_partition(int blk_sz = paracel::BLK_SZ) {
if(pattern == "linesplit" || pattern == "fvec") {
blk_sz = 1;
}
slst.resize(0);
elst.resize(0);
displs.resize(0);
displs.resize(namelst.size() + 1, 0); // 擴展為文件個數
for(size_t i = 0; i < displs.size() - 1; ++i) {
std::ifstream f(namelst[i], std::ios::ate); // ate作用是寫入的數據被加入到文件末尾
long tmp = f.tellg(); // 得到某個文件的行數
f.close();
displs[i + 1] = displs[i] + tmp; // 計算每個文件在總行數中的位置
}
long sz = displs.back(); //得到所有文件的總行數
int nbk = np * blk_sz; // 每個worker負責的范圍
long bk_sz = sz / static_cast<long>(nbk); //每個partition的大小
long s, e;
for(int i = 0; i < nbk; ++i) { // nbk是每個worker負責的范圍,其中每個范圍是s, e,s和e之間大小是BLK_SZ。
s = static_cast<long>(i) * bk_sz; // 加載起始行
if(i == nbk - 1) {
e = sz;
} else {
e = (i + 1) * bk_sz; // 加載終止行
}
assert(s < e);
slst.push_back(s); //插入起始行
elst.push_back(e); //插入終止行
}
}
4.4 並行加載
回憶一下前面的代碼,當我們用分區做負載均衡之后,就可以用scheduler實施並行加載:
partition_obj.files_partition();
// parallel loading lines 此時才並行加載
auto linelst = scheduler.structure_load(partition_obj);
scheduler可以認為是調度器,負責調度多個進程並行加載。
比如某worker,rank = 2, 則依據自己的rank來計算,得到本worker加載的起始,終止位置是:st = 64, en = 96。然后使用 files_load_lines_impl 具體加載。
paracel::list_type<paracel::str_type>
scheduler::structure_load(partition & partition_obj) {
paracel::list_type<paracel::str_type> result;
int blk_sz = paracel::BLK_SZ;
if(pattern == "fvec" || pattern == "linesplit") {
blk_sz = 1;
}
int st = m_comm.get_rank() * blk_sz; // 依據自己的rank來計算,看看自己這個進程從哪里加載。
int en = (m_comm.get_rank() + 1) * blk_sz; // 加載到哪里結束
auto slst = partition_obj.get_start_list();
auto elst = partition_obj.get_end_list();
for(int i = st; i < en; ++i) { // 遍歷 64 ~ 96
// 去找 slst[64 ~ 96], elst[64 ~ 96]的來逐一加載
auto lines = partition_obj.files_load_lines_impl(slst[i], elst[i]); // 自己應該加載什么
result.insert(result.end(), lines.begin(), lines.end());
}
return result;
}
files_load_lines_impl完成了對具體文件的加載功能。
template <class F>
void files_load_lines_impl(long st, long en, F & func) {
// to locate files index to load from
int fst = 0;
int fen = 0;
long offset;
// 找到st, en分別屬於哪個文件,即在 displs 的位置,找到哪些files
for(size_t i = 0; i < namelst.size(); ++i) {
if(st >= displs[i]) {
fst = i; // st所在文件的idx
}
if(en > displs[i + 1]) {
fen = i + 1; // en所在文件的idx
}
}
assert(fst <= fen);
bool flag = false;
// load from files
for(auto fi = fst; fi < fen + 1; ++fi) { // 遍歷加載 fst, fen之間的文件
if(flag) {
offset = 0;
} else {
offset = st - displs[fi];
}
assert(offset >= 0);
std::ifstream f(namelst[fi]); // 加載某個file
// 依據文件行數,找到對應在哪個文件之中,然后加載
if(offset) {
f.seekg(offset - 1);
paracel::str_type l;
std::getline(f, l);
offset += l.size();
}
if(fi == fen) {
while(offset + displs[fi] < en) {
paracel::str_type l;
std::getline(f, l);
offset += l.size() + 1;
func(l);
}
} else {
flag = true;
while(1) {
paracel::str_type l;
std::getline(f, l);
if(l.size() == 0) {
break;
}
func(l);
}
}
f.close();
} // end of for
}
4.5 建立圖
加載完成之后,會調用create_graph完成對圖的構建。
void create_graph(paracel::list_type<paracel::str_type> & linelst,
paracel::bigraph<paracel::default_id_type> & grp) {
paracel::scheduler scheduler(m_comm, pattern, mix);
// hash lines into slotslst,每個worker構建自己負責的部分
paracel::list_type<paracel::list_type<paracel::compact_triple_type> > result;
scheduler.lines_organize(linelst,
parserfunc,
result);
linelst.resize(0); linelst.shrink_to_fit(); paracel::cheat_to_os();
m_comm.synchronize();
// alltoall exchange,讓每個worker都擁有全部的數據
paracel::list_type<paracel::compact_triple_type> stf;
scheduler.exchange(result, stf);
result.resize(0); result.shrink_to_fit(); paracel::cheat_to_os();
m_comm.synchronize();
for(auto & tpl : stf) {
grp.add_edge(std::get<0>(tpl),
std::get<1>(tpl),
std::get<2>(tpl));
}
stf.resize(0); stf.shrink_to_fit(); paracel::cheat_to_os();
}
在構建過程中,使用lines_organize完成了對具體數據行的處理,具體就是依據文件中行的格式來進行解析,比如文件類型是fset?還是 fsv?還是 bfs 等,針對每種格式進行不同的處理。
template <class F = std::function< paracel::list_type<paracel::str_type>(paracel::str_type) > >
listlistriple_type
lines_organize(const paracel::list_type<paracel::str_type> & lines,
F && parser_func = default_parser) {
listlistriple_type line_slot_lst(m_comm.get_size());
paracel::str_type delimiter("[:| ]*");
for(auto & line : lines) {
auto stf = parser_func(line);
if(stf.size() == 2) {
// bfs or part of fset case
// ['a', 'b'] or ['a', 'b:0.2']
auto tmp = paracel::str_split(stf[1], delimiter);
if(tmp.size() == 1) {
paracel::triple_type tpl(stf[0], stf[1], 1.);
line_slot_lst[h(stf[0], stf[1], npx, npy)].push_back(tpl);
} else {
paracel::triple_type tpl(stf[0], tmp[0], std::stod(tmp[1]));
line_slot_lst[h(stf[0], tmp[0], npx, npy)].push_back(tpl);
}
} else if(mix) {
// fset case
// ['a', 'b', 'c'] or ['a', 'b|0.2', 'c|0.4']
// but ['a', '0.2', '0.4'] is not supported here
for(paracel::default_id_type i = 1; i < stf.size(); ++i) {
auto item = stf[i];
auto tmp = paracel::str_split(item, delimiter);
if(tmp.size() == 1) {
paracel::triple_type tpl(stf[0], item, 1.);
line_slot_lst[h(stf[0], item, npx, npy)].push_back(tpl);
} else {
paracel::triple_type tpl(stf[0], tmp[0], std::stod(tmp[1]));
line_slot_lst[h(stf[0], tmp[0], npx, npy)].push_back(tpl);
}
} // end of for
} else {
// fsv case
paracel::triple_type tpl(stf[0], stf[1], std::stod(stf[2]));
line_slot_lst[h(stf[0], stf[1], npx, npy)].push_back(tpl);
} // end of if
} // end of for
return line_slot_lst;
}
0x05 總結
現在歸納下總體邏輯如下,我們假設有兩個workers對若干文件進行並行加載。最終每個worker都把數據和模型加載進入自己的進程。
+------------------------------------------------------------------------------------------------------------------------------------------------------+
| worker 1 +------------------+ |
| | partition | +-----------------+ |
| | | 3 structure_load | scheduler | |
| | slst +---------------------------> | | 5 6 8 |
| 1 fixload | | | +----> paracel_sync +-----> create_graph +----> lines_organize |
| +-------------> | elst | <---------------------------+ | |
| | | 4 files_load_lines_impl +-----------------+ ^ ^ |
| | displs | | | |
| | | | | |
| +------------------+ | | |
| | | |
| ^ | | |
| | | | |
| |2 files_partition | | |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
| | |
+------------------------------+ | |
| | | | |
| | | | |
+----+----+ +---+----+ +----+---+ | 7 +
| File 1 | | File 2 | | File n | | scheduler.exchange
+----+----+ +---+----+ +----+---+ | +
| | | | |
| | | | |
+------------------------------+ | |
| | |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
| worker 2 |2 files_partition | | |
| v | | |
| | | |
| +-------------------+ | | |
| | partition | | | |
| | | +------------------+ | | |
| 1 fixload | slst | 3 structure_load | scheduler | v v |
| +-------------> | +------------------------> | | 8 |
| | elst | | +----> paracel_sync +-----> create_graph +-----> lines_organize |
| | | <-------------------------+ | 5 6 |
| | displs | 4 files_load_lines_impl +------------------+ |
| | | |
| +-------------------+ |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
手機如下:
至此,Paracel分析完畢,我們下一篇開始介紹 GPipe,敬請期待。
0xFF 參考
卷積神經網絡的並行化模型--One weird trick for parallelizing convolutional neural networks
PyTorch 源碼解讀之 torch.utils.data:解析數據處理全流程
pytorch(分布式)數據並行個人實踐總結——DataParallel/DistributedDataParallel