R︱並行計算以及提高運算效率的方式(parallel包、clusterExport函數、SupR包簡介)


終於開始攻克並行這一塊了,有點小興奮,來看看網絡上R語言並行辦法有哪些:

     趙鵬老師(R與並行計算)做的總結已經很到位。現在並行可以分為:

 

     隱式並行:隱式計算對用戶隱藏了大部分細節,用戶不需要知道具體數據分配方式 ,算法的實現或者底層的硬件資源分配。系統會根據當前的硬件資源來自動啟動計算核心。顯然,這種模式對於大多數用戶來說是最喜聞樂見的。

    顯性並行:顯式計算則要求用戶能夠自己處理算例中數據划分,任務分配,計算以及最后的結果收集。因此,顯式計算模式對用戶的要求更高,用戶不僅需要理解自己的算法,還需要對並行計算和硬件有一定的理解。值得慶幸的是,現有R中的並行計算框架,如parallel (snow,multicores),Rmpi和foreach等采用的是映射式並行模型(Mapping),使用方法簡單清晰,極大地簡化了編程復雜度。R用戶只需要將現有程序轉化為*apply或者for的循環形式之后,通過簡單的API替換來實現並行計算。

 

簡單總結就是:

    隱式並行:OpenBLAS,Intel MKL,NVIDIA cuBLAS,H2O(參考我的博客)等

    顯性並行:parallel(主打lapply應用)、foreach(主打for循環)、SupR、還有利用GPU的辦法(gpuR)

 

      同時並行時對內存的消耗極大,超級容易爆發內存問題,而且R的內存問題一直都是R很難解決的問題,這邊筆者也把看到的一些方式列出來。

      當然在使用一些高大上的並行包以及框架之前,如果你能夠從編碼小細節優化,效率也能提高很多,譬如:

 

 

[html]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. 方法:速度, nrow(df)/time_taken = n 行每秒  
  2. 原始方法:1X, 856.2255行每秒(正則化為1)  
  3. 向量化方法:738X, 631578行每秒  
  4. 只考慮真值情況:1002X,857142.9行每秒  
  5. ifelse:1752X,1500000行每秒  
  6. which:8806X,7540364行每秒  
  7. Rcpp:13476X,11538462行每秒  
  8. apply處理並行  

 

 

——————————————————————————————————————————————————————

 

在最后筆者在實踐中遇到的問題,進行對應的解決:

 

應用一:使用parallel包時,能不能clusterExport整個函數呢?

應用二:在使用parallel包時,報錯:Error in unserialize(node$con) : error reading from connection

 

 

——————————————————————————————————

 

一、parallel包的使用方法

 

多數內容參考:R語言並行化基礎與提高

parallel是base包,所以不用install.packages就可以直接調用。

原理:是利用CPU的核心進行訓練。

應用場景:跟apply族(lapply/sapply效果一致)( 

R語言︱數據分組統計函數族——apply族用法與心得

 

 

1、使用步驟

     設置核心數:no_cores <- detectCores() - 1

     步驟分群環境:cl <- makeCluster(no_cores)

     用到的變量與包復制給不同的核心:clusterEvalQ(包)、clusterExport(變量)

     運行算法:clusterApply(cl, c(9,5), get("+"), 3) 

     關閉集群:

stopCluster(cl)

 

 

     就OK啦。但是這里面很從前不一樣的是,如果有環境里面的外置變量(自己定義)那么需要額外插入,復制到不同核上面,而且如果有不同包里面的函數,都要額外加載、復制多份給不同的電腦核心。

 

2、案例

 

[html]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. library(parallel)  
  2. cl <- makeCluster(getOption("cl.cores", 2))  
  3. clusterApply(cl, c(9,5), get("+"), 3)   #加減乘除  
  4. parSapply(cl, c(9,5), get("+"), 3)     

       案例一:c1就是設置的核心數,此時是2核心,然后就可以利用clusterApply/parSapply等函數進行調用。

[html]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. xx <- 1  
  2. clusterExport(cl, "xx")  
  3. clusterCall(cl, function(y) xx + y, 2)  

      案例二:這個里面有xx這個變量是額外定義的,所以需要額外加載,需要用clusterExport函數,導入到並行環境中。

 

3、parallel內存優化與管理

(1)注意數據容量的均勻分布

 

[html]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. parLapply <- function (cl = NULL, X, fun, ...)   
  2. {  
  3.     cl <- defaultCluster(cl)  
  4.     do.call(c, clusterApply(cl, x = splitList(X, length(cl)),   
  5.         fun = lapply, fun, ...), quote = TRUE)  
  6. }  


    注意到splitList(X, length(cl)) ,他會將任務分割成多個部分,然后將他們發送到不同的集群中。這里一個問題就是,譬如假設有一個list,里面數據量分別是:

 

(99,99,99,2,5,2)

    如果是兩個核數據分為了(99,99,99)、(2,5,2),第一個核分為到了那么多任務,第二個核很少,那么就會空閑,於是乎,效率還是不高,所以數據容量要盡量均勻分布。

 

(2)集群內存類型:FORK和PSOCK

FORK適用unix/max,實現內存共享以及節省內存,大數據環境下內存問題報錯少

PSOCK適用所有(一般window都是這個)

 

parallel包中通過函數來設置:

[html]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. makeCluster(4,type="FORK")  


FORK對性能提升很顯著,但是window下不可適用。

 

 

4、parallel萬一報錯了咋辦?

      lapply在使用的時候也會出現這樣的問題,如果出現問題,那么就白跑了,而且也不可能給你停頓下來。那么如何讓lapply運行中跳過報錯的辦法呢?

      R語言相關的報錯處理函數可見:R語言-處理異常值或報錯的三個示例

      用tryCatch跳過:

 

[html]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. result = tryCatch(  
  2.         {expr},   
  3.         warning = function(w) {warning-handler-code},   
  4.         error = function(e) { error-handler-code},   
  5.         finally = {cleanup-code}  
  6.         )  

出現warning、error時候怎么處理,就可以跳過了。例子:

 

 

[html]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. result = tryCatch(  
  2.         {segmentCN(txt)},   
  3.         warning = function(w) {"出警告啦"},   
  4.         error = function(e) { "出錯啦"},   
  5.         )  

分詞時候,容易因為Lapply中斷之后,就不會運行了,這樣功虧一簣所以可以用這個辦法跳過。

 

5、parSapply/parLapply函數使用技巧

       函數的大體結構是:

 

[html]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. parSapply(cl,x,fun)  


      其中cl是預先設定好的,x是需要循環的變量,而fun是函數。

 

      那么一般來說,fun之中要使用的任何內容都需要用clusterEvalQ(包)、clusterExport(變量)復制到不同的核心之中。

       而x則可以不用布置到全局,因為他是在源環境下調用出來,並拆分任務的。

 

 

 

 



 

——————————————————————————————————

 

二、foreach包的使用方法

 

1、簡單使用案例

 

設計foreach包的思想可能想要創建一個lapply和for循環的標准,初始化的過程有些不同,你需要register注冊集群:

library(foreach)
library(doParallel)

cl<-makeCluster(no_cores)
registerDoParallel(cl)

要記得最后要結束集群(不是用stopCluster()):

stopImplicitCluster()

foreach函數可以使用參數.combine控制你匯總結果的方法:

> foreach(exponent = 2:4, 
        .combine = c)  %dopar%  
  base^exponent
  [1]  4  8 16
> foreach(exponent = 2:4, .combine = rbind) %dopar%  base^exponent [,1] result.1 4 result.2 8 result.3 16
foreach(exponent = 2:4, .combine = list, .multicombine = TRUE) %dopar%  base^exponent [[1]] [1] 4 [[2]] [1] 8 [[3]] [1] 16

注意到最后list的combine方法是默認的。在這個例子中用到一個.multicombine參數,他可以幫助你避免嵌套列表。比如說list(list(result.1, result.2), result.3) :

> foreach(exponent = 2:4, 
        .combine = list)  %dopar%  
  base^exponent
[[1]]
[[1]][[1]]
[1] 4

[[1]][[2]]
[1] 8


[[2]]
[1] 16

2、變量作用域

在foreach中,變量作用域有些不同,它會自動加載本地的環境到函數中:

> base <- 2
> cl<-makeCluster(2)
> registerDoParallel(cl)
> foreach(exponent = 2:4, 
        .combine = c)  %dopar%  
  base^exponent
stopCluster(cl)
 [1]  4  8 16

但是,對於父環境的變量則不會加載,以下這個例子就會拋出錯誤:

test <- function (exponent) {
  foreach(exponent = 2:4, 
          .combine = c)  %dopar%  
    base^exponent
}
test()

 Error in base^exponent : task 1 failed - "object 'base' not found" 

為解決這個問題你可以使用.export這個參數而不需要使用clusterExport。注意的是,他可以加載最終版本的變量,在函數運行前,變量都是可以改變的:

base <- 2
cl<-makeCluster(2)
registerDoParallel(cl)
 
base <- 4
test <- function (exponent) {
  foreach(exponent = 2:4, 
          .combine = c,
          .export = "base")  %dopar%  
    base^exponent
}
test()
 
stopCluster(cl)

 [1]  4  8 16

相似的你可以使用.packages參數來加載包,比如說:.packages = c("rms", "mice")

 

——————————————————————————————————

 

三、SupR

通過對現有R 內核的改進實現在單機上的多線程和在集群上的分布式計算功能。SupR目前仍處在內部試用和補充完善階段。

       據說supR很好用,而且小象學院的講師(游皓麟)已經開始教授這個系統的使用方法,挺好用的。

 

——————————————————————————————————

 

四、內存管理

 

方法有三:
      一、升級硬件
      二、改進算法
      三、修改操作系統分配給R的內存上限, memory.size(T)查看已分配內存 

 

 

[html]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. memory.size(F)#查看已使用內存    
  2. memory.limit()#查看內存上限   
  3. object.size()#看每個變量占多大內存。  
  4. memory.size()#查看現在的work space的內存使用  
  5. memory.limit()#查看系統規定的內存使用上限。如果現在的內存上限不夠用,可以通過memory.limit(newLimit)更改到一個新的上限。注意,在32位的R中,封頂上限為4G,無法在一個程序上使用超過4G (數位上限)。這種時候,可以考慮使用64位的版本。  


詳情看:R語言︱大數據集下運行內存管理   以及  R語言之內存管理

 

 

——————————————————————————————————

 

應用一:使用parallel包時,能不能clusterExport整個函數呢?

 

 

      R語言在使用Parallel時候,會出現這樣的疑問,一些東西都需要廣播給不同的核心,那么在clusterExport步驟怎么辦呢?能不能clusterExport一整個函數?然后直接parLapply呢?

      答案否定的。筆者在用的時候,怎么樣都不能把整個函數加載進去,所以只能另想辦法。

      既然不能clusterExport整個函數,那就只能改造我們的函數去適應parallel包了。

      來看幾個函數“被”改造的例子,一般來說有兩個辦法:

 

1、方法一:通過.GlobalEnv廣播成全局變量

 

[plain]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. clusterExport(cl=cl, varlist=c("text.var", "ntv", "gc.rate", "pos"), envir=environment())  


      在函數導入的時候,加入envir變量讓其廣播給不同的核心,這個可以放在函數之中來使用。

 

 

2、方法二:把parLapply嵌套進函數之中

 

[plain]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. par.test <- function(text.var, gc.rate=10){   
  2.     require(parallel)  
  3.     pos <-  function(i) {  
  4.         paste(sapply(strsplit(tolower(i), " "), nchar), collapse=" | ")  
  5.     }  
  6.     cl <- makeCluster(mc <- getOption("cl.cores", 4))  
  7.     parLapply(cl, text.var, function(text.vari, gc.rate, pos) {  
  8.         x <- pos(text.vari)  
  9.         if (i%%gc.rate==0) gc()  
  10.         x  
  11.     }, gc.rate, pos)  
  12. }  


       可以看到的這個例子,就是把內容嵌套到parLapply之中了。同時也可以學習到,parLapply使用方法也很不錯,也可以學習一下。

 

      再來看一個例子

 

[plain]  view plain  copy
 
 print?在CODE上查看代碼片派生到我的代碼片
  1. mainFunction <- function(cl) {  
  2.     fa <- function(x) fb(x)  
  3.     fb <- function(x) fc(x)  
  4.     fc <- function(x) x  
  5.     y <- 7  
  6.     workerFunction <- function(i) {  
  7.         do.call(functionNames[[i]], list(y))  
  8.     }  
  9.     environment(workerFunction) <- .GlobalEnv  
  10.     environment(fa) <- .GlobalEnv  
  11.     environment(fb) <- .GlobalEnv  
  12.     environment(fc) <- .GlobalEnv  
  13.     functionNames <- c("fa", "fb", "fc")  
  14.     clusterExport(cl, varlist=c("functionNames", functionNames, "y"),  
  15.                   envir=environment())  
  16.     parLapply(cl, seq_along(functionNames), workerFunction)  
  17. }  
  18.   
  19. library(parallel)  
  20. cl <- makeCluster(detectCores())  
  21. mainFunction(cl)  
  22. stopCluster(cl)  

 

 

——————————————————————————————————

應用二:在使用parallel包時,報錯:Error in unserialize(node$con) : error reading from connection

 

      在R語言中使用並行算法的時候,會出現報錯,無法連接到核心,即使在本來連接上的時候。通過查閱文獻看到了,這是因為“調用核心數--計算機內存”的不匹配造成的。

      如果你的數據集很大,調用了很多核心,那么你的計算機內存如果不夠匹配,就會出現連接不上的不錯,甚至還出現卡機,一動不動的情況(當然,只要耐心等待,其實他還是會繼續運行的...等待的時候會有點長)

 

解決辦法一:調用FORK,window不能...

 

      FORK適用unix/max,實現內存共享以及節省內存,大數據環境下內存問題報錯少

      PSOCK適用所有(一般window都是這個)

      不過調用FORK也還是治標不治本。

 

解決辦法二:分開並行,小步迭代

      譬如10萬數據,那么就“2萬+2萬+2萬+2萬+2萬”的跑,如果還出現脫機,就用之前tryCatch跳過,讓損失降低到最小。

      最好的辦法了。

 

 

參考文獻:How-to go parallel in R – basics + tips

 

——————————————————————————————————

 

參考文獻

 

1、R語言並行化基礎與提高

 

2、R與並行計算

 

3、sparklyr包:實現Spark與R的接口,會用dplyr就能玩Spark

4、Sparklyr與Docker的推薦系統實戰

 

5、R語言︱H2o深度學習的一些R語言實踐——H2o包

 

6、R用戶的福音︱TensorFlow:TensorFlow的R接口

7、mxnet:結合R與GPU加速深度學習

8、碎片︱R語言與深度學習

 

轉自:http://blog.csdn.net/sinat_26917383/article/details/52719232

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM