R語言多線程運算操作(解決R循環慢的問題)


已經大半年沒有更新博客了。。最近都跑去寫分析報告半年沒有R

這次記錄下關於R循環(百萬級以上)死慢死慢的問題,這個問題去年就碰到過,當時也嘗試過多線程,but failed......昨天試了下,終於跑通了,而且過程還挺順利

step1

先查下自己電腦幾核的,n核貌似應該選跑n個線程,線程不是越多越好,線程個數和任務運行時間是條開口向下的拋物線,最高點預計在電腦的核數上。

detectCores( )檢查當前電腦可用核數 我的是4所以step2選的是4

1
2
library(parallel)
cl.cores <- detectCores()

step 2

多線程計算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
setwd( "C:\\Users\\siyuanmao\\Documents\\imdada\\0-渠道投放和新人券聯動模型\\測算" )
options(scipen=3)  ##取消科學計數法
channel_ad_ios_data<- seq (0,50000,5000)
channel_ad_android_data<- seq (0,100000,10000)
library(parallel)
func <- function (n){ #n=1
   result_data<- read .csv( "發券方案.csv" ,stringsAsFactors=FALSE)
   total_coupon_solution_data<- read .csv( "結果表框架.csv" ,stringsAsFactors=FALSE)
   coupon_solution_data<-subset(result_data,solution== paste ( '方案' ,n,sep= "" ))
   
   for (i in 1:11){ #i=3
     coupon_solution_data$channel_ad_cost[3]<-5000*(i-1)
     
     for (j in 1:11){ #j=5
       coupon_solution_data$channel_ad_cost[4]<-10000*(j-1)
       solution_mark<- paste ( '方案' ,n,i,j,sep= "-" )
       coupon_solution_data$solution<-solution_mark
       
       total_coupon_solution_data<-rbind(total_coupon_solution_data,coupon_solution_data)
     }
   }
   print(solution_mark)
   return (total_coupon_solution_data)
}
#func(10)
system. time ({
x <- 1:7776
cl <- makeCluster(4) # 初始化四核心集群
results <- parLapply(cl,x,func) # lapply的並行版本
res. df <- do .call( 'rbind' ,results) # 整合結果
stopCluster(cl) # 關閉集群
})
df =as.data.frame(res. df )

原來非多線程的時候,我預計要跑12個小時以上,電腦發出呼呼~~的響聲,查了下Python循環會快點,然后改為python版(已經很久沒有用了,連個range都不會寫,摸索了大半天才改好,但是速度還是慢==),於是改成多線程,運行25分鍾就出結果了~~

補充:R語言 多線程

parallel包

包的安裝

1
2
install .packages( "parallel" )
library(parallel)

包中常用函數

detectCores() 檢查當前的可用核數

clusterExport() 配置當前環境

makeCluster() 分配核數

stopCluster() 關閉集群

parLapply() lapply()函數的並行版本

其實R語言本來就是一門向量化語言,如果是對於一個向量的操作,使用apply函數一族能獲得比較高的效率,相比於for循環,這種高效來自於:

用C實現了for循環

減少對於data.frame等數據結構等不必要的拷貝

但是很多時候,如果想更快的話,光apply函數一族還不足夠,這時候就能用上多線程。

R語言parallel包可以幫助實現多線程。

parLapply的簡單代碼實戰

檢查當前核數

1
2
3
4
cl.cores <- detectCores()
#結果
> cl.cores
[1] 8

啟動集群和關閉集群

1
2
3
cl <- makeCluster(4) # 初始化四核心集群
###並行任務
stopCluster(cl) # 關閉集群

parLapply執行多線程計算

1
2
3
4
5
#定義計算平方函數
square <- function (x)
{
     return (x^2)
}
1
2
3
4
5
6
7
8
9
#利用並行計算計算平方函數
num <- c(1:3)
cl <- makeCluster(4) # 初始化四核心集群
results <- parLapply(cl,num,square) #調用parLapply並行計算平方函數
final <- do .call( 'c' ,results) #整合結果
stopCluster(cl) # 關閉集群
#結果
> final
[1] 1,4,9

思考:在如此小的計算方式下,開4個核計算是否比開一個核要快

答案:當然是不一定,因為涉及到調度方式等額外開銷,所以不一定快,因為真正並行起作用的地方在於大數據量的計算。

時間開銷對比

兩段對比代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#定義計算平方函數
square <- function (x)
{
    #########
    #一段冗余代碼增加執行時間
     y = 2*x
     if (y <300)
     {z = y}
     else
     {z = x}
    ##########  
     return (x^2)
}
num <- c(1:10000000)
1
2
3
4
5
6
7
8
9
10
#並行計算
print(system. time ({
     cl <- makeCluster(4) # 初始化四核心集群
     results <- parLapply(cl,num,square) #調用parLapply並行計算平方函數
final <- do .call( 'c' ,results) #整合結果
stopCluster(cl) # 關閉集群
}))
#結果
用戶  系統  流逝
  7.89  0.27 19.01
1
2
3
4
5
6
7
8
#普通計算
print(system. time ({
     results <- lapply(num,square)
     final <- do .call( 'c' ,results) #整合結果
}))
#結果
用戶  系統  流逝
29.74  0.00 29.79

顯然在數據量比較大的時候,並行計算的時間幾乎就是於核數反比。不過,也不是多開幾個核就好,注意內存很容易超支的,每個核都分配相應的內存,所以要注意內存開銷。出現內存問題的時候,需要檢查是否代碼是否合理,R語言版本(64位會比32位分配的內存大),核分配是否合理。

上一級環境中變量的引入

R語言里邊對於環境變量有着有趣的定義,一層套一層,這里不做深入展開。

類似於在c語言函數中使用全局變量,R在執行並行計算的時候,如果需要計算的函數出現在全局(上一級),那么就需要聲明引入這個變量,否則將會報錯。

1
2
3
4
5
6
7
#定義計算冪函數
base = 2
square <- function (x)
{
     return (x^base)
}
num <- c(1:1000000)
1
2
3
4
5
6
7
8
#利用並行計算計算冪函數
cl <- makeCluster(4) # 初始化四核心集群
results <- parLapply(cl,num,square) #調用parLapply並行計算平方函數
final <- do .call( 'c' ,results) #整合結果
stopCluster(cl) # 關閉集群
#結果報錯
Error in checkForRemoteErrors(val) :
   4 nodes produced errors; first error: 找不到對象 'base'
1
2
3
4
5
6
7
8
9
#利用並行計算計算冪函數
cl <- makeCluster(4) # 初始化四核心集群
clusterExport(cl, "base" ,envir = environment())
results <- parLapply(cl,num,square) #調用parLapply並行計算平方函數
final <- do .call( 'c' ,results) #整合結果
stopCluster(cl) # 關閉集群
#結果
> final
[1] 1,4,9,16,25.......

foreach包

除了parallel包以外,還有針對並行for循環的foreach包,foreach()的使用也與parLapply()類似,兩個功能也類似,其中遇到的問題也類似。

包的安裝

1
2
install .packages( "foreach" )
library(parallel)

foreach的使用

1
2
3
4
5
#定義計算冪函數
square <- function (x)
{
     return (x^2)
}

非並行情況的使用:

參數中的combine就是整合結果的函數,可以是c,可以是rbind,也可以是+等

1
2
3
4
results = foreach(x = c(1:3),.combine = 'c' ) % do % square(x)
#結果
> results
[1] 1,4,9

並行情況的使用:

注意並行情況的時候,需要與parallel包進行配合,引入library(doParallel)。同時%do%需要改成%dopar%。另外與parallel包不一樣的是,需要多加一句registerDoParallel(cl)來注冊核進行使用。

1
2
3
4
cl <- makeCluster(4)
registerDoParallel(cl)
results = foreach(x = c(1:100000),.combine = 'c' ) %dopar% square(x)
stopCluster(cl)

上一級環境中變量的引入

同parallel包並行計算前需要clusterExport()來引入全局變量一樣,foreach也同樣需要聲明,不同的是,foreach聲明方式直接寫在foreach()的參數export里邊。

1
2
3
4
5
6
7
8
9
10
#定義計算冪函數
base = 2
square <- function (x)
{
     return (x^base)
}
cl <- makeCluster(4)
registerDoParallel(cl)
results = foreach(x = c(1:100000),.combine = 'c' ,. export = 'base' ) %dopar% square(x)
stopCluster(cl)
 

以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。如有錯誤或未考慮完全的地方,望不吝賜教。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM