PySpark理解wordcount.py


在本文中, 我們借由深入剖析wordcount.py, 來揭開Spark內部各種概念的面紗。我們再次回顧wordcount.py代碼來回答如下問題

  1. 對於大多數語言的Hello Word示例,都有main()函數, wordcount.py的main函數,或者說調用Spark的main() 在哪里

  2. 數據的讀入,各個RDD數據如何轉換

  3. map與flatMap的工作機制,以及區別

  4. reduceByKey的作用

WordCount.py 的代碼如下:

 1 from __future__ import print_function
 2 
 3 import sys
 4 from operator import add
 5 
 6 # SparkSession:是一個對Spark的編程入口,取代了原本的SQLContext與HiveContext,方便調用Dataset和DataFrame API
 7 # SparkSession可用於創建DataFrame,將DataFrame注冊為表,在表上執行SQL,緩存表和讀取parquet文件。
 8 from pyspark.sql import SparkSession
 9 
10 
11 if __name__ == "__main__":
12 
13     # Python 常用的簡單參數傳入
14     if len(sys.argv) != 2:
15         print("Usage: wordcount <file>", file=sys.stderr)
16         exit(-1)
17         
18     # appName 為 Spark 應用設定一個應用名,改名會顯示在 Spark Web UI 上
19     # 假如SparkSession 已經存在就取得已存在的SparkSession,否則創建一個新的。
20     spark = SparkSession\
21         .builder\
22         .appName("PythonWordCount")\
23         .getOrCreate()
24         
25     # 讀取傳入的文件內容,並寫入一個新的RDD實例lines中,此條語句所做工作有些多,不適合初學者,可以截成兩條語句以便理解。
26     # map是一種轉換函數,將原來RDD的每個數據項通過map中的用戶自定義函數f映射轉變為一個新的元素。原始RDD中的數據項與新RDD中的數據項是一一對應的關系。
27     lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
28    
29     # flatMap與map類似,但每個元素輸入項都可以被映射到0個或多個的輸出項,最終將結果”扁平化“后輸出 
30     counts = lines.flatMap(lambda x: x.split(' ')) \
31                   .map(lambda x: (x, 1)) \
32                   .reduceByKey(add)
33                 
34     # collect() 在驅動程序中將數據集的所有元素作為數組返回。 這在返回足夠小的數據子集的過濾器或其他操作之后通常是有用的。由於collect 是將整個RDD匯聚到一台機子上,所以通常需要預估返回數據集的大小以免溢出。             
35     output = counts.collect()
36     
37     for (word, count) in output:
38         print("%s: %i" % (word, count))
39 
40     spark.stop()

Spark 入口 SparkSession

Spark2.0中引入了SparkSession的概念,它為用戶提供了一個統一的切入點來使用Spark的各項功能,這邊不妨對照Http Session, 在此Spark就在充當Web service的角色,程序調用Spark功能的時候需要先建立一個Session。因此看到getOrCreate()就很容易理解了, 表明可以視情況新建session或利用已有的session。

1     spark = SparkSession\
2         .builder\
3         .appName("PythonWordCount")\
4         .getOrCreate()

既然將Spark 想象成一個Web server, 也就意味着可能用多個訪問在進行,為了便於監控管理, 對應用命名一個恰當的名稱是個好辦法。Web UI並不是本文的重點,有興趣的同學可以參考  Spark Application’s Web Console

加載數據

在建立SparkSession之后, 就是讀入數據並寫入到Dateset中。

1  lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])

為了更好的分解執行過程,是時候借助PySpark了, PySpark是python調用Spark的 API,它可以啟動一個交互式Python Shell。為了方便腳本調試,暫時切換到Linux執行

 1 # pyspark
 2 Python 2.7.6 (default, Jun 22 2015, 17:58:13) 
 3 [GCC 4.8.2] on linux2
 4 Type "help", "copyright", "credits" or "license" for more information.
 5 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
 6 Setting default log level to "WARN".
 7 To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
 8 17/02/23 08:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 9 17/02/23 08:30:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
10 17/02/23 08:30:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
11 17/02/23 08:30:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
12 Welcome to
13       ____              __
14      / __/__  ___ _____/ /__
15     _\ \/ _ \/ _ `/ __/  '_/
16    /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
17       /_/
18 
19 Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
20 SparkSession available as 'spark'.
21 >>> ds = spark.read.text('/home/spark2.1/spark/examples/src/main/python/a.txt')
22 >>> type(ds)
23 <class 'pyspark.sql.dataframe.DataFrame'>
24 >>> print ds
25 DataFrame[value: string]
26 >>> lines = ds.rdd

交互式Shell的好處是可以方便的查看變量內容和類型。此刻文件a.txt已經加載到lines中,它是RDD(Resilient Distributed Datasets)彈性分布式數據集的實例。

RDD操作

RDD在內存中的結構可以參考論文, 理解RDD有兩點比較重要:

一是RDD一種只讀、只能由已存在的RDD變換而來的共享內存,然后將所有數據都加載到內存中,方便進行多次重用。

二是RDD的數據默認情況下存放在集群中不同節點的內存中,本身提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDD partition,因為節點故障,導致數據丟了,那么RDD會自動通過自己的數據來源重新計算該partition。

為了探究RDD內部的數據內容,可以利用collect()函數, 它能夠以數組的形式,返回RDD數據集的所有元素。

1 >>> lines = ds.rdd
2 >>> for i in lines.collect():
3 ...     print i
4 ... 
5 Row(value=u'These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.')

lines存儲的是Row object類型,而我們希望的是對String類型進行處理,所以需要利用map api進一步轉換RDD

1 >>> lines_map = lines.map(lambda x: x[0])
2 >>> for i in lines_map.collect():
3 ...     print i
4 ... 
5 These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.

為了統計每個單詞的出現頻率,需要對每個單詞分別統計,那么第一步需要將上面的字符串以空格作為分隔符將單詞提取出來,並為每個詞設置一個計數器。比如 These出現次數是1, 我們期望的數據結構是['There', 1]。但是如何將包含字符串的RDD轉換成元素為類似 ['There', 1] 的RDD呢?

 1 >>> flat_map = lines_map.flatMap(lambda x: x.split(' '))
 2 >>> rdd_map = flat_map.map(lambda x: [x, 1])
 3 >>> for i in rdd_map.collect():
 4 ...     print i
 5 ... 
 6 [u'These', 1]
 7 [u'examples', 1]
 8 [u'give', 1]
 9 [u'a', 1]
10 [u'quick', 1]

下圖簡要的講述了flatMap 和 map的轉換過程。

不難看出,map api只是為所有出現的單詞初始化了計數器為1,並沒有統計相同詞,接下來這個任務由reduceByKey()來完成。在rdd_map 中,所有的詞被視為一個key,而key相同的value則執行reduceByKey內的算子操作,因為統計相同key是累加操作,所以可以直接add操作。
 1 >>> from operator import add
 2 >>> add_map = rdd_map.reduceByKey(add)
 3 >>> for i in add_map.collect():
 4 ...     print i
 5 ... 
 6 (u'a', 1)
 7 (u'on', 1)
 8 (u'of', 2)
 9 (u'arbitrary', 1)
10 (u'quick', 1)
11 (u'the', 2)
12 (u'or', 1)
13 
14 >>> print rdd_map.count()
15 26
16 >>> print add_map.count()
17 23

根據a.txt 的內容,可知只有 of 和 the 兩個單詞出現了兩次,符合預期。

總結

以上的分解步驟,可以幫我們理解RDD的操作,需要提示的是,RDD將操作分為兩類:transformation與action。無論執行了多少次transformation操作,RDD都不會真正執行運算,只有當action操作被執行時,運算才會觸發。也就是說,上面所有的RDD都是通過collect()觸發的, 那么如果將上述的transformation放入一條簡練語句中, 則展現為原始wordcount.py的書寫形式。

1 counts = lines.flatMap(lambda x: x.split(' ')) \
2                   .map(lambda x: (x, 1)) \
3                   .reduceByKey(add)

而真正的action 則是由collect()完成。

1 output = counts.collect()

至此,已經完成了對wordcount.py的深入剖析

轉自:https://www.jianshu.com/p/067907b23546?winzoom=1


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM