Spark ML機器學習


Spark提供了常用機器學習算法的實現, 封裝於spark.mlspark.mllib中.

spark.mllib是基於RDD的機器學習庫, spark.ml是基於DataFrame的機器學習庫.

相對於RDD, DataFrame擁有更豐富的操作API, 可以進行更靈活的操作. 目前, spark.mllib已經進入維護狀態, 不再添加新特性.

本文將重點介紹pyspark.ml, 測試環境為Spark 2.1, Python API.

首先介紹pyspark.ml中的幾個基類:

  • ML DataSet: 即為pyspark.sql.DataFrame作為數據集使用

  • pyspark.ml.Transformer: 代表將數據集轉換到另一個數據集的算法

  • pyspark.ml.Estimator: 代表根據數據和參數創建模型的算法,包含方法

    • fit(dataset, params): 根據訓練數據集和參數進行訓練, 返回訓練好的模型對象
  • pyspark.ml.Model: 代表訓練好的模型的基類, 通常由Estimator.fit()創建. 包含的方法有:

    • transform(df): 將輸入數據集代入模型變換為輸出數據集
    • save(path): 保存訓練好的模型
    • load(path): 從文件中加載模型
  • pyspark.ml.Pipeline: 用於將多個步驟組合為管道進行處理, 可以建立線性管道和有向無環圖管道.

pyspark.ml下將不同算法封裝到不同的包中:

  • pyspark.ml.linalg 線性代數工具包. 包括:
    • Vector
    • DenseVector
    • SparseVector
    • Matrix
    • DenseMatrix
    • SparseMatrix
  • pyspark.ml.feature特征和預處理算法包. 包括:
    • Tokenizer
    • Normalizer
    • StopWordsRemover
    • PCA
    • NGram
    • Word2Vec
  • pyspark.ml.classification分類算法包. 包括:
    • LogisticRegression
    • DecisionTreeClassifier
    • RandomForestClassifier
    • NaiveBayes
    • MultilayerPerceptronClassifier
    • OneVsRest
  • pyspark.ml.clustering 聚類算法包. 包括:
    • KMeans
    • LDA
  • pyspark.ml.regression回歸算法包. 包括:
    • LinearRegression
    • GeneralizedLinearRegression
    • DecisionTreeRegressor
    • RandomForestRegressor
  • pyspark.ml.recommendation推薦系統算法包. 包括:
    • ALS
  • pyspark.ml.tuning 校驗工具包
  • pyspark.ml.evaluation 評估工具包

pyspark.ml中的算法大多數為Estimator的派生類. 大多數算法類均擁有對應的Model類.

classification.NaiveBayesclassification.NaiveBayesModel. 算法類的fit方法可以生成對應的Model類.

應用示例

pyspark.ml使用了統一風格的接口,這里只展示部分算法.

首先用NaiveBayes分類器做一個二分類:

>>> from pyspark.sql import Row
>>> from pyspark.ml.linalg import Vectors
>>> df = spark.createDataFrame([
...     Row(label=0.0, weight=0.1, features=Vectors.dense([0.0, 0.0])),
...     Row(label=0.0, weight=0.5, features=Vectors.dense([0.0, 1.0])),
...     Row(label=1.0, weight=1.0, features=Vectors.dense([1.0, 0.0]))])
>>> nb = NaiveBayes(smoothing=1.0, modelType="multinomial", weightCol="weight")
>>> model = nb.fit(df)  # 構造模型
>>> test0 = sc.parallelize([Row(features=Vectors.dense([1.0, 0.0]))]).toDF()
>>> result = model.transform(test0).head()  # 預測
>>> result.prediction
1.0
>>> result.probability
DenseVector([0.32..., 0.67...])
>>> result.rawPrediction
DenseVector([-1.72..., -0.99...])

model.transform將輸入的一行(Row)作為一個樣本,產生一行輸出. 這里我們只輸入了一個測試樣本, 所以直接使用head()取出唯一一行輸出.

使用LogisticRegression和OneVsRest做多分類:

>>> from pyspark.sql import Row
>>> from pyspark.ml.linalg import Vectors
>>> df = sc.parallelize([
...     Row(label=0.0, features=Vectors.dense(1.0, 0.8)),
...     Row(label=1.0, features=Vectors.sparse(2, [], [])),
...     Row(label=2.0, features=Vectors.dense(0.5, 0.5))]).toDF()
>>> lr = LogisticRegression(maxIter=5, regParam=0.01)
>>> ovr = OneVsRest(classifier=lr)
>>> model = ovr.fit(df)
>>> # 進行預測
>>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0))]).toDF()
>>> model.transform(test0).head().prediction
1.0
>>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF()
>>> model.transform(test1).head().prediction
0.0
>>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4))]).toDF()
>>> model.transform(test2).head().prediction
2.0

使用PCA進行降維:

>>> from pyspark.ml.linalg import Vectors
>>> data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
...     (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
...     (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
>>> df = spark.createDataFrame(data,["features"])
>>> pca = PCA(k=2, inputCol="features", outputCol="pca_features")
>>> model = pca.fit(df)
>>> model.transform(df).head().pca_features
DenseVector([1.648..., -4.013...])

EstimatorTransformer均為PipelineStage的派生類,pipeline由一系列Stage組成.調用pipeline對象的fit方法, 將會依次執行Stage並生成一個最終模型.

>>>from pyspark.ml import Pipeline
>>>from pyspark.ml.classification import LogisticRegression
>>>from pyspark.ml.feature import HashingTF, Tokenizer
>>> data = [
        (0, "a b c d e spark", 1.0),
        (1, "b d", 0.0),
        (2, "spark f g h", 1.0),
        (3, "hadoop mapreduce", 0.0) ]
>>> df = spark.createDataFrame(data, ["id", "text", "label"])
>>> # build pipeline
>>> tokenizer = Tokenizer(inputCol="text", outputCol="words")
>>> hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
>>> lr = LogisticRegression(maxIter=10, regParam=0.001)
>>> pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
>>> # train
>>> model = pipeline.fit(df)
>>> data2 = [
       (4, "spark i j k"),
       (5, "l m n"),
       (6, "spark hadoop spark"),
       (7, "apache hadoop")
]
>>> test = spark.createDataFrame(data2, ["id", "text"])
>>> result = model.transform(test)
>>> result = result.select("id", "text", "probability", "prediction")
>>> result.collect()
[Row(id=4, text=u'spark i j k', probability=DenseVector([0.1596, 0.8404]), prediction=1.0), 
Row(id=5, text=u'l m n', probability=DenseVector([0.8378, 0.1622]), prediction=0.0), 
Row(id=6, text=u'spark hadoop spark', probability=DenseVector([0.0693, 0.9307]), prediction=1.0), 
Row(id=7, text=u'apache hadoop', probability=DenseVector([0.9822, 0.0178]), prediction=0.0)]

本文示例來源於官方文檔


更多內容請參考:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM