利用Spark-mllab進行聚類,分類,回歸分析的代碼實現(python)


      Spark作為一種開源集群計算環境,具有分布式的快速數據處理能力。而Spark中的Mllib定義了各種各樣用於機器學習的數據結構以及算法。Python具有Spark的API。需要注意的是,Spark中,所有數據的處理都是基於RDD的。

首先舉一個聚類方面的詳細應用例子Kmeans:

   下面代碼是一些基本步驟,包括外部數據,RDD預處理,訓練模型,預測。

#coding:utf-8
from numpy import array
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel


if __name__ == "__main__":
    sc = SparkContext(appName="KMeansExample",master='local')  # SparkContext


    # 讀取並處理數據
    data = sc.textFile("./kmeans_data.txt")
    print data.collect()
    parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

    # 訓練數據
    print parsedData.collect()
    clusters = KMeans.train(parsedData, k=2, maxIterations=10,
                            runs=10, initializationMode="random")

    #求方差之和
    def error(point):
        center = clusters.centers[clusters.predict(point)]
        return sqrt(sum([x**2 for x in (point - center)]))
    WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)

    print("Within Set Sum of Squared Error = " + str(WSSSE))

    #聚類結果
    def sort(point):
        return clusters.predict(point)
    clusters_result = parsedData.map(sort)
    # Save and load model
    # $example off$
    print '聚類結果:'
    print clusters_result.collect()
    sc.stop()

 

可以看到在利用Spark進行機器學習時,我調用了一個外部的開源包numpy,並利用了數組作為數據結構。而在Mllib中其實已經定義了各種用於機器學習的數據結構,下面簡單介紹兩種在分類和回歸分析中可以用到的DS。

 

稀疏向量(SparseVector):稀疏向量是指向量元素中有許多值是0的向量。

其初始化與簡單操作如下:  

# coding:utf-8
from pyspark.mllib.linalg import *
v0 = SparseVector(4, [1, 2], [2, 3.0])  # 稀疏向量,第一個參數為維度,第二個參數是非0維度的下標的集合,第三個參數是非0維度的值的集合
v1 = SparseVector(4,{1: 3, 2: 4}) # 第一個參數是維度,第二個參數是下標和維度組成的字典
print v0.dot(v1)  # 計算點積
print v0.size  # 向量維度
print v0.norm(0)  # 返回維度0的值
print v0.toArray()  # 轉化為array
print v0.squared_distance(v1)  # 歐式距離

  spark中的稀疏向量可以利用list或者dict進行初始化。

向量標簽(Labeled  point):向量標簽就是在向量和標簽的組合,分類和回歸中,標簽可以作為分類中的類別,也可以作為回歸中的實際值。

from pyspark.mllib.regression import LabeledPoint
data = [
  LabeledPoint(1.0, [1.0, 1.0]),
  LabeledPoint(4.0, [1.0, 3.0]),
  LabeledPoint(8.0, [2.0, 3.0]),
  LabeledPoint(10.0, [3.0, 4.0])]
print data[0].features
print data[0].label

 

下面是mllib中用於回歸分析的一些基本實現(線性回歸,嶺回歸):

# coding:UTF-8
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.context import SparkContext

# ----------------線性回歸--------------

import numpy as np
sc = SparkContext(master='local',appName='Regression')
data = [
  LabeledPoint(1.0, [1.0, 1.0]),
  LabeledPoint(2.0, [1.0, 1.4]),
  LabeledPoint(4.0, [2.0, 1.9]),
  LabeledPoint(6.0, [3.0, 4.0])]  # 訓練集
lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=100, initialWeights=np.array([1.0,1.0]))
print lrm.predict(np.array([2.0,1.0]))  # 利用訓練出的回歸模型進行預測

import os, tempfile
from pyspark.mllib.regression import LinearRegressionModel
from pyspark.mllib.linalg import SparseVector

path = tempfile.mkdtemp()
lrm.save(sc, path)  # 將模型保存至外存
sameModel = LinearRegressionModel.load(sc, path)  # 讀取模型
print sameModel.predict(SparseVector(2, {0: 100.0, 1: 150}))  # 利用稀疏向量作為數據結構,返回單個預測值
test_set = []
for i in range(100):
  for j in range(100):
    test_set.append(SparseVector(2, {0: i,1: j}))
print sameModel.predict(sc.parallelize(test_set)).collect()  # 預測多值,返回一個RDD數據集
print sameModel.weights  # 返回參數


# -----------------嶺回歸------------------

from pyspark.mllib.regression import RidgeRegressionWithSGD
data = [
  LabeledPoint(1.0, [1.0, 1.0]),
  LabeledPoint(4.0, [1.0, 3.0]),
  LabeledPoint(8.0, [2.0, 3.0]),
  LabeledPoint(10.0, [3.0, 4.0])]
train_set = sc.parallelize(data)
rrm = RidgeRegressionWithSGD.train(train_set, iterations=100, initialWeights=np.array([1.0,1.0]))
test_set = []
for i in range(100):
  for j in range(100):
    test_set.append(np.array([i, j]))
print rrm.predict(sc.parallelize(test_set)).collect()
print rrm.weights

 上述代碼只是讓大家弄懂一下簡單的操作,對於數據的預處理沒有在RDD的基礎上做。

 

下面是一些分類算法的基本實現:

# coding:utf-8
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint

print '-------邏輯回歸-------'
from pyspark.mllib.classification import LogisticRegressionWithSGD
sc = SparkContext(appName="LRWSGD", master='local')
dataset = []
for i in range(100):
    for j in range(100):
        dataset.append([i,j])
dataset = sc.parallelize(dataset)  # 並行化數據,轉化為RDD

data =[LabeledPoint(0.0, [0.0, 100.0]),LabeledPoint(1.0, [100.0, 0.0]),]

lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10)  # 第二個參數是迭代次數
print lrm.predict(dataset).collect()

lrm.clearThreshold()
print lrm.predict([0.0, 1.0])
# ----------------------------------------------------------
from pyspark.mllib.linalg import SparseVector
from numpy import array
sparse_data = [
    LabeledPoint(0.0, SparseVector(2, {0: 0.0, 1: 0.0})),
    LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
    LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
    LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
]
train = sc.parallelize(sparse_data)
lrm = LogisticRegressionWithSGD.train(train, iterations=10)
print lrm.predict(array([0.0, 1.0]))  # 對單個數組進行預測
print lrm.predict(SparseVector(2, {1: 1.0}))  # 對單個稀疏向量進行預測

print '------svm-------'

from pyspark.mllib.classification import SVMWithSGD
svm = SVMWithSGD.train(train,iterations=10)
print svm.predict(SparseVector(2, {1: 1.0}))

print '------bayes------'
from pyspark.mllib.classification import NaiveBayes
nb = NaiveBayes.train(train)
print nb.predict(SparseVector(2, {1: 1.0}))

 

版權都是我所有的,(*^__^*) 哈哈哈~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM