https://spark.rstudio.com/guides/mlib.html
Spark機器學習庫
sparklyr提供了Spark分布式機器學習庫的綁定。特別是,允許你訪問 spark.ml 包提供的機器學習例程。結合 sparklyr的 dplyr 接口,您可以輕松地在 Spark 上創建和調整機器學習工作流,這些工作流完全在 R 中編排。
sparklyr提供了三個可與 Spark 機器學習一起使用的函數系列:
- 用於分析數據的機器學習算法 (ml_*)
- 用於操作單個特征的特征轉換器 (ft_*)
- 用於操作 Spark 數據幀的函數 (sdf_*)
sparklyr的分析工作流可能由以下階段組成。
通過 sparklyr dplyr 接口執行 SQL 查詢
使用 sdf_*和 ft_*系列函數生成新列,或對數據集進行分區從函數ml_*系列中選擇適當的機器學習算法來對數據進行建模
檢查模型擬合的質量,並使用它來使用新數據進行預測。
收集結果,以便在 R 中進行可視化和進一步分析
公式
這些函數采用參數和 ,但也可以是具有主效應的公式(它目前不接受交互作用項),可以使用 ml_*responsefeaturesfeatures-1省略截距項。
以下兩個語句是等效的:
ml_linear_regression(z ~ -1 + x + y)
ml_linear_regression(intercept = FALSE, response = "z", features = c("x", "y"))
選項
可以使用函數中的參數修改 Spark 模型輸出。這是一個專家專用界面,用於調整模型輸出。例如ml_optionsml_*ml_optionsmodel.transform,可用於在執行擬合之前改變 Spark 模型對象。
轉換
模型通常不適合於數據集,而是適合該數據集的某些轉換。Spark 提供了功能轉換器,促進了 Spark DataFrame 中數據的許多常見轉換,並在函數系列中公開了這些轉換。這些例程通常采用一個或多個輸入列,並生成作為這些列的轉換而形成的新輸出列。
例子
我們將使用數據集來檢查一些學習算法和轉換器。Iris數據集測量 3 種不同鳶尾花中 150 朵花的屬性。
library(sparklyr) library(ggplot2) library(dplyr) sc <- spark_connect(master = "local") iris_tbl <- copy_to(sc, iris, "iris", overwrite = TRUE) iris_tbl
(1)K 均值聚類
使用 Spark 的 K 均值聚類將數據集划分為多個組。K 均值將點聚類划分為組K,以便將點到指定聚類中心的平方和最小化。
kmeans_model <- iris_tbl %>%
ml_kmeans(k = 3, features = c("Petal_Length", "Petal_Width"))
kmeans_model

在 R 中運行並收集預測:
predicted <- ml_predict(kmeans_model, iris_tbl) %>% collect() table(predicted$Species, predicted$prediction)

使用收集的數據繪制結果:
predicted %>%
ggplot(aes(Petal_Length, Petal_Width)) +
geom_point(aes(Petal_Width, Petal_Length, col = factor(prediction + 1)),
size = 2, alpha = 0.5
) +
geom_point(
data = kmeans_model$centers, aes(Petal_Width, Petal_Length),
col = scales::muted(c("red", "green", "blue")),
pch = "x", size = 12
) +
scale_color_discrete(
name = "Predicted Cluster",
labels = paste("Cluster", 1:3)
) +
labs(
x = "Petal Length",
y = "Petal Width",
title = "K-Means Clustering",
subtitle = "Use Spark.ML to predict cluster membership with the iris dataset."
)

(2)線性回歸
使用 Spark 的線性回歸對響應變量與一個或多個解釋變量之間的線性關系進行建模。
lm_model <- iris_tbl %>% ml_linear_regression(Petal_Length ~ Petal_Width)
將斜率和截距提取到離散 R 變量中。我們將使用它們來繪制:
spark_slope <- coef(lm_model)[["Petal_Width"]] spark_intercept <- coef(lm_model)[["(Intercept)"]]
iris_tbl %>%
select(Petal_Width, Petal_Length) %>%
collect() %>%
ggplot(aes(Petal_Length, Petal_Width)) +
geom_point(aes(Petal_Width, Petal_Length), size = 2, alpha = 0.5) +
geom_abline(aes(
slope = spark_slope,
intercept = spark_intercept
),
color = "red"
) +
labs(
x = "Petal Width",
y = "Petal Length",
title = "Linear Regression: Petal Length ~ Petal Width",
subtitle = "Use Spark.ML linear regression to predict petal length as a function of petal width."
)

(3)邏輯回歸
使用 Spark 的邏輯回歸來執行邏輯回歸,將二進制結果建模為一個或多個解釋變量的函數。
glm_model <- iris_tbl %>% mutate(is_setosa = ifelse(Species == "setosa", 1, 0)) %>% select_if(is.numeric) %>% ml_logistic_regression(is_setosa ~.)
summary(glm_model)

ml_predict(glm_model, iris_tbl) %>% count(Species, prediction)

(4)PCA
使用 Spark 的主成分分析 (PCA) 執行降維。PCA 是一種統計方法,用於查找旋轉,使得第一個坐標具有可能的最大方差,而每個后續坐標又具有可能的最大方差。
pca_model <- tbl(sc, "iris") %>% select(-Species) %>% ml_pca()
pca_model

(5)隨機森林
使用 Spark 的隨機森林執行回歸或多類分類。
rf_model <- iris_tbl %>%
ml_random_forest(
Species ~ Petal_Length + Petal_Width, type = "classification"
)
用於使用將新模型應用會數據。ml_predict()
rf_predict <- ml_predict(rf_model, iris_tbl)
glimpse(rf_predict)
要了解模型的有效性,請使用將物種與預測進行比較。
rf_predict %>%
count(Species, predicted_label)

(6)FT 字符串索引
使用 和 將字符列轉換為數字列,然后再轉換回來。ft_string_indexer()ft_index_to_string()
ft_string2idx <- iris_tbl %>%
ft_string_indexer("Species", "Species_idx") %>%
ft_index_to_string("Species_idx", "Species_remap") %>%
select(Species, Species_remap, Species_idx)
要查看 中分配給每個值的值,我們可以提取所有物種的聚合,重新映射的物種和索引組合:Species
ft_string2idx %>% group_by_all() %>% summarise(count = n(), .groups = "keep")

(7)斷開與 Spark 的連接
最后,通過斷開 Spark 連接來清理會話:
spark_disconnect(sc)
#決策樹
sc <- spark_connect(master = "local") iris_tbl <- sdf_copy_to(sc, iris, name = "iris_tbl", overwrite = TRUE) partitions <- iris_tbl %>% sdf_random_split(training = 0.7, test = 0.3, seed = 1111) iris_training <- partitions$training iris_test <- partitions$test dt_model <- iris_training %>% ml_decision_tree(Species ~ .) pred <- ml_predict(dt_model, iris_test) ml_multiclass_classification_evaluator(pred)

#梯度提升樹
gbt_model <- iris_training %>% ml_gradient_boosted_trees(Sepal_Length ~ Petal_Length + Petal_Width) pred <- ml_predict(gbt_model, iris_test) ml_regression_evaluator(pred, label_col = "Sepal_Length")

#高斯混合聚類
描述:此類對多元高斯混合模型 (GMM) 執行期望最大化。GMM 表示獨立高斯分布的復合分布,具有相關的"混合"權重,指定每個權重對復合材料的貢獻。給定一組采樣點,此類將最大化 k 個高斯混合的對數似然,迭代直到對數似然數變化小於 ,或者直到達到最大迭代次數。雖然這個過程通常保證收斂,但不能保證找到全局最優。
gmm_model <- ml_gaussian_mixture(iris_tbl, Species ~ .) pred <- sdf_predict(iris_tbl, gmm_model) ml_clustering_evaluator(pred)

#一分為二的K均值聚類
描述:基於Steinbach, Karypis, and Kumar的論文 "A comparison of document clustering techniques "的一分為二的K-means算法,並進行了修改以適應Spark。該算法從一個包含所有點的單一集群開始。迭代地在底層找到可分割的聚類,並使用k-means對每個聚類進行二分,直到總共有k個葉子聚類或沒有葉子聚類是可分割的。同一層次上的集群的平分步驟被分組,以增加並行性。如果對底層所有可分割的集群進行二分會導致超過k個葉子集群,則較大的集群會得到較高的優先權。
library(dplyr) #sc <- spark_connect(master = "local") #iris_tbl <- sdf_copy_to(sc, iris, name = "iris_tbl", overwrite = TRUE) iris_tbl %>% select(-Species) %>% ml_bisecting_kmeans(k = 4, Species ~ .)

#K-means聚類
描述:Bahmani等人提出的支持k-means||初始化的K-means聚類,使用公式接口需要Spark 2.0+.ml_kmeans()。
kmeans_model <- iris_tbl %>%
ml_kmeans(k = 3, features = c("Petal_Length", "Petal_Width"))
kmeans_model
predicted <- ml_predict(kmeans_model, iris_tbl) %>%
collect()
table(predicted$Species, predicted$prediction)
#在驗證集上評估模型
ml_gaussian_mixture(iris_tbl, Species ~ .) %>% #(高斯混合聚類) ml_evaluate(iris_tbl)

ml_bisecting_kmeans(iris_tbl, Species ~ .) %>% #(一分為二的K均值聚類) ml_evaluate(iris_tbl)

ml_kmeans(iris_tbl, Species ~ .) %>% #(K均值聚類) ml_evaluate(iris_tbl)

#聚類評估
描述:聚類結果的評估器。該指標使用歐氏距離的平方計算Silhouette措施。
Silhouette是一個用於驗證聚類內一致性的措施。它的范圍在1和-1之間,其中接近1的值意味着一個聚類中的點與同一聚類中的其他點接近,而與其他聚類的點遠離。
formula <- Species ~ .
kmeans_model <- ml_kmeans(iris_training, formula = formula) b_kmeans_model <- ml_bisecting_kmeans(iris_training, formula = formula) gmm_model <- ml_gaussian_mixture(iris_training, formula = formula)
pred_kmeans <- ml_predict(kmeans_model, iris_test) pred_b_kmeans <- ml_predict(b_kmeans_model, iris_test) pred_gmm <- ml_predict(gmm_model, iris_test)
ml_clustering_evaluator(pred_kmeans) #(K均值聚類)

ml_clustering_evaluator(pred_b_kmeans) #(一分為二的K均值聚類)

ml_clustering_evaluator(pred_gmm) #(高斯混合聚類)


