Spark作為一種開源集群計算環境,具有分布式的快速數據處理能力。而Spark中的Mllib定義了各種各樣用於機器學習的數據結構以及算法。Python具有Spark的API。需要注意的是,Spark中,所有數據的處理都是基於RDD的。
首先舉一個聚類方面的詳細應用例子Kmeans:
下面代碼是一些基本步驟,包括外部數據,RDD預處理,訓練模型,預測。
#coding:utf-8 from numpy import array from math import sqrt from pyspark import SparkContext from pyspark.mllib.clustering import KMeans, KMeansModel if __name__ == "__main__": sc = SparkContext(appName="KMeansExample",master='local') # SparkContext # 讀取並處理數據 data = sc.textFile("./kmeans_data.txt") print data.collect() parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) # 訓練數據 print parsedData.collect() clusters = KMeans.train(parsedData, k=2, maxIterations=10, runs=10, initializationMode="random") #求方差之和 def error(point): center = clusters.centers[clusters.predict(point)] return sqrt(sum([x**2 for x in (point - center)])) WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y) print("Within Set Sum of Squared Error = " + str(WSSSE)) #聚類結果 def sort(point): return clusters.predict(point) clusters_result = parsedData.map(sort) # Save and load model # $example off$ print '聚類結果:' print clusters_result.collect() sc.stop()
可以看到在利用Spark進行機器學習時,我調用了一個外部的開源包numpy,並利用了數組作為數據結構。而在Mllib中其實已經定義了各種用於機器學習的數據結構,下面簡單介紹兩種在分類和回歸分析中可以用到的DS。
稀疏向量(SparseVector):稀疏向量是指向量元素中有許多值是0的向量。
其初始化與簡單操作如下:
# coding:utf-8 from pyspark.mllib.linalg import * v0 = SparseVector(4, [1, 2], [2, 3.0]) # 稀疏向量,第一個參數為維度,第二個參數是非0維度的下標的集合,第三個參數是非0維度的值的集合 v1 = SparseVector(4,{1: 3, 2: 4}) # 第一個參數是維度,第二個參數是下標和維度組成的字典 print v0.dot(v1) # 計算點積 print v0.size # 向量維度 print v0.norm(0) # 返回維度0的值 print v0.toArray() # 轉化為array print v0.squared_distance(v1) # 歐式距離
spark中的稀疏向量可以利用list或者dict進行初始化。
向量標簽(Labeled point):向量標簽就是在向量和標簽的組合,分類和回歸中,標簽可以作為分類中的類別,也可以作為回歸中的實際值。
from pyspark.mllib.regression import LabeledPoint data = [ LabeledPoint(1.0, [1.0, 1.0]), LabeledPoint(4.0, [1.0, 3.0]), LabeledPoint(8.0, [2.0, 3.0]), LabeledPoint(10.0, [3.0, 4.0])] print data[0].features print data[0].label
下面是mllib中用於回歸分析的一些基本實現(線性回歸,嶺回歸):
# coding:UTF-8 from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.regression import LinearRegressionWithSGD from pyspark.context import SparkContext # ----------------線性回歸-------------- import numpy as np sc = SparkContext(master='local',appName='Regression') data = [ LabeledPoint(1.0, [1.0, 1.0]), LabeledPoint(2.0, [1.0, 1.4]), LabeledPoint(4.0, [2.0, 1.9]), LabeledPoint(6.0, [3.0, 4.0])] # 訓練集 lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=100, initialWeights=np.array([1.0,1.0])) print lrm.predict(np.array([2.0,1.0])) # 利用訓練出的回歸模型進行預測 import os, tempfile from pyspark.mllib.regression import LinearRegressionModel from pyspark.mllib.linalg import SparseVector path = tempfile.mkdtemp() lrm.save(sc, path) # 將模型保存至外存 sameModel = LinearRegressionModel.load(sc, path) # 讀取模型 print sameModel.predict(SparseVector(2, {0: 100.0, 1: 150})) # 利用稀疏向量作為數據結構,返回單個預測值 test_set = [] for i in range(100): for j in range(100): test_set.append(SparseVector(2, {0: i,1: j})) print sameModel.predict(sc.parallelize(test_set)).collect() # 預測多值,返回一個RDD數據集 print sameModel.weights # 返回參數 # -----------------嶺回歸------------------ from pyspark.mllib.regression import RidgeRegressionWithSGD data = [ LabeledPoint(1.0, [1.0, 1.0]), LabeledPoint(4.0, [1.0, 3.0]), LabeledPoint(8.0, [2.0, 3.0]), LabeledPoint(10.0, [3.0, 4.0])] train_set = sc.parallelize(data) rrm = RidgeRegressionWithSGD.train(train_set, iterations=100, initialWeights=np.array([1.0,1.0])) test_set = [] for i in range(100): for j in range(100): test_set.append(np.array([i, j])) print rrm.predict(sc.parallelize(test_set)).collect() print rrm.weights
上述代碼只是讓大家弄懂一下簡單的操作,對於數據的預處理沒有在RDD的基礎上做。
下面是一些分類算法的基本實現:
# coding:utf-8 from pyspark import SparkContext from pyspark.mllib.regression import LabeledPoint print '-------邏輯回歸-------' from pyspark.mllib.classification import LogisticRegressionWithSGD sc = SparkContext(appName="LRWSGD", master='local') dataset = [] for i in range(100): for j in range(100): dataset.append([i,j]) dataset = sc.parallelize(dataset) # 並行化數據,轉化為RDD data =[LabeledPoint(0.0, [0.0, 100.0]),LabeledPoint(1.0, [100.0, 0.0]),] lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10) # 第二個參數是迭代次數 print lrm.predict(dataset).collect() lrm.clearThreshold() print lrm.predict([0.0, 1.0]) # ---------------------------------------------------------- from pyspark.mllib.linalg import SparseVector from numpy import array sparse_data = [ LabeledPoint(0.0, SparseVector(2, {0: 0.0, 1: 0.0})), LabeledPoint(1.0, SparseVector(2, {1: 1.0})), LabeledPoint(0.0, SparseVector(2, {0: 1.0})), LabeledPoint(1.0, SparseVector(2, {1: 2.0})) ] train = sc.parallelize(sparse_data) lrm = LogisticRegressionWithSGD.train(train, iterations=10) print lrm.predict(array([0.0, 1.0])) # 對單個數組進行預測 print lrm.predict(SparseVector(2, {1: 1.0})) # 對單個稀疏向量進行預測 print '------svm-------' from pyspark.mllib.classification import SVMWithSGD svm = SVMWithSGD.train(train,iterations=10) print svm.predict(SparseVector(2, {1: 1.0})) print '------bayes------' from pyspark.mllib.classification import NaiveBayes nb = NaiveBayes.train(train) print nb.predict(SparseVector(2, {1: 1.0}))
版權都是我所有的,(*^__^*) 哈哈哈~