情況一:二元分類
這部分使用的數據集是判斷網頁是暫時的還是長青的。因為涉及到了文本的信息,所以需要進行文本的數字化和向量化。
在這部分中,機器學習分為三個部分,第一部分是建立機器學習流程pipeline,第二部分是訓練,第三部分是預測。
在建立機器學習流程pipeline中包含4個階段,如下所示:
StringIndexer:將文字的分類特征轉換為數字。
OneHotEncoder:將一個數字的分類特征字段轉為多個字段。
VectorAssembler:將所有的特征字段整合成一個Vector字段。
DesionTreeClassifier:進行訓練並且產生模型。
訓練過程是指“訓練數據DataFrame”使用pipeline.fit()進行訓練,然后產生pipelineModel模型。
預測過程是指“新數據DataFrame”使用pipelineModel.transform()進行預測,預測完成后會產生“預測結果DataFrame”。
這部分先使用DecisionTree Classifier Model進行預測,代碼如下:
global Path
if sc.master[0:5]=="local":
Path="file:/home/jorlinlee/pythonwork/PythonProject/"
else:
Path="hdfs://master:9000/user/jorlinlee"
創建row_dr DataFrame
row_df=sqlContext.read.format("csv").option("header","true").option("delimiter","\t").load(Path+"data/train.tsv")
編寫DataFrames UDF 用戶自定義函數(將"?"轉換為"0")
from pyspark.sql.functions import udf
def replace_question(x):
return("0" if x=="?" else x)
replace_question=udf(replace_question)
將string數據類型轉換為double數據類型
from pyspark.sql.functions import col
import pyspark.sql.types
df=row_df.select(
['url','alchemy_category'] +
[replace_question(col(colimn)).cast("double").alias(column)
for column in row_df.columns[4:]])
將數據分成train_df與test_df
train_df,test_df=df.randomSplit([0.7,0.3])
train_df.cache()
test_df.cache()
使用StringIndexer:將字符串分類特征字段轉換為數值
from pyspark.ml.feature import StringIndexer
categoryIndexer=StringIndexer(inputCol='alchemy_category',outputCol="alchemy_category_Index")
categoryTransformer=categoryIndexer.fit(df)
df1=categoryTransformer.transform(train_df)
使用OneHotEncoder:將一個數值的分類特征字段轉換為多個字段的Vector
from pyspark.ml.feature import OneHotEncoder
encoder=OneHotEncoder(dropLast=false,inputCol='alchemy_category_Index',outputCol="alchemy_category_IndexVec")
df2=encoder.transform(df1)
使用VectorAssembler:將多個特征字段整合成一個特征的Vector
from pyspark.ml.feature import VectorAssembler
assemblerInputs=['alchemy_category_IndexVec'] + row_df.columns[4:-1]
assembler=VectorAssembler(inputCols=assemblerInputs,outputCol="feature")
df3=assembler.transform(df2)
使用DecisionTreeClassifier二元分類
from pyspark.ml.classification import DecisionTreeClassifier
dt=DecisionTreeClassifier(labelCol="label",featuresCol="features",impurity="gini",maxDepth=10,maxBins=14)
dt_model=dt.fit(df3)
進行預測
df4=dt_model.transform(df3)
以上是分解過程,下面使用pipeline流程組件
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
建立pipeline
stringIndexer=StringIndexer(inputCol='alchemy_category',
outputCol="alchemy_category_Index")
encoder=OneHotEncoder(dropLast=False,
inputCol='alchemy_category_Index',
outputCol="alchemy_category_IndexVec")
assemblerInputs=['alchemy_category_IndexVec']+row_df.columns[4:-1]
assembler=VectorAssembler(inputCols=assemblerInputs,outputCol="feature")
dt=DecisionTreeClassifier(labelCol="label",featuresCol="feature",impurity="gini",maxDepth=10,maxBins=14)
pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,dt])
查看pipeline階段
pipeline.getStages()
使用pipeline進行數據處理與訓練
pipelineModel=pipeline.fit(train_df)
查看訓練完成后的決策樹模型(第3個階段會產生決策樹模型)
pipelineModel.stages[3]
使用pipelineModel.transform進行預測
predicted=pipelineModel.transform(test_df)
評估模型的准確性(使用AUC)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction",
labelCol="label",
metricName="areaUnderROC")
auc=evaluator.evaluate(predicted)
auc
結果是:0.617
提出改進方案:
方案一:
使用TrainValidation進行訓練驗證找出最佳模型
from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit
設置訓練驗證的參數
paramGrid=ParamGridBuilder().addGrid(dt.impurity,["gini","entropy"]).addGrid(dt.maxDepth,[5,10,15]).addGrid(dt.maxBins,[10,15,20]).build()
創建TrainValidationSplit
tvs=TrainValidationSplit(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)
建立tvs_pipeline
tvs_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,tvs])
使用tvs_pipeline流程進行訓練驗證
tvs_pipelineModel=tvs_pipeline.fit(train_df)
評估最佳模型AUC
predictions=tvs_pipelineModel.transform(test_df)
auc=evaluator.evaluate(predictions)
auc
結果:0.656
方案二:使用crossValidation交叉驗證找出最佳模型
from pyspark.ml.tuning import CrossValidator
建立交叉驗證的CrossValidator(與之前的paramGrid有聯系)
cv=CrossValidator(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)
建立交叉驗證的cv_pipeline
cv_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,cv])
訓練模型
cv_pipelineModel=cv_pipeline.fit(train_df)
評估最佳模型AUC
predictions=cv_pipelineModel.transform(test_df)
auc=evaluator.evaluate(predictions)
auc
結果:0.658
方案三:使用隨機森林RandomForestClassifier分類器
from pyspark.ml.classification import RandomForestClassifier
建立隨機森林分類模型
rf=RandomForestClassifier(labelCol="label",featuresCol="feature",numTrees=10)
建立隨機森林分類pipeline
rfpipeline=Pipeline(stages=[stringIndexer,encoder,assembler,rf])
對隨機森林模型進行訓練
rfpipelineModel=rfpipeline.fit(train_df)
使用模型進行預測
rfpredicted=rfpipelineModel.transform(test_df)
auc=evaluator.evaluate(rfpredicted)
auc
結果:0.738
使用RandomForestClassifier TrainValidation找出最佳模型
from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit
paramGrid=ParamGridBuilder().addGrid(rf.impurity,['gini','entropy']).addGrid(rf.maxDepth,[5,10,15]).addGrid(rf.maxBins,[10,15,20]).addGrid(rf.numTrees,[10,20,30]).build()
rftvs=TrainValidationSplit(estimator=rf,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)
rftvs_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,rftvs])
rftvs_pipelineModel=rftvs_pipeline.fit(train_df)
rftvspredcitions=rftvs_pipelineMode.transform(test_df)
auc=evaluator.evaluate(rftvspredictions)
auc
結果是:0.760
使用crossValidation找出最佳模型
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
rfcv=CrossValidator(estimator=rf,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)
rfcv_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,rfcv])
rfcv_pipelineModel=rfcv_pipeline.fit(train_df)
rfcvpredictions=rfcv_pipelineModel.transform(test_df)
auc=evaluator.evaluate(rfcvpredictions)
auc
結果:0.762
情況二:多元分類
這部分使用的數據是森林覆蓋樹種數據。數據中沒有涉及到文本數據,因此在建立pipeline時使用VectorAssembler和相應的機器學習模型就好了。
讀取數據
global path
if sc.master[0:5]=="local":
Path="file:/home/jorlinlee/pythonwork/PythonProject/"
else:
Path="hdfs://master:9000/user/jorlinlee/"
rawData=sc.textFile(Path+"data/covtype.data")
lines=rawData.map(lambda x:x.split(","))
因為這份數據最后需要轉換成DataFrame形式,但是這份數據沒有字段名,因此需要人工增加
from pyspark.sql.types import StringType, StructField, StructType
fields=[StructField("f"+str(i), StringType(), True) for i in range(fieldnum)]
schema=StructType(fields)
構造DataFrame
covtype_df=spark.createDataFrame(lines,schema)
將string格式轉換為double
from pyspark.sql.functions import col
covtype_df=covtype_df.select([col(column).cast("double").alias(column) for column in covtype_df.columns])
創建特征字段List
featuresCols=covtype_df.columns[:54]
創建label字段
covtype_df=covtype_df.withColumn("label",covtype_df["f54"] - 1).drop("f54")
將數據分成train_df與test_df
train_df,test_df=covtype_df.randomSplit([0.7,0.3])
train_df.cache()
test_df.cache()
導入模塊
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
建立pipeline
vectorAssembler=VectorAssembler(inputCols=featuresCols,outputCol="features")
dt=DecisionTreeClassifier(labelCol="label",featuresCol="features",maxDepth=5,maxBins=20)
dt_pipeline=Pipeline(stages=[vectorAssembler,dt])
使用pipeline進行訓練
pipelineModel=dt_pipeline.fit(train_df)
使用pipelinModel進行預測
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator=MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="accuracy")
predicted=pipelineModel.transform(test_df)
accuracy=evaluator.evaluate(predictions)
accuracy
結果:0.703
使用TrainValidation進行訓練驗證找出最佳模型
from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit
paramGrid=ParamGridBuilder().addGrid(dt.impurity,["gini","entropy"]).addGrid(dt.maxDepth,[10,15,25]).addGrid(dt.maxBins,[30,40,50]).build()
tvs=TrainValidationSplit(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)
tvs_pipeline=Pipeline(stages=[vectorAssembler,tvs])
tvs_pipelineModel=tvs_pipeline.fit(train_df)
predictions=tvs_pipelineModel.transform(test_df)
accuracy=evaluator.evaluate(predictions)
accuracy
結果:0.930
情況三:回歸分析
這部分使用的是共享單車預測的數據。這部分在建立pipeline時使用VectorAssembler(將所有的特征字段整合成vector)、VectorIndexer(將不重復數值的數量小於等於maxCategories參數值所對應的字段視為分類字段,否則視為數值字段)和機器學習模型。
代碼如下:
導入數據
global Path
if sc.master[0:5]=="local":
Path="file:/home/jorlinlee/pythonwork/PythonProject/"
else:
Path="hdfs://master:9000/user/jorlinlee/"
hour_df=spark.read.format('csv').option("header",'true').load(Path+"data/hour.csv")
去掉不重要的字段
hour_df=hour_df.drop("instant").drop("dteday").drop("yr").drop("casual").drop("registered")
數據類型變換
from pyspark.sql.functions import col
hour_df=hour_df.select([col(column).cast("double").alias(column) for column in hour_df.columns])
建立pipeline流程
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
featuresCols=hour_df.columns[:-1]
vectorAssembler=VectorAssembler(inputCols=featuresCols,outputCol="aFeatures")vectorIndexer=VectorIndexer(inputCol="aFeatures",outputCol="features",maxCategories=24)
dt=DecisionTreeRegressor(labelCol="cnt",featuresCol="features")
dt_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,dt])
分割數據
train_df,test_df=hour_df.randomSplit([0.7,0.3])
train_df.cache()
test_df.cache()
使用pipeline進行數據處理與訓練
dt_pipelineModel=dt_pipeline.fit(train_df)
predicted_df=dt_pipelineModel.transform(test_df)
評估模型的准確率
from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(labelCol='cnt',predictionCol='prediction',metricName="rmse")
predicted_df=dt_pipelineModel.transform(test_df)
rmse=evaluator.evaluate(predicted_df)
rmse
結果:95.617
使用TrainValidation進行訓練驗證找出最佳模型
from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit
paramGrid=ParamGridBuilder().addGrid(dt.maxDepth,[5,10,15,25]).addGrid(dt.maxBins,[25,35,45,50]).build()
tvs=TrainValidationSplit(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)
tvs_pipeline=Pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,tvs])
tvs_pipelineModel=tvs_pipeline.fit(train_df)
predictions=tvs_pipelineModel.transform(test_df)
rmse=evaluator.evaluate(predictions)
rmse
結果:78.285
使用crossValidation進行交叉驗證找出最佳模型
from pyspark.ml.tuning import CrossValidator
from pyspark.ml import Pipeline
cv=CrossValidator(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)
cv_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,cv])
cv_pipelineModel=cv_pipeline.fit(train_df)
predictions=cv_pipelineModel.transform(test_df)
rmse=evaluator.evaluate(predictions)
rmse
結果:78.457
使用GBT Regression(梯度提升樹,一次只產生一棵決策樹,再根據前一個決策樹的結果決定如何產生下一個決策樹)
from pyspark.ml.regression import GBTRegressor
gbt=GBTRegressor(labelCol='cnt',featuresCol='features')
gbt_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,gbt])
gbt_pipelineModel=gbt_pipeline.fit(train_df)
predicted_df=gbt_pipelineModel.transform(test_df)
rmse=evaluator.evaluate(predicted_df)
rmse
結果:75.699
使用GBT Regression CrossValidation找出最佳模型
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
paramGrid=ParamGridBuilder().addGrid(gbt.maxDepth,[5,10]).addGrid(gbt.maxBins,[25,40]).addGrid(gbt.maxIter,[10,50]).build()
cv=CrossValidator(estimator=gbt,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)
cv_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,cv])
cv_pipelineModel=cv_pipeline.fit(train_df)
predicted_df=cv_pipelineModel.transform(test_df)
evaluator=RegressionEvaluator(labelCol='cnt',predictionCol='prediction',metricName="rmse")
rmse=evaluator.evaluate(predicted_df)
rmse
結果:70.732