上篇博客已經初步提到一點線性回歸Linner和KNN的,本篇繼續對機器學習進行深化!!!
-
Python配置 :Py4j模塊、Pyspark模塊
-
Windows 環境變量:Eclipse開發Pyspark
一. 線性回歸
1.什么是回歸?
從大量的函數結果和自變量反推回函數表達式的過程就是回歸。線性回歸是利用數理統計中回歸分析來確定兩種或兩種以上變量間相互依賴的定量關系的一種統計分析方法。
-
一元線性回歸:
只包括一個自變量(x)和一個因變量(y),且二者的關系可用一條直線近似表示,這種回歸分析稱為一元線性回歸分析。公式: y = w0+w1x1
-
多元線性回歸:
如果回歸分析中包括兩個或兩個以上的自變量,且因變量和自變量之間是線性關系,則稱為多元線性回歸分析。公式:Ε(w0+w1x1+...+ wnxn)

代表一系列的真實離散點的y值,
代表一系列得到的直線在對應的x處的y值,一元線性回歸中=。
m代表有m個離散點,前面的 是為了方便求導加上的。
表示的是一系列權重,,….
這個公式也叫做最小二乘法誤差公式。error的值如果是0說明確定的這條直線穿過了所有的點,對於離散分散的一組點,error的值不可能為0。

通過求導的方式,理論上可以確定error的最小值,但是由於是一組數據,無法確定error最小下對應的這一組權重 。
正向的求導不能得到一組權重值,就不能確定線性回歸的公式。那么可以根據數據集來窮舉法反向的試,來確定線性回歸的公式:



2. 梯度下降法調節參數
假設和error的函數關系中,中只有一個維度,那么這個關系其實就是和error的關系,反應圖上就是一維平面關系,如果中有兩個維度和,那么這個反映到圖上就是個三維空間關系,以此類推。

如果中只有一個維度,如下圖:

圖中①,②,③,④,⑤,⑥,⑦都是圖像的切線(斜率),在切點處可以求得對應的error值,如何調節模型得到一組 值可以使得到的error值更小,如圖,沿着①->②->③方向調節和沿着④->⑤->⑥方向調節,可以使error值更小,這種沿着斜率絕對值減少的方向,沿着梯度的負方向每次迭代調節的方法就叫做梯度下降法。
梯度下降是迭代法的一種,可以用於求解最小二乘問題,在求解error函數的最小值時,可以通過梯度下降法來一步步的迭代求解,得到最小化的error函數和模型參數值。
每次按照步長(學習率)來調節值,迭代求出error的最小值,這里的error不可能有一個固定的最小值,只會向着最小值的方向收斂。
判斷模型error誤差值收斂,也就是停止迭代的兩種方式:
-
- 指定一個error值,當迭代處理過程中得到的error值小於指定的值時,停止迭代。
- 設置迭代次數,當迭代次數達到設置的次數時,停止迭代。
迭代周期:從調整參數開始到計算出error值判斷是否大於用戶指定的error是一個迭代周期。
當步長比較小時,迭代次數多,訓練模型的時間比較長,當步長比較大時,有可能迭代的次數也比較多,訓練模型的時間也會相對比較長,需要找到一個合適的步長。
3. 模型過擬合
訓練模型都會將數據集分為兩部分,一般會將0.8比例的數據集作為訓練集,將0.2比例的數據集作為測試集,來訓練模型。模型過擬合就是訓練出來的模型在訓練集上表現很好,但是在測試集上表現較差的一種現象,也就是模型對已有的訓練集數據擬合的非常好(誤差值等於0),對於測試集數據擬合的非常差,模型的泛化能力比較差。
4.如何判斷模型發生過擬合?
訓練出模型后,可以在訓練集中測試下模型的正確率,在測試集中測試下模型的正確率,如果兩者差別很大(測試集正確率小,訓練集正確率大),那么模型就有可能發生了過擬合。
5.Spark Mllib 線性回歸案例
object LinearRegression { def main(args: Array[String]) { // 構建Spark對象 val conf = new SparkConf().setAppName("LinearRegressionWithSGD").setMaster("local") val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) // sc.setLogLevel("WARN") //讀取樣本數據 val data_path1 = "lpsa.data" val data = sc.textFile(data_path1) val examples = data.map { line => val parts = line.split(',') val y = parts(0) val xs = parts(1) LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) }.cache() val train2TestData = examples.randomSplit(Array(0.8, 0.2), 1) /* * 迭代次數 * 訓練一個多元線性回歸模型收斂(停止迭代)條件: * 1、error值小於用戶指定的error值 * 2、達到一定的迭代次數 */ val numIterations = 100 //在每次迭代的過程中 梯度下降算法的下降步長大小 0.1 0.2 0.3 0.4 val stepSize = 1 val miniBatchFraction = 1 val lrs = new LinearRegressionWithSGD() //讓訓練出來的模型有w0參數,就是由截距 lrs.setIntercept(true) //設置步長 lrs.optimizer.setStepSize(stepSize) //設置迭代次數 lrs.optimizer.setNumIterations(numIterations) //每一次下山后,是否計算所有樣本的誤差值,1代表所有樣本,默認就是1.0 lrs.optimizer.setMiniBatchFraction(miniBatchFraction) val model = lrs.run(train2TestData(0)) println(model.weights) println(model.intercept) // 對樣本進行測試 val prediction = model.predict(train2TestData(1).map(_.features)) val predictionAndLabel = prediction.zip(train2TestData(1).map(_.label)) val print_predict = predictionAndLabel.take(20) println("prediction" + "\t" + "label") for (i <- 0 to print_predict.length - 1) { println(print_predict(i)._1 + "\t" + print_predict(i)._2) } // 計算測試集平均誤差 val loss = predictionAndLabel.map { case (p, v) => val err = p - v Math.abs(err) }.reduce(_ + _) val error = loss / train2TestData(1).count println(s"Test RMSE = " + error) // 模型保存 val ModelPath = "model" model.save(sc, ModelPath) // val sameModel = LinearRegressionModel.load(sc, ModelPath) sc.stop() }
二.貝葉斯分類算法
1.朴素貝葉斯
朴素貝葉斯(Naive Bayes ,NB)算法是基於貝葉斯定理與特征條件獨立假設的分類方法,該算法是有監督的學習算法,解決的是分類問題,是將一個未知樣本分到幾個預先已知類別的過程。
朴素貝葉斯的思想就是根據某些個先驗概率計算Y變量屬於某個類別的后驗概率,也就是根據先前事件的有關數據估計未來某個事件發生的概率。

2.拉普拉斯估計
拉普拉斯估計本質上是給頻率表中的每個單詞的計數加上一個較小的數,這樣就保證每一類中每個特征發生的概率非零。通常,拉普拉斯估計中加上的數值為1,這樣就保證了每一個特征至少在數據中出現一次。
3.Python實現
import os import sysecs 編碼轉換模塊 import codecs from sklearn.naive_bayes import MultinomialNB from sklearn.feature_extraction.text import CountVectorizer if __name__ == '__main__': # 讀取文本構建語料庫 corpus = [] labels = [] corpus_test = [] labels_test = [] f = codecs.open("i:/ML/sms_spam.txt", "rb") count = 0 while True: line = f.readline() if count == 0: count = count + 1 continue if line: count = count + 1 line = line.split(",") label = line[0] sentence = line[1] corpus.append(sentence) if "ham"==label: labels.append(0) elif "spam"==label: labels.append(1) if count > 5550: corpus_test.append(sentence) if "ham"==label: labels_test.append(0) elif "spam"==label: labels_test.append(1) else: break #CountVectorizer是將文本向量轉換成稀疏表示數值向量(字符頻率向量) vectorizer 將文檔詞塊化 vectorizer=CountVectorizer() fea_train = vectorizer.fit_transform(corpus) print vectorizer.get_feature_names() print fea_train.toarray() vectorizer2=CountVectorizer(vocabulary=vectorizer.vocabulary_) fea_test = vectorizer2.fit_transform(corpus_test) rint vectorizer2.get_feature_names() print fea_test.toarray() #create the Multinomial Naive Bayesian Classifier #alpha = 1 拉普拉斯估計給每個單詞加1 clf = MultinomialNB(alpha = 1) clf.fit(fea_train,labels) # pred = clf.predict(fea_test); for p in pred: if p == 0: print "正常郵件" else: print "垃圾郵件"
4.cala 貝葉斯案例
object Naive_bayes { def main(args: Array[String]) { //1 構建Spark對象 val conf = new SparkConf().setAppName("Naive_bayes").setMaster("local") val sc = new SparkContext(conf) //讀取樣本數據1 val data = sc.textFile("sample_naive_bayes_data.txt") val parsedData = data.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' 、').map(_.toDouble))) } //樣本數據划分訓練樣本與測試樣本 val splits = parsedData.randomSplit(Array(0.9, 0.1), seed = 11L) val training = splits(0) val test = splits(1) //新建貝葉斯分類模型模型,並訓練 ,lambda 拉普拉斯估計 val model = NaiveBayes.train(training, lambda = 1.0) //對測試樣本進行測試 val predictionAndLabel = test.map(p => (model.predict(p.features), p.label)) val print_predict = predictionAndLabel.take(10020) println("prediction" + "\t" + "label") for (i <- 0 to print_predict.length - 1) { println(print_predict(i)._1 + "\t" + print_predict(i)._2) } val accuracy = 1.0 * predictionAndLabel .filter(x => x._1 == x._2).count() / test.count() println(accuracy) //保存模型 val ModelPath = "naive_bayes_model" model.save(sc, ModelPath) val sameModel = NaiveBayesModel.load(sc, ModelPath) } }
三.KNN最鄰近算法
K最近鄰(k-Nearest Neighbor,KNN)分類算法,是一個理論上比較成熟的方法,也是最簡單的機器學習算法之一,有監督算法。該方法的思路是:如果一個樣本在特征空間中的k個最相似的樣本中的大多數屬於某一個類別,則該樣本也屬於這個類別。KNN算法由你的鄰居來推斷出你的類別,KNN算法就是用距離來衡量樣本之間的相似度。

如果K = 3,綠色圓點的最近的3個鄰居是2個紅色小三角形和1個藍色小正方形,少數從屬於多數,基於統計的方法,判定綠色的這個待分類點屬於紅色的三角形一類。
如果K = 5,綠色圓點的最近的5個鄰居是2個紅色三角形和3個藍色的正方形,還是少數從屬於多數,基於統計的方法,判定綠色的這個待分類點屬於藍色的正方形一類。
K 值的選擇,距離度量和分類決策規則是該算法的三個基本要素。K值的選擇一般低於樣本數據的平方根,一般是不大於20的整數。距離度量常用的有歐式距離,曼哈頓距離,余弦距離等,一般使用歐氏距離,對於文本分類,常用余弦距離。分類決策就是“少數服從多數”的策略。
1.KNN算法步驟:
1) 對於未知類別的數據(對象,點),計算已知類別數據集中的點到該點的距離。
2) 按照距離由小到大排序
3) 選取與當前點距離最小的K個點
4) 確定前K個點所在類別出現的概率
5) 返回當前K個點出現頻率最高的類別作為當前點預測分類
2.KNN算法復雜度:
KNN 分類的計算復雜度和訓練集中的文檔數目成正比,也就是說,如果訓練集中文檔總數為 n,那么 KNN 的分類時間復雜度為O(n)
3.KNN問題:
該算法在分類時有個主要的不足是,當樣本不平衡時,如一個類的樣本容量很大,而其他類樣本容量很小時,有可能導致當輸入一個新樣本時,該樣本的 K 個鄰居中大容量類的樣本占多數。解決:可以采用權值的方法,根據和該樣本距離的遠近,對近鄰進行加權,距離越小的鄰居權值大,權重一般為距離平方的倒數。
4.KNN數據歸一化:
為了防止某一維度的數據的數值大小對距離計算產生影響,保證多個維度的特征是等權重的,最終結果不能被數值的大小影響,應該將各個維度進行數據的歸一化,把數據歸一化到[0,1]區間上。

5.距離度量
- 歐式距離:也稱歐幾里得距離,在一個N維度的空間里,求兩個點的距離,這個距離肯定是一個大於等於零的數字,那么這個距離需要用兩個點在各自維度上的坐標相減,平方后加和再開方。一維,二維,三維的歐式距離計算方法:一維: 二維: 三維:
- 平方歐式距離:就是歐式距離的平方。
- 曼哈頓距離:相比歐式距離簡單的多,曼哈頓距離只要把兩個點坐標的x坐標相減取絕對值,y坐標相減取絕對值,再加和,

。三維,四維以此類推。
- 余弦距離:也叫余弦相似度,是用向量空間中兩個向量夾角的余弦值作為衡量兩個個體間差異的大小的度量。如果兩個向量的方向一致,即夾角接近零,那么這兩個向量就越相近。要確定兩個向量方向是否一致,要用到余弦定理計算向量的夾角。
- 閔可夫斯基距離:閔式距離不是一種距離,而是一組距離的定義,是對多個距離度量公式的概括性表述。定義:兩個n維變量(可以理解為n維數組,就是有n個元素)a()與b()間的閔可夫斯基距離定義為: 其中p是一個變參數,當p=1時,就是曼哈頓距離,當p=2時,就是歐式距離,當p 就是切比雪夫距離。

- 切比雪夫距離:國際象棋中,國王可以直行、橫行、斜行。國王走一步,可以移動到相鄰的8個方格的任意一個。國王從格子 到格子最少需要多少步?這個距離就是切比雪夫距離。
- 谷本距離:同時考慮余弦距離和歐式距離的測度。
- 加權距離測度:可以指定某一維度的權重比例,從而使某個權重的影響力更大。
6.KNN案例
import numpy as np import operator #matplotata 測試數據集的某行, dataSet 訓練數據集 ,labels 訓練數據集的類別,k k的值 def classify(normData,dataSet,labels,k): dataSetSize = dataSet.shape[0] # print 'dataSetSize 長度 =',dataSetSize #當前點到所有點的坐標差值 diffMat = np.tile(normData, (dataSetSize,1)) - dataSet #對每個坐標差值平方 sqDiffMat = diffMat ** 2 #對於二維數組 sqDiffMat.sum(axis=0)指定對數組b對每列求和,sqDiffMat.sum(axis=1)是對每行求和 sqDistances = sqDiffMat.sum(axis = 1) #歐式距離 最后開方 distance = sqDistances ** 0.5 #argsort() 將x中的元素從小到大排序,提取其對應的index 索引,返回數組 sortedDistIndicies = distance.argsort() # classCount保存的K是魅力類型 V:在K個近鄰中某一個類型的次數 classCount = {} for i in range(k): #獲取對應的下標的類別 voteLabel = labels[sortedDistIndicies[i]] #給相同的類別次數計數 classCount[voteLabel] = classCount.get(voteLabel,0) + 1 sortedClassCount = sorted(classCount.iteritems(),key=operator.itemgetter(1),reverse=True) return sortedClassCount[0][0] def file2matrix(filename): fr = open(filename) # readlines:是一次性將這個文本的內容全部加載到內存中(列表) arrayOflines = fr.readlines() numOfLines = len(arrayOflines) # print "numOfLines = " , numOfLines #numpy.zeros 創建給定類型的矩陣 numOfLines 行 ,3列 returnMat = np.zeros((numOfLines,3)) classLabelVector = [] index = 0 for line in arrayOflines: #去掉一行的頭尾空格 line = line.strip() listFromline = line.split('\t') returnMat[index,:] = listFromline[0:3] classLabelVector.append(int(listFromline[-1])) index += 1 return returnMat,classLabelVector ''' 將訓練集中的數據進行歸一化 歸一化的目的: 訓練集中飛行公里數這一維度中的值是非常大,那么這個緯度值對於最終的計算結果(兩點的距離)影響是非常大, 遠遠超過其他的兩個維度對於最終結果的影響 實際約會姑娘認為這三個特征是同等重要的 下面使用最大最小值歸一化的方式將訓練集中的數據進行歸一化 ''' #將數據歸一化 def autoNorm(dataSet): # dataSet.min(0) 代表的是統計這個矩陣中每一列的最小值 返回值是一個矩陣1*3矩陣 minVals = dataSet.min(0) maxVals = dataSet.max(0) ranges = maxVals - minVals #dataSet.shape[0] 計算行數, shape[1] 計算列數 m = dataSet.shape[0] # normDataSet存儲歸一化后的數據 # normDataSet = np.zeros(np.shape(dataSet)) #np.tile(minVals,(m,1)) 在行的方向上重復 minVals m次 即復制m行,在列的方向上重復munVals 1次,即復制1列 normDataSet = dataSet - np.tile(minVals,(m,1)) normDataSet = normDataSet / np.tile(ranges,(m,1)) return normDataSet,ranges,minVals def datingClassTest(): hoRatio = 0.1 datingDataMat,datingLabels = file2matrix('datingTestSet2.txt') #將數據歸一化 normMat,ranges,minVals = autoNorm(datingDataMat) # m 為行數 = 1000 m = normMat.shape[0] # print 'm =%d 行'%m #取出100行數據測試 numTestVecs = int(m*hoRatio) errorCount = 0.0 for i in range(numTestVecs): #normMat[i,:] 取出數據的第i行,normMat[numTestVecs:m,:]取出數據中的100行到1000行 作為訓練集,datingLabels[numTestVecs:m] 取出數據中100行到1000行的類別,4是K classifierResult = classify(normMat[i,:],normMat[numTestVecs:m,:],datingLabels[numTestVecs:m],4) print '模型預測值: %d ,真實值 : %d' %(classifierResult,datingLabels[i]) if (classifierResult != datingLabels[i]): errorCount += 1.0 errorRate = errorCount / float(numTestVecs) print '正確率 : %f' %(1-errorRate) return 1-errorRate def classifyperson(): resultList = ['沒感覺', '看起來還行','極具魅力'] input_man= [30000,3,0.1] datingDataMat,datingLabels = file2matrix('datingTestSet2.txt') normMat,ranges,minVals = autoNorm(datingDataMat) result = classify((input_man - minVals)/ranges,normMat,datingLabels,3) print '你即將約會的人是:' , resultList[result-1] if __name__ == '__main__': acc = datingClassTest() if(acc > 0.9): classifyperson()
四.K-means聚類算法
機器學習中有兩類的大問題,一個是分類,一個是聚類。分類是監督學習,原始數據有標簽,可以根據原始數據建立模型,確定新來的數據屬於哪一類。聚類是一種無監督學習,聚類是指事先沒有“標簽”,在數據中發現數據對象之間的關系,將數據進行分組,一個分組也叫做“一個簇”, 組內的相似性越大,組間的差別越大,則聚類效果越好,也就是簇內對象有較高的相似度,簇之間的對象相似度比較低,則聚類效果越好。K-means就是一個聚類算法。
K-means算法的思想就是對空間K個點為中心進行聚類,對靠近他們的對象進行歸類,通過迭代的方法,逐次更新聚類中心(質心)的值,直到得到最好的聚類結果。K-means過程:
-
- 1. 首先選擇k個類別的中心點
- 2. 對任意一個樣本,求其到各類中心的距離,將該樣本歸到距離最短的中心所在的類
- 3. 聚好類后,重新計算每個聚類的中心點位置
- 4. 重復2,3步驟迭代,直到k個類中心點的位置不變,或者達到一定的迭代次數,則迭代結束,否則繼續迭代
1. K-means++算法
K-means算法假設聚類為3類,開始選取每個類的中心點的時候是隨機選取,有可能三個點選取的位置非常近,導致后面每次聚類重新求各類中心的迭代次數增加。K-means++在選取第一個聚類中心點的時候也是隨機選取,當選取第二個中心點的時候,距離當前已經選擇的聚類中心點的距離越遠的點會有更高的概率被選中,假設已經選取n個點,當選取第n+1個聚類中心時,距離當前n個聚類中心點越遠的點越會被選中,這種思想是聚類中心的點離的越遠越好,這樣就大大降低的找到最終聚類各個中心點的迭代次數,提高了效率。
2.K-means找中心點和數據點分類例子
import numpy as np將每行數據放入一個數組內列表,返回一個二維列表 def loadDataSet(fileName): 建空列表 dataMat = [] fr = open(fileName) for line in fr.readlines(): #按照制表符切割每行,返回一個列表list curLine = line.strip().split('\t')切分后的每個列表中的元素,以float形式返回,map()內置函數,返回一個list fltLine = map(float,curLine) dataMat.append(fltLine) return dataMat 離 def distEclud(vecA, vecB): return np.sqrt(np.sum(np.power(vecA - vecB, 2))) 3個中心點的位置坐標,返回一個3*2的矩陣 def randCent(dataSet, k):列數,2列 n = np.shape(dataSet)[1] ''' centroids是一個3*2的矩陣,用於存儲三個中心點的坐標 ''' centroids = np.mat(np.zeros((k,n))) for j in range(n): #統計每一列的最小值 minJ = min(dataSet[:,j]) #每列最大值與最小值的差值 rangeJ = float(max(dataSet[:,j]) - minJ) #random.rand(k,1) 產生k*1的數組,里面的數據是0~1的浮點型。 array2 = minJ + rangeJ * np.random.rand(k,1) #轉換成k*1矩陣 賦值給centroids centroids[:,j] = np.mat(array2) return centroids def kMeans(dataSet, k, distMeas=distEclud, createCent=randCent): #計算矩陣所有行數 80 m = np.shape(dataSet)[0] y.mat 將二維數組轉換成矩陣 clusterAssment = np.mat(np.zeros((m,2))) createCent找到K個隨機中心點坐標 centroids = createCent(dataSet, k) # print centroids clusterChanged = True while clusterChanged: clusterChanged = False #遍歷80個數據到每個中心點的距離 for i in range(m): #np.inf float的最大值,無窮大 minDist = np.inf #當前點屬於的類別號 minIndex = -1 #每個樣本點到三個中心點的距離 for j in range(k): 返回兩點距離的值 distJI = distMeas(centroids[j,:],dataSet[i,:]) if distJI < minDist: #當前最小距離的值 minDist = distJI #當前最小值屬於哪個聚類 minIndex = j #有與上次迭代計算的當前點的類別不相同的點 if clusterAssment[i,0] != minIndex: clusterChanged = True #將當前點的類別號和最小距離 賦值給clusterAssment的一行 clusterAssment[i,:] = minIndex,minDist for cent in range(k): ent[:,0].A==censInClust 取出的是對應是當前遍歷cent類別的 所有行數據組成的一個矩陣 ptsInClust = dataSet[np.nonzero(clusterAssment[:,0].A==cent)[0]]心點坐標的位置 centroids[cent,:] = np.mean(ptsInClust, axis=0) #返回 【 當前三個中心點的坐標】 【每個點的類別號,和到當前中心點的最小距離】 return centroids, clusterAssment if __name__ == '__main__': 80*2的矩陣 dataMat = np.mat(loadDataSet('./testSet.txt')) k=3 centroids, clusterAssment = kMeans(dataMat, k, distMeas=distEclud, createCent=randCent) print centroids print clusterAssment
3.使用matplotlib檢驗分類效果
import numpy as np import matplotlib.pyplot as plt from sklearn.cluster import KMeans from sklearn.datasets import make_blobs #建立12*12新的圖像 plt.figure(figsize=(12, 12)) n_samples = 1500 random_state = 170 ''' make_blobs函數是為聚類產生數據集 , 產生一個數據集和相應的標簽 n_samples:表示數據樣本點個數,默認值100 n_features:表示數據的維度,特征,默認值是2 centers:產生數據的中心點,默認值3個 shuffle :洗亂,默認值是True random_state:官網解釋是隨機生成器的種子 ''' #x返回的是向量化的數據點,y返回的是對應數據的類別號 x,y = make_blobs(n_samples=n_samples, random_state=random_state) #使用KMeans去聚類,返回聚好的類別集合,聚合成幾類 y_pred = KMeans(n_clusters=3, random_state=random_state).fit_predict(x) #subplot 繪制多個子圖,221 等價於2,2,1 表示兩行兩列的子圖中的第一個 plt.subplot(221) #scatter 繪制散點圖 plt.scatter(x[:, 0], x[:, 1], c=y_pred) plt.title("kmeans01") transformation = [[ 0.60834549, -0.63667341], [-0.40887718, 0.85253229]] #numpy.dot 矩陣相乘 X_aniso = np.dot(x, transformation) y_pred = KMeans(n_clusters=3, random_state=random_state).fit_predict(X_aniso) plt.subplot(222) plt.scatter(X_aniso[:, 0], X_aniso[:, 1], c=y_pred) plt.title("kmeans02") #vstack 是合並矩陣,將y=0類別的取出500行,y=1類別的取出100行,y=2類別的取出10行 X_filtered = np.vstack((x[y == 0][:500], x[y == 1][:100], x[y == 2][:10])) y_pred = KMeans(n_clusters=3, random_state=random_state).fit_predict(X_filtered) plt.subplot(223) plt.scatter(X_filtered[:, 0], X_filtered[:, 1], c=y_pred) plt.title("kmeans03") dataMat = [] fr = open("testSet.txt","r") for line in fr.readlines(): if line.strip() <> "": curLine = line.strip().split('\t') fltLine = map(float,curLine) dataMat.append(fltLine) dataMat = np.array(dataMat) y_pred = KMeans(n_clusters=4, random_state=random_state).fit_predict(dataMat) plt.subplot(224) plt.scatter(dataMat[:,0], dataMat[:, 1], c=y_pred) plt.title("kmeans04") plt.savefig("./kmeans.png") plt.show()
4.使用SparkMllib訓練K-means模型
object KMeans { def main(args: Array[String]) { //1 構建Spark對象 val conf = new SparkConf().setAppName("KMeans").setMaster("local") val sc = new SparkContext(conf) // 讀取樣本數據1,格式為LIBSVM format val data = sc.textFile("kmeans_data.txt") val parsedData = data.map(s => Vectors.dense(s.split(' ') .map(_.toDouble))).cache() val numClusters = 4 val numIterations = 100 val model = new KMeans(). //設置聚類的類數 setK(numClusters). //設置找中心點最大的迭代次數 setMaxIterations(numIterations). run(parsedData) //四個中心點的坐標 val centers = model.clusterCenters val k = model.k centers.foreach(println) println(k) //保存模型 // model.save(sc, "./Kmeans_model") //加載模型 val sameModel = KMeansModel.load(sc, "./Kmeans_model") println(sameModel.predict(Vectors.dense(1,1,1))) val sqlContext = new SQLContext(sc) sqlContext.read.parquet("./Kmeans_model/data").show() } }
給Kmeans指定中心點坐標:
object KMeans2 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("KMeans2").setMaster("local") val sc = new SparkContext(conf) val rdd = sc.parallelize(List( Vectors.dense(Array(-0.1, 0.0, 0.0)), Vectors.dense(Array(9.0, 9.0, 9.0)), Vectors.dense(Array(3.0, 2.0, 1.0)))) //指定文件 kmeans_data.txt 中的六個點為中心點坐標。 val centroids: Array[Vector] = sc.textFile("kmeans_data.txt") .map(_.split(" ").map(_.toDouble)) .map(Vectors.dense(_)) .collect() val model = new KMeansModel(clusterCenters=centroids) println("聚類個數 = "+model.k) //模型中心點 model.clusterCenters.foreach { println } //預測指定的三條數據 val result = model.predict(rdd) result.collect().foreach(println(_)) } }
---------總結------------
