寫這個系列是因為最近公司在搞技術分享,學習Spark,我的任務是講PySpark的應用,因為我主要用Python,結合Spark,就講PySpark了。然而我在學習的過程中發現,PySpark很雞肋(至少現在我覺得我不會拿PySpark做開發)。為什么呢?原因如下:
1.PySpark支持的算法太少了。我們看一下PySpark支持的算法:(參考官方文檔)
前面兩個pyspark.sql和pyspark.streaming是對sql和streaming的支持。主要是讀取數據,和streaming處理這種方式(當然這是spark的優勢,要是這也不支持真是見鬼了)。pyspark.ml和pyspark.mllib分別是ml的api和mllib的api,ml的算法真心少啊,而且支持的功能很有限,譬如Lr(邏輯回歸)和GBT目前只支持二分類,不支持多分類。mllib相對好點,支持的算法也多點,雖然昨天發的博文講mlllib的時候說過有的算法不支持分布式,所以才會有限,但是我在想,如果我需要用到A算法,而Ml和Mllib的包里面都沒有,這樣是不是意味着要自己開發分布式算法呢?代價有點大誒,感覺寫這個的時間不如多找找有用的特征,然后上LR,這樣效果說不定更好。因為目前還沒有在實際中用過,所以以上只是我的想法。下面把ml和mllib的所有api列出來,這樣看的更清楚。
圖一 pyspark.ml的api
圖二 pyspark.mllib的api
從上面兩張圖可以看到,mllib的功能比ml強大的不是一點半點啊,那ml這個包的存在還有什么意義呢?不懂(如果有了解的歡迎留言)。雖然有這么多疑問,但是我還是跟大家講了,用的數據依然是iris(其實我真心想換個數據集啊 == ,下次換)。上代碼:
1 from pyspark.sql import SQLContext 2 sqlContext = SQLContext(sc) 3 df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('iris.csv') 4 # Displays the content of the DataFrame to stdout 5 df.show() 6 7 8 from pyspark.ml.feature import StringIndexer 9 indexer = StringIndexer(inputCol="Species", outputCol="labelindex") 10 indexed = indexer.fit(df).transform(df) 11 indexed.show() 12 13 from pyspark.sql import Row 14 from pyspark.mllib.linalg import Vectors 15 from pyspark.ml.classification import NaiveBayes 16 17 # Load and parse the data 18 def parseRow(row): 19 return Row(label=row["labelindex"], 20 features=Vectors.dense([row["Sepal.Length"], 21 row["Sepal.Width"], 22 row["Petal.Length"], 23 row["Petal.Width"]])) 24 25 ## Must convert to dataframe after mapping 26 parsedData = indexed.map(parseRow).toDF() 27 labeled = StringIndexer(inputCol="label", outputCol="labelpoint") 28 data = labeled.fit(parsedData).transform(parsedData) 29 data.show() 30 31 ## 訓練模型 32 #Naive Bayes 33 nb = NaiveBayes(smoothing=1.0, modelType="multinomial") 34 model_NB = nb.fit(data) 35 predict_data= model_NB.transform(data) 36 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 37 total = predict_data.count() 38 nb_scores = float(traing_err)/total 39 print traing_err, total, nb_scores 40 #7 150 0.0466666666667 41 42 43 #Logistic Regression########################################################### 44 # Logistic regression. Currently, this class only supports binary classification. 45 from pyspark.ml.classification import LogisticRegression 46 lr = LogisticRegression(maxIter=5, regParam=0.01) 47 model_lr = lr.fit(data) 48 predict_data= model_lr.transform(data) 49 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 50 total = predict_data.count() 51 lr_scores = float(traing_err)/total 52 print traing_err, total, float(traing_err)/total 53 54 55 #Decision Tree 56 from pyspark.ml.classification import DecisionTreeClassifier 57 dt = DecisionTreeClassifier(maxDepth=2,labelCol = 'labelpoint') 58 model_DT= dt.fit(data) 59 predict_data= model_DT.transform(data) 60 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 61 total = predict_data.count() 62 dt_scores = float(traing_err)/total 63 print traing_err, total, float(traing_err)/total 64 65 66 #GBT########################################################### 67 ## GBT. Currently, this class only supports binary classification. 68 from pyspark.ml.classification import GBTClassifier 69 gbt = GBTClassifier(maxIter=5, maxDepth=2,labelCol="labelpoint") 70 model_gbt = gbt.fit(data) 71 predict_data= model_gbt.transform(data) 72 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 73 total = predict_data.count() 74 dt_scores = float(traing_err)/total 75 print traing_err, total, float(traing_err)/total 76 77 78 #Random Forest 79 from pyspark.ml.classification import RandomForestClassifier 80 rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="labelpoint", seed=42) 81 model_rf= rf.fit(data) 82 predict_data= model_rf.transform(data) 83 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 84 total = predict_data.count() 85 dt_scores = float(traing_err)/total 86 print traing_err, total, float(traing_err)/total 87 88 #MultilayerPerceptronClassifier########################################################### 89 # Classifier trainer based on the Multilayer Perceptron. Each layer has sigmoid activation function, output layer has softmax. 90 # Number of inputs has to be equal to the size of feature vectors. Number of outputs has to be equal to the total number of labels. 91 from pyspark.ml.classification import MultilayerPerceptronClassifier 92 mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[150, 5, 150], blockSize=1, seed=11) 93 model_mlp= mlp.fit(parsedData) 94 predict_data= model_mlp.transform(parsedData) 95 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 96 total = predict_data.count() 97 dt_scores = float(traing_err)/total 98 print traing_err, total, float(traing_err)/total
因為數據集和上次講pyspark聚類應用的數據是一樣的,就不一步步的展示了,但是我這個程序里只有NaiveBayes的效果還行,0.94的正確率,其他的像DecisionTree等,效果真心差,可能參數還需要調。先掌握怎么用再來調參,官方文檔里關於參數的解釋也非常詳細,可以看看。下一次講回歸,我決定不只寫pyspark.ml的應用了,因為實在是圖樣圖naive,想弄清楚pyspark的機器學習算法是怎么運行的,跟普通的算法運行有什么區別,優勢等,再寫個pyspark.mllib,看相同的算法在ml和mllib的包里運行效果有什么差異,如果有,是為什么,去看源碼怎么寫的。此外,我真的想弄清楚這貨在實際生產中到底有用嗎,畢竟還是要落實生產的,我之前想,如果python的sklearn能夠在spark上應用就好了,后來在databricks里面找到了一個包好像是准備把sklearn弄到spark上來,當然算法肯定要重新寫,不過還沒有發布,期待發布的時候。此外,我在知乎上也看到過有人提問說“spark上能用skearn嗎?”(大概是這意思,應該很好搜),里面有個回答好像說可以,不過不是直接用(等我找到了把鏈接放出來)。其實換一種想法,不用spark也行,直接用mapreduce編程序,但是mapreduce慢啊(此處不嚴謹,因為並沒有測試過兩者的性能差異,待補充),在我使用spark的短暫時間內,我個人認為spark的優勢在於數據處理快,它不需要像mapreduce一樣把數據切分成這么多塊計算然后再reduce合並,而是直接將數據導入的時候就指定分區,運行機制不同,尤其是spark streaming的功能,還是很快的,所以這是spark的優勢(鄙人拙見,如有錯誤歡迎指出)。而spark的劣勢也比較明顯,因為它對設備的要求太高了(吃內存啊能不高嗎!),這也是它快的原因,你把數據都放在內存里,取的時間比放在磁盤里當然要快,不過實際上在存儲數據或者輸出結果的時候還是會選擇(memory+disk)的方式,保險嘛。前段時間看的alluxio也是占了內存的優勢。恩,說了很多廢話。下周爭取研究的深一點,不然在公司里講都沒人聽 = =。