1.Example
使用Spark MLlib中決策樹分類器API,訓練出一個決策樹模型,使用Python開發。
"""
Decision Tree Classification Example.
"""
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
if __name__ == "__main__":
sc = SparkContext(appName="PythonDecisionTreeClassificationExample")
# 加載和解析數據文件為RDD
dataPath = "/home/zhb/Desktop/work/DecisionTreeShareProject/app/sample_libsvm_data.txt"
print(dataPath)
data = MLUtils.loadLibSVMFile(sc,dataPath)
# 將數據集分割為訓練數據集和測試數據集
(trainingData,testData) = data.randomSplit([0.7,0.3])
print("train data count: " + str(trainingData.count()))
print("test data count : " + str(testData.count()))
# 訓練決策樹分類器
# categoricalFeaturesInfo 為空,表示所有的特征均為連續值
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
impurity='gini', maxDepth=5, maxBins=32)
# 測試數據集上預測
predictions = model.predict(testData.map(lambda x: x.features))
# 打包真實值與預測值
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
# 統計預測錯誤的樣本的頻率
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Decision Tree Test Error = %5.3f%%'%(testErr*100))
print("Decision Tree Learned classifiction tree model : ")
print(model.toDebugString())
# 保存和加載訓練好的模型
modelPath = "/home/zhb/Desktop/work/DecisionTreeShareProject/app/myDecisionTreeClassificationModel"
model.save(sc, modelPath)
sameModel = DecisionTreeModel.load(sc, modelPath)
2.決策樹源碼分析
決策樹分類器API為DecisionTree.trainClassifier,進入源碼分析。
源碼文件所在路徑為,spark-1.6/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala。
@Since("1.1.0")
def trainClassifier(
input: RDD[LabeledPoint],
numClasses: Int,
categoricalFeaturesInfo: Map[Int, Int],
impurity: String,
maxDepth: Int,
maxBins: Int): DecisionTreeModel = {
val impurityType = Impurities.fromString(impurity)
train(input, Classification, impurityType, maxDepth, numClasses, maxBins, Sort,
categoricalFeaturesInfo)
}
訓練出一個分類器,然后調用了train方法。
@Since("1.0.0")
def train(
input: RDD[LabeledPoint],
algo: Algo,
impurity: Impurity,
maxDepth: Int,
numClasses: Int,
maxBins: Int,
quantileCalculationStrategy: QuantileStrategy,
categoricalFeaturesInfo: Map[Int, Int]): DecisionTreeModel = {
val strategy = new Strategy(algo, impurity, maxDepth, numClasses, maxBins,
quantileCalculationStrategy, categoricalFeaturesInfo)
new DecisionTree(strategy).run(input)
}
train方法首先將模型類型(分類或者回歸)、信息增益指標、決策樹深度、分類數目、最大切分箱子數等參數封裝為Strategy,然后新建一個DecisionTree對象,並調用run方法。
@Since("1.0.0")
class DecisionTree private[spark] (private val strategy: Strategy, private val seed: Int)
extends Serializable with Logging {
/**
* @param strategy The configuration parameters for the tree algorithm which specify the type
* of decision tree (classification or regression), feature type (continuous,
* categorical), depth of the tree, quantile calculation strategy, etc.
*/
@Since("1.0.0")
def this(strategy: Strategy) = this(strategy, seed = 0)
strategy.assertValid()
/**
* Method to train a decision tree model over an RDD
*
* @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* @return DecisionTreeModel that can be used for prediction.
*/
@Since("1.2.0")
def run(input: RDD[LabeledPoint]): DecisionTreeModel = {
val rf = new RandomForest(strategy, numTrees = 1, featureSubsetStrategy = "all", seed = seed)
val rfModel = rf.run(input)
rfModel.trees(0)
}
}
run方法中首先新建一個RandomForest對象,將strategy、決策樹數目設置為1,子集選擇策略為"all"傳遞給RandomForest對象,然后調用RandomForest中的run方法,最后返回隨機森林模型中的第一棵決策樹。
也就是,決策樹模型使用了隨機森林模型進行訓練,將決策樹數目設置為1,然后將隨機森林模型中的第一棵決策樹作為結果,返回作為決策樹訓練模型。
3.隨機森林源碼分析
隨機森林的源碼文件所在路徑為,spark-1.6/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala。
private class RandomForest (
private val strategy: Strategy,
private val numTrees: Int,
featureSubsetStrategy: String,
private val seed: Int)
extends Serializable with Logging {
strategy.assertValid()
require(numTrees > 0, s"RandomForest requires numTrees > 0, but was given numTrees = $numTrees.")
require(RandomForest.supportedFeatureSubsetStrategies.contains(featureSubsetStrategy)
|| Try(featureSubsetStrategy.toInt).filter(_ > 0).isSuccess
|| Try(featureSubsetStrategy.toDouble).filter(_ > 0).filter(_ <= 1.0).isSuccess,
s"RandomForest given invalid featureSubsetStrategy: $featureSubsetStrategy." +
s" Supported values: ${NewRFParams.supportedFeatureSubsetStrategies.mkString(", ")}," +
s" (0.0-1.0], [1-n].")
/**
* Method to train a decision tree model over an RDD
*
* @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
* @return RandomForestModel that can be used for prediction.
*/
def run(input: RDD[LabeledPoint]): RandomForestModel = {
val trees: Array[NewDTModel] = NewRandomForest.run(input.map(_.asML), strategy, numTrees,
featureSubsetStrategy, seed.toLong, None)
new RandomForestModel(strategy.algo, trees.map(_.toOld))
}
}
在該文件開頭,通過"import org.apache.spark.ml.tree.impl.{RandomForest => NewRandomForest}"將ml中的RandomForest引入,重新命名為NewRandomForest。
在RandomForest.run方法中,首先新建NewRandomForest模型,並調用該類的run方法,然后將生成的trees作為新建RandomForestModel的入參。
NewRandomForest,源碼文件所在路徑為,spark-1.6/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala。
由於涉及代碼量較大,因此無法將代碼展開,run方法主要有如下調用。
run方法
--->1. val metadata = DecisionTreeMetadata.buildMetadata(retaggedInput, strategy, numTrees,featureSubsetStrategy) # 對輸入數據建立元數據
--->2. val splits = findSplits(retaggedInput, metadata, seed) # 對元數據中的特征進行切分
--->2.1 計算采樣率,對輸入樣本進行采樣
--->2.2 findSplitsBySorting(sampledInput, metadata, continuousFeatures) # 對采樣后的樣本中的特征進行切分
--->2.2.1 val thresholds = findSplitsForContinuousFeature(samples, metadata, idx) # 針對連續型特征
--->2.2.2 val categories = extractMultiClassCategories(splitIndex + 1, featureArity) # 針對分類型特征,且特征無序
--->2.2.3 Array.empty[Split] # 針對分類型特征,且特征有序,訓練時直接構造即可
--->3. val treeInput = TreePoint.convertToTreeRDD(retaggedInput, splits, metadata) # 將輸入數據轉換為樹形數據
--->3.1 input.map { x => TreePoint.labeledPointToTreePoint(x, thresholds, featureArity) # 將LabeledPoint數據轉換為TreePoint數據
--->3.2 arr(featureIndex) = findBin(featureIndex, labeledPoint, featureArity(featureIndex), thresholds(featureIndex)) # 在(labeledPoint,feature)中找出一個離散值
--->4. val baggedInput = BaggedPoint.convertToBaggedRDD(treeInput, strategy.subsamplingRate, numTrees,withReplacement, seed) # 對輸入數據進行采樣
--->4.1 convertToBaggedRDDSamplingWithReplacement(input, subsamplingRate, numSubsamples, seed) #有放回采樣
--->4.2 convertToBaggedRDDWithoutSampling(input) # 樣本數為1,采樣率為100%
--->4.3 convertToBaggedRDDSamplingWithoutReplacement(input, subsamplingRate, numSubsamples, seed) # 無放回采樣
--->5. val (nodesForGroup, treeToNodeToIndexInfo) = RandomForest.selectNodesToSplit(nodeQueue, maxMemoryUsage,metadata, rng) # 取得每棵樹所有需要切分的結點
--->5.1 val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures) { Some(SamplingUtils.reservoirSampleAndCount(Range(0, metadata.numFeatures).iterator, metadata.numFeaturesPerNode, rng.nextLong())._1)} # 如果需要子采樣,選擇特征子集
--->5.2 val nodeMemUsage = RandomForest.aggregateSizeForNode(metadata, featureSubset) * 8L # 計算添加這個結點之后,是否有足夠的內存
--->6. RandomForest.findBestSplits(baggedInput, metadata, topNodes, nodesForGroup, treeToNodeToIndexInfo, splits, nodeQueue, timer, nodeIdCache) # 找出最優切分點
--->6.1 val (split: Split, stats: ImpurityStats) = binsToBestSplit(aggStats, splits, featuresForNode, nodes(nodeIndex)) #找出每個結點最好的切分
--->7. new DecisionTreeClassificationModel(uid, rootNode.toNode, numFeatures, strategy.getNumClasses) # 返回決策樹分類模型
4.Reference
spark mllib中的隨機森林算法,實現源碼以及使用介紹