sparklyr包:實現Spark與R的接口



日前,Rstudio公司發布了sparklyr包。該包具有以下幾個功能:

  • 實現R與Spark的連接—sparklyr包提供了一個完整的dplyr后端
  • 篩選並聚合Spark數據集,接着在R中實現分析與可視化
  • 利用Spark的MLlib機器學習庫在R中實現分布式機器學習算法
  • 可以創建一個擴展,用於調用Spark API。並為Spark的所有包集提供了一個接口
  • 未來在RStudio IDE中集成支持Spark和sparklyr包

安裝

通過devtools包實現sparklyr包的安裝:

install.packages("devtools")
devtools::install_github("rstudio/sparklyr")

接着,我們需要在本地安裝Spark:

library(sparklyr)
spark_install(version = "1.6.1")

如果用的是RStudio IDE,還需下載最新的預覽版IDE。它包含有實現與Spark交互的若干增強功能(詳情參考RStudio IDE)。

連接Spark

安裝好sparklyr包之后,我們連接本地的Spark,也可以連接遠程的Spark集群。這里,我們使用spark_connect函數來連接本地的Spark:

library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local")

返回的Spark連接(sc)為Spark集群提供了一個遠程的dplyr數據源。更多連接遠程Spark集群的信息參考這里

讀取數據

使用copy_to函數可以實現將R中的數據框導入到Spark。下面我將R自帶的iris數據集,nycflights13包的flights數據集,以及Lahman包的Batting數據集復制到Spark(請確保安裝了這兩個包)。

iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")

使用dplyr的src_tbls函數可以列出所有可用的表(包括預先加載在集群內的表)。

src_tbls(sc)

[1] "batting" "flights" "iris" 

使用dplyr語法

我們利用dplyr語法來對集群內的所有表進行操作,下面是一個簡單的數據篩選案例:

# 篩選出飛機晚點兩分鍾的航班信息
flights_tbl %>% filter(dep_delay == 2)
Source:   query [?? x 16]
Database: spark connection master=local app=sparklyr local=TRUE

    year month   day dep_time dep_delay arr_time arr_delay carrier tailnum flight origin  dest
   <int> <int> <int>    <int>     <dbl>    <int>     <dbl>   <chr>   <chr>  <int>  <chr> <chr>
1   2013     1     1      517         2      830        11      UA  N14228   1545    EWR   IAH
2   2013     1     1      542         2      923        33      AA  N619AA   1141    JFK   MIA
3   2013     1     1      702         2     1058        44      B6  N779JB    671    JFK   LAX
4   2013     1     1      715         2      911        21      UA  N841UA    544    EWR   ORD
5   2013     1     1      752         2     1025        -4      UA  N511UA    477    LGA   DEN
6   2013     1     1      917         2     1206        -5      B6  N568JB     41    JFK   MCO
7   2013     1     1      932         2     1219        -6      VX  N641VA    251    JFK   LAS
8   2013     1     1     1028         2     1350        11      UA  N76508   1004    LGA   IAH
9   2013     1     1     1042         2     1325        -1      B6  N529JB     31    JFK   MCO
10  2013     1     1     1231         2     1523        -6      UA  N402UA    428    EWR   FLL
..   ...   ...   ...      ...       ...      ...       ...     ...     ...    ...    ...   ...
Variables not shown: air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>.

dplyr導論提供了許多dplyr包中函數的使用案例。以下案例演示的是航班延誤信息的數據可視化:

delay <- flights_tbl %>% 
  group_by(tailnum) %>%
  summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
  filter(count > 20, dist < 2000, !is.na(delay)) %>%
  collect

# 繪圖
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)

窗口函數

支持dplyr的窗口函數。如下所示:

batting_tbl %>%
  select(playerID, yearID, teamID, G, AB:H) %>%
  arrange(playerID, yearID, teamID) %>%
  group_by(playerID) %>%
  filter(min_rank(desc(H)) <= 2 & H > 0)
Source:   query [?? x 7]
Database: spark connection master=local app=sparklyr local=TRUE
Groups: playerID

    playerID yearID teamID     G    AB     R     H
       <chr>  <int>  <chr> <int> <int> <int> <int>
1  anderal01   1941    PIT    70   223    32    48
2  anderal01   1942    PIT    54   166    24    45
3  balesco01   2008    WAS    15    15     1     3
4  balesco01   2009    WAS     7     8     0     1
5  bandoch01   1986    CLE    92   254    28    68
6  bandoch01   1984    CLE    75   220    38    64
7  bedelho01   1962    ML1    58   138    15    27
8  bedelho01   1968    PHI     9     7     0     1
9  biittla01   1977    CHN   138   493    74   147
10 biittla01   1975    MON   121   346    34   109
..       ...    ...    ...   ...   ...   ...   ...

更多dplyr在Spark中的用法參考這里

調用MLlib

利用sparklyr包中的MLlib函數可以實現在Spark集群中調用機器學習算法。

這里,我們使用ml_linear_regression函數來擬合一個線性回歸模型。數據為內置的mtcars數據集,我們想看看能否通過汽車的重量(wt)和發動機的氣缸數(cyl)來預測汽車的油耗(mpg)。我們假設mpg跟這兩個變量之間的關系是線性的。

# 將mtcar數據集復制到spark
mtcars_tbl <- copy_to(sc, mtcars)

# 先對數據做變換,然后將數據集分割為訓練集和測試集
partitions <- mtcars_tbl %>%
  filter(hp >= 100) %>%
  mutate(cyl8 = cyl == 8) %>%
  sdf_partition(training = 0.5, test = 0.5, seed = 1099)

# 對訓練數據集做模型擬合
fit <- partitions$training %>%
  ml_linear_regression(response = "mpg", features = c("wt", "cyl"))
Call:
mpg ~ wt + cyl

Coefficients:
(Intercept)          wt         cyl 
  33.499452   -2.818463   -0.923187 

對spark得到的線性回歸模型,使用summary()函數可以查看模型的擬合效果以及每個預測指標的統計意義。

summary(fit)
Call:
mpg ~ wt + cyl

Residuals:
   Min     1Q Median     3Q    Max 
-1.752 -1.134 -0.499  1.296  2.282 

Coefficients:
            Estimate Std. Error t value  Pr(>|t|)    
(Intercept) 33.49945    3.62256  9.2475 0.0002485 ***
wt          -2.81846    0.96619 -2.9171 0.0331257 *  
cyl         -0.92319    0.54639 -1.6896 0.1518998    
---
Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1

R-Squared: 0.8274
Root Mean Squared Error: 1.422

Spark機器學習提供常用機器學習算法的實現和特征變換。更多信息請參考這里

RStudio IDE

RStudio的最新預覽版集成支持Spark和sparklyr包。包含以下工具:

  • 創建和管理Spark連接
  • 瀏覽表格數據和Spark DataFrames的所有列
  • 可以預覽Spark DataFrames的前1000行

一旦成功安裝完sparklyr包,我們可以在IDE中可以看到一個新的Spark窗口。該窗口包含一個New Connection對話框,用於連接本地或者遠程的Spark。如下所示:

Sparklyr包的官方網站提供了詳盡的學習文檔,感興趣的讀者可以自行了解:

本文由雪晴數據網負責翻譯整理,原文參考sparklyr — R interface for Apache Spark。轉載本譯文請注明鏈接http://www.xueqing.tv/cms/article/232


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM