已經大半年沒有更新博客了。。最近都跑去寫分析報告半年沒有R
這次記錄下關於R循環(百萬級以上)死慢死慢的問題,這個問題去年就碰到過,當時也嘗試過多線程,but failed......昨天試了下,終於跑通了,而且過程還挺順利
step1
先查下自己電腦幾核的,n核貌似應該選跑n個線程,線程不是越多越好,線程個數和任務運行時間是條開口向下的拋物線,最高點預計在電腦的核數上。
detectCores( )檢查當前電腦可用核數 我的是4所以step2選的是4
1
2
|
library(parallel)
cl.cores <- detectCores()
|
step 2
多線程計算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
setwd(
"C:\\Users\\siyuanmao\\Documents\\imdada\\0-渠道投放和新人券聯動模型\\測算"
)
options(scipen=3)
##取消科學計數法
channel_ad_ios_data<-
seq
(0,50000,5000)
channel_ad_android_data<-
seq
(0,100000,10000)
library(parallel)
func <-
function
(n){
#n=1
result_data<-
read
.csv(
"發券方案.csv"
,stringsAsFactors=FALSE)
total_coupon_solution_data<-
read
.csv(
"結果表框架.csv"
,stringsAsFactors=FALSE)
coupon_solution_data<-subset(result_data,solution==
paste
(
'方案'
,n,sep=
""
))
for
(i
in
1:11){
#i=3
coupon_solution_data$channel_ad_cost[3]<-5000*(i-1)
for
(j
in
1:11){
#j=5
coupon_solution_data$channel_ad_cost[4]<-10000*(j-1)
solution_mark<-
paste
(
'方案'
,n,i,j,sep=
"-"
)
coupon_solution_data$solution<-solution_mark
total_coupon_solution_data<-rbind(total_coupon_solution_data,coupon_solution_data)
}
}
print(solution_mark)
return
(total_coupon_solution_data)
}
#func(10)
system.
time
({
x <- 1:7776
cl <- makeCluster(4)
# 初始化四核心集群
results <- parLapply(cl,x,func)
# lapply的並行版本
res.
df
<-
do
.call(
'rbind'
,results)
# 整合結果
stopCluster(cl)
# 關閉集群
})
df
=as.data.frame(res.
df
)
|
原來非多線程的時候,我預計要跑12個小時以上,電腦發出呼呼~~的響聲,查了下Python循環會快點,然后改為python版(已經很久沒有用了,連個range都不會寫,摸索了大半天才改好,但是速度還是慢==),於是改成多線程,運行25分鍾就出結果了~~
補充:R語言 多線程
parallel包
包的安裝
1
2
|
install
.packages(
"parallel"
)
library(parallel)
|
包中常用函數
detectCores() 檢查當前的可用核數
clusterExport() 配置當前環境
makeCluster() 分配核數
stopCluster() 關閉集群
parLapply() lapply()函數的並行版本
其實R語言本來就是一門向量化語言,如果是對於一個向量的操作,使用apply函數一族能獲得比較高的效率,相比於for循環,這種高效來自於:
用C實現了for循環
減少對於data.frame等數據結構等不必要的拷貝
但是很多時候,如果想更快的話,光apply函數一族還不足夠,這時候就能用上多線程。
R語言parallel包可以幫助實現多線程。
parLapply的簡單代碼實戰
檢查當前核數
1
2
3
4
|
cl.cores <- detectCores()
#結果
> cl.cores
[1] 8
|
啟動集群和關閉集群
1
2
3
|
cl <- makeCluster(4)
# 初始化四核心集群
###並行任務
stopCluster(cl)
# 關閉集群
|
parLapply執行多線程計算
1
2
3
4
5
|
#定義計算平方函數
square <-
function
(x)
{
return
(x^2)
}
|
1
2
3
4
5
6
7
8
9
|
#利用並行計算計算平方函數
num <- c(1:3)
cl <- makeCluster(4)
# 初始化四核心集群
results <- parLapply(cl,num,square)
#調用parLapply並行計算平方函數
final <-
do
.call(
'c'
,results)
#整合結果
stopCluster(cl)
# 關閉集群
#結果
> final
[1] 1,4,9
|
思考:在如此小的計算方式下,開4個核計算是否比開一個核要快
答案:當然是不一定,因為涉及到調度方式等額外開銷,所以不一定快,因為真正並行起作用的地方在於大數據量的計算。
時間開銷對比
兩段對比代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
#定義計算平方函數
square <-
function
(x)
{
#########
#一段冗余代碼增加執行時間
y = 2*x
if
(y <300)
{z = y}
else
{z = x}
##########
return
(x^2)
}
num <- c(1:10000000)
|
1
2
3
4
5
6
7
8
9
10
|
#並行計算
print(system.
time
({
cl <- makeCluster(4)
# 初始化四核心集群
results <- parLapply(cl,num,square)
#調用parLapply並行計算平方函數
final <-
do
.call(
'c'
,results)
#整合結果
stopCluster(cl)
# 關閉集群
}))
#結果
用戶 系統 流逝
7.89 0.27 19.01
|
1
2
3
4
5
6
7
8
|
#普通計算
print(system.
time
({
results <- lapply(num,square)
final <-
do
.call(
'c'
,results)
#整合結果
}))
#結果
用戶 系統 流逝
29.74 0.00 29.79
|
顯然在數據量比較大的時候,並行計算的時間幾乎就是於核數反比。不過,也不是多開幾個核就好,注意內存很容易超支的,每個核都分配相應的內存,所以要注意內存開銷。出現內存問題的時候,需要檢查是否代碼是否合理,R語言版本(64位會比32位分配的內存大),核分配是否合理。
上一級環境中變量的引入
R語言里邊對於環境變量有着有趣的定義,一層套一層,這里不做深入展開。
類似於在c語言函數中使用全局變量,R在執行並行計算的時候,如果需要計算的函數出現在全局(上一級),那么就需要聲明引入這個變量,否則將會報錯。
1
2
3
4
5
6
7
|
#定義計算冪函數
base = 2
square <-
function
(x)
{
return
(x^base)
}
num <- c(1:1000000)
|
1
2
3
4
5
6
7
8
|
#利用並行計算計算冪函數
cl <- makeCluster(4)
# 初始化四核心集群
results <- parLapply(cl,num,square)
#調用parLapply並行計算平方函數
final <-
do
.call(
'c'
,results)
#整合結果
stopCluster(cl)
# 關閉集群
#結果報錯
Error
in
checkForRemoteErrors(val) :
4 nodes produced errors; first error: 找不到對象
'base'
|
1
2
3
4
5
6
7
8
9
|
#利用並行計算計算冪函數
cl <- makeCluster(4)
# 初始化四核心集群
clusterExport(cl,
"base"
,envir = environment())
results <- parLapply(cl,num,square)
#調用parLapply並行計算平方函數
final <-
do
.call(
'c'
,results)
#整合結果
stopCluster(cl)
# 關閉集群
#結果
> final
[1] 1,4,9,16,25.......
|
foreach包
除了parallel包以外,還有針對並行for循環的foreach包,foreach()的使用也與parLapply()類似,兩個功能也類似,其中遇到的問題也類似。
包的安裝
1
2
|
install
.packages(
"foreach"
)
library(parallel)
|
foreach的使用
1
2
3
4
5
|
#定義計算冪函數
square <-
function
(x)
{
return
(x^2)
}
|
非並行情況的使用:
參數中的combine就是整合結果的函數,可以是c,可以是rbind,也可以是+等
1
2
3
4
|
results = foreach(x = c(1:3),.combine =
'c'
) %
do
% square(x)
#結果
> results
[1] 1,4,9
|
並行情況的使用:
注意並行情況的時候,需要與parallel包進行配合,引入library(doParallel)。同時%do%需要改成%dopar%。另外與parallel包不一樣的是,需要多加一句registerDoParallel(cl)來注冊核進行使用。
1
2
3
4
|
cl <- makeCluster(4)
registerDoParallel(cl)
results = foreach(x = c(1:100000),.combine =
'c'
) %dopar% square(x)
stopCluster(cl)
|
上一級環境中變量的引入
同parallel包並行計算前需要clusterExport()來引入全局變量一樣,foreach也同樣需要聲明,不同的是,foreach聲明方式直接寫在foreach()的參數export里邊。
1
2
3
4
5
6
7
8
9
10
|
#定義計算冪函數
base = 2
square <-
function
(x)
{
return
(x^base)
}
cl <- makeCluster(4)
registerDoParallel(cl)
results = foreach(x = c(1:100000),.combine =
'c'
,.
export
=
'base'
) %dopar% square(x)
stopCluster(cl)
|
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。如有錯誤或未考慮完全的地方,望不吝賜教。