利用pyspark pandas_udf 加速機器學習任務


實驗是最能定義數據科學家日常生活的詞。為了為給定的問題構建一個合適的機器學習模型,數據科學家需要訓練多個模型。此過程包括諸如尋找模型的最佳超參數、使用 K 折交叉驗證模型,有時甚至訓練具有多個輸出的模型等任務。前面提到的所有這些任務都很耗時,但對於模型開發的成功來說卻極為重要。在這篇博文中,我們將展示如何應用 PySpark Pandas UDF(一個用於在 Spark 集群上分發 Python 函數的框架)來提高數據科學家的日常工作效率。

PySpark 如何實現 Pandas UDF(用戶定義函數)?

顧名思義,PySpark Pandas UDF 是一種使用 Pandas DataFrame 在 PySpark 中實現用戶定義函數 (UDF) 的方法。PySpark API 文檔給出的定義如下:

“Pandas UDF 是用戶定義的函數,由 Spark 執行,使用 Arrow 傳輸數據,Pandas 執行數據,允許向量化操作。Pandas UDF 是使用pandas_udf
作為裝飾器或包裝函數來定義的,不需要額外的配置。Pandas UDF 通常表現為常規的 PySpark 函數 API。”

在這篇文章中,我們將探索PandasUDFType.GROUPED_MAP
,或者在 PySpark 的最新版本中,也稱為pyspark.sql.GroupedData.applyInPandas
. 主要思想很簡單,Pandas UDF 分組數據允許在數據集的每一組中進行操作。由於 spark 中的分組操作是跨集群節點計算的,因此我們可以以允許在不同節點計算不同模型的方式操作我們的數據集。是的,我的兄弟們……永遠不要低估一個groupBy
.

配置

在進入應用 Pandas UDF 的細節之前,讓我們用一些模塊、全局變量和常用函數設置環境。

第一步是導入將在這個小實驗中使用的所有模塊。

import pandas as pd
from catboost import CatBoostClassifier
from itertools import product
from pyspark.sql import DataFrame
from pyspark.sql import functions as sf
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import (
    DoubleType, FloatType, IntegerType, StringType, StructField, StructType
)
from sklearn.datasets import make_multilabel_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

並設置一些將要多次使用的全局變量。

N_FEATURES = 20
N_CLASSES = 10

本文探索的每項任務的一個常見步驟是機器學習模型的訓練和評估。此步驟封裝在以下函數中,該函數根據 CatBoost 模型的准確度得分來訓練和評估該模型。

def train_and_evaluate_model(X_train, y_train, X_test, y_test, kwargs={}):

    # split data
    X_train, X_eval, y_train, y_eval = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

    # create model
    model = CatBoostClassifier(
        nan_mode='Min',
        random_seed=42,
        boosting_type='Plain',
        bootstrap_type='Bernoulli',
        rsm=0.1,
        loss_function='Logloss',
        use_best_model=True,
        early_stopping_rounds=100,
        **kwargs
    )

    # fit model
    model.fit(X_train.values, y_train.values, eval_set=(X_eval, y_eval))

    # evaluate model
    accuracy = accuracy_score(model.predict(X_test), y_test)

    return accuracy

為了訓練和測試我們的 CatBoost 模型,我們還需要一些數據。因此,讓我們使用 scikit-learn 的make_multilabel_classification
函數創建我們的數據集,並從中構建我們的 PySpark DataFrame。

X, y = make_multilabel_classification(
    n_samples=10000,
    n_features=N_FEATURES,
    n_classes=N_CLASSES,
    random_state=42
)
pdf = pd.DataFrame(X)
for i in range(N_CLASSES):
    pdf[f'y_{i}'] = y[:, i]
df = spark.createDataFrame(pdf)
print(f'number of rows in the dataset: {df.count()}')
number of rows in the dataset: 10000
df.limit(5).toPandas()
  0 1 2 3 4 5 6 7 8 9 ... y_0 y_1 y_2 y_3 y_4 y_5 y_6 y_7 y_8 y_9
0 2.0 2.0 0.0 1.0 3.0 5.0 0.0 3.0 4.0 1.0 ... 0 1 1 0 0 0 0 1 0 0
1 4.0 3.0 2.0 2.0 0.0 4.0 1.0 2.0 0.0 3.0 ... 0 0 0 0 0 0 0 1 1 1
2 2.0 2.0 3.0 0.0 0.0 0.0 0.0 6.0 0.0 3.0 ... 0 0 0 0 0 0 0 0 1 0
3 0.0 1.0 4.0 4.0 2.0 0.0 2.0 1.0 3.0 2.0 ... 0 0 0 0 0 0 0 0 0 0
4 0.0 0.0 7.0 2.0 1.0 0.0 1.0 2.0 1.0 2.0 ... 0 0 0 0 0 0 0 0 0 1

5 行 × 30 列

最后,為了更高效的 Spark 計算,我們將啟用基於 arrow 的列式數據傳輸。

spark.conf.set('spark.sql.execution.arrow.enabled', 'true')

分布式網格搜索

在機器學習中,超參數是其值用於控制模型架構及其學習過程的參數。通常在訓練模型時,您需要優化這些超參數,但是,盡管 ML 能夠找到最佳內部參數和決策閾值,但超參數是手動設置的。

如果搜索空間包含太多可能性,您將需要花費大量時間進行測試以找到超參數的最佳組合。加速此任務的一種方法是將搜索過程分布在 Spark 集群的節點上。

這種方法產生的一個問題是:“好吧,但我使用的算法尚未在 Spark 上實現,我如何在這些限制下分配這個過程?” 別擔心!這是我們在這里要回答的問題!

首先,我們必須定義超參數搜索空間。為此,我們將創建一個輔助 PySpark DataFrame,其中每一行都是一組唯一的超參數。

values_range = list(
    product(
        [200, 210, 220, 230, 240, 250, 260, 270, 280, 290],
        [3, 4, 5, 6, 7],
        [0.02, 0.07, 0.1, 0.15, 0.2],
        ['MinEntropy', 'Uniform', 'UniformAndQuantiles', 'GreedyLogSum'],
        [1.0, 2.0, 3.0, 4.0, 5.0, 6.0],
        [0.5, 0.6, 0.7, 0.8],
    )
)

schema = StructType(
    [
        StructField('iterations', IntegerType(), True),
        StructField('depth', IntegerType(), True),
        StructField('learning_rate', DoubleType(), True),
        StructField('feature_border_type', StringType(), True),
        StructField('l2_leaf_reg', FloatType(), True),
        StructField('subsample', FloatType(), True)
    ]
)

df_grid = spark.createDataFrame(data=values_range, schema=schema)
df_grid = df_grid.withColumn('replication_id', sf.monotonically_increasing_id())
df_grid.limit(5).toPandas()
  iterations depth learning_rate feature_border_type l2_leaf_reg subsample replication_ID
0 200 4 0.1 Uniform 2.0 0.5 171798691840
1 200 4 0.1 Uniform 2.0 0.6 171798691841
2 200 4 0.1 Uniform 2.0 0.7 171798691842
3 200 4 0.1 Uniform 2.0 0.8 171798691843
4 200 4 0.1 Uniform 3.0 0.5 171798691844
print(f'number of different hyperparameter combinations: {df_grid.count()}')
number of different hyperparameter combinations: 24000

對於每個超參數行,我們想要復制我們的數據,以便我們以后可以單獨處理每個超參數集。

df_replicated = df.crossJoin(df_grid)
print(f'number of rows in the replicated dataset: {df_replicated.count()}')
number of rows in the replicated dataset: 240000000

最后一步是指定每個 Spark 節點將如何處理數據。為此,我們定義了run_model
函數。它從輸入 Spark DataFrame 中提取超參數和數據,然后訓練和評估模型,返回其結果。

# declare the schema for the output of our function
schema = StructType(
    [
        StructField('replication_id', IntegerType(),True),
        StructField('accuracy', FloatType(),True),
        StructField("iterations", IntegerType(), True),
        StructField("depth", IntegerType(), True),
        StructField("learning_rate", DoubleType(), True),
        StructField("feature_border_type", StringType(), True),
        StructField("l2_leaf_reg", FloatType(), True),
        StructField("subsample", FloatType(), True)
     ]
)

# decorate our function with pandas_udf decorator
@pandas_udf(schema, sf.PandasUDFType.GROUPED_MAP)
def hyperparameter_search(pdf):

    # get hyperparameter values
    kwargs = {
        'iterations': pdf.iterations.values[0],
        'depth': pdf.depth.values[0],
        'learning_rate': pdf.learning_rate.values[0],
        'feature_border_type': pdf.feature_border_type.values[0],
        'l2_leaf_reg': pdf.l2_leaf_reg.values[0],
        'subsample': pdf.subsample.values[0]
    }

    # get data and label
    X = pdf[[str(i) for i in range(N_FEATURES)]]
    y = pdf['y_0']

    # split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # get accuracy
    accuracy = train_and_evaluate_model(X_train, y_train, X_test, y_test, kwargs)

    # return results as pandas DF
    kwargs.update({
        'replication_id': pdf.replication_id.values[0],
        'accuracy': accuracy
    })
    results = pd.DataFrame([kwargs])

    return results

我們現在可以按 對 Spark 數據幀進行分組replication_id
並應用該run_model
函數。這樣,每個超參數組合都將用於在分布式系統中訓練不同的模型。

results = df_replicated.groupby('replication_id').apply(hyperparameter_search)
%%time

results.sort('accuracy', ascending=False).limit(5).toPandas()
CPU times: user 11.6 s, sys: 13.5 s, total: 25.1 s
Wall time: 29min 10s
  replication_id accuracy iterations depth learning_rate feature_border_type l2_leaf_reg subsample
0 24 0.9145 210 7 0.20 Uniform 6.0 0.6
1 22 0.9125 250 3 0.20 Uniform 2.0 0.5
2 13 0.9125 230 6 0.15 MinEntropy 3.0 0.7
3 11 0.9125 290 3 0.20 Uniform 5.0 0.7
4 7 0.9125 220 3 0.10 MinEntropy 6.0 0.5

通過這種分布式方法,我們能夠在 29 分鍾內運行 24000 個超參數組合。

分布式 K Fold 交叉驗證

有了最優的超參數集,另一個重要的任務是對模型進行 K-Fold 交叉驗證,以防止(或最小化)過擬合的不良影響。在這個實驗中添加的折疊越多,你的模型就越健壯。然而,你將不得不花更多的時間來訓練每個折疊的模型。同樣,避免時間陷阱的一種方法是使用 Spark 並在 Spark 集群的單個節點上計算每個折疊。

我們以與分布網格搜索的方式非常相似的方式執行此操作,不同之處在於我們根據折疊數復制我們的數據集。所以如果我們的交叉驗證使用 8 折,我們的數據集將被復制 8 次。

在這里,我們的第一步是定義我們想要交叉驗證模型的折疊次數。

N_FOLDS = 8

在此之后,我們定義了一些代碼來根據上面定義的折疊數隨機拆分我們的數據集。

proportion = 1 / N_FOLDS
splits = df.randomSplit([proportion] * N_FOLDS, 42)
df_folds = splits[0].withColumn('fold', sf.lit(0))
for i in range(1, N_FOLDS):
    df_folds = df_folds.union(
        splits[i].withColumn('fold', sf.lit(i))
    )

拆分后,我們將數據集復制 K 次。

df_numbers = spark.createDataFrame(
    pd.DataFrame(list(range(N_FOLDS)),columns=['replication_id'])
)
df_numbers.toPandas()
  replication_id
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
df_replicated = df_folds.crossJoin(df_numbers)
print(f'number of rows in the replicated dataset: {df_replicated.count()}')
number of rows in the replicated dataset: 80000

與網格搜索方法相比,我們還有另一個不同之處。在下面的函數中,我們根據 replication_id
和 fold_id
定義訓練和測試數據集。如果replication_id
等於fold_id
,我們將該折疊設置為測試折疊,而其余折疊用作訓練集。

# declare the schema for the output of our function
schema = StructType(
    [
        StructField('replication_id', IntegerType(), True),
        StructField('accuracy', FloatType(), True)
    ]
)

# decorate our function with pandas_udf decorator
@pandas_udf(schema, sf.PandasUDFType.GROUPED_MAP)
def cross_validation(pdf):

    # get repliaction id
    replication_id = pdf.replication_id.values[0]

    # get data and label
    columns = [str(i) for i in range(N_FEATURES)]
    X_train = pdf[pdf.fold != replication_id][columns]
    X_test = pdf[pdf.fold == replication_id][columns]
    y_train = pdf[pdf.fold != replication_id]['y_0']
    y_test = pdf[pdf.fold == replication_id]['y_0']

    # get accuracy
    accuracy = train_and_evaluate_model(X_train, y_train, X_test, y_test)

    # return results as pandas DF
    results = pd.DataFrame([{
        'replication_id': replication_id,
        'accuracy': accuracy
    }])

    # save the model (if you want to retrieve it later)

    return results

使用此方法可能需要考慮的一件事是如何保存每個經過訓練的模型,因為每個模型都在不同的節點中進行訓練。為此,根據您的雲提供商,您可以使用一些開發的 Python 庫將文件從集群節點直接傳輸到雲存儲桶(如 Google Cloud Storage 或 Amazon S3)。但是,如果您只對交叉驗證模型的性能感興趣,那么上面的函數就足夠了。

results = df_replicated.groupby('replication_id').apply(cross_validation)
%%time

results.sort('accuracy', ascending=False).toPandas()
CPU times: user 1.03 s, sys: 1.24 s, total: 2.27 s
Wall time: 35.9 s
  replication_id accuracy
0 4 0.900715
1 5 0.895292
2 3 0.893720
3 2 0.893601
4 1 0.891801
5 7 0.890048
6 0 0.883293
7 6 0.882946

在這個實驗中,我們僅在 35 秒內評估了 8 個折疊(集群的每個節點中一個)。最佳折疊(編號 4)的准確度得分為 0.900。

分布式多輸出模型

遵循相同的理念,我們可以利用 PySpark Pandas UDF 來分發多輸出模型的訓練。對於這個任務,我們有一組特征和一組標簽,我們必須用相同的訓練數據為每個標簽訓練一個模型。

一些軟件包scikit-learn
已經實現了這種隨機森林算法的方法。CatBoost
還可以選擇多輸出訓練。然而,與單輸出 API 相比,這些實現具有有限的超參數和損失函數選項。考慮到這一點,Pandas UDF 是一次自動訓練多個模型的替代方案,它使用任何其他機器學習庫通常為單輸出模型訓練提供的所有選項。

由於我們的數據集有多個標簽列,這次的方法是以一種可以復制每個特定標簽的數據的方式來旋轉我們的數據。因此,我們創建一列來映射每個標簽,並將所有標簽附加到一個標簽列中,如下所示:

features = [f'{i}' for i in range(N_FEATURES)]
targets = [f'y_{i}' for i in range(N_CLASSES)]

df_multipe_output = df.select(
    *features,
     sf.lit(targets[0]).alias('y_group'),
     sf.col(targets[0]).alias('Y')
)
for target in targets[1:]:
    df_multipe_output = df_multipe_output.union(
        df.select(
            *features,
            sf.lit(target).alias('y_group'),
            sf.col(target).alias('Y')
        )
    )
print(f'number of rows in the dataset: {df_multipe_output.count()}')
number of rows in the dataset: 100000
df_multipe_output.limit(5).toPandas()
  0 1 2 3 4 5 6 7 8 9 ... 12 13 14 15 16 17 18 19 y_group y
0 1.0 3.0 9.0 1.0 6.0 0.0 5.0 0.0 4.0 1.0 ... 2.0 1.0 3.0 1.0 1.0 1.0 2.0 3.0 y_0 0
1 1.0 4.0 2.0 1.0 4.0 2.0 1.0 2.0 0.0 1.0 ... 3.0 2.0 5.0 2.0 2.0 3.0 3.0 3.0 y_0 1
2 2.0 6.0 3.0 6.0 0.0 5.0 4.0 3.0 2.0 4.0 ... 2.0 1.0 3.0 0.0 5.0 4.0 3.0 1.0 y_0 0
3 3.0 2.0 0.0 1.0 5.0 3.0 0.0 3.0 2.0 3.0 ... 3.0 1.0 0.0 6.0 1.0 0.0 3.0 1.0 y_0 1
4 6.0 3.0 0.0 0.0 3.0 6.0 0.0 2.0 3.0 0.0 ... 4.0 3.0 6.0 7.0 0.0 5.0 6.0 3.0 y_0 0

5 行 × 22 列

定義了我們的 spark 多輸出數據集后,我們准備定義執行模型訓練的函數。

# declare the schema for the output of our function
schema = StructType(
    [
        StructField('y_group', StringType(), True),
        StructField('accuracy', FloatType(), True)
    ]
)

# decorate our function with pandas_udf decorator
@pandas_udf(schema, sf.PandasUDFType.GROUPED_MAP)
def multi_models(pdf):

    # get group
    y_group = pdf.y_group.values[0]

    # get data and label
    X = pdf.drop(['Y', 'y_group'], axis=1)
    y = pdf['Y']

    # split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # get accuracy
    accuracy = train_and_evaluate_model(X_train, y_train, X_test, y_test)

    # return results as pandas DF
    results = pd.DataFrame([{
        'y_group': y_group,
        'accuracy': accuracy
    }])

    return results

一切設置好后,就可以在y_group
列上調用 groupBy 方法來分發每個模型的訓練。

results = df_multipe_output.groupby('y_group').apply(multi_models).orderBy('accuracy')
%%time

results.sort('accuracy', ascending=False).limit(5).toPandas()
CPU times: user 193 ms, sys: 195 ms, total: 388 ms
Wall time: 9.24 s
  y_group accuracy
0 y_6 0.9740
1 y_4 0.9330
2 y_5 0.9325
3 y_8 0.8990
4 y_0 0.8910

結論

在這篇文章中,我們展示了一些示例,說明如何使用 PySpark Pandas UDF 來分發涉及機器學習模型訓練的流程。展示的一些方法可用於節省時間或進行更大規模的實驗,否則會占用過多內存或成本過高。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM