前三章中列出的大多數示例代碼都很短,並沒有涉及到復雜的操作。從本章開始將會把前面介紹的數據結構組合起來,構成真正的程序。大部分程序是由條件語句和循環語句控制,R 語言中的條件語句(if-else)和 C 語言中類似此處就不再介紹,循環語句包括 for
和 while
控制塊。循環是社交網絡分析的主旋律,比如使用 for
循環遍歷分析網絡中的每一個節點。當網絡規模足夠大時,並行處理又變得十分必要。熟練掌握本章的內容后,你的程序將會優雅而自然。
循環語句
while
while
循環作為最簡單的一種循環,只要滿足條件(condition 為 TRUE
),循環將會一直進行。
while (condition) {
# TODO
}
在 R 語言中還存在特殊的關鍵字 repeat
,在 repeat
控制塊內的語句將會無限的執行。下面的示例代碼效果是等價的:
repeat {
# TODO
}
while (TRUE) {
# TODO
}
for
R 語言中的 for
循環更像某些語言中的 foreach
,本質上就是遍歷向量(或其他數據結構)中的元素:
for (name in vector) {
# TODO
}
下面的示例將會輸出向量中的元素:
> v <- c("a", "b", "c")
> for (item in v) {
+ print(item)
+ }
[1] "a"
[1] "b"
[1] "c"
循環控制
有時當滿足條件時,需要使用 break
退出循環:
while (TRUE) {
# TODO
if (condition) {
break
}
}
或者使用 next
退出當前循環(類似其他語言的 continue
):
for (name in vector) {
# TODO
if (condition) {
next
}
}
apply() 系列函數
R 語言中循環語句的執行效率是無法忍受的,這是因為循環語句是基於 R 語言本身來實現的,而向量操作是基於 C 語言實現的,所以應避免使用顯式循環,使用 apply()
系列函數進行替代。舉個例子,對一個矩陣的行求和,並封裝一個函數,使用 for
循環應該是這樣:
func1 <- function(matrix) {
row_sum <- c()
for (i in 1: nrow(matrix)) {
row_sum[i] <- sum(matrix[i, ]) # 對每一行求和
}
return(row_sum)
}
使用 sapply()
可以這樣簡化代碼:
func2 <- function(matrix) {
return(sapply(1: nrow(matrix), function(i) { return(sum(matrix[i, ])) }))
}
下面測試一下兩種方法的性能消耗:
> m <- matrix(c(1: (10000 * 10000)), nrow = 10000) # 10000x10000 的方陣
> system.time(func1(m))
用戶 系統 流逝
0.79 0.00 0.79
> system.time(func2(m))
用戶 系統 流逝
0.72 0.00 0.72
上面的例子說明使用 for
循環不僅代碼冗余,而且 for
循環實現的計算是耗時最長的,這就是為什么要了解 apply()
系列函數的原因。apply()
系列函數本身就是解決數據循環處理的問題,為了面向不同的數據類型,不同的返回值,apply()
函數組成了一個函數族。一般使用最多的是對矩陣處理的函數 apply()
以及對向量處理的函數 sapply()
。
apply() 系列函數[1]
apply()
apply()
函數用於多維數據的處理,比如矩陣。其本質上是對 for
循環的進一步封裝,並不會加快計算速度。apply()
函數的定義如下:
apply(X, MARGIN, FUN)
💡 提示
要查看函數的文檔可以在 R 終端中鍵入“?函數名
”,比如查看 apply() 的文檔輸入 ?apply
。
其中 X
是要循環處理的數據,即矩陣;MARGIN
是數據處理的維度,1 是按行處理,2 是按列處理;FUN
是循環處理的函數。對一個矩陣的行求和使用 apply()
函數更簡單,但效率上不如 sapply()
。
func2 <- function(matrix) {
return(apply(matrix, 1, sum))
}
sapply()
sapply()
函數用於循環處理一維數據,比如向量。參數上更加精簡,處理完成的數據返回的結果集為向量,其定義如下:
sapply(X, FUN)
其中 X
是要循環處理的數據,即向量;FUN
是循環處理的函數。在不使用向量運算的前提下計算向量的平方,使用 sapply()
函數可以這樣:
> v <- c(1, 2, 3)
> sapply(v, function(item) { return(item ^ 2) })
[1] 1 4 9
使用 parallel 包並行處理
現代 CPU 通常擁有 4 個以上的核心,為了使計算機更努力的“工作”,將任務並行化處理變得很有意義。充分利用多核 CPU,運行速度可能會快四倍,這樣我們等待實驗的時間更少,並且可以運行更多的實驗。在開始將任務並行化之前,首先需要問自己一個問題:任務是否能夠並行?要回答這個問題,你需要思考任務是否具有“重復性”,即每個子任務可以保持計算的獨立性,只有可重復的任務才能分配到多個 CPU 上運行。回到上文中“對一個矩陣的行求和”這個問題上,“求和”是一個可重復的任務,矩陣的行數決定了“求和”的次數,對矩陣中某一行向量的求和並不會干擾其他行向量的求和,因此該問題可以進行並行處理。或者更簡單的說,包含在循環控制塊內的代碼基本都可以進行並行處理。
在 R 語言中並行計算有 snow
和 parallel
兩個包可選,兩個包功能上一樣,這里使用 parallel
,最直接的原因是 R 語言集成了這個包,無需額外安裝。並行函數的用法基本等同於 apply()
系列函數,比如:apply()
對應的並行計算函數為 parApply()
、sapply()
對應的並行計算函數為 parSapply()
等等。
在本機上並行
在本機上處理並行計算的概念很好理解,就是將需要並行處理的任務分配到計算機的多個 CPU 內核中,這也是最常見的場景。繼續以“對一個矩陣的行求和”為例,采用並行的方式解決這個問題。首先需要創建一個並行集群:
> library(parallel)
> parallel.cores <- detectCores() # 檢測本機的內核數
> cl <- makeCluster(parallel.cores) # 創建集群,從機的數量為內核數
💡 提示
通常創建集群的從機數量不要超過 最大內核數 - 1,最好保留 1~2 個內核供系統調度以及其他任務使用。
如果沒有任何錯誤提示的話,則本機集群創建完成,可以將創建的集群打印出來以查看信息。
> print(cl)
socket cluster with 16 nodes on host 'localhost'
💡 提示
本機集群的創建錯誤通常和端口占用有關,處理該問題可以查看端口的占用情況並結束程序,或者重啟計算機。
緊接着調用 parApply()
進行並行計算,並行計算的 parApply()
系列方法僅僅需要在第一個參數將創建的集群傳遞進去即可。
func3 <- function(cluster, matrix) {
return(parApply(cluster, matrix, 1, sum))
}
下面來測試一下並行計算的時間開銷:
> system.time(func3(cl, m))
用戶 系統 流逝
3.43 0.47 4.86
測試的結果似乎與想象的有些不同,時間變得更慢了。這是由於 parallel
創建的是套接字集群,從機之間的通信速度是較慢的,由於求和這個任務本身就很簡單,通信的開銷遠遠大於計算的時間消耗,因此導致了計算速度並沒有變得更快。這也告訴我們過於“輕松”的任務,並不需要並行執行。
最后在並行計算完成后需要及時關閉集群:
> stopCluster(cl)
由於集群是一個獨立的環境,本地環境所引入的包、擁有的變量在集群內是無法訪問的。在進行更復雜的並行任務時,需要將包或者變量傳遞至集群中:
> clusterEvalQ(cl, { library(igraph) }) # 為集群引入包
> clusterExport(cl, c("graph", "subgraph"), envir = environment()) # 為集群引入本地變量
在多台計算機上並行
由於 parallel
創建的是套接字集群,這使得將並行任務分配至多台計算機成為可能。當然這並不意味着計算機越多就能獲得更快的計算速度。parallel
分配任務的方式類似均分,如果計算機之間單核的性能差距過大,那么會出現一台計算機分配的任務已經完成而等待其他計算機的現象,這樣反而會出現計算速度的下降。並且並行計算的速度還與計算機之間的通信速度有關,從機的變量共享來自於主機,當網絡情況不佳時,通信的消耗也是不容忽視的。因此在多台計算機上進行並行任務時需要謹慎考慮。在多台計算機上並行與在本機上並行的區別僅在於集群的創建,因此本小節將只介紹集群創建的不同。
這里使用兩台計算機進行模擬實驗,主機的操作系統為 Windows 10,從機的操作系統為 Ubuntu 20.04,使用兩台安裝了不同操作系統的計算機模擬了最復雜的情況,拓撲圖如下所示。
💡 提示
計算機之間的通信需要 SSH,Windows 10 請在“可選功能”中添加“OpenSSH 服務器”,Ubuntu Desktop 請運行命令 apt install openssh-server
。
同時為了避免在創建集群時手動輸入 SSH 登錄密碼,請配置 SSH 密鑰登錄。
首先創建一個列表,用於配置集群計算機的信息。其中 host
為計算機的地址;user
為 SSH 登錄的用戶名;rscript
為 Rscript 程序的路徑,當主從機的操作系統相同時該字段可以省略;ncore
為分配的 CPU 內核數。
> master <- '192.168.122.100'
> addresses <- list(
+ list(host = master, user = "zhang", rscript = "C:/Program Files/R/R-4.0.5/bin/Rscript", ncore = 4),
+ list(host = "192.168.122.200", user = "zhang", rscript = "/usr/lib/R/bin/Rscript", ncore = 4)
+ )
由於 parallel
是將一個 CPU 內核作為從機,而上面的配置是按照計算機進行的,因此還需要根據 ncore
字段創建分配 CPU 內核數的從機:
> spec <- lapply(addresses, function(machine) {
+ rep(list(list(host = machine$host, user = machine$user, rscript = machine$rscript)), machine$ncore)
+ })
> spec <- unlist(spec, recursive = FALSE)
可以將創建的 spec
變量打印出來,觀察是否創建了 8 個從機的信息。
> length(addresses)
[1] 2
> length(spec)
[1] 8
緊接着就可以調用 makeCluster()
創建集群,此過程根據計算機的數量可能需要數分鍾。其中 manual
為是否手動激活從機,當創建集群出現問題時,可以將該字段設置為 TRUE
,根據提示手動激活從機,以此來觀察哪一台計算機出現了問題;outfile
為日志文件的存儲地址,當創建集群出現問題時,也可以查看該文件。
cl <- makeCluster(type = "PSOCK", master = master, spec = spec, manual = FALSE, outfile = "log.txt")
此時如果沒有提示任何錯誤,那么一個由多台計算機組成的集群已經創建完成。現在可以使用 parApply()
系列函數將任務並行的在多台計算機上運行。
> print(cl)
socket cluster with 8 nodes on hosts
'192.168.122.100', '192.168.122.200'
💡 提示
多台計算機集群的創建錯誤通常與 SSH 登錄和包的引用有關。SSH 登錄的錯誤根據提示信息進行處理,包引用的錯誤請確保計算機之間的 R 語言版本、包的版本一致。
✏️ 練習
1. 使用 for 循環倒序輸出 0~100;
2. 定義一個函數,使用 apply() 系列函數,求一個矩陣列向量的平均值。