分類(Classification)
下面的例子說明了怎樣導入LIBSVM 數據文件,解析成RDD[LabeledPoint],然后使用決策樹進行分類。GINI不純度作為不純度衡量標准並且樹的最大深度設置為5。最后計算了測試錯誤率從而評估算法的准確性。
from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.tree import DecisionTree, DecisionTreeModel from pyspark.mllib.util import MLUtils # Load and parse the data file into an RDD of LabeledPoint. data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) # Train a DecisionTree model. # Empty categoricalFeaturesInfo indicates all features are continuous. model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, impurity='gini', maxDepth=5, maxBins=32) # Evaluate model on test instances and compute test error predictions = model.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count()) print('Test Error = ' + str(testErr)) print('Learned classification tree model:') print(model.toDebugString()) # Save and load model model.save(sc, "myModelPath") sameModel = DecisionTreeModel.load(sc, "myModelPath")
以下代碼展示了如何載入一個LIBSVM數據文件,解析成一個LabeledPointRDD,然后使用決策樹,使用Gini不純度作為不純度衡量指標,最大樹深度是5.測試誤差用來計算算法准確率。
# -*- coding:utf-8 -*-
"""
測試決策樹
"""
import os
import sys
import logging
from pyspark.mllib.tree import DecisionTree,DecisionTreeModel
from pyspark.mllib.util import MLUtils
# Path for spark source folder
os.environ['SPARK_HOME']="D:\javaPackages\spark-1.6.0-bin-hadoop2.6"
# Append pyspark to Python Path
sys.path.append("D:\javaPackages\spark-1.6.0-bin-hadoop2.6\python")
sys.path.append("D:\javaPackages\spark-1.6.0-bin-hadoop2.6\python\lib\py4j-0.9-src.zip")
from pyspark import SparkContext
from pyspark import SparkConf
conf = SparkConf()
conf.set("YARN_CONF_DIR ", "D:\javaPackages\hadoop_conf_dir\yarn-conf")
conf.set("spark.driver.memory", "2g")
#conf.set("spark.executor.memory", "1g")
#conf.set("spark.python.worker.memory", "1g")
conf.setMaster("yarn-client")
conf.setAppName("TestDecisionTree")
logger = logging.getLogger('pyspark')
sc = SparkContext(conf=conf)
mylog = []
#載入和解析數據文件為 LabeledPoint RDDdata = MLUtils.loadLibSVMFile(sc,"/home/xiatao/machine_learing/")
#將數據拆分成訓練集合測試集
(trainingData,testData) = data.randomSplit([0.7,0.3])
##訓練決策樹模型
#空的 categoricalFeauresInfo 代表了所有的特征都是連續的
model = DecisionTree.trainClassifier(trainingData, numClasses=2,categoricalFeaturesInfo={},impurity='gini',maxDepth=5,maxBins=32)
# 在測試實例上評估模型並計算測試誤差
predictions = model.predict(testData.map(lambda x:x.features))
labelsAndPoint = testData.map(lambda lp:lp.label).zip(predictions)
testMSE = labelsAndPoint.map(lambda (v,p):(v-p)**2).sum()/float(testData.count())
mylog.append("測試誤差是")
mylog.append(testMSE)
#存儲模型
model.save(sc,"/home/xiatao/machine_learing/")
sc.parallelize(mylog).saveAsTextFile("/home/xiatao/machine_learing/log")
sameModel = DecisionTreeModel.load(sc,"/home/xiatao/machine_learing/")