kmeans聚類相信大家都已經很熟悉了。在Python里我們用kmeans通常調用Sklearn包(當然自己寫也很簡單)。那么在Spark里能不能也直接使用sklean包呢?目前來說直接使用有點困難,不過我看到spark-packages里已經有了,但還沒有發布。不過沒關系,PySpark里有ml包,除了ml包,還可以使用MLlib,這個在后期會寫,也很方便。
首先來看一下Spark自帶的例子:
1 from pyspark.mllib.linalg import Vectors 2 from pyspark.ml.clustering import KMeans 3 from pyspark.sql import SQLContext 4 from pyspark.mllib.linalg import Vectors 5 #導入數據
6 data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),(Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)] 7 df = sqlContext.createDataFrame(data, ["features"]) 8 #kmeans模型
9 kmeans = KMeans(k=2, seed=1) 10 model = kmeans.fit(df) 11 #簇心數量
12 centers = model.clusterCenters() 13 len(centers) 14 #2
15 #訓練模型
16 transformed = model.transform(df).select("features", "prediction") 17 rows = transformed.collect() 18 rows[0].prediction == rows[1].prediction 19 #True
20 rows[2].prediction == rows[3].prediction 21 # True
這個例子很簡單,導入的數據是四個稠密向量(可以自己在二維向量里畫一下),設定了兩個簇心,最后驗證預測的結果是否正確,顯示為True,證明預測正確。算法中具體的參數可以參考API中的說明。然而實際生產中我們的數據集不可能以這樣的方式一條條寫進去,一般是讀取文件,關於怎么讀取文件,可以具體看我的這篇博文。這里我們采用iris數據集(不要問我為什么又是iris數據集,因為真的太方便了)來給大家講解一下。
我的數據集是csv格式的,而Spark又不能直接讀取csv格式的數據,這里我們有兩個方式,一是我提到的這篇博文里有寫怎么讀取csv文件,二是安裝spark-csv包(在這里下載),github地址在這里。按照步驟安裝可以了。這里友情提示一下大家,github的安裝方法是:
$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0
如果報錯了,可以把 --packages 換成 --jars,如果還是不行,在加一個 common-csv.jars包放到lib下面就可以了。我因為這個耽誤了不少時間,不過具體問題也得具體分析。
安裝好這個包以后,就可以讀取數據了
1 from pyspark.sql import SQLContext 2 sqlContext = SQLContext(sc) 3 data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('iris.csv') 4 data.show()
讀取數據以后,我們來看一下數據集:
1 +------+------------+-----------+------------+-----------+-------+ 2 |row.id|Sepal.Length|Sepal.Width|Petal.Length|Petal.Width|Species| 3 +------+------------+-----------+------------+-----------+-------+ 4 | 1| 5.1| 3.5| 1.4| 0.2| 0| 5 | 2| 4.9| 3.0| 1.4| 0.2| 0| 6 | 3| 4.7| 3.2| 1.3| 0.2| 0| 7 | 4| 4.6| 3.1| 1.5| 0.2| 0| 8 | 5| 5.0| 3.6| 1.4| 0.2| 0| 9 | 6| 5.4| 3.9| 1.7| 0.4| 0| 10 | 7| 4.6| 3.4| 1.4| 0.3| 0| 11 | 8| 5.0| 3.4| 1.5| 0.2| 0| 12 | 9| 4.4| 2.9| 1.4| 0.2| 0| 13 | 10| 4.9| 3.1| 1.5| 0.1| 0| 14 | 11| 5.4| 3.7| 1.5| 0.2| 0| 15 | 12| 4.8| 3.4| 1.6| 0.2| 0| 16 | 13| 4.8| 3.0| 1.4| 0.1| 0| 17 | 14| 4.3| 3.0| 1.1| 0.1| 0| 18 | 15| 5.8| 4.0| 1.2| 0.2| 0| 19 | 16| 5.7| 4.4| 1.5| 0.4| 0| 20 | 17| 5.4| 3.9| 1.3| 0.4| 0| 21 | 18| 5.1| 3.5| 1.4| 0.3| 0| 22 | 19| 5.7| 3.8| 1.7| 0.3| 0| 23 | 20| 5.1| 3.8| 1.5| 0.3| 0| 24 +------+------------+-----------+------------+-----------+-------+
25 only showing top 20 rows
第二步:提取特征
我們在上一步導入的數據中label是String類型的,但在Spark中要變成數值型才能計算,不然就會報錯。可以利用StringIndexer功能將字符串轉化為數值型
1 from pyspark.ml.feature import StringIndexer 2
3 feature = StringIndexer(inputCol="Species", outputCol="targetlabel") 4 target = feature.fit(data).transform(data) 5 target.show()
targetlabel這一列就是Species轉化成數值型的結果
1 +------+------------+-----------+------------+-----------+-------+-----------+
2 |row.id|Sepal.Length|Sepal.Width|Petal.Length|Petal.Width|Species|targetlabel|
3 +------+------------+-----------+------------+-----------+-------+-----------+
4 | 1| 5.1| 3.5| 1.4| 0.2| 0| 0.0|
5 | 2| 4.9| 3.0| 1.4| 0.2| 0| 0.0|
6 | 3| 4.7| 3.2| 1.3| 0.2| 0| 0.0|
7 | 4| 4.6| 3.1| 1.5| 0.2| 0| 0.0|
8 | 5| 5.0| 3.6| 1.4| 0.2| 0| 0.0|
9 | 6| 5.4| 3.9| 1.7| 0.4| 0| 0.0|
10 | 7| 4.6| 3.4| 1.4| 0.3| 0| 0.0|
11 | 8| 5.0| 3.4| 1.5| 0.2| 0| 0.0|
12 | 9| 4.4| 2.9| 1.4| 0.2| 0| 0.0|
13 | 10| 4.9| 3.1| 1.5| 0.1| 0| 0.0|
14 | 11| 5.4| 3.7| 1.5| 0.2| 0| 0.0|
15 | 12| 4.8| 3.4| 1.6| 0.2| 0| 0.0|
16 | 13| 4.8| 3.0| 1.4| 0.1| 0| 0.0|
17 | 14| 4.3| 3.0| 1.1| 0.1| 0| 0.0|
18 | 15| 5.8| 4.0| 1.2| 0.2| 0| 0.0|
19 | 16| 5.7| 4.4| 1.5| 0.4| 0| 0.0|
20 | 17| 5.4| 3.9| 1.3| 0.4| 0| 0.0|
21 | 18| 5.1| 3.5| 1.4| 0.3| 0| 0.0|
22 | 19| 5.7| 3.8| 1.7| 0.3| 0| 0.0|
23 | 20| 5.1| 3.8| 1.5| 0.3| 0| 0.0|
24 +------+------------+-----------+------------+-----------+-------+-----------+
25 only showing top 20 rows
最后一步:模型訓練和驗證
1 from pyspark.sql import Row 2 from pyspark.ml.clustering import KMeans 3 from pyspark.mllib.linalg import Vectors 4 5 #把數據格式轉化成稠密向量 6 def transData(row): 7 return Row(label=row["targetlabel"], 8 features=Vectors.dense([row["Sepal.Length"], 9 row["Sepal.Width"], 10 row["Petal.Length"], 11 row["Petal.Width"]])) 12 13 #轉化成Dataframe格式 14 transformed = target.map(transData).toDF() 15 kmeans = KMeans(k=3) 16 model = kmeans.fit(transformed) 17 18 predict_data = model.transform(transformed) 19 20 train_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 21 total = predict_data.count() 22 print traing_err, total, float(train_err)/total
到這一步就結束了。總結一下,用pyspark做機器學習時,數據格式要轉成需要的格式,不然很容易出錯。下周寫pyspark在機器學習中如何做分類。
