此前用自己實現的隨機森林算法,應用在titanic生還者預測的數據集上。事實上,有很多開源的算法包供我們使用。無論是本地的機器學習算法包sklearn 還是分布式的spark mllib,都是非常不錯的選擇。
Spark是目前比較流行的分布式計算解決方案,同時支持集群模式和本地單機模式。由於其通過scala語言開發,原生支持scala,同時由於python在科學計算等領域的廣泛應用,Spark也提供了python的接口。
Spark的常用操作詳見官方文檔:
http://spark.apache.org/docs/latest/programming-guide.html
在終端下面鍵入如下命令,切換到spark的目錄,進入相應的環境:
cd $SPARK_HOME
cd ./bin
./pyspark
可以看到,出現了python 的版本號以及spark的logo

此時,仍然是輸入一句,運行一句並輸出。可以事先編輯好腳本保存為filename然后:
./spark-submit filename
下面給出詳細的代碼:
- import pandas as pd
- import numpy as np
- from pyspark.mllib.regression import LabeledPoint
- from pyspark.mllib.tree import RandomForest
- #將類別數量大於2的類別型變量進行重新編碼,並把數據集變成labeledPoint格式
- #df=pd.read_csv('/home/kim/t.txt',index_col=0)
- #for col in ['Pclass','embrk']:
- # values=df[col].drop_duplicates()
- # for v in values:
- # col_name=col+str(v)
- # df[col_name]=(df[col]==v)
- # df[col_name]=df[col_name].apply(lambda x:int(x))
- #df=df.drop(['Pclass','embrk'],axis=1)
- #df.to_csv('train_data')
- #讀入數據集變成彈性分布式數據集RDD ,由於是有監督學習,需要轉換為模型輸入的格式LabeledPoint
- rdd=pyspark.SparkContext.textFile('/home/kim/train')
- train=rdd.map(lambda x:x.split(',')[1])
- train=train.map(lambda line:LabeledPoint(line[1],line[2:]))
- #模型訓練
- model=RandomForest.trainClassifier\
- (train, numClasses=2, categoricalFeaturesInfo={},numTrees=1000,\
- featureSubsetStrategy="auto",impurity='gini', maxDepth=4, maxBins=32)
- #包含LabeledPoint對象的RDD,應用features方法返回其輸入變量的值,label方法返回其真實類別
- data_p=train.map(lambda lp:lp.features)
- v=train.map(lambda lp:lp.label)
- prediction=model.predict(data_p)
- vp=v.zip(prediction)
- #最后輸出模型在訓練集上的正確率
- MSE=vp.map(lambda x:abs(x[0]-x[1]).sum())/vp.count()
- print("MEAN SQURE ERROR: "+str(MSE))
import pandas as pd
import numpy as np
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest
#將類別數量大於2的類別型變量進行重新編碼,並把數據集變成labeledPoint格式
#df=pd.read_csv('/home/kim/t.txt',index_col=0)
#for col in ['Pclass','embrk']:
# values=df[col].drop_duplicates()
# for v in values:
# col_name=col+str(v)
# df[col_name]=(df[col]==v)
# df[col_name]=df[col_name].apply(lambda x:int(x))
#df=df.drop(['Pclass','embrk'],axis=1)
#df.to_csv('train_data')
#讀入數據集變成彈性分布式數據集RDD ,由於是有監督學習,需要轉換為模型輸入的格式LabeledPoint
rdd=pyspark.SparkContext.textFile('/home/kim/train')
train=rdd.map(lambda x:x.split(',')[1])
train=train.map(lambda line:LabeledPoint(line[1],line[2:]))
#模型訓練
model=RandomForest.trainClassifier\
(train, numClasses=2, categoricalFeaturesInfo={},numTrees=1000,\
featureSubsetStrategy="auto",impurity='gini', maxDepth=4, maxBins=32)
#包含LabeledPoint對象的RDD,應用features方法返回其輸入變量的值,label方法返回其真實類別
data_p=train.map(lambda lp:lp.features)
v=train.map(lambda lp:lp.label)
prediction=model.predict(data_p)
vp=v.zip(prediction)
#最后輸出模型在訓練集上的正確率
MSE=vp.map(lambda x:abs(x[0]-x[1]).sum())/vp.count()
print("MEAN SQURE ERROR: "+str(MSE))
后面可以多加測試,例如:
使用更大規模的數據集;
將數據集划分為訓練集測試集,在訓練集上建模在測試集上評估模型性能;
使用mllib里面的其他算法並比較效果,等等
歡迎大家與我交流!
