Spark Sreaming與MLlib機器學習
本來這篇是准備5.15更的,但是上周一直在忙簽證和工作的事,沒時間就推遲了,現在終於有時間來寫寫Learning Spark最后一部分內容了。
第10-11 章主要講的是Spark Streaming 和MLlib方面的內容。我們知道Spark在離線處理數據上的性能很好,那么它在實時數據上的表現怎么樣呢?在實際生產中,我們經常需要即使處理收到的數據,比如實時機器學習模型的應用,自動異常的檢測,實時追蹤頁面訪問統計的應用等。Spark Streaming可以很好的解決上述類似的問題。
了解Spark Streaming ,只需要掌握以下幾點即可:
- DStream
- 概念:離散化流(discretized stream),是隨時間推移的數據。由每個時間區間的RDD組成的序列。DStream可以從Flume、Kafka或者HDFS等多個輸入源創建。
- 操作:轉換和輸出,支持RDD相關的操作,增加了“滑動窗口”等於時間相關的操作。
下面以一張圖來說明Spark Streaming的工作流程:
從上圖中也可以看到,Spark Streaming把流式計算當做一系列連續的小規模批處理來對待。它從各種輸入源讀取數據,並把數據分組為小的批次,新的批次按均勻的時間間隔創建出來。在每個時間區間開始的時候,一個新的批次就創建出來,在該區間內收到的數據都會被添加到這個批次中去。在時間區間結束時,批次停止增長。
轉化操作
- 無狀態轉化操作:把簡單的RDDtransformation分別應用到每個批次上,每個批次的處理不依賴於之前的批次的數據。包括map()、filter()、reduceBykey()等。
- 有狀態轉化操作:需要使用之前批次的數據或者中間結果來計算當前批次的數據。包括基於滑動窗口的轉化操作,和追蹤狀態變化的轉化操作(updateStateByKey())
無狀態轉化操作
有狀態轉化操作
Windows機制(一圖盛千言)
上圖應該很容易看懂,下面舉個實例(JAVA寫的):
UpdateStateByKey()轉化操作
主要用於訪問狀態變量,用於鍵值對形式的DStream。首先會給定一個由(鍵,事件)對構成的DStream,並傳遞一個指定如何個人劇新的事件更新每個鍵對應狀態的函數,它可以構建出一個新的DStream,為(鍵,狀態)。通俗點說,加入我們想知道一個用戶最近訪問的10個頁面是什么,可以把鍵設置為用戶ID,然后UpdateStateByKey()就可以跟蹤每個用戶最近訪問的10個頁面,這個列表就是“狀態”對象。具體的要怎么操作呢,UpdateStateByKey()提供了一個update(events,oldState)函數,用於接收與某鍵相關的時間以及該鍵之前對應的狀態,然后返回這個鍵對應的新狀態。
- events:是在當前批次中收到的時間列表()可能為空。
- oldState:是一個可選的狀態對象,存放在Option內;如果一個鍵沒有之前的狀態,可以為空。
- newState:由函數返回,也以Option形式存在。如果返回一個空的Option,表示想要刪除該狀態。
UpdateStateByKey()的結果是一個新的DStream,內部的RDD序列由每個時間區間對應的(鍵,狀態)對組成。
接下來講一下輸入源
- 核心數據源:文件流,包括文本格式和任意hadoop的輸入格式
- 附加數據源:kafka和flume比較常用,下面會講一下kafka的輸入
- 多數據源與集群規模
Kafka的具體操作如下:
基於MLlib的機器學習
一般我們常用的算法都是單機跑的,但是想要在集群上運行,不能把這些算法直接拿過來用。一是數據格式不同,單機上我們一般是離散型或者連續型的數據,數據類型一般為array、list、dataframe比較多,以txt、csv等格式存儲,但是在spark上,數據是以RDD的形式存在的,如何把ndarray等轉化為RDD是一個問題;此外,就算我們把數據轉化成RDD格式,算法也會不一樣。舉個例子,你現在有一堆數據,存儲為RDD格式,然后設置了分區,每個分區存儲一些數據准備來跑算法,可以把每個分區看做是一個單機跑的程序,但是所有分區跑完以后呢?怎么把結果綜合起來?直接求平均值?還是別的方式?所以說,在集群上跑的算法必須是專門寫的分布式算法。而且有些算法是不能分布式的跑。Mllib中也只包含能夠在集群上運行良好的並行算法。
MLlib的數據類型
- Vector:向量(mllib.linalg.Vectors)支持dense和sparse(稠密向量和稀疏向量)。區別在與前者的沒一個數值都會存儲下來,后者只存儲非零數值以節約空間。
- LabeledPoint:(mllib.regression)表示帶標簽的數據點,包含一個特征向量與一個標簽,注意,標簽要轉化成浮點型的,通過StringIndexer轉化。
- Rating:(mllib.recommendation),用戶對一個產品的評分,用於產品推薦
- 各種Model類:每個Model都是訓練算法的結果,一般都有一個predict()方法可以用來對新的數據點或者數據點組成的RDD應用該模型進行預測
一般來說,大多數算法直接操作由Vector、LabledPoint或Rating組成的RDD,通常我們從外部數據讀取數據后需要進行轉化操作構建RDD。具體的聚類和分類算法原理不多講了,可以自己去看MLlib的在線文檔里去看。下面舉個實例----垃圾郵件分類的運行過程:
步驟:
1.將數據轉化為字符串RDD
2.特征提取,把文本數據轉化為數值特征,返回一個向量RDD
3.在訓練集上跑模型,用分類算法
4.在測試系上評估效果
具體代碼:
1 from pyspark.mllib.regression import LabeledPoint 2 from pyspark.mllib.feature import HashingTF 3 from pyspark.mllib.calssification import LogisticRegressionWithSGD 4 5 spam = sc.textFile("spam.txt") 6 normal = sc.textFile("normal.txt") 7 8 #創建一個HashingTF實例來把郵件文本映射為包含10000個特征的向量 9 tf = HashingTF(numFeatures = 10000) 10 #各郵件都被切分為單詞,每個單詞背映射為一個特征 11 spamFeatures = spam.map(lambda email: tf.transform(email.split(" "))) 12 normalFeatures = normal.map(lambda email: tf.transform(email.split(" "))) 13 14 #創建LabeledPoint數據集分別存放陽性(垃圾郵件)和陰性(正常郵件)的例子 15 positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1,features)) 16 negativeExamples = normalFeatures.map(lambda features: LabeledPoint(0,features)) 17 trainingData = positiveExamples.union(negativeExamples) 18 trainingData.cache#因為邏輯回歸是迭代算法,所以緩存數據RDD 19 20 #使用SGD算法運行邏輯回歸 21 model = LogisticRegressionWithSGD.train(trainingData) 22 23 #以陽性(垃圾郵件)和陰性(正常郵件)的例子分別進行測試 24 posTest = tf.transform("O M G GET cheap stuff by sending money to...".split(" ")) 25 negTest = tf.transform("Hi Dad, I stared studying Spark the other ...".split(" ")) 26 print "Prediction for positive test examples: %g" %model.predict(posTest) 27 print "Prediction for negative test examples: %g" %model.predict(negTest)
這個例子很簡單,講的也很有限,建議大家根據自己的需求,直接看MLlib的官方文檔,關於聚類,分類講的都很詳細。