""" Pipeline Example. """ # $example on$ from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer # $example off$ from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession\ .builder\ .appName("PipelineExample")\ .getOrCreate() # $example on$ # Prepare training documents from a list of (id, text, label) tuples. training = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"]) # Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.001) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) # Fit the pipeline to training documents. model = pipeline.fit(training) # Prepare test documents, which are unlabeled (id, text) tuples. test = spark.createDataFrame([ (4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop") ], ["id", "text"]) # Make predictions on test documents and print columns of interest. prediction = model.transform(test) selected = prediction.select("id", "text", "probability", "prediction") for row in selected.collect(): rid, text, prob, prediction = row print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction)) # $example off$ spark.stop()
""" Decision Tree Classification Example. """ from __future__ import print_function # $example on$ from pyspark.ml import Pipeline from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator # $example off$ from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession\ .builder\ .appName("DecisionTreeClassificationExample")\ .getOrCreate() # $example on$ # Load the data stored in LIBSVM format as a DataFrame. data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") # Index labels, adding metadata to the label column. # Fit on whole dataset to include all labels in index. labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data) # Automatically identify categorical features, and index them. # We specify maxCategories so features with > 4 distinct values are treated as continuous. featureIndexer =\ VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data) # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) # Train a DecisionTree model. dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures") # Chain indexers and tree in a Pipeline pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt]) # Train model. This also runs the indexers. model = pipeline.fit(trainingData) # Make predictions. predictions = model.transform(testData) # Select example rows to display. predictions.select("prediction", "indexedLabel", "features").show(5) # Select (prediction, true label) and compute test error evaluator = MulticlassClassificationEvaluator( labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy") accuracy = evaluator.evaluate(predictions) print("Test Error = %g " % (1.0 - accuracy)) treeModel = model.stages[2] # summary only print(treeModel) # $example off$ spark.stop()
管道里的主要概念
MLlib提供標准的接口來使聯合多個算法到單個的管道或者工作流,管道的概念源於scikit-learn項目。
1.數據框:機器學習接口使用來自Spark SQL的數據框形式數據作為數據集,它可以處理多種數據類型。比如,一個數據框可以有不同的列存儲文本、特征向量、標簽值和預測值。
2.轉換器:轉換器是將一個數據框變為另一個數據框的算法。比如,一個機器學習模型就是一個轉換器,它將帶有特征數據框轉為預測值數據框。
3.估計器:估計器是擬合一個數據框來產生轉換器的算法。比如,一個機器學習算法就是一個估計器,它訓練一個數據框產生一個模型。
4.管道:一個管道串起多個轉換器和估計器,明確一個機器學習工作流。
5.參數:管道中的所有轉換器和估計器使用共同的接口來指定參數。
工作原理
管道由一系列有順序的階段指定,每個狀態時轉換器或估計器。每個狀態的運行是有順序的,輸入的數據框通過每個階段進行改變。在轉換器階段,transform()方法被調用於數據框上。對於估計器階段,fit()方法被調用來產生一個轉換器,然后該轉換器的transform()方法被調用在數據框上。
下面的圖說明簡單的文檔處理工作流的運行。