由於Scala才剛剛開始學習,還是對python更為熟悉,因此在這記錄一下自己的學習過程,主要內容來自於spark的官方幫助文檔,這一節的地址為:
http://spark.apache.org/docs/latest/quick-start.html
文章主要是翻譯了文檔的內容,但也在里邊加入了一些自己在實際操作中遇到的問題及解決的方案,和一些補充的小知識,一起學習。
環境:Ubuntu 16.04 LTS,Spark 2.0.1, Hadoop 2.7.3, Python 3.5.2,
利用spark shell進行交互式分析
1. 基礎
首先打開spark與python交互的API
$ cd /usr/local/spark
$ ./bin/pyspark
Spark最重要的一個概念就是RDD(Resilient Distributed Dataset),彈性分布式數據集。RDD可以利用Hadoop的InputFormats創建,或者從其他RDD轉換。
這里,作為入門,我們利用spark安裝后文件夾中自帶的README.md(此文件位置為/usr/local/spark/README.md)文件作為例子,學習如何創建一個新的RDD。
創建新的RDD:
>>> textFile = sc.textFile(“README.md”)
RDD支持兩種類型的操作,actions和transformations:
actions: 在數據集上運行計算后返回值
transformations: 轉換, 從現有數據集創建一個新的數據集
RDD可以有執行一系列的動作(actions),這些動作可以返回值(values),轉換(transformations),或者指向新的RDD的指針。下邊學習RDD的一些簡單的動作:
>>> textFile.count() # 計數,返回RDD中items的個數,這里就是README.md的總行# 數 99 >>> textFile.first() # RDD中的第一個item,這里就是文件README.md的第一行 u'# Apache Spark'
注意:如果之前是從/usr/local/spark啟動pyspark,然后讀取README.md文件的,如果執行count語句,會出現以下錯誤:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/spark/README.md
這是因為在使用相對路徑時,系統默認是從hdfs://localhost:9000/目錄下讀取README.md文件的,但是README.md文件並不在這一目錄下,所以sc.textFile()必須使用絕對路徑,此時代碼修改為:
>>> textFile = sc.textFile(“file:///usr/local/spark/README.md”)
99
下邊嘗試使用一個轉換(transformation)。例如,使用filter這一轉換返回一個新的RDD,這些RDD中的items都含有“Spark”字符串。
>>> linesWithSpark = textFile.filter(lambda line: “Spark” in line)
我們還可以將actions和transformation鏈接起來:
>>> textFile.filter(lambda line: “Spark” in line).count() # 有多好行含有“Spark”這一字符串 19
2. 更多的RDD操作
利用RDD的動作和轉換能夠完成很多復雜的計算。例如,我們希望找到含有最后單詞的一句話:
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a>b) else b) 22
這個語句中,map函數將len(line.split())這一語句在所有line上執行,返回每個line所含有的單詞個數,也就是將line都map到一個整數值,然后創建一個新的RDD。然后調用reduce,找到最大值。map和reduce函數里的參數是python中的匿名函數(lambda),事實上,我們這里也可以傳遞python中更頂層的函數。比如,我們先定義一個比較大小的函數,這樣我們的代碼會更容易理解:
>>> def max(a, b): . . . if a > b: . . . return a . . . else: . . . return b . . . >>> textFile.map(lambda line: len(line.split())).reduce(max) 22
Hadoop掀起了MapReduce的熱潮。在spark中,能夠更加容易的實現MapReduce
>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
上述語句中,利用flatMap, map和reduceByKey三個轉換,計算文件README.md中每個單詞出現的個數,並返回一個新的RDD,每個item的格式為(string, int),即單詞和對應的出現次數。其中,
flatMap(func):與map相似,但是每個輸入的item能夠被map到0個或者更多的輸出items上,也就是說func的返回值應當是一個Seq,而不是一個單獨的item,上述語句中,匿名函數返回的就是一句話中所含的每個單詞
reduceByKey(func):可以作用於使用“鍵-值”(K, V)形式存儲的數據集上並返回一組新的數據集(K, V),其中,每個鍵的值為聚合使用func操作的結果,這里相當於python中字典的含義。上述語句中,相當於當某個單詞出現一次時,就在這個單詞的出現次數上加1,每個單詞就是一個Key,reducByKey中的匿名函數計算單詞的出現次數。
要收集上述語句的計算結果,可以使用collect這一動作:
>>> wordCounts.collect() [(u'when', 1), (u'R,', 1), (u'including', 3), (u'computation', 1), ...]
3. 緩存Caching
Spark也支持將數據集存入集群范圍的內存緩存中。這對於需要進行重復訪問的數據非常有用,比如我們需要在一個小的數據集中執行查詢操作,或者需要執行一個迭代算法(例如PageRank)。下面,利用之前命令中得到的linesWithSpark數據集,演示緩存這一操作過程:
>>> linesWithSpark.cache() PythonRDD[26] at RDD at PythonRDD.scala:48 >>> linesWithSpark.count() 19 >>> linesWithSpark.count() 19
利用Spark去緩存一個100行的文件可能並沒什么意義。但是有趣的是,這一系列的操作可以用於非常大的數據集上,甚至含有成千上萬的節點的數據集。
4. 自含式應用程序(self-contained applications)
假設我們希望利用Spark API寫一個自含式應用程序,我們可以利用Scala,Java或者Python完成。
下邊,簡單介紹一下怎樣利用Python API (PySpark)寫一個應用程序,命名為SimpleApp.py.
在spark所在目錄下輸入:
./bin/spark-submit --master local[4] SimpleApp.py
輸出為:
Lines with a: 61, Lines with b: 27
此外,Spark自帶很多例子,可以在spark目錄下輸入下列指令查看:
# For Scala and Java, use run-example: ./bin/run-example SparkPi # For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py # For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R