利用機器學習模型對PySpark流數據進行預測


作者|LAKSHAY ARORA
編譯|VK
來源|Analytics Vidhya

概述

  • 流數據是機器學習領域的一個新興概念

  • 學習如何使用機器學習模型(如logistic回歸)使用PySpark對流數據進行預測

  • 我們將介紹流數據和Spark流的基礎知識,然后深入到實現部分

介紹

想象一下,每秒有超過8500條微博被發送,900多張照片被上傳到Instagram上,超過4200個Skype電話被打,超過78000個谷歌搜索發生,超過200萬封電子郵件被發送(根據互聯網實時統計)。

我們正在以前所未有的速度和規模生成數據。在數據科學領域工作真是太好了!但是,隨着大量數據的出現,同樣面臨着復雜的挑戰。

主要是,我們如何收集這種規模的數據?我們如何確保我們的機器學習管道在數據生成和收集后繼續產生結果?這些都是業界面臨的重大挑戰,也是為什么流式數據的概念在各組織中越來越受到重視的原因。

增加處理流式數據的能力將大大提高你當前的數據科學能力。這是業界急需的技能,如果你能掌握它,它將幫助你獲得下一個數據科學的角色。

因此,在本文中,我們將了解什么是流數據,了解Spark流的基本原理,然后研究一個與行業相關的數據集,以使用Spark實現流數據。

目錄

  1. 什么是流數據?

  2. Spark流基礎

    1. 離散流

    2. 緩存

    3. 檢查點

  3. 流數據中的共享變量

    1. 累加器變量

    2. 廣播變量

  4. 利用PySpark對流數據進行情感分析

什么是流數據?

我們看到了上面的社交媒體數據——我們正在處理的數據令人難以置信。你能想象存儲所有這些數據需要什么嗎?這是一個復雜的過程!因此,在我們深入討論本文的Spark方面之前,讓我們花點時間了解流式數據到底是什么。

流數據沒有離散的開始或結束。這些數據是每秒從數千個數據源生成的,需要盡快進行處理和分析。相當多的流數據需要實時處理,比如Google搜索結果。

我們知道,一些結論在事件發生后更具價值,它們往往會隨着時間而失去價值。舉個體育賽事的例子——我們希望看到即時分析、即時統計得出的結論,以便在那一刻真正享受比賽,對吧?

Spark流基礎

Spark流是Spark API的擴展,它支持對實時數據流進行可伸縮和容錯的流處理。

在跳到實現部分之前,讓我們先了解Spark流的不同組件。

離散流

離散流或數據流代表一個連續的數據流。這里,數據流要么直接從任何源接收,要么在我們對原始數據做了一些處理之后接收。

構建流應用程序的第一步是定義我們從數據源收集數據的批處理時間。如果批處理時間為2秒,則數據將每2秒收集一次並存儲在RDD中。而這些RDD的連續序列鏈是一個不可變的離散流,Spark可以將其作為一個分布式數據集使用。

想想一個典型的數據科學項目。在數據預處理階段,我們需要對變量進行轉換,包括將分類變量轉換為數值變量、刪除異常值等。Spark維護我們在任何數據上定義的所有轉換的歷史。因此,無論何時發生任何錯誤,它都可以追溯轉換的路徑並重新生成計算結果。

我們希望Spark應用程序運行24小時 x 7,並且無論何時出現任何故障,我們都希望它盡快恢復。但是,Spark在處理大規模數據時,出現任何錯誤時需要重新計算所有轉換。你可以想象,這非常昂貴。

緩存

以下是應對這一挑戰的一種方法。我們可以臨時存儲計算(緩存)的結果,以維護在數據上定義的轉換的結果。這樣,當出現任何錯誤時,我們不必一次又一次地重新計算這些轉換。

數據流允許我們將流數據保存在內存中。當我們要計算同一數據上的多個操作時,這很有幫助。

檢查點(Checkpointing)

當我們正確使用緩存時,它非常有用,但它需要大量內存。並不是每個人都有數百台擁有128GB內存的機器來緩存所有東西。

這就引入了檢查點的概念。

檢查點是保存轉換數據幀結果的另一種技術。它將運行中的應用程序的狀態不時地保存在任何可靠的存儲器(如HDFS)上。但是,它比緩存速度慢,靈活性低。

當我們有流數據時,我們可以使用檢查點。轉換結果取決於以前的轉換結果,需要保留才能使用它。我們還檢查元數據信息,比如用於創建流數據的配置和一組DStream(離散流)操作的結果等等。

流數據中的共享變量

有時我們需要為Spark應用程序定義map、reduce或filter等函數,這些函數必須在多個集群上執行。此函數中使用的變量將復制到每個計算機(集群)。

在這里,每個集群有一個不同的執行器,我們需要一些東西,可以給我們這些變量之間的關系。

例如,假設我們的Spark應用程序運行在100個不同的集群上,捕獲來自不同國家的人發布的Instagram圖片。我們需要一個在他們的帖子中提到的特定標簽的計數。

現在,每個集群的執行器將計算該集群上存在的數據的結果。但是我們需要一些東西來幫助這些集群進行通信,這樣我們就可以得到聚合的結果。在Spark中,我們有一些共享變量可以幫助我們克服這個問題

累加器變量

用例,比如錯誤發生的次數、空白日志的次數、我們從某個特定國家收到請求的次數,所有這些都可以使用累加器來解決。

每個集群上的執行器將數據發送回驅動程序進程,以更新累加器變量的值。累加器僅適用於關聯和交換的操作。例如,sum和maximum有效,而mean無效。

廣播變量

當我們處理位置數據時,比如城市名稱和郵政編碼的映射,這些都是固定變量。現在,如果任何集群上的特定轉換每次都需要此類數據,我們不需要向驅動程序發送請求,因為這太昂貴了。

相反,我們可以在每個集群上存儲此數據的副本。這些類型的變量稱為廣播變量。

廣播變量允許程序員在每台機器上緩存一個只讀變量。通常,Spark會使用有效的廣播算法自動分配廣播變量,但如果我們有多個階段需要相同數據的任務,我們也可以定義它們。

利用PySpark對流數據進行情感分析

是時候啟動你最喜歡的IDE了!讓我們在本節中進行寫代碼,並以實際的方式理解流數據。

在本節中,我們將使用真實的數據集。我們的目標是在推特上發現仇恨言論。為了簡單起見,如果推特帶有種族主義或性別歧視情緒,我們說它包含仇恨言論。

因此,任務是將種族主義或性別歧視的推文與其他推文進行分類。我們將使用Tweets和label的訓練樣本,其中label'1'表示Tweet是種族主義/性別歧視,label'0'表示其他。

為什么這個項目與流處理相關?因為社交媒體平台以評論和狀態更新的形式接收海量流媒體數據。這個項目將幫助我們限制公開發布的內容。

你可以在這里更詳細地查看問題陳述-練習問題:Twitter情感分析(https://datahack.analyticsvidhya.com/contest/practice-problem-twitter-sentiment-analysis/?utm_source=blog&utm_medium=streaming-data-pyspark-machine-learning-model)。我們開始吧!

設置項目工作流

  1. 模型構建:我們將建立一個邏輯回歸模型管道來分類tweet是否包含仇恨言論。在這里,我們的重點不是建立一個非常精確的分類模型,而是查看如何使用任何模型並返回流數據的結果

  2. 初始化Spark流上下文:一旦構建了模型,我們就需要定義從中獲取流數據的主機名和端口號

  3. 流數據:接下來,我們將從定義的端口添加netcat服務器的tweets,Spark API將在指定的持續時間后接收數據

  4. 預測並返回結果:一旦我們收到tweet文本,我們將數據傳遞到我們創建的機器學習管道中,並從模型返回預測的情緒

下面是我們工作流程的一個簡潔說明:

建立Logistic回歸模型的數據訓練

我們在映射到標簽的CSV文件中有關於Tweets的數據。我們將使用logistic回歸模型來預測tweet是否包含仇恨言論。如果是,那么我們的模型將預測標簽為1(否則為0)。

你可以在這里下載數據集和代碼(https://github.com/lakshay-arora/PySpark/tree/master/spark_streaming)。

首先,我們需要定義CSV文件的模式,否則,Spark將把每列的數據類型視為字符串。我們讀取數據並檢查:

# 導入所需庫
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row

# 初始化spark session
sc = SparkContext(appName="PySparkShell")
spark = SparkSession(sc)
    
# 定義方案
my_schema = tp.StructType([
  tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'label',       dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)
])
    
  
# 讀取數據集
my_data = spark.read.csv('twitter_sentiments.csv',
                         schema=my_schema,
                         header=True)

# 查看數據
my_data.show(5)

# 輸出方案
my_data.printSchema()

定義機器學習管道

現在我們已經在Spark數據幀中有了數據,我們需要定義轉換數據的不同階段,然后使用它從我們的模型中獲取預測的標簽。

在第一階段中,我們將使用RegexTokenizer 將Tweet文本轉換為單詞列表。然后,我們將從單詞列表中刪除停用詞並創建單詞向量。在最后階段,我們將使用這些詞向量建立一個邏輯回歸模型,並得到預測情緒。

請記住,我們的重點不是建立一個非常精確的分類模型,而是看看如何在預測模型中獲得流數據的結果。

# 定義階段1:標記tweet文本 
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')
# 定義階段2:刪除停用字
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
# 定義階段3:創建大小為100的詞向量
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)
# 定義階段4:邏輯回歸模型
model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

設置我們的機器學習管道

讓我們在Pipeline對象中添加stages變量,然后按順序執行這些轉換。將管道與訓練數據集匹配,現在,每當我們有新的Tweet時,我們只需要將其傳遞到管道對象並轉換數據以獲得預測:

# 設置管道
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

#擬合模型
pipelineFit = pipeline.fit(my_data)

流數據和返回的結果

假設我們每秒收到數百條評論,我們希望通過阻止發布包含仇恨言論的評論的用戶來保持平台的干凈。所以,每當我們收到新的文本,我們就會把它傳遞到管道中,得到預測的情緒。

我們將定義一個函數 get_prediction,它將刪除空白語句並創建一個數據框,其中每行包含一條推特。

因此,初始化Spark流上下文並定義3秒的批處理持續時間。這意味着我們將對每3秒收到的數據進行預測:

#定義一個函數來計算情感
def get_prediction(tweet_text):
	try:
    # 過濾得到長度大於0的tweets
		tweet_text = tweet_text.filter(lambda x: len(x) > 0)
    # 創建一個列名為“tweet”的數據框,每行將包含一條tweet
		rowRdd = tweet_text.map(lambda w: Row(tweet=w))
    # 創建spark數據框
		wordsDataFrame = spark.createDataFrame(rowRdd)
    # 利用管道對數據進行轉換,得到預測的情緒
		pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
	except : 
		print('No data')
    
# 初始化流上下文
ssc = StreamingContext(sc, batchDuration= 3)

# 創建一個將連接到hostname:port的數據流,如localhost:9991
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

# 用一個關鍵字“tweet_APP”分割tweet文本,這樣我們就可以從一條tweet中識別出一組單詞
words = lines.flatMap(lambda line : line.split('TWEET_APP'))

# 獲取收到的推文的預期情緒
words.foreachRDD(get_prediction)

#開始計算
ssc.start()             

# 等待結束
ssc.awaitTermination()  

在一個終端上運行程序並使用Netcat(一個實用工具,可用於將數據發送到定義的主機名和端口號)。可以使用以下命令啟動TCP連接:

nc -lk port_number

最后,在第二個終端中鍵入文本,你將在另一個終端中實時獲得預測:

視頻演示地址:https://cdn.analyticsvidhya.com/wp-content/uploads/2019/12/final_twitter_sentiment.mp4?_=1

結尾

流數據在未來幾年會增加的越來越多,所以你應該開始熟悉這個話題。記住,數據科學不僅僅是建立模型,還有一個完整的管道需要處理。

本文介紹了Spark流的基本原理以及如何在真實數據集上實現它。我鼓勵你使用另一個數據集或收集實時數據並實現我們剛剛介紹的內容(你也可以嘗試其他模型)。

原文鏈接:https://www.analyticsvidhya.com/blog/2019/12/streaming-data-pyspark-machine-learning-model/

歡迎關注磐創AI博客站:
http://panchuang.net/

sklearn機器學習中文官方文檔:
http://sklearn123.com/

歡迎關注磐創博客資源匯總站:
http://docs.panchuang.net/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM