Spark.ML之PipeLine學習筆記


地址:

 
Spark PipeLine
是基於DataFrames的高層的API,可以方便用戶構建和調試機器學習流水線
可以使得多個機器學習算法順序執行,達到高效的數據處理的目的
 
DataFrame是來自Spark SQL的ML DataSet 可以存儲一系列的數據類型,text,特征向量,Label和預測結果
 
Transformer:將DataFrame轉化為另外一個DataFrame的算法,通過實現transform()方法
Estimator:將DataFrame轉化為一個Transformer的算法,通過實現fit()方法
 
PipeLine:將多個Transformer和Estimator串成一個特定的ML Wolkflow
Parameter:Tansformer和Estimator共用同一個聲明參數的API
 
Transformer和 Estimator是 PipeLine的Stage
Pipeline是一系列的Stage按照聲明的順序排列成的工作流
 
Transformer.transform()和 Estimator.fit()都是無狀態的
每一個 Transformer和 Estimator的實例都有唯一的ID在聲明參數的時候非常有用
 
下面是一個線性的PipeLine的流程
 
上面創建的是線性的PipeLine,每一步都依賴上一步的結果
如果數據流可以組成有向不循環圖(Directed Acyclic Graph DAG)
那么可以創建Non-Linear Pipeline
 
RuntimeCheching:因為PipeLine可以操作多種類型的DataFrame
所以不能使用編譯時檢測
那么PipeLine或者PipeLine Model使用運行時檢測
這種檢測使用了DataFrame Schema這個Schema是DataFrame列的數據類型的描述
 
Unique PipeLine Stage:PipeLine Stage應當都是唯一的實例,都擁有唯一的ID
 
Param是一個命名參數,帶有自包含文檔
ParamMap是一個參數與值的對(parameter,value)
 
將參數傳遞給算法主要有下面兩種方式:
1. 為實例設置參數,若Ir是LogisticRegression的實例,調用Ir.SetMaxIter(10)意味着Ir.fit()做多調用10次
2. 傳遞一個ParamMap給fit()或者transform()那么位於map中的所有的parameter都會通過setter方法override以前的參數
 
很多時候將PipeLine保存到disk方便以后的使用是值得的
Spark 1.6時候,model Import/Export函數被添加到PipeLine API
大部分transformer和一些ML Model支持I/O
 
下面是基本組件的一些操作的例子:
 
 
  1. #導入向量和模型
  2. from pyspark.ml.linalg importVectors
  3. from pyspark.ml.classification importLogisticRegression
  4. #准備訓練數據
  5. # Prepare training data from a list of (label, features) tuples.
  6. training = spark.createDataFrame([
  7. (1.0,Vectors.dense([0.0,1.1,0.1])),
  8. (0.0,Vectors.dense([2.0,1.0,-1.0])),
  9. (0.0,Vectors.dense([2.0,1.3,1.0])),
  10. (1.0,Vectors.dense([0.0,1.2,-0.5]))],["label","features"])
  11. #創建回歸實例,這個實例是Estimator
  12. # Create a LogisticRegression instance. This instance is an Estimator.
  13. lr =LogisticRegression(maxIter=10, regParam=0.01)
  14. #打印出參數和文檔
  15. # Print out the parameters, documentation, and any default values.
  16. print"LogisticRegression parameters:\n"+ lr.explainParams()+"\n"
  17. #使用Ir中的參數訓練出Model1
  18. # Learn a LogisticRegression model. This uses the parameters stored in lr.
  19. model1 = lr.fit(training)
  20. # Since model1 is a Model (i.e., a transformer produced by an Estimator),
  21. # we can view the parameters it used during fit().
  22. # This prints the parameter (name: value) pairs, where names are unique IDs for this
  23. # LogisticRegression instance.
  24. #查看model1在fit()中使用的參數
  25. print"Model 1 was fit using parameters: "
  26. print model1.extractParamMap()
  27. #修改其中的一個參數
  28. # We may alternatively specify parameters using a Python dictionary as a paramMap
  29. paramMap ={lr.maxIter:20}
  30. #覆蓋掉
  31. paramMap[lr.maxIter]=30# Specify 1 Param, overwriting the original maxIter.
  32. #更新參數對
  33. paramMap.update({lr.regParam:0.1, lr.threshold:0.55})# Specify multiple Params.
  34. # You can combine paramMaps, which are python dictionaries.
  35. #新的參數,合並為兩組參數對
  36. paramMap2 ={lr.probabilityCol:"myProbability"}# Change output column name
  37. paramMapCombined = paramMap.copy()
  38. paramMapCombined.update(paramMap2)
  39. #重新得到model2並拿出來參數看看
  40. # Now learn a new model using the paramMapCombined parameters.
  41. # paramMapCombined overrides all parameters set earlier via lr.set* methods.
  42. model2 = lr.fit(training, paramMapCombined)
  43. print"Model 2 was fit using parameters: "
  44. print model2.extractParamMap()
  45. #准備測試的數據
  46. # Prepare test data
  47. test = spark.createDataFrame([
  48. (1.0,Vectors.dense([-1.0,1.5,1.3])),
  49. (0.0,Vectors.dense([3.0,2.0,-0.1])),
  50. (1.0,Vectors.dense([0.0,2.2,-1.5]))],["label","features"])
  51. # Make predictions on test data using the Transformer.transform() method.
  52. # LogisticRegression.transform will only use the 'features' column.
  53. # Note that model2.transform() outputs a "myProbability" column instead of the usual
  54. # 'probability' column since we renamed the lr.probabilityCol parameter previously.
  55. prediction = model2.transform(test)
  56. #得到預測的DataFrame打印出預測中的選中列
  57. selected = prediction.select("features","label","myProbability","prediction")
  58. for row in selected.collect():
  59. print row
 
 
下面是一個PipeLine的實例:
 
  1. from pyspark.ml importPipeline
  2. from pyspark.ml.classification importLogisticRegression
  3. from pyspark.ml.feature importHashingTF,Tokenizer
  4. #准備測試數據
  5. # Prepare training documents from a list of (id, text, label) tuples.
  6. training = spark.createDataFrame([
  7. (0L,"a b c d e spark",1.0),
  8. (1L,"b d",0.0),
  9. (2L,"spark f g h",1.0),
  10. (3L,"hadoop mapreduce",0.0)],["id","text","label"])
  11. #構建機器學習流水線
  12. # Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
  13. tokenizer =Tokenizer(inputCol="text", outputCol="words")
  14. hashingTF =HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
  15. lr =LogisticRegression(maxIter=10, regParam=0.01)
  16. pipeline =Pipeline(stages=[tokenizer, hashingTF, lr])
  17. #訓練出model
  18. # Fit the pipeline to training documents.
  19. model = pipeline.fit(training)
  20. #測試數據
  21. # Prepare test documents, which are unlabeled (id, text) tuples.
  22. test = spark.createDataFrame([
  23. (4L,"spark i j k"),
  24. (5L,"l m n"),
  25. (6L,"mapreduce spark"),
  26. (7L,"apache hadoop")],["id","text"])
  27. #預測,打印出想要的結果
  28. # Make predictions on test documents and print columns of interest.
  29. prediction = model.transform(test)
  30. selected = prediction.select("id","text","prediction")
  31. for row in selected.collect():
  32. print(row)






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM