先上代碼案例:
主要的操作:
library(parallel);#加載並行計算包
cl <- makeCluster(8);# 初始化cpu集群
clusterEvalQ(cl,library(RODBC));#添加並行計算中用到的包
clusterExport(cl,'variablename');#添加並行計算中用到的環境變量(如當前上下文中定義的方法)
dt <- parApply(cl,stasList, 1, stasPowerPre_Time);# apply的並行版本
all_predata_time <- do.call('rbind',dt);# 整合結果
R並行計算相關
parallel
包
library(parallel)
#使用parallel
包,首先要初始化一個集群,這個集群的數量最好是你CPU核數-1。如果一台8核的電腦建立了數量為8的集群,那你的CPU就干不了其他事情了。所以可以這樣啟動一個集群:
# Calculate the number of cores
no_cores <- detectCores() - 1
# Initiate cluster
cl <- makeCluster(no_cores)
#現在只需要使用並行化版本的lapply
,parLapply
就可以了
parLapply(cl, 2:4,function(exponent) 2^exponent)
#結束后,要記得關閉集群,否則你電腦的內存會始終被R占用
stopCluster(cl)
變量作用域
在Mac/Linux中你可以使用 makeCluster(no_core, type="FORK")
這一選項,從而使你並行運行的時候可以包含所有環境變量。
在Windows中由於使用的是Parallel Socket Cluster (PSOCK),所以每個集群只會加載base包,所以你運行時要指定加載特定的包或變量:
cl<-makeCluster(no_cores)
base <-
2
clusterExport(cl,
"base"
)
parLapply(cl,
2
:
4
,
function(exponent) base^exponent)
stopCluster(cl)
注意:你需要用clusterExport(cl, "base")
把base這一個變量加載到集群當中。
如果你在函數中使用了一些其他的包就要使用clusterEvalQ
加載進去,比如說,使用rms包,那么就用clusterEvalQ(cl, library(rms))
。
要注意的是,在clusterExport 加載某些變量后,這些變量的任何變化都會被忽略:
cl<-makeCluster(no_cores) clusterExport(cl, "base") base <- 4 # Run parLapply(cl, 2:4, function(exponent) base^exponent) # Finish stopCluster(cl) 運行結果: [[1]] [1] 4 [[2]] [1] 8 [[3]]
使用parSapply
如果你想程序返回一個向量或者矩陣。而不是一個列表,那么就應該使用sapply
,他同樣也有並行版本parSapply
:
parSapply(cl, 2:4, function(exponent) base^exponent) [1] 4 8 16
輸出矩陣並顯示行名和列名(因此才需要使用as.character
):
parSapply(cl, as.character(2:4), function(exponent){ x <- as.numeric(exponent) c(base = base^x, self = x^x) }) 2 3 4 base 4 8 16 self 4 27 256
foreach
包
設計foreach
包的思想可能想要創建一個lapply和for循環的標准,初始化的過程有些不同,你需要register
注冊集群:
library(foreach) library(doParallel) cl<-makeCluster(no_cores) registerDoParallel(cl) #要記得最后要結束集群(不是用stopCluster()): stopImplicitCluster()
foreach函數可以使用參數.combine
控制你匯總結果的方法:
foreach(exponent = 2:4, .combine = c) %dopar% base^exponent [1] 4 8 16</code> #------------------------------------------------- foreach(exponent = 2:4, .combine = rbind) %dopar% base^exponent [,1] result.1 4 result.2 8 result.3 16 #------------------------------------------------- foreach(exponent = 2:4, .combine = list, .multicombine = TRUE) %dopar% base^exponent [[1]] [1] 4 [[2]] [1] 8 [[3]] [1] 16
注意到最后list的combine方法是默認的。在這個例子中用到一個.multicombine
參數,他可以幫助你避免嵌套列表。比如說list(list(result.1, result.2), result.3)
:
foreach(exponent = 2:4, .combine = list) %dopar% base^exponent [[1]] [[1]][[1]] [1] 4 [[1]][[2]] [1] 8 [[2]] [1] 16
變量作用域
在foreach中,變量作用域有些不同,它會自動加載本地的環境到函數中:
base <- 2 cl<-makeCluster(2) registerDoParallel(cl) foreach(exponent = 2:4, .combine = c) %dopar% base^exponent stopCluster(cl) [1] 4 8 16</code>
但是,對於父環境的變量則不會加載,以下這個例子就會拋出錯誤:
test <- function (exponent) { foreach(exponent = 2:4, .combine = c) %dopar% base^exponent } test() Error in base^exponent : task 1 failed - "object 'base' not found"
為解決這個問題你可以使用.export
這個參數而不需要使用clusterExport
。注意的是,他可以加載最終版本的變量,在函數運行前,變量都是可以改變的:
base <- 2 cl<-makeCluster(2) registerDoParallel(cl) base <- 4 test <- function (exponent) { foreach(exponent = 2:4, .combine = c, .export = "base") %dopar% base^exponent } test() stopCluster(cl) [1] 4 8 16
相似的你可以使用.packages
參數來加載包,比如說:.packages = c("rms", "mice")
使用Fork還是Psock?
這兩個的區別:
FORK:”to pide in branches and go separate ways”
系統:Unix/Mac (not Windows)
環境: 所有
PSOCK:並行socket集群
系統: All (including Windows)
環境: 空
內存控制
如果你不打算使用windows的話,建議你嘗試FORK模式,它可以實現內存共享,節省你的內存。
PSOCK:
library(pryr) # Used for memory analyses cl<-makeCluster(no_cores) clusterExport(cl, "a") clusterEvalQ(cl, library(pryr)) parSapply(cl, X = 1:10, function(x) {address(a)}) == address(a) [1] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE
FORK :
cl<-makeCluster(no_cores, type="FORK") parSapply(cl, X = 1:10, function(x) address(a)) == address(a) [1] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE
你不需要花費太多時間去配置你的環境,有趣的是,你不需要擔心變量沖突:
b <- 0 parSapply(cl, X = 1:10, function(x) {b <- b + 1; b}) # [1] 1 1 1 1 1 1 1 1 1 1 parSapply(cl, X = 1:10, function(x) {b <<- b + 1; b}) # [1] 1 2 3 4 5 1 2 3 4 5 b # [1] 0
調試
當你在並行環境中工作是,debug是很困難的,你不能使用browser
/cat
/print
等參數來發現你的問題。
tryCatch
-list
方法
使用stop()
函數這不是一個好方法,因為當你收到一個錯誤信息時,很可能這個錯誤信息你在很久之前寫的,都快忘掉了,但是當你的程序跑了1,2天后,突然彈出這個錯誤,就只因為這一個錯誤,你的程序終止了,並把你之前的做的計算全部扔掉了,這是很討厭的。為此,你可以嘗試使用tryCatch
去捕捉那些錯誤,從而使得出現錯誤后程序還能繼續執行:
foreach(x=list(1, 2, "a")) %dopar% { tryCatch({ c(1/x, x, 2^x) }, error = function(e) return(paste0("The variable '", x, "'", " caused the error: '", e, "'"))) } [[1]] [1] 1 1 2 [[2]] [1] 0.5 2.0 4.0 [[3]] [1] "The variable 'a' caused the error: 'Error in 1/x: non-numeric argument to binary operator\n'"
這也正是我喜歡list的原因,它可以方便的將所有相關的數據輸出,而不是只輸出一個錯誤信息。這里有一個使用rbind
在lapply
進行conbine的例子:
out <- lapply(1:3, function(x) c(x, 2^x, x^x)) do.call(rbind, out) [,1] [,2] [,3] [1,] 1 2 1 [2,] 2 4 4 [3,] 3 8 27
創建一個文件輸出
當我們無法在控制台觀測每個工作時,我們可以設置一個共享文件,讓結果輸出到文件當中,這是一個想當舒服的解決方案:
cl<-makeCluster(no_cores, outfile = "debug.txt") registerDoParallel(cl) foreach(x=list(1, 2, "a")) %dopar% { print(x) } stopCluster(cl) starting worker pid=7392 on localhost:11411 at 00:11:21.077 starting worker pid=7276 on localhost:11411 at 00:11:21.319 starting worker pid=7576 on localhost:11411 at 00:11:21.762 [1] 2] [1] "a"
創建一個結點專用文件
一個或許更為有用的選擇是創建一個結點專用的文件,如果你的數據集存在一些問題的時候,可以方便觀測:
cl<-makeCluster(no_cores, outfile = "debug.txt") registerDoParallel(cl) foreach(x=list(1, 2, "a")) %dopar% { cat(dput(x), file = paste0("debug_file_", x, ".txt")) } stopCluster(cl)
partools
包
partools
這個包有一個dbs()函數或許值得一看(使用非windows系統值得一看),他允許你聯合多個終端給每個進程進行debug。
Caching
當做一個大型計算時,我強烈推薦使用一些緩存。這或許有多個原因你想要結束計算,但是要遺憾地浪費了計算的寶貴的時間。這里有一個包可以做緩存,R.cache,但是我發現自己寫個函數來實現更加簡單。你只需要嵌入digest
包就可以。digest()
函數是一個散列函數,把一個R對象輸入進去可以輸出一個md5值或sha1等從而得到一個唯一的key值,當你key匹配到你保存的cache中的key時,你就可以繼續你的計算了,而不需要將算法重新運行,以下是一個使用例子:
cacheParallel <- function(){ vars <- 1:2 tmp <- clusterEvalQ(cl, library(digest)) parSapply(cl, vars, function(var){ fn <- function(a) a^2 dg <- digest(list(fn, var)) cache_fn <- sprintf("Cache_%s.Rdata", dg) if (file.exists(cache_fn)){ load(cache_fn) }else{ var <- fn(var); Sys.sleep(5) save(var, file = cache_fn) } return(var) }) }
這個例子很顯然在第二次運行的時候並沒有啟動Sys.sleep,而是檢測到了你的cache文件,加載了上一次計算后的cache,你就不必再計算Sys.sleep了,因為在上一次已經計算過了。
system.time(out <- cacheParallel()) # user system elapsed # 0.003 0.001 5.079 out # [1] 1 4 system.time(out <- cacheParallel()) # user system elapsed # 0.001 0.004 0.046 out # [1] 1 4 # To clean up the files just do: file.remove(list.files(pattern = "Cache.+\.Rdata"))
載入平衡
任務載入
需要注意的是,無論parLapply還是foreach都是一個包裝(wrapper)的函數。這意味着他們不是直接執行並行計算的代碼,而是依賴於其他函數實現的。在parLapply中的定義如下:
parLapply <- function (cl = NULL, X, fun, ...) { cl <- defaultCluster(cl) do.call(c, clusterApply(cl, x = splitList(X, length(cl)), fun = lapply, fun, ...), quote = TRUE) }
注意到splitList(X, length(cl))
,他會將任務分割成多個部分,然后將他們發送到不同的集群中。如果你有很多cache或者存在一個任務比其他worker中的任務都大,那么在這個任務結束之前,其他提前結束的worker都會處於空閑狀態。為了避免這一情況,你需要將你的任務盡量平均分配給每個worker。舉個例子,你要計算優化神經網絡的參數,這一過程你可以並行地以不同參數來訓練神經網絡,你應該將如下代碼:
# From the nnet example parLapply(cl, c(10, 20, 30, 40, 50), function(neurons) nnet(ir[samp,], targets[samp,], size = neurons))
改為:
# From the nnet example parLapply(cl, c(10, 50, 30, 40, 20), function(neurons) nnet(ir[samp,], targets[samp,], size = neurons))
內存載入
在大數據的情況下使用並行計算會很快的出現問題。因為使用並行計算會極大的消耗內存,你必須要注意不要讓你的R運行內存到達內存的上限,否則這將會導致崩潰或非常緩慢。使用Forks是一個控制內存上限的一個重要方法。Fork是通過內存共享來實現,而不需要額外的內存空間,這對性能的影響是很顯著的(我的系統時16G內存,8核心):
> rm(list=ls()) > library(pryr) > library(magrittr) > a <- matrix(1, ncol=10^4*2, nrow=10^4) > object_size(a) 1.6 GB > system.time(mean(a)) user system elapsed 0.338 0.000 0.337 > system.time(mean(a + 1)) user system elapsed 0.490 0.084 0.574 > library(parallel) > cl <- makeCluster(4, type = "PSOCK") > system.time(clusterExport(cl, "a")) user system elapsed 5.253 0.544 7.289 > system.time(parSapply(cl, 1:8, function(x) mean(a + 1))) user system elapsed 0.008 0.008 3.365 > stopCluster(cl); gc(); > cl <- makeCluster(4, type = "FORK") > system.time(parSapply(cl, 1:8, function(x) mean(a + 1))) user system elapsed 0.009 0.008 3.123 > stopCluster(cl)
FORKs可以讓你並行化從而不用崩潰:
> cl <- makeCluster(8, type = "PSOCK") > system.time(clusterExport(cl, "a")) user system elapsed 10.576 1.263 15.877 > system.time(parSapply(cl, 1:8, function(x) mean(a + 1))) Error in checkForRemoteErrors(val) : 8 nodes produced errors; first error: cannot allocate vector of size 1.5 Gb Timing stopped at: 0.004 0 0.389 > stopCluster(cl) > cl <- makeCluster(8, type = "FORK") > system.time(parSapply(cl, 1:8, function(x) mean(a + 1))) user system elapsed 0.014 0.016 3.735 > stopCluster(cl)
當然,他並不能讓你完全解放,如你所見,當我們創建一個中間變量時也是需要消耗內存的:
> a <- matrix(1, ncol=10^4*2.1, nrow=10^4) > cl <- makeCluster(8, type = "FORK") > parSapply(cl, 1:8, function(x) { + b <- a + 1 + mean(b) + }) Error in unserialize(node$con) : error reading from connection</code>
內存建議
盡量使用rm()避免無用的變量 盡量使用gc()釋放內存。即使這在R中是自動執行的,但是當它沒有及時執行,在一個並行計算的情況下,如果沒有及時釋放內存,那么它將不會將內存返回給操作系統,從而影響了其他worker的執行。 通常並行化在大規模運算下很有用,但是,考慮到R中的並行化存在內存的初始化成本,所以考慮到內存的情況下,顯然小規模的並行化可能會更有用。 有時候在並行計算時,不斷做緩存,當達到上限時,換回串行計算。 你也可以手動的控制每個核所使用的內存數量,一個簡單的方法就是:memory.limit()/memory.size() = max cores
其他建議
一個常用的CPU核數檢測函數:max(
1
, detectCores() -
1
)
永遠不要使用set.seed()
,使用clusterSetRNGStream()來代替設置種子,如果你想重現結果。 如果你有Nvidia 顯卡,你可以嘗試使用gputools 包進行GPU加速(警告:安裝可能會很困難) 當使用mice並行化時記得使用ibind()
來合並項。
轉自:
https://www.2cto.com/kf/201606/517963.html
R中的apply族函數和多線程計算
一.apply族函數
1.apply 應用於矩陣和數組
# apply # 1代表行,2代表列 # create a matrix of 10 rows x 2 columns m <- matrix(c(1:10, 11:20), nrow = 10, ncol = 2) # mean of the rows apply(m, 1, mean) [1] 6 7 8 9 10 11 12 13 14 15 # mean of the columns apply(m, 2, mean) [1] 5.5 15.5 # divide all values by 2 apply(m, 1:2, function(x) x/2)
2.eapply 應用於環境中的變量
# a new environment e <- new.env() # two environment variables, a and b e$a <- 1:10 e$b <- 11:20 # mean of the variables eapply(e, mean) $b [1] 15.5 $a [1] 5.5
3.lapply應用於列表,返回列表,實際data.frame也是一種list,一種由多個長度相同的向量cbind一起的list:lapply(list, function)
sapply(iris[,1:4],mean) Sepal.Length Sepal.Width Petal.Length Petal.Width 5.843333 3.057333 3.758000 1.199333 lapply(iris[,1:4],mean) $Sepal.Length [1] 5.843333 $Sepal.Width [1] 3.057333 $Petal.Length [1] 3.758 $Petal.Width [1] 1.199333
4.sapply 是lapply的友好形式.lapply和sapply都可應用於list,data.frame。只是返回的對象類型不一樣,前者是list,后者看情況,如果是每一個list下面的元素長度都一樣,返回的結果就會被就會簡化。舉例說明。
# 下面兩個返回的結果是一樣一樣的,都是list sapply(iris,unique) lapply(iris,unique) # 下面兩個前者返回向量,后者返回list sapply(iris[,1:4],mean) lapply(iris[,1:4],mean) #下面兩個前者返回data.frame,后者反回list sapply(iris[,1:4], function(x) x/2) lapply(iris[,1:4], function(x) x/2) # sapply會根據返回結果,選最合適的對象類型來存放對象,而list反悔的統統都是list # 以下兩者返回結果一樣 library(magrittr) lapply(iris[,1:4],mean)%>%unlist() sapply(iris[,1:4],mean)
5.vapply要求提供第三個參數,即輸出的格式
l <- list(a = 1:10, b = 11:20) # fivenum of values using vapply l.fivenum <- vapply(l, fivenum, c(Min.=0, "1st Qu."=0, Median=0, "3rd Qu."=0, Max.=0)) class(l.fivenum) [1] "matrix" # let's see it l.fivenum a b Min. 1.0 11.0 1st Qu. 3.0 13.0 Median 5.5 15.5 3rd Qu. 8.0 18.0 Max. 10.0 20.0
6.replicate
Description: “replicate is a wrapper for the common use of sapply for repeated evaluation of an expression (which will usually involve random number generation).”
replicate(10, rnorm(10))
7.mapply可傳遞多個參數進去.
mapply is a multivariate version of sapply. mapply applies FUN to the first elements of each ... argument, the second elements, the third elements, and so on. Arguments are recycled if necessary.
l1 <- list(a = c(1:10), b = c(11:20)) l2 <- list(c = c(21:30), d = c(31:40)) # sum the corresponding elements of l1 and l2 mapply(sum, l1$a, l1$b, l2$c, l2$d) [1] 64 68 72 76 80 84 88 92 96 100 #mapply像是可以傳遞多個參數的saply mapply(rep, 1:4, 5) [,1] [,2] [,3] [,4] [1,] 1 2 3 4 [2,] 1 2 3 4 [3,] 1 2 3 4 [4,] 1 2 3 4 [5,] 1 2 3 4
8.rapply
Description: “rapply is a recursive version of lapply.”
# let's start with our usual simple list example l <- list(a = 1:10, b = 11:20) # log2 of each value in the list rapply(l, log2) a1 a2 a3 a4 a5 a6 a7 a8 0.000000 1.000000 1.584963 2.000000 2.321928 2.584963 2.807355 3.000000 a9 a10 b1 b2 b3 b4 b5 b6 3.169925 3.321928 3.459432 3.584963 3.700440 3.807355 3.906891 4.000000 b7 b8 b9 b10 4.087463 4.169925 4.247928 4.321928 # log2 of each value in each list rapply(l, log2, how = "list") $a [1] 0.000000 1.000000 1.584963 2.000000 2.321928 2.584963 2.807355 3.000000 [9] 3.169925 3.321928 $b [1] 3.459432 3.584963 3.700440 3.807355 3.906891 4.000000 4.087463 4.169925 [9] 4.247928 4.321928 # what if the function is the mean? rapply(l, mean) a b 5.5 15.5 rapply(l, mean, how = "list") $a [1] 5.5 $b [1] 15.5
二.多線程計算
下面用歐拉問題14,來演示R中的向量化編程(利用apply組函數)和多線程
#-----Longest Collatz sequence Problem 14 func <- function(x) { n = 1 raw <- x while (x > 1) { x <- ifelse(x%%2==0,x/2,3*x+1) n = n + 1 } return(c(raw,n)) } #方法1 向量化編程 library(magrittr) system.time({ x <- 1:1e5 res1 <- sapply(x, func)%>%t() }) 用戶 系統 流逝 37.960 0.360 41.315 #方法2 向量化編程 system.time({ x <- 1:1e5 res2 <- do.call('rbind',lapply(x,func)) }) 用戶 系統 流逝 36.031 0.181 36.769 #方法3 多線程計算 library(parallel) # 用system.time來返回計算所需時間 system.time({ x <- 1:1e5 cl <- makeCluster(4) # 初始化四核心集群 results <- parLapply(cl,x,func) # lapply的並行版本 res.df <- do.call('rbind',results) # 整合結果 stopCluster(cl) # 關閉集群 }) 用戶 系統 流逝 0.199 0.064 20.038 # 方法4 for 循環 system.time({ m <- matrix(nrow = 0,ncol = 2) for(i in 1:1e5){ m <- rbind(m,func(i)) } }) #方法4用時太長
轉自:
http://www.cnblogs.com/litao1105/p/5573373.html