PySpark初級教程——第一步大數據分析(附代碼實現)


概述

  • 數據正以前所未有的速度與日俱增
  • 如何存儲、處理和使用這些數據來進行機器學習?spark正可以應對這些問題
  • 了解Spark是什么,它是如何工作的,以及涉及的不同組件是什么

簡介

我們正在以前所未有的速度生成數據。老實說,我跟不上世界各地里產生的巨大數據量!我敢肯定你已經了解過當今時代數據的產量。McKinsey, Gartner, IBM,等公司都給出了他們公司的數據。

這里有一些令人難以置信的數字供你參考。有超過5億條推文、900億封電子郵件、6500萬條WhatsApp消息,以上這些都是在一天之內發送的!Facebook在24小時內能生成4PB的數據。這是難以置信的!

當然,這也帶來了挑戰。一個數據科學團隊如何捕獲這么多的數據?你如何處理它並從中建立機器學習模型?如果你是一名數據科學家或數據工程師,這些都是令人興奮的問題。

Spark正能應對這些問題。Spark是用Scala編寫的,它提供了Scala、JAVA、Python和R的接口. PySpark一起工作的API。PySpark是用Python編寫的Python API用來支持Spark的。

處理大數據的一種傳統方式是使用像Hadoop這樣的分布式框架,但這些框架需要在硬盤上執行大量的讀寫操作。事實上時間和速度都非常昂貴。計算能力同樣是一個重要的障礙。

PySpark以一種高效且易於理解的方式處理這一問題。因此,在本文中,我們將開始學習有關它的所有內容。我們將了解什么是Spark,如何在你的機器上安裝它,然后我們將深入研究不同的Spark組件。本文附有代碼。

目錄

  1. Spark是什么?
  2. 在你的計算機上安裝Apache Spark
  3. 什么是Spark應用程序?
  4. 什么是Spark會話?
  5. Spark的分區
  6. 轉換
  7. 惰性計算
  8. Spark中的數據類型

Spark是什么?

Apache Spark是一個開源的分布式集群計算框架,用於快速處理、查詢和分析大數據。

它是當今企業中最有效的數據處理框架。使用Spark的成本很高,因為它需要大量的內存進行計算,但它仍然是數據科學家和大數據工程師的最愛。在本文中,你將看到為什么會出現這種情況。

通常依賴於Map-Reduce的框架的組織現在正在轉向Apache Spark框架。Spark執行內存計算,比Hadoop等Map Reduce框架快100倍。Spark在數據科學家中很受歡迎,因為它將數據分布和緩存放入了內存中,並且幫助他們優化大數據上的機器學習算法。

我建議查看Spark的官方頁面,了解更多細節。它有大量的文檔,是Spark很好參考教程:https://spark.apache.org/

在你的計算機上安裝Apache Spark

1. 下載Apache Spark

安裝Spark的一個簡單方法是通過pip。但是,根據Spark的官方文檔,這不是推薦的方法,因為Spark的Python包並不打算取代所有其他情況。

在實現基本功能時,你很可能會遇到很多錯誤。它只適用於與現有集群(獨立的Spark、YARN或Mesos)進行交互。

因此,第一步是從這里下載Apache Spark的最新版本。解壓並移動壓縮文件:

tar xzvf spark-2.4.4-bin-hadoop2.7.tgz 
mv spark-2.4.4-bin-hadoop2.7 spark
sudo mv spark/ /usr/lib/

2. 安裝JAVA

確保在系統中安裝了JAVA。我強烈推薦JAVA 8,因為眾所周知,Spark2在JAVA 9和其他方面存在問題:

sudo apt install default-jre
sudo apt install openjdk-8-jdk

3.安裝Scala構建工具(SBT)

當你處理一個包含很少源代碼文件的小型項目時,手動編譯它們會更容易。但是,如果你正在處理一個包含數百個源代碼文件的大型項目呢?在這種情況下,你需要使用構建工具。

SBT是Scala構建工具的縮寫,它管理你的Spark項目以及你在代碼中使用的庫的依賴關系。

請記住,如果你使用的是PySpark,就不需要安裝它。但是如果你使用JAVA或Scala構建Spark應用程序,那么你需要在你的機器上安裝SBT。運行以下命令安裝SBT:

echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key add
sudo apt-get update
sudo apt-get install sbt

4. 配置SPARK

接下來,打開Spark的配置目錄,復制默認的Spark環境模板。它已經以spark-env.sh.template的形式出現了。使用編輯器打開:

cd /usr/lib/spark/conf/ 
cp spark-env.sh.template spark-env.sh 
sudo gedit spark-env.sh

現在,在文件spark-env.sh中。添加JAVA_HOME,並將內存限制SPARK_WORKER_MEMORY進行賦值。這里,我把它分配為4GB:

## 添加變量
JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
SPARK_WORKER_MEMORY=4g

5. 設置Spark環境變量

使用下面的命令打開並編輯bashrc文件。這個bashrc文件是一個腳本,每當你開始一個新的終端會話就會執行:

## 打開bashrc
sudo gedit ~/bashrc

文件中添加以下環境變量:

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 
export SBT_HOME=/usr/share/sbt/bin/sbt-launch.jar 
export SPARK_HOME=/usr/lib/spark
export PATH=$PATH:$JAVA_HOME/bin
export PATH=$PATH:$SBT_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
export PYSPARK_PYTHON=python3
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH

現在,更新bashrc文件。這將在更新腳本的情況下重新啟動終端會話:

source ~/.bashrc

現在,在終端中輸入pyspark,它將在默認瀏覽器中打開Jupyter和一個自動初始化變量名為sc的Spark環境(它是Spark服務的入口點):

什么是Spark應用程序?

Spark應用程序是Spark上下文的一個實例。它由一個驅動進程和一組執行程序進程組成。

驅動進程負責維護關於Spark應用程序的信息、響應代碼、分發和調度執行器中的工作。驅動進程是非常重要的,它是Spark應用程序的核心,並在應用程序的生命周期內維護所有相關信息。

執行器負責實際執行驅動程序分配給他們的工作。因此,每個執行器只負責兩件事:

  • 執行由驅動程序分配給它的任務
  • 將執行程序上的計算狀態報告回驅動程序節點

什么是Spark會話?

我們知道一個驅動進程控制着Spark應用程序。驅動程序進程將自己作為一個稱為Spark會話的對象提供給用戶。

Spark會話實例可以使用Spark在集群中執行用戶自定義操作。在Scala和Python中,當你啟動控制台時,Spark會話變量就是可用的:

Spark的分區

分區意味着完整的數據不會出現在一個地方。它被分成多個塊,這些塊被放置在不同的節點上。

如果只有一個分區,即使有數千個執行器,Spark的並行度也只有一個。另外,如果有多個分區,但只有一個執行器,Spark的並行度仍然只有一個,因為只有一個計算資源。

在Spark中,較低級別的api允許我們定義分區的數量。

讓我們舉一個簡單的例子來理解分區是如何幫助我們獲得更快的結果的。我們將在10到1000之間創建一個包含2000萬個隨機數的列表,並對大於200的數字進行計數。

讓我們看看我們能多快做到這只一個分區:

from random import randint 

# 創建一個隨機數字的列表在10到1000之間
my_large_list = [randint(10,1000) for x in range(0,20000000)]

# 創建一個分區的列表
my_large_list_one_partition = sc.parallelize(my_large_list,numSlices=1)

# 檢查分區數量
print(my_large_list_one_partition.getNumPartitions())
# >> 1

# 篩選數量大於等於200的數字
my_large_list_one_partition = my_large_list_one_partition.filter(lambda x : x >= 200)

# 在jupyter中運行代碼 
# 執行以下命令來計算時間
%%time

# 列表中元素的數量
print(my_large_list_one_partition.count())
# >> 16162207

one_partition_f

使用一個分區時,花了34.5毫秒來篩選數字:

現在,讓我們將分區的數量增加到5和檢查執行時間:

# 創建五個分區
my_large_list_with_five_partition = sc.parallelize(my_large_list, numSlices=5)

# 篩選數量大於等於200的數字
my_large_list_with_five_partition = my_large_list_with_five_partition.filter(lambda x : x >= 200)

%%time 

# 列表中元素的數量
print(my_large_list_with_five_partition.count())
# >> 16162207

使用5個分區時,花了11.1毫秒來篩選數字:

轉換

在Spark中,數據結構是不可變的。這意味着一旦創建它們就不能更改。但是如果我們不能改變它,我們該如何使用它呢?

因此,為了進行更改,我們需要指示Spark如何修改數據。這些指令稱為轉換。

回想一下我們在上面看到的例子。我們要求Spark過濾大於200的數字——這本質上是一種轉換。Spark有兩種類型的轉換:

  • 窄轉換:在窄轉換中,計算單個分區結果所需的所有元素都位於父RDD的單個分區中。例如,如果希望過濾小於100的數字,可以在每個分區上分別執行此操作。轉換后的新分區僅依賴於一個分區來計算結果
  • 寬轉換:在寬轉換中,計算單個分區的結果所需的所有元素可能位於父RDD的多個分區中。例如,如果你想計算數字個數,那么你的轉換依賴於所有的分區來計算最終的結果

惰性計算

假設你有一個包含數百萬行的非常大的數據文件。你需要通過一些操作來進行分析,比如映射、過濾、隨機分割,甚至是最基本的加減法。

現在,對於大型數據集,即使是一個基本的轉換也需要執行數百萬個操作。

在處理大數據時,優化這些操作至關重要,Spark以一種非常有創意的方式處理它。你所需要做的就是告訴Spark你想要對數據集進行哪些轉換,Spark將維護一系列轉換。當你向Spark請求結果時,它將找出最佳路徑並執行所需的轉換並給出結果。

現在,讓我們舉個例子。你有一個1gb的文本文件,並創建了10個分區。你還執行了一些轉換,最后要求查看第一行。在這種情況下,Spark將只從第一個分區讀取文件,在不需要讀取整個文件的情況下提供結果。

讓我們舉幾個實際的例子來看看Spark是如何執行惰性計算的。在第一步中,我們創建了一個包含1000萬個數字的列表,並創建了一個包含3個分區的RDD:

# 創建一個樣本列表
my_list = [i for i in range(1,10000000)]

# 並行處理數據
rdd_0 = sc.parallelize(my_list,3)

rdd_0

接下來,我們將執行一個非常基本的轉換,比如每個數字加4。請注意,Spark此時還沒有啟動任何轉換。它只記錄了一系列RDD運算圖形式的轉換。你可以看到,使用函數toDebugString查看RDD運算圖:


# 每個數增加4
rdd_1 = rdd_0.map(lambda x : x 4)

# RDD對象
print(rdd_1)

#獲取RDD運算圖
print(rdd_1.toDebugString())

我們可以看到,PythonRDD[1]與ParallelCollectionRDD[0]是連接的。現在,讓我們繼續添加轉換,將列表的所有元素加20。

你可能會認為直接增加24會先增加4后增加20一步更好。但是在這一步之后檢查RDD運算圖:

# 每個數增加20
rdd_2 = rdd_1.map(lambda x : x 20)

# RDD 對象
print(rdd_2)

#獲取RDD運算圖
print(rdd_2.toDebugString())

我們可以看到,它自動跳過了冗余步驟,並將在單個步驟中添加24。因此,Spark會自動定義執行操作的最佳路徑,並且只在需要時執行轉換。

讓我們再舉一個例子來理解惰性計算過程。

假設我們有一個文本文件,並創建了一個包含4個分區的RDD。現在,我們定義一些轉換,如將文本數據轉換為小寫、將單詞分割、為單詞添加一些前綴等。

但是,當我們執行一個動作,比如獲取轉換數據的第一個元素時,這種情況下不需要查看完整的數據來執行請求的結果,所以Spark只在第一個分區上執行轉換

# 創建一個文本文件的RDD,分區數量= 4
my_text_file = sc.textFile('tokens_spark.txt',minPartitions=4)

# RDD對象
print(my_text_file)

# 轉換小寫
my_text_file = my_text_file.map(lambda x : x.lower())

# 更新RDD對象
print(my_text_file)

print(my_text_file.toDebugString())

在這里,我們把單詞小寫,取得每個單詞的前兩個字符。

# 分割單詞
my_text_file = my_text_file.map(lambda x : x[:2])

# RDD對象
print(my_text_file)

print(my_text_file.toDebugString())

# 在所有的轉換后得到第一個元素
print(my_text_file.first())

我們創建了4個分區的文本文件。但是根據我們需要的結果,不需要在所有分區上讀取和執行轉換,因此Spack只在第一個分區執行。

如果我們想計算出現了多少個單詞呢?這種情況下我們需要讀取所有的分區:

print(my_text_file.countApproxDistinct())

Spark MLlib的數據類型

MLlib是Spark的可擴展機器學習庫。它包括一些常用的機器學習算法,如回歸、分類、降維,以及一些對數據執行基本統計操作的工具。

在本文中,我們將詳細討論MLlib提供的一些數據類型。在以后的文章中,我們將討論諸如特征提取和構建機器學習管道之類的主題。

局部向量

MLlib支持兩種類型的本地向量:稠密和稀疏。當大多數數字為零時使用稀疏向量。要創建一個稀疏向量,你需要提供向量的長度——非零值的索引,這些值應該嚴格遞增且非零值。

from pyspark.mllib.linalg import Vectors

## 稠密向量
print(Vectors.dense([1,2,3,4,5,6,0]))
# >> DenseVector([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 0.0])

### 稠密向量
### Vectors.sparse( length, index_of_non_zero_values, non_zero_values)
### 索引應該嚴格遞增且非零值

print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]))
# >> SparseVector(10, {0: 1.0, 1: 5.0, 2: 3.0, 4: 5.0, 5: 7.0})

print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]).toArray())
# >> array([1., 5., 3., 0., 5., 7., 0., 0., 0., 0.]) 

標簽點

標簽點(Labeled Point)是一個局部向量,其中每個向量都有一個標簽。這可以用在監督學習中,你有一些目標的特征與這些特征對應的標簽。

from pyspark.mllib.regression import LabeledPoint

# 設置一個標簽與一個稠密向量
point_1 = LabeledPoint(1,Vectors.dense([1,2,3,4,5]))

# 特征 
print(point_1.features)

# 標簽
print(point_1.label)

局部矩陣

局部矩陣存儲在一台機器上。MLlib同時支持稠密矩陣和稀疏矩陣。在稀疏矩陣中,非零項值按列為主順序存儲在壓縮的稀疏列格式(CSC格式)中。

# 導入矩陣
from pyspark.mllib.linalg import Matrices

# 創建一個3行2列的稠密矩陣
matrix_1 = Matrices.dense(3, 2, [1,2,3,4,5,6])

print(matrix_1)
# >> DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False)

print(matrix_1.toArray())
""" >> array([[1., 4.], [2., 5.], [3., 6.]]) """

# 創建一個稀疏矩陣
matrix_2 = Matrices.sparse(3, 3, [0, 1, 2, 3], [0, 0, 2], [9, 6, 8])

print(matrix_2)
# SparseMatrix(3, 3, [0, 1, 2, 3], [0, 0, 2], [9.0, 6.0, 8.0], False)

print(matrix_2.toArray())
""" >> array([[9., 6., 0.], [0., 0., 0.], [0., 0., 8.]]) """

分布式矩陣

分布式矩陣存儲在一個或多個rds中。選擇合適的分布式矩陣格式是非常重要的。目前已經實現了四種類型的分布式矩陣:

  • 行矩陣
    • 每一行都是一個局部向量。可以在多個分區上存儲行
    • 像隨機森林這樣的算法可以使用行矩陣來實現,因為該算法將行划分為多個樹。一棵樹的結果不依賴於其他樹。因此,我們可以利用分布式架構,對大數據的隨機森林等算法進行並行處理
# 分布式數據類型——行矩陣
from pyspark.mllib.linalg.distributed import RowMatrix

# 創建RDD
rows = sc.parallelize([[1,2,3], [4,5,6], [7,8,9], [10,11,12]])

# 創建一個分布式行矩陣
row_matrix = RowMatrix(rows)


print(row_matrix)
# >> <pyspark.mllib.linalg.distributed.RowMatrix at 0x7f425884d7f0> 

print(row_matrix.numRows())
# >> 4

print(row_matrix.numCols())
# >> 3
  • 索引行矩陣
    • 它類似於行矩陣,其中行以有序的方式存儲在多個分區中。為每行分配一個索引值。它用於序列很重要的算法,比如時間序列數據
    • 它可以從IndexedRow的RDD創建
# 索引行矩陣

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

#創建RDD
indexed_rows = sc.parallelize([
    IndexedRow(0, [0,1,2]),
    IndexedRow(1, [1,2,3]),
    IndexedRow(2, [3,4,5]),
    IndexedRow(3, [4,2,3]),
    IndexedRow(4, [2,2,5]),
    IndexedRow(5, [4,5,5])
])

# 創建IndexedRowMatrix
indexed_rows_matrix = IndexedRowMatrix(indexed_rows)

print(indexed_rows_matrix.numRows())
# >> 6

print(indexed_rows_matrix.numCols())
# >> 3
  • 坐標矩陣
    • 可以從MatrixEntry的RDD創建坐標矩陣
    • 只有當矩陣的維數都很大時,我們才使用坐標矩陣
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# 用MatrixEntry創建
matrix_entries = sc.parallelize([MatrixEntry(0, 5, 2), MatrixEntry(1, 1, 1), MatrixEntry(1, 5, 4)])

# 創建坐標矩陣
c_matrix = CoordinateMatrix(matrix_entries)

# 列數
print(c_matrix.numCols())
# >> 6

# 行數
print(c_matrix.numRows())
# >> 2
  • 塊矩陣
    • 在一個塊矩陣中,我們可以在不同的機器上存儲一個大矩陣的不同子矩陣
    • 我們需要指定塊的尺寸。就像下面的例子,我們有3X3,對於每一個方塊,我們可以通過提供坐標來指定一個矩陣
# 導入庫
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

# 創建子矩陣塊的RDD
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 3, [1, 2, 1, 2, 1, 2, 1, 2, 1])),
                         ((1, 1), Matrices.dense(3, 3, [3, 4, 5, 3, 4, 5, 3, 4, 5])),
                         ((2, 0), Matrices.dense(3, 3, [1, 1, 1, 1, 1, 1, 1, 1, 1]))])

# 從子矩陣塊的RDD中創建矩陣塊,大小為3X3
b_matrix = BlockMatrix(blocks, 3, 3) 

#每一塊的列數
print(b_matrix.colsPerBlock)
# >> 3

#每一塊的行數
print(b_matrix.rowsPerBlock)
# >> 3

# 把塊矩陣轉換為局部矩陣
local_mat = b_matrix.toLocalMatrix()

# 打印局部矩陣
print(local_mat.toArray())
""" >> array([[1., 2., 1., 0., 0., 0.], [2., 1., 2., 0., 0., 0.], [1., 2., 1., 0., 0., 0.], [0., 0., 0., 3., 3., 3.], [0., 0., 0., 4., 4., 4.], [0., 0., 0., 5., 5., 5.], [1., 1., 1., 0., 0., 0.], [1., 1., 1., 0., 0., 0.], [1., 1., 1., 0., 0., 0.]]) """

結尾

今天我們已經講了很多了。Spark是數據科學中最迷人的語言之一,我覺得至少應該熟悉它。

這只是我們PySpark學習旅程的開始!我計划在本系列中涵蓋更多的內容,包括不同機器學習任務的多篇文章。

在即將發表的PySpark文章中,我們將看到如何進行特征提取、創建機器學習管道和構建模型。

歡迎關注磐創博客資源匯總站:
http://docs.panchuang.net/

歡迎關注PyTorch官方中文教程站:
http://pytorch.panchuang.net/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM