日前,Rstudio公司發布了sparklyr
包。該包具有以下幾個功能:
- 實現R與Spark的連接—
sparklyr
包提供了一個完整的dplyr后端 - 篩選並聚合Spark數據集,接着在R中實現分析與可視化
- 利用Spark的MLlib機器學習庫在R中實現分布式機器學習算法
- 可以創建一個擴展,用於調用Spark API。並為Spark的所有包集提供了一個接口
- 未來在RStudio IDE中集成支持Spark和sparklyr包
安裝
通過devtools
包實現sparklyr
包的安裝:
install.packages("devtools")
devtools::install_github("rstudio/sparklyr")
接着,我們需要在本地安裝Spark:
library(sparklyr)
spark_install(version = "1.6.1")
如果用的是RStudio IDE,還需下載最新的預覽版IDE。它包含有實現與Spark交互的若干增強功能(詳情參考RStudio IDE)。
連接Spark
安裝好sparklyr
包之后,我們連接本地的Spark,也可以連接遠程的Spark集群。這里,我們使用spark_connect函數來連接本地的Spark:
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local")
返回的Spark連接(sc
)為Spark集群提供了一個遠程的dplyr數據源。更多連接遠程Spark集群的信息參考這里
讀取數據
使用copy_to函數可以實現將R中的數據框導入到Spark。下面我將R自帶的iris數據集,nycflights13包的flights數據集,以及Lahman包的Batting數據集復制到Spark(請確保安裝了這兩個包)。
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")
使用dplyr的src_tbls
函數可以列出所有可用的表(包括預先加載在集群內的表)。
src_tbls(sc)
[1] "batting" "flights" "iris"
使用dplyr語法
我們利用dplyr語法來對集群內的所有表進行操作,下面是一個簡單的數據篩選案例:
# 篩選出飛機晚點兩分鍾的航班信息
flights_tbl %>% filter(dep_delay == 2)
Source: query [?? x 16]
Database: spark connection master=local app=sparklyr local=TRUE
year month day dep_time dep_delay arr_time arr_delay carrier tailnum flight origin dest
<int> <int> <int> <int> <dbl> <int> <dbl> <chr> <chr> <int> <chr> <chr>
1 2013 1 1 517 2 830 11 UA N14228 1545 EWR IAH
2 2013 1 1 542 2 923 33 AA N619AA 1141 JFK MIA
3 2013 1 1 702 2 1058 44 B6 N779JB 671 JFK LAX
4 2013 1 1 715 2 911 21 UA N841UA 544 EWR ORD
5 2013 1 1 752 2 1025 -4 UA N511UA 477 LGA DEN
6 2013 1 1 917 2 1206 -5 B6 N568JB 41 JFK MCO
7 2013 1 1 932 2 1219 -6 VX N641VA 251 JFK LAS
8 2013 1 1 1028 2 1350 11 UA N76508 1004 LGA IAH
9 2013 1 1 1042 2 1325 -1 B6 N529JB 31 JFK MCO
10 2013 1 1 1231 2 1523 -6 UA N402UA 428 EWR FLL
.. ... ... ... ... ... ... ... ... ... ... ... ...
Variables not shown: air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>.
dplyr導論提供了許多dplyr包中函數的使用案例。以下案例演示的是航班延誤信息的數據可視化:
delay <- flights_tbl %>%
group_by(tailnum) %>%
summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
filter(count > 20, dist < 2000, !is.na(delay)) %>%
collect
# 繪圖
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() +
scale_size_area(max_size = 2)
窗口函數
支持dplyr的窗口函數。如下所示:
batting_tbl %>%
select(playerID, yearID, teamID, G, AB:H) %>%
arrange(playerID, yearID, teamID) %>%
group_by(playerID) %>%
filter(min_rank(desc(H)) <= 2 & H > 0)
Source: query [?? x 7]
Database: spark connection master=local app=sparklyr local=TRUE
Groups: playerID
playerID yearID teamID G AB R H
<chr> <int> <chr> <int> <int> <int> <int>
1 anderal01 1941 PIT 70 223 32 48
2 anderal01 1942 PIT 54 166 24 45
3 balesco01 2008 WAS 15 15 1 3
4 balesco01 2009 WAS 7 8 0 1
5 bandoch01 1986 CLE 92 254 28 68
6 bandoch01 1984 CLE 75 220 38 64
7 bedelho01 1962 ML1 58 138 15 27
8 bedelho01 1968 PHI 9 7 0 1
9 biittla01 1977 CHN 138 493 74 147
10 biittla01 1975 MON 121 346 34 109
.. ... ... ... ... ... ... ...
更多dplyr在Spark中的用法參考這里。
調用MLlib
利用sparklyr包中的MLlib函數可以實現在Spark集群中調用機器學習算法。
這里,我們使用ml_linear_regression函數來擬合一個線性回歸模型。數據為內置的mtcars
數據集,我們想看看能否通過汽車的重量(wt
)和發動機的氣缸數(cyl
)來預測汽車的油耗(mpg
)。我們假設mpg
跟這兩個變量之間的關系是線性的。
# 將mtcar數據集復制到spark
mtcars_tbl <- copy_to(sc, mtcars)
# 先對數據做變換,然后將數據集分割為訓練集和測試集
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
mutate(cyl8 = cyl == 8) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)
# 對訓練數據集做模型擬合
fit <- partitions$training %>%
ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
Call:
mpg ~ wt + cyl
Coefficients:
(Intercept) wt cyl
33.499452 -2.818463 -0.923187
對spark得到的線性回歸模型,使用summary()
函數可以查看模型的擬合效果以及每個預測指標的統計意義。
summary(fit)
Call:
mpg ~ wt + cyl
Residuals:
Min 1Q Median 3Q Max
-1.752 -1.134 -0.499 1.296 2.282
Coefficients:
Estimate Std. Error t value Pr(>|t|)
(Intercept) 33.49945 3.62256 9.2475 0.0002485 ***
wt -2.81846 0.96619 -2.9171 0.0331257 *
cyl -0.92319 0.54639 -1.6896 0.1518998
---
Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
R-Squared: 0.8274
Root Mean Squared Error: 1.422
Spark機器學習提供常用機器學習算法的實現和特征變換。更多信息請參考這里。
RStudio IDE
RStudio的最新預覽版集成支持Spark和sparklyr包。包含以下工具:
- 創建和管理Spark連接
- 瀏覽表格數據和Spark DataFrames的所有列
- 可以預覽Spark DataFrames的前1000行
一旦成功安裝完sparklyr包,我們可以在IDE中可以看到一個新的Spark窗口。該窗口包含一個New Connection對話框,用於連接本地或者遠程的Spark。如下所示:
Sparklyr
包的官方網站提供了詳盡的學習文檔,感興趣的讀者可以自行了解:
本文由雪晴數據網負責翻譯整理,原文參考sparklyr — R interface for Apache Spark。轉載本譯文請注明鏈接http://www.xueqing.tv/cms/article/232