Python+Spark2.0+hadoop學習筆記——Spark ML Pipeline機器學習流程


情況一:二元分類

這部分使用的數據集是判斷網頁是暫時的還是長青的。因為涉及到了文本的信息,所以需要進行文本的數字化和向量化。

在這部分中,機器學習分為三個部分,第一部分是建立機器學習流程pipeline,第二部分是訓練,第三部分是預測。

在建立機器學習流程pipeline中包含4個階段,如下所示:

StringIndexer:將文字的分類特征轉換為數字。

OneHotEncoder:將一個數字的分類特征字段轉為多個字段。

VectorAssembler:將所有的特征字段整合成一個Vector字段。

DesionTreeClassifier:進行訓練並且產生模型。

訓練過程是指“訓練數據DataFrame”使用pipeline.fit()進行訓練,然后產生pipelineModel模型。

預測過程是指“新數據DataFrame”使用pipelineModel.transform()進行預測,預測完成后會產生“預測結果DataFrame”。

這部分先使用DecisionTree Classifier Model進行預測,代碼如下:

global Path
if sc.master[0:5]=="local":
Path="file:/home/jorlinlee/pythonwork/PythonProject/"
else:
Path="hdfs://master:9000/user/jorlinlee"

創建row_dr DataFrame

row_df=sqlContext.read.format("csv").option("header","true").option("delimiter","\t").load(Path+"data/train.tsv")

編寫DataFrames UDF 用戶自定義函數(將"?"轉換為"0")

from pyspark.sql.functions import udf

def replace_question(x):

return("0" if x=="?" else x)

replace_question=udf(replace_question)

將string數據類型轉換為double數據類型

from pyspark.sql.functions import col

import pyspark.sql.types

df=row_df.select(

['url','alchemy_category'] +

[replace_question(col(colimn)).cast("double").alias(column)

for column in row_df.columns[4:]])

將數據分成train_df與test_df

train_df,test_df=df.randomSplit([0.7,0.3])
train_df.cache()
test_df.cache()

使用StringIndexer:將字符串分類特征字段轉換為數值

from pyspark.ml.feature import StringIndexer

categoryIndexer=StringIndexer(inputCol='alchemy_category',outputCol="alchemy_category_Index")

categoryTransformer=categoryIndexer.fit(df)

df1=categoryTransformer.transform(train_df)

使用OneHotEncoder:將一個數值的分類特征字段轉換為多個字段的Vector

from pyspark.ml.feature import OneHotEncoder

encoder=OneHotEncoder(dropLast=false,inputCol='alchemy_category_Index',outputCol="alchemy_category_IndexVec")

df2=encoder.transform(df1)

使用VectorAssembler:將多個特征字段整合成一個特征的Vector

from pyspark.ml.feature import VectorAssembler

assemblerInputs=['alchemy_category_IndexVec'] + row_df.columns[4:-1]

assembler=VectorAssembler(inputCols=assemblerInputs,outputCol="feature")

df3=assembler.transform(df2)

使用DecisionTreeClassifier二元分類

from pyspark.ml.classification import DecisionTreeClassifier

dt=DecisionTreeClassifier(labelCol="label",featuresCol="features",impurity="gini",maxDepth=10,maxBins=14)

dt_model=dt.fit(df3)

進行預測

df4=dt_model.transform(df3)

以上是分解過程,下面使用pipeline流程組件

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

建立pipeline

stringIndexer=StringIndexer(inputCol='alchemy_category',
outputCol="alchemy_category_Index")
encoder=OneHotEncoder(dropLast=False,
inputCol='alchemy_category_Index',
outputCol="alchemy_category_IndexVec")
assemblerInputs=['alchemy_category_IndexVec']+row_df.columns[4:-1]
assembler=VectorAssembler(inputCols=assemblerInputs,outputCol="feature")
dt=DecisionTreeClassifier(labelCol="label",featuresCol="feature",impurity="gini",maxDepth=10,maxBins=14)
pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,dt])

查看pipeline階段

pipeline.getStages()

使用pipeline進行數據處理與訓練

pipelineModel=pipeline.fit(train_df)

查看訓練完成后的決策樹模型(第3個階段會產生決策樹模型)

pipelineModel.stages[3]

使用pipelineModel.transform進行預測

predicted=pipelineModel.transform(test_df)

評估模型的准確性(使用AUC)

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator=BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction",
labelCol="label",
metricName="areaUnderROC")

auc=evaluator.evaluate(predicted)

auc

結果是:0.617

提出改進方案:

方案一:

使用TrainValidation進行訓練驗證找出最佳模型

from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit

設置訓練驗證的參數

paramGrid=ParamGridBuilder().addGrid(dt.impurity,["gini","entropy"]).addGrid(dt.maxDepth,[5,10,15]).addGrid(dt.maxBins,[10,15,20]).build()

創建TrainValidationSplit

tvs=TrainValidationSplit(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)

建立tvs_pipeline

tvs_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,tvs])

使用tvs_pipeline流程進行訓練驗證

tvs_pipelineModel=tvs_pipeline.fit(train_df)

評估最佳模型AUC

predictions=tvs_pipelineModel.transform(test_df)

auc=evaluator.evaluate(predictions)

auc

結果:0.656

方案二:使用crossValidation交叉驗證找出最佳模型

from pyspark.ml.tuning import CrossValidator

建立交叉驗證的CrossValidator(與之前的paramGrid有聯系)

cv=CrossValidator(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)

建立交叉驗證的cv_pipeline

cv_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,cv])

訓練模型

cv_pipelineModel=cv_pipeline.fit(train_df)

評估最佳模型AUC

predictions=cv_pipelineModel.transform(test_df)

auc=evaluator.evaluate(predictions)

auc

結果:0.658

方案三:使用隨機森林RandomForestClassifier分類器

from pyspark.ml.classification import RandomForestClassifier

建立隨機森林分類模型

rf=RandomForestClassifier(labelCol="label",featuresCol="feature",numTrees=10)

建立隨機森林分類pipeline

rfpipeline=Pipeline(stages=[stringIndexer,encoder,assembler,rf])

對隨機森林模型進行訓練

rfpipelineModel=rfpipeline.fit(train_df)

使用模型進行預測

rfpredicted=rfpipelineModel.transform(test_df)

auc=evaluator.evaluate(rfpredicted)

auc

結果:0.738

使用RandomForestClassifier TrainValidation找出最佳模型

from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit

paramGrid=ParamGridBuilder().addGrid(rf.impurity,['gini','entropy']).addGrid(rf.maxDepth,[5,10,15]).addGrid(rf.maxBins,[10,15,20]).addGrid(rf.numTrees,[10,20,30]).build()

rftvs=TrainValidationSplit(estimator=rf,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)

rftvs_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,rftvs])

rftvs_pipelineModel=rftvs_pipeline.fit(train_df)

rftvspredcitions=rftvs_pipelineMode.transform(test_df)

auc=evaluator.evaluate(rftvspredictions)

auc

結果是:0.760

使用crossValidation找出最佳模型

from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

rfcv=CrossValidator(estimator=rf,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)

rfcv_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,rfcv])

rfcv_pipelineModel=rfcv_pipeline.fit(train_df)

rfcvpredictions=rfcv_pipelineModel.transform(test_df)

auc=evaluator.evaluate(rfcvpredictions)

auc

結果:0.762

情況二:多元分類

這部分使用的數據是森林覆蓋樹種數據。數據中沒有涉及到文本數據,因此在建立pipeline時使用VectorAssembler和相應的機器學習模型就好了。

讀取數據

global path
if sc.master[0:5]=="local":
Path="file:/home/jorlinlee/pythonwork/PythonProject/"
else:
Path="hdfs://master:9000/user/jorlinlee/"

rawData=sc.textFile(Path+"data/covtype.data")
lines=rawData.map(lambda x:x.split(","))

因為這份數據最后需要轉換成DataFrame形式,但是這份數據沒有字段名,因此需要人工增加

from pyspark.sql.types import StringType, StructField, StructType

fields=[StructField("f"+str(i), StringType(), True) for i in range(fieldnum)]

schema=StructType(fields)

構造DataFrame

covtype_df=spark.createDataFrame(lines,schema)

將string格式轉換為double

from pyspark.sql.functions import col
covtype_df=covtype_df.select([col(column).cast("double").alias(column) for column in covtype_df.columns])

創建特征字段List

featuresCols=covtype_df.columns[:54]

創建label字段

covtype_df=covtype_df.withColumn("label",covtype_df["f54"] - 1).drop("f54")

將數據分成train_df與test_df

train_df,test_df=covtype_df.randomSplit([0.7,0.3])

train_df.cache()

test_df.cache()

導入模塊

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

建立pipeline

vectorAssembler=VectorAssembler(inputCols=featuresCols,outputCol="features")
dt=DecisionTreeClassifier(labelCol="label",featuresCol="features",maxDepth=5,maxBins=20)
dt_pipeline=Pipeline(stages=[vectorAssembler,dt])

使用pipeline進行訓練

pipelineModel=dt_pipeline.fit(train_df)

使用pipelinModel進行預測

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator=MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="accuracy")

predicted=pipelineModel.transform(test_df)

accuracy=evaluator.evaluate(predictions)

accuracy

結果:0.703

使用TrainValidation進行訓練驗證找出最佳模型

from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit

paramGrid=ParamGridBuilder().addGrid(dt.impurity,["gini","entropy"]).addGrid(dt.maxDepth,[10,15,25]).addGrid(dt.maxBins,[30,40,50]).build()

tvs=TrainValidationSplit(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)

tvs_pipeline=Pipeline(stages=[vectorAssembler,tvs])

tvs_pipelineModel=tvs_pipeline.fit(train_df)

predictions=tvs_pipelineModel.transform(test_df)

accuracy=evaluator.evaluate(predictions)
accuracy

結果:0.930

情況三:回歸分析

這部分使用的是共享單車預測的數據。這部分在建立pipeline時使用VectorAssembler(將所有的特征字段整合成vector)、VectorIndexer(將不重復數值的數量小於等於maxCategories參數值所對應的字段視為分類字段,否則視為數值字段)和機器學習模型。

代碼如下:

導入數據

global Path
if sc.master[0:5]=="local":
Path="file:/home/jorlinlee/pythonwork/PythonProject/"
else:
Path="hdfs://master:9000/user/jorlinlee/"

hour_df=spark.read.format('csv').option("header",'true').load(Path+"data/hour.csv")

去掉不重要的字段

hour_df=hour_df.drop("instant").drop("dteday").drop("yr").drop("casual").drop("registered")

數據類型變換

from pyspark.sql.functions import col

hour_df=hour_df.select([col(column).cast("double").alias(column) for column in hour_df.columns])

建立pipeline流程

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor

featuresCols=hour_df.columns[:-1]

vectorAssembler=VectorAssembler(inputCols=featuresCols,outputCol="aFeatures")vectorIndexer=VectorIndexer(inputCol="aFeatures",outputCol="features",maxCategories=24)

dt=DecisionTreeRegressor(labelCol="cnt",featuresCol="features")

dt_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,dt])

分割數據

train_df,test_df=hour_df.randomSplit([0.7,0.3])

train_df.cache()

test_df.cache()

使用pipeline進行數據處理與訓練

dt_pipelineModel=dt_pipeline.fit(train_df)

predicted_df=dt_pipelineModel.transform(test_df)

評估模型的准確率

from pyspark.ml.evaluation import RegressionEvaluator

evaluator=RegressionEvaluator(labelCol='cnt',predictionCol='prediction',metricName="rmse")

predicted_df=dt_pipelineModel.transform(test_df)

rmse=evaluator.evaluate(predicted_df)

rmse

結果:95.617

使用TrainValidation進行訓練驗證找出最佳模型

from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit

paramGrid=ParamGridBuilder().addGrid(dt.maxDepth,[5,10,15,25]).addGrid(dt.maxBins,[25,35,45,50]).build()

tvs=TrainValidationSplit(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)

tvs_pipeline=Pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,tvs])

tvs_pipelineModel=tvs_pipeline.fit(train_df)

predictions=tvs_pipelineModel.transform(test_df)

rmse=evaluator.evaluate(predictions)

rmse

結果:78.285

使用crossValidation進行交叉驗證找出最佳模型

from pyspark.ml.tuning import CrossValidator
from pyspark.ml import Pipeline

cv=CrossValidator(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)

cv_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,cv])

cv_pipelineModel=cv_pipeline.fit(train_df)

predictions=cv_pipelineModel.transform(test_df)

rmse=evaluator.evaluate(predictions)

rmse

結果:78.457

使用GBT Regression(梯度提升樹,一次只產生一棵決策樹,再根據前一個決策樹的結果決定如何產生下一個決策樹)

from pyspark.ml.regression import GBTRegressor

gbt=GBTRegressor(labelCol='cnt',featuresCol='features')

gbt_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,gbt])

gbt_pipelineModel=gbt_pipeline.fit(train_df)

predicted_df=gbt_pipelineModel.transform(test_df)

rmse=evaluator.evaluate(predicted_df)

rmse

結果:75.699

使用GBT Regression CrossValidation找出最佳模型

from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

paramGrid=ParamGridBuilder().addGrid(gbt.maxDepth,[5,10]).addGrid(gbt.maxBins,[25,40]).addGrid(gbt.maxIter,[10,50]).build()

cv=CrossValidator(estimator=gbt,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)

cv_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,cv])

cv_pipelineModel=cv_pipeline.fit(train_df)

predicted_df=cv_pipelineModel.transform(test_df)

evaluator=RegressionEvaluator(labelCol='cnt',predictionCol='prediction',metricName="rmse")

rmse=evaluator.evaluate(predicted_df)

rmse

結果:70.732


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM