終於開始攻克並行這一塊了,有點小興奮,來看看網絡上R語言並行辦法有哪些:
趙鵬老師(R與並行計算)做的總結已經很到位。現在並行可以分為:
隱式並行:隱式計算對用戶隱藏了大部分細節,用戶不需要知道具體數據分配方式 ,算法的實現或者底層的硬件資源分配。系統會根據當前的硬件資源來自動啟動計算核心。顯然,這種模式對於大多數用戶來說是最喜聞樂見的。
顯性並行:顯式計算則要求用戶能夠自己處理算例中數據划分,任務分配,計算以及最后的結果收集。因此,顯式計算模式對用戶的要求更高,用戶不僅需要理解自己的算法,還需要對並行計算和硬件有一定的理解。值得慶幸的是,現有R中的並行計算框架,如parallel (snow,multicores),Rmpi和foreach等采用的是映射式並行模型(Mapping),使用方法簡單清晰,極大地簡化了編程復雜度。R用戶只需要將現有程序轉化為*apply或者for的循環形式之后,通過簡單的API替換來實現並行計算。
簡單總結就是:
隱式並行:OpenBLAS,Intel MKL,NVIDIA cuBLAS,H2O(參考我的博客)等
顯性並行:parallel(主打lapply應用)、foreach(主打for循環)、SupR、還有利用GPU的辦法(gpuR)
同時並行時對內存的消耗極大,超級容易爆發內存問題,而且R的內存問題一直都是R很難解決的問題,這邊筆者也把看到的一些方式列出來。
當然在使用一些高大上的並行包以及框架之前,如果你能夠從編碼小細節優化,效率也能提高很多,譬如:
- 方法:速度, nrow(df)/time_taken = n 行每秒
- 原始方法:1X, 856.2255行每秒(正則化為1)
- 向量化方法:738X, 631578行每秒
- 只考慮真值情況:1002X,857142.9行每秒
- ifelse:1752X,1500000行每秒
- which:8806X,7540364行每秒
- Rcpp:13476X,11538462行每秒
- apply處理並行
——————————————————————————————————————————————————————
在最后筆者在實踐中遇到的問題,進行對應的解決:
應用一:使用parallel包時,能不能clusterExport整個函數呢?
應用二:在使用parallel包時,報錯:Error in unserialize(node$con) : error reading from connection
——————————————————————————————————
一、parallel包的使用方法
多數內容參考:R語言並行化基礎與提高
parallel是base包,所以不用install.packages就可以直接調用。
原理:是利用CPU的核心進行訓練。
應用場景:跟apply族(lapply/sapply效果一致)(
R語言︱數據分組統計函數族——apply族用法與心得
)
1、使用步驟
設置核心數:no_cores <- detectCores() - 1
步驟分群環境:cl <- makeCluster(no_cores)
用到的變量與包復制給不同的核心:clusterEvalQ(包)、clusterExport(變量)
運行算法:clusterApply(cl, c(9,5), get("+"), 3)
關閉集群:
stopCluster(cl)
就OK啦。但是這里面很從前不一樣的是,如果有環境里面的外置變量(自己定義)那么需要額外插入,復制到不同核上面,而且如果有不同包里面的函數,都要額外加載、復制多份給不同的電腦核心。
2、案例
- library(parallel)
- cl <- makeCluster(getOption("cl.cores", 2))
- clusterApply(cl, c(9,5), get("+"), 3) #加減乘除
- parSapply(cl, c(9,5), get("+"), 3)
案例一:c1就是設置的核心數,此時是2核心,然后就可以利用clusterApply/parSapply等函數進行調用。
- xx <- 1
- clusterExport(cl, "xx")
- clusterCall(cl, function(y) xx + y, 2)
案例二:這個里面有xx這個變量是額外定義的,所以需要額外加載,需要用clusterExport函數,導入到並行環境中。
3、parallel內存優化與管理
(1)注意數據容量的均勻分布
- parLapply <- function (cl = NULL, X, fun, ...)
- {
- cl <- defaultCluster(cl)
- do.call(c, clusterApply(cl, x = splitList(X, length(cl)),
- fun = lapply, fun, ...), quote = TRUE)
- }
注意到splitList(X, length(cl)) ,他會將任務分割成多個部分,然后將他們發送到不同的集群中。這里一個問題就是,譬如假設有一個list,里面數據量分別是:
(99,99,99,2,5,2)
如果是兩個核數據分為了(99,99,99)、(2,5,2),第一個核分為到了那么多任務,第二個核很少,那么就會空閑,於是乎,效率還是不高,所以數據容量要盡量均勻分布。
(2)集群內存類型:FORK和PSOCK
FORK適用unix/max,實現內存共享以及節省內存,大數據環境下內存問題報錯少
PSOCK適用所有(一般window都是這個)
parallel包中通過函數來設置:
- makeCluster(4,type="FORK")
FORK對性能提升很顯著,但是window下不可適用。
4、parallel萬一報錯了咋辦?
lapply在使用的時候也會出現這樣的問題,如果出現問題,那么就白跑了,而且也不可能給你停頓下來。那么如何讓lapply運行中跳過報錯的辦法呢?
R語言相關的報錯處理函數可見:R語言-處理異常值或報錯的三個示例
用tryCatch跳過:
- result = tryCatch(
- {expr},
- warning = function(w) {warning-handler-code},
- error = function(e) { error-handler-code},
- finally = {cleanup-code}
- )
出現warning、error時候怎么處理,就可以跳過了。例子:
- result = tryCatch(
- {segmentCN(txt)},
- warning = function(w) {"出警告啦"},
- error = function(e) { "出錯啦"},
- )
分詞時候,容易因為Lapply中斷之后,就不會運行了,這樣功虧一簣所以可以用這個辦法跳過。
5、parSapply/parLapply函數使用技巧
函數的大體結構是:
- parSapply(cl,x,fun)
其中cl是預先設定好的,x是需要循環的變量,而fun是函數。
那么一般來說,fun之中要使用的任何內容都需要用clusterEvalQ(包)、clusterExport(變量)復制到不同的核心之中。
而x則可以不用布置到全局,因為他是在源環境下調用出來,並拆分任務的。
——————————————————————————————————
二、foreach包的使用方法
1、簡單使用案例
設計foreach
包的思想可能想要創建一個lapply和for循環的標准,初始化的過程有些不同,你需要register
注冊集群:
library(foreach)
library(doParallel)
cl<-makeCluster(no_cores)
registerDoParallel(cl)
要記得最后要結束集群(不是用stopCluster()
):
stopImplicitCluster()
foreach函數可以使用參數.combine
控制你匯總結果的方法:
> foreach(exponent = 2:4,
.combine = c) %dopar%
base^exponent
[1] 4 8 16
> foreach(exponent = 2:4, .combine = rbind) %dopar% base^exponent [,1] result.1 4 result.2 8 result.3 16
foreach(exponent = 2:4, .combine = list, .multicombine = TRUE) %dopar% base^exponent [[1]] [1] 4 [[2]] [1] 8 [[3]] [1] 16
注意到最后list的combine方法是默認的。在這個例子中用到一個.multicombine
參數,他可以幫助你避免嵌套列表。比如說list(list(result.1, result.2), result.3)
:
> foreach(exponent = 2:4,
.combine = list) %dopar%
base^exponent
[[1]]
[[1]][[1]]
[1] 4
[[1]][[2]]
[1] 8
[[2]]
[1] 16
2、變量作用域
在foreach中,變量作用域有些不同,它會自動加載本地的環境到函數中:
> base <- 2
> cl<-makeCluster(2)
> registerDoParallel(cl)
> foreach(exponent = 2:4,
.combine = c) %dopar%
base^exponent
stopCluster(cl)
[1] 4 8 16
但是,對於父環境的變量則不會加載,以下這個例子就會拋出錯誤:
test <- function (exponent) {
foreach(exponent = 2:4,
.combine = c) %dopar%
base^exponent
}
test()
Error in base^exponent : task 1 failed - "object 'base' not found"
為解決這個問題你可以使用.export
這個參數而不需要使用clusterExport
。注意的是,他可以加載最終版本的變量,在函數運行前,變量都是可以改變的:
base <- 2
cl<-makeCluster(2)
registerDoParallel(cl)
base <- 4
test <- function (exponent) {
foreach(exponent = 2:4,
.combine = c,
.export = "base") %dopar%
base^exponent
}
test()
stopCluster(cl)
[1] 4 8 16
相似的你可以使用.packages
參數來加載包,比如說:.packages = c("rms", "mice")
——————————————————————————————————
三、SupR
通過對現有R 內核的改進實現在單機上的多線程和在集群上的分布式計算功能。SupR目前仍處在內部試用和補充完善階段。
據說supR很好用,而且小象學院的講師(游皓麟)已經開始教授這個系統的使用方法,挺好用的。
——————————————————————————————————
四、內存管理
方法有三:
一、升級硬件
二、改進算法
三、修改操作系統分配給R的內存上限, memory.size(T)查看已分配內存
- memory.size(F)#查看已使用內存
- memory.limit()#查看內存上限
- object.size()#看每個變量占多大內存。
- memory.size()#查看現在的work space的內存使用
- memory.limit()#查看系統規定的內存使用上限。如果現在的內存上限不夠用,可以通過memory.limit(newLimit)更改到一個新的上限。注意,在32位的R中,封頂上限為4G,無法在一個程序上使用超過4G (數位上限)。這種時候,可以考慮使用64位的版本。
詳情看:R語言︱大數據集下運行內存管理 以及 R語言之內存管理
——————————————————————————————————
應用一:使用parallel包時,能不能clusterExport整個函數呢?
R語言在使用Parallel時候,會出現這樣的疑問,一些東西都需要廣播給不同的核心,那么在clusterExport步驟怎么辦呢?能不能clusterExport一整個函數?然后直接parLapply呢?
答案否定的。筆者在用的時候,怎么樣都不能把整個函數加載進去,所以只能另想辦法。
既然不能clusterExport整個函數,那就只能改造我們的函數去適應parallel包了。
來看幾個函數“被”改造的例子,一般來說有兩個辦法:
1、方法一:通過.GlobalEnv廣播成全局變量
- clusterExport(cl=cl, varlist=c("text.var", "ntv", "gc.rate", "pos"), envir=environment())
在函數導入的時候,加入envir變量讓其廣播給不同的核心,這個可以放在函數之中來使用。
2、方法二:把parLapply嵌套進函數之中
- par.test <- function(text.var, gc.rate=10){
- require(parallel)
- pos <- function(i) {
- paste(sapply(strsplit(tolower(i), " "), nchar), collapse=" | ")
- }
- cl <- makeCluster(mc <- getOption("cl.cores", 4))
- parLapply(cl, text.var, function(text.vari, gc.rate, pos) {
- x <- pos(text.vari)
- if (i%%gc.rate==0) gc()
- x
- }, gc.rate, pos)
- }
可以看到的這個例子,就是把內容嵌套到parLapply之中了。同時也可以學習到,parLapply使用方法也很不錯,也可以學習一下。
再來看一個例子:
- mainFunction <- function(cl) {
- fa <- function(x) fb(x)
- fb <- function(x) fc(x)
- fc <- function(x) x
- y <- 7
- workerFunction <- function(i) {
- do.call(functionNames[[i]], list(y))
- }
- environment(workerFunction) <- .GlobalEnv
- environment(fa) <- .GlobalEnv
- environment(fb) <- .GlobalEnv
- environment(fc) <- .GlobalEnv
- functionNames <- c("fa", "fb", "fc")
- clusterExport(cl, varlist=c("functionNames", functionNames, "y"),
- envir=environment())
- parLapply(cl, seq_along(functionNames), workerFunction)
- }
- library(parallel)
- cl <- makeCluster(detectCores())
- mainFunction(cl)
- stopCluster(cl)
——————————————————————————————————
應用二:在使用parallel包時,報錯:Error in unserialize(node$con) : error reading from connection
在R語言中使用並行算法的時候,會出現報錯,無法連接到核心,即使在本來連接上的時候。通過查閱文獻看到了,這是因為“調用核心數--計算機內存”的不匹配造成的。
如果你的數據集很大,調用了很多核心,那么你的計算機內存如果不夠匹配,就會出現連接不上的不錯,甚至還出現卡機,一動不動的情況(當然,只要耐心等待,其實他還是會繼續運行的...等待的時候會有點長)
解決辦法一:調用FORK,window不能...
FORK適用unix/max,實現內存共享以及節省內存,大數據環境下內存問題報錯少
PSOCK適用所有(一般window都是這個)
不過調用FORK也還是治標不治本。
解決辦法二:分開並行,小步迭代
譬如10萬數據,那么就“2萬+2萬+2萬+2萬+2萬”的跑,如果還出現脫機,就用之前tryCatch跳過,讓損失降低到最小。
最好的辦法了。
參考文獻:How-to go parallel in R – basics + tips
——————————————————————————————————
參考文獻
2、R與並行計算
3、sparklyr包:實現Spark與R的接口,會用dplyr就能玩Spark
6、R用戶的福音︱TensorFlow:TensorFlow的R接口
8、碎片︱R語言與深度學習
轉自:http://blog.csdn.net/sinat_26917383/article/details/52719232