Hadoop是對大數據集進行分布式計算的標准工具,這也是為什么當你穿過機場時能看到”大數據(Big Data)”廣告的原因。它已經成為大數據的操作系統,提供了包括工具和技巧在內的豐富生態系統,允許使用相對便宜的商業硬件集群進行超級計算機級別的計算。2003和2004年,兩個來自Google的觀點使Hadoop成為可能:一個分布式存儲框架(Google文件系統),在Hadoop中被實現為HDFS;一個分布式計算框架(MapReduce)。
這兩個觀點成為過去十年規模分析(scaling analytics)、大規模機器學習(machine learning),以及其他大數據應用出現的主要推動力!但是,從技術角度上講,十年是一段非常長的時間,而且Hadoop還存在很多已知限制,尤其是MapReduce。對MapReduce編程明顯是困難的。對大多數分析,你都必須用很多步驟將Map和Reduce任務串接起來。這造成類SQL的計算或機器學習需要專門的系統來進行。更糟的是,MapReduce要求每個步驟間的數據要序列化到磁盤,這意味着MapReduce作業的I/O成本很高,導致交互分析和迭代算法(iterative algorithms)開銷很大;而事實是,幾乎所有的最優化和機器學習都是迭代的。
為了解決這些問題,Hadoop一直在向一種更為通用的資源管理框架轉變,即YARN(Yet Another Resource Negotiator, 又一個資源協調者)。YARN實現了下一代的MapReduce,但同時也允許應用利用分布式資源而不必采用MapReduce進行計算。通過將集群管理一般化,研究轉到分布式計算的一般化上,來擴展了MapReduce的初衷。
Spark是第一個脫胎於該轉變的快速、通用分布式計算范式,並且很快流行起來。Spark使用函數式編程范式擴展了MapReduce模型以支持更多計算類型,可以涵蓋廣泛的工作流,這些工作流之前被實現為Hadoop之上的特殊系統。Spark使用內存緩存來提升性能,因此進行交互式分析也足夠快速(就如同使用Python解釋器,與集群進行交互一樣)。緩存同時提升了迭代算法的性能,這使得Spark非常適合數據理論任務,特別是機器學習。
本文中,我們將首先討論如何在本地機器上或者EC2的集群上設置Spark進行簡單分析。然后,我們在入門級水平探索Spark,了解Spark是什么以及它如何工作(希望可以激發更多探索)。最后兩節我們開始通過命令行與Spark進行交互,然后演示如何用Python寫Spark應用,並作為Spark作業提交到集群上。
設置Spark
在本機設置和運行Spark非常簡單。你只需要下載一個預構建的包,只要你安裝了Java 6+和Python 2.6+,就可以在Windows、Mac OS X和Linux上運行Spark。確保java程序在PATH環境變量中,或者設置了JAVA_HOME環境變量。類似的,python也要在PATH中。
假設你已經安裝了Java和Python:
- 訪問Spark下載頁
- 選擇Spark最新發布版(本文寫作時是1.2.0),一個預構建的Hadoop 2.4包,直接下載。
現在,如何繼續依賴於你的操作系統,靠你自己去探索了。Windows用戶可以在評論區對如何設置的提示進行評論。
一般,我的建議是按照下面的步驟(在POSIX操作系統上):
1.解壓Spark
~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz
2.將解壓目錄移動到有效應用程序目錄中(如Windows上的
~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0
3.創建指向該Spark版本的符號鏈接到<spark目錄。這樣你可以簡單地下載新/舊版本的Spark,然后修改鏈接來管理Spark版本,而不用更改路徑或環境變量。
~$ ln -s /srv/spark-1.2.0 /srv/spark
4.修改BASH配置,將Spark添加到PATH中,設置SPARK_HOME環境變量。這些小技巧在命令行上會幫到你。在Ubuntu上,只要編輯~/.bash_profile或~/.profile文件,將以下語句添加到文件中:
export SPARK_HOME=/srv/spark
export PATH=$SPARK_HOME/bin:$PATH
5.source這些配置(或者重啟終端)之后,你就可以在本地運行一個pyspark解釋器。執行pyspark命令,你會看到以下結果:
~$ pyspark Python 2.7.8 (default, Dec 2 2014, 12:45:58) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin Type "help", "copyright", "credits" or "license" for more information. Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties [… snip …] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ `_/ /__ / .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Python version 2.7.8 (default, Dec 2 2014 12:45:58) SparkContext available as sc. >>>
現在Spark已經安裝完畢,可以在本機以”單機模式“(standalone mode)使用。你可以在本機開發應用並提交Spark作業,這些作業將以多進程/多線程模式運行的,或者,配置該機器作為一個集群的客戶端(不推薦這樣做,因為在Spark作業中,驅動程序(driver)是個很重要的角色,並且應該與集群的其他部分處於相同網絡)。可能除了開發,你在本機使用Spark做得最多的就是利用spark-ec2腳本來配置Amazon雲上的一個EC2 Spark集群了。
簡略Spark輸出
Spark(和PySpark)的執行可以特別詳細,很多INFO日志消息都會打印到屏幕。開發過程中,這些非常惱人,因為可能丟失Python棧跟蹤或者print的輸出。為了減少Spark輸出 – 你可以設置$SPARK_HOME/conf下的log4j。首先,拷貝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉“.template”擴展名。
~$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties
編輯新文件,用WARN替換代碼中出現的INFO。你的log4j.properties文件類似:
# Set everything to be logged to the console log4j.rootCategory=WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN
現在運行PySpark,輸出消息將會更簡略!感謝@genomegeek在一次District Data Labs的研討會中指出這一點。
在Spark中使用IPython Notebook
當搜索有用的Spark小技巧時,我發現了一些文章提到在PySpark中配置IPython notebook。IPython notebook對數據科學家來說是個交互地呈現科學和理論工作的必備工具,它集成了文本和Python代碼。對很多數據科學家,IPython notebook是他們的Python入門,並且使用非常廣泛,所以我想值得在本文中提及。
這里的大部分說明都來改編自IPython notebook: 在PySpark中設置IPython。但是,我們將聚焦在本機以單機模式將IPtyon shell連接到PySpark,而不是在EC2集群。如果你想在一個集群上使用PySpark/IPython,查看並評論下文的說明吧!
- 1.為Spark創建一個iPython notebook配置
~$ ipython profile create spark [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_config.py' [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_notebook_config.py' [ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_nbconvert_config.py'
記住配置文件的位置,替換下文各步驟相應的路徑:
2.創建文件$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py,並添加如下代碼:
import os import sys # Configure the environment if 'SPARK_HOME' not in os.environ: os.environ['SPARK_HOME'] = '/srv/spark' # Create a variable for our root path SPARK_HOME = os.environ['SPARK_HOME'] # Add the PySpark/py4j to the Python Path sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build")) sys.path.insert(0, os.path.join(SPARK_HOME, "python"))
3.使用我們剛剛創建的配置來啟動IPython notebook。
~$ ipython notebook --profile spark
4.在notebook中,你應該能看到我們剛剛創建的變量。
print SPARK_HOME
5.在IPython notebook最上面,確保你添加了Spark context。
from pyspark import SparkContext sc = SparkContext( 'local', 'pyspark')
6.使用IPython做個簡單的計算來測試Spark context。
def isprime(n): """ check if integer n is a prime """ # make sure n is a positive integer n = abs(int(n)) # 0 and 1 are not primes if n < 2: return False # 2 is the only even prime number if n == 2: return True # all other even numbers are not primes if not n & 1: return False # range starts with 3 and only needs to go up the square root of n # for all odd numbers for x in range(3, int(n**0.5)+1, 2): if n % x == 0: return False return True # Create an RDD of numbers from 0 to 1,000,000 nums = sc.parallelize(xrange(1000000)) # Compute the number of primes in the RDD print nums.filter(isprime).count()
如果你能得到一個數字而且沒有錯誤發生,那么你的context正確工作了!
編輯提示:上面配置了一個使用PySpark直接調用IPython notebook的IPython context。但是,你也可以使用PySpark按以下方式直接啟動一個notebook: $ IPYTHON_OPTS=”notebook –pylab inline” pyspark
哪個方法好用取決於你使用PySpark和IPython的具體情景。前一個允許你更容易地使用IPython notebook連接到一個集群,因此是我喜歡的方法。
在EC2上使用Spark
在講授使用Hadoop進行分布式計算時,我發現很多可以通過在本地偽分布式節點(pseudo-distributed node)或以單節點模式(single-node mode)講授。但是為了了解真正發生了什么,就需要一個集群。當數據變得龐大,這些書面講授的技能和真實計算需求間經常出現隔膜。如果你肯在學習詳細使用Spark上花錢,我建議你設置一個快速Spark集群做做實驗。 包含5個slave(和1個master)每周大概使用10小時的集群每月大概需要$45.18。
完整的討論可以在Spark文檔中找到:在EC2上運行Spark在你決定購買EC2集群前一定要通讀這篇文檔!我列出了一些關鍵點:
- 通過AWS Console獲取AWS EC2 key對(訪問key和密鑰key)。
- 將key對導出到你的環境中。在shell中敲出以下命令,或者將它們添加到配置中。
export AWS_ACCESS_KEY_ID=myaccesskeyid
export AWS_SECRET_ACCESS_KEY=mysecretaccesskey
注意不同的工具使用不同的環境名稱,確保你用的是Spark腳本所使用的名稱。
3.啟動集群:
~$ cd $SPARK_HOME/ec2
ec2$ ./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> launch <cluster-name>
4.SSH到集群來運行Spark作業。
ec2$ ./spark-ec2 -k <keypair> -i <key-file> login <cluster-name>
5.銷毀集群
ec2$ ./spark-ec2 destroy <cluster-name>.
這些腳本會自動創建一個本地的HDFS集群來添加數據,copy-dir命令可以同步代碼和數據到該集群。但是你最好使用S3來存儲數據,創建使用s3://URI來加載數據的RDDs。
Spark是什么?
既然設置好了Spark,現在我們討論下Spark是什么。Spark是個通用的集群計算框架,通過將大量數據集計算任務分配到多台計算機上,提供高效內存計算。如果你熟悉Hadoop,那么你知道分布式計算框架要解決兩個問題:如何分發數據和如何分發計算。Hadoop使用HDFS來解決分布式數據問題,MapReduce計算范式提供有效的分布式計算。類似的,Spark擁有多種語言的函數式編程API,提供了除map和reduce之外更多的運算符,這些操作是通過一個稱作彈性分布式數據集(resilient distributed datasets, RDDs)的分布式數據框架進行的。
本質上,RDD是種編程抽象,代表可以跨機器進行分割的只讀對象集合。RDD可以從一個繼承結構(lineage)重建(因此可以容錯),通過並行操作訪問,可以讀寫HDFS或S3這樣的分布式存儲,更重要的是,可以緩存到worker節點的內存中進行立即重用。由於RDD可以被緩存在內存中,Spark對迭代應用特別有效,因為這些應用中,數據是在整個算法運算過程中都可以被重用。大多數機器學習和最優化算法都是迭代的,使得Spark對數據科學來說是個非常有效的工具。另外,由於Spark非常快,可以通過類似Python REPL的命令行提示符交互式訪問。
Spark庫本身包含很多應用元素,這些元素可以用到大部分大數據應用中,其中包括對大數據進行類似SQL查詢的支持,機器學習和圖算法,甚至對實時流數據的支持。
核心組件如下:
- Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是構建在RDD和Spark Core之上的。
- Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行交互的API。每個數據庫表被當做一個RDD,Spark SQL查詢被轉換為Spark操作。對熟悉Hive和HiveQL的人,Spark可以拿來就用。
- Spark Streaming:允許對實時數據流進行處理和控制。很多實時數據庫(如Apache Store)可以處理實時數據。Spark Streaming允許程序能夠像普通RDD一樣處理實時數據。
- MLlib:一個常用機器學習算法庫,算法被實現為對RDD的Spark操作。這個庫包含可擴展的學習算法,比如分類、回歸等需要對大量數據集進行迭代的操作。之前可選的大數據機器學習庫Mahout,將會轉到Spark,並在未來實現。
- GraphX:控制圖、並行圖操作和計算的一組算法和工具的集合。GraphX擴展了RDD API,包含控制圖、創建子圖、訪問路徑上所有頂點的操作。
由於這些組件滿足了很多大數據需求,也滿足了很多數據科學任務的算法和計算上的需要,Spark快速流行起來。不僅如此,Spark也提供了使用Scala、Java和Python編寫的API;滿足了不同團體的需求,允許更多數據科學家簡便地采用Spark作為他們的大數據解決方案。
對Spark編程
編寫Spark應用與之前實現在Hadoop上的其他數據流語言類似。代碼寫入一個惰性求值的驅動程序(driver program)中,通過一個動作(action),驅動代碼被分發到集群上,由各個RDD分區上的worker來執行。然后結果會被發送回驅動程序進行聚合或編譯。本質上,驅動程序創建一個或多個RDD,調用操作來轉換RDD,然后調用動作處理被轉換后的RDD。
這些步驟大體如下:
- 定義一個或多個RDD,可以通過獲取存儲在磁盤上的數據(HDFS,Cassandra,HBase,Local Disk),並行化內存中的某些集合,轉換(transform)一個已存在的RDD,或者,緩存或保存。
- 通過傳遞一個閉包(函數)給RDD上的每個元素來調用RDD上的操作。Spark提供了除了Map和Reduce的80多種高級操作。
- 使用結果RDD的動作(action)(如count、collect、save等)。動作將會啟動集群上的計算。
當Spark在一個worker上運行閉包時,閉包中用到的所有變量都會被拷貝到節點上,但是由閉包的局部作用域來維護。Spark提供了兩種類型的共享變量,這些變量可以按照限定的方式被所有worker訪問。廣播變量會被分發給所有worker,但是是只讀的。累加器這種變量,worker可以使用關聯操作來“加”,通常用作計數器。
Spark應用本質上通過轉換和動作來控制RDD。后續文章將會深入討論,但是理解了這個就足以執行下面的例子了。
Spark的執行
簡略描述下Spark的執行。本質上,Spark應用作為獨立的進程運行,由驅動程序中的SparkContext協調。這個context將會連接到一些集群管理者(如YARN),這些管理者分配系統資源。集群上的每個worker由執行者(executor)管理,執行者反過來由SparkContext管理。執行者管理計算、存儲,還有每台機器上的緩存。
重點要記住的是應用代碼由驅動程序發送給執行者,執行者指定context和要運行的任務。執行者與驅動程序通信進行數據分享或者交互。驅動程序是Spark作業的主要參與者,因此需要與集群處於相同的網絡。這與Hadoop代碼不同,Hadoop中你可以在任意位置提交作業給JobTracker,JobTracker處理集群上的執行。
與Spark交互
使用Spark最簡單的方式就是使用交互式命令行提示符。打開PySpark終端,在命令行中打出pyspark。
~$ pyspark
[… snip …]
>>>
PySpark將會自動使用本地Spark配置創建一個SparkContext。你可以通過sc變量來訪問它。我們來創建第一個RDD。
>>> text = sc.textFile("shakespeare.txt") >>> print text shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
textFile方法將莎士比亞全部作品加載到一個RDD命名文本。如果查看了RDD,你就可以看出它是個MappedRDD,文件路徑是相對於當前工作目錄的一個相對路徑(記得傳遞磁盤上正確的shakespear.txt文件路徑)。我們轉換下這個RDD,來進行分布式計算的“hello world”:“字數統計”。
>>> from operator import add >>> def tokenize(text): ... return text.split() ... >>> words = text.flatMap(tokenize) >>> print words PythonRDD[2] at RDD at PythonRDD.scala:43
我們首先導入了add操作符,它是個命名函數,可以作為加法的閉包來使用。我們稍后再使用這個函數。首先我們要做的是把文本拆分為單詞。我們創建了一個tokenize函數,參數是文本片段,返回根據空格拆分的單詞列表。然后我們通過給flatMap操作符傳遞tokenize閉包對textRDD進行變換創建了一個wordsRDD。你會發現,words是個PythonRDD,但是執行本應該立即進行。顯然,我們還沒有把整個莎士比亞數據集拆分為單詞列表。
如果你曾使用MapReduce做過Hadoop版的“字數統計”,你應該知道下一步是將每個單詞映射到一個鍵值對,其中鍵是單詞,值是1,然后使用reducer計算每個鍵的1總數。
首先,我們map一下。
>>> wc = words.map(lambda x: (x,1)) >>> print wc.toDebugString() (2) PythonRDD[3] at RDD at PythonRDD.scala:43 | shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 | shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2
我使用了一個匿名函數(用了Python中的lambda關鍵字)而不是命名函數。這行代碼將會把lambda映射到每個單詞。因此,每個x都是一個單詞,每個單詞都會被匿名閉包轉換為元組(word, 1)。為了查看轉換關系,我們使用toDebugString方法來查看PipelinedRDD是怎么被轉換的。可以使用reduceByKey動作進行字數統計,然后把統計結果寫到磁盤。
>>> counts = wc.reduceByKey(add) >>> counts.saveAsTextFile("wc")
一旦我們最終調用了saveAsTextFile動作,這個分布式作業就開始執行了,在作業“跨集群地”(或者你本機的很多進程)運行時,你應該可以看到很多INFO語句。如果退出解釋器,你可以看到當前工作目錄下有個“wc”目錄。
$ ls wc/
_SUCCESS part-00000 part-00001
每個part文件都代表你本機上的進程計算得到的被保持到磁盤上的最終RDD。如果對一個part文件進行head命令,你應該能看到字數統計元組。
$ head wc/part-00000 (u'fawn', 14) (u'Fame.', 1) (u'Fame,', 2) (u'kinghenryviii@7731', 1) (u'othello@36737', 1) (u'loveslabourslost@51678', 1) (u'1kinghenryiv@54228', 1) (u'troilusandcressida@83747', 1) (u'fleeces', 1) (u'midsummersnightsdream@71681', 1)
注意這些鍵沒有像Hadoop一樣被排序(因為Hadoop中Map和Reduce任務中有個必要的打亂和排序階段)。但是,能保證每個單詞在所有文件中只出現一次,因為你使用了reduceByKey操作符。你還可以使用sort操作符確保在寫入到磁盤之前所有的鍵都被排過序。
編寫一個Spark應用
編寫Spark應用與通過交互式控制台使用Spark類似。API是相同的。首先,你需要訪問<SparkContext,它已經由<pyspark自動加載好了。
使用Spark編寫Spark應用的一個基本模板如下:
## Spark Application - execute with spark-submit ## Imports from pyspark import SparkConf, SparkContext ## Module Constants APP_NAME = "My Spark Application" ## Closure Functions ## Main functionality def main(sc): pass if __name__ == "__main__": # Configure Spark conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster("local[*]") sc = SparkContext(conf=conf) # Execute Main functionality main(sc)
這個模板列出了一個Spark應用所需的東西:導入Python庫,模塊常量,用於調試和Spark UI的可識別的應用名稱,還有作為驅動程序運行的一些主要分析方法學。在ifmain中,我們創建了SparkContext,使用了配置好的context執行main。我們可以簡單地導入驅動代碼到pyspark而不用執行。注意這里Spark配置通過setMaster方法被硬編碼到SparkConf,一般你應該允許這個值通過命令行來設置,所以你能看到這行做了占位符注釋。
使用<sc.stop()或<sys.exit(0)來關閉或退出程序。
## Spark Application - execute with spark-submit ## Imports import csv import matplotlib.pyplot as plt from StringIO import StringIO from datetime import datetime from collections import namedtuple from operator import add, itemgetter from pyspark import SparkConf, SparkContext ## Module Constants APP_NAME = "Flight Delay Analysis" DATE_FMT = "%Y-%m-%d" TIME_FMT = "%H%M" fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep', 'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance') Flight = namedtuple('Flight', fields) ## Closure Functions def parse(row): """ Parses a row and returns a named tuple. """ row[0] = datetime.strptime(row[0], DATE_FMT).date() row[5] = datetime.strptime(row[5], TIME_FMT).time() row[6] = float(row[6]) row[7] = datetime.strptime(row[7], TIME_FMT).time() row[8] = float(row[8]) row[9] = float(row[9]) row[10] = float(row[10]) return Flight(*row[:11]) def split(line): """ Operator function for splitting a line with csv module """ reader = csv.reader(StringIO(line)) return reader.next() def plot(delays): """ Show a bar chart of the total delay per airline """ airlines = [d[0] for d in delays] minutes = [d[1] for d in delays] index = list(xrange(len(airlines))) fig, axe = plt.subplots() bars = axe.barh(index, minutes) # Add the total minutes to the right for idx, air, min in zip(index, airlines, minutes): if min > 0: bars[idx].set_color('#d9230f') axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center') else: bars[idx].set_color('#469408') axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center') # Set the ticks ticks = plt.yticks([idx+ 0.5 for idx in index], airlines) xt = plt.xticks()[0] plt.xticks(xt, [' '] * len(xt)) # minimize chart junk plt.grid(axis = 'x', color ='white', linestyle='-') plt.title('Total Minutes Delayed per Airline') plt.show() ## Main functionality def main(sc): # Load the airlines lookup dictionary airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect()) # Broadcast the lookup dictionary to the cluster airline_lookup = sc.broadcast(airlines) # Read the CSV Data into an RDD flights = sc.textFile("ontime/flights.csv").map(split).map(parse) # Map the total delay to the airline (joined using the broadcast value) delays = flights.map(lambda f: (airline_lookup.value[f.airline], add(f.dep_delay, f.arv_delay))) # Reduce the total delay for the month to the airline delays = delays.reduceByKey(add).collect() delays = sorted(delays, key=itemgetter(1)) # Provide output from the driver for d in delays: print "%0.0f minutes delayed\t%s" % (d[1], d[0]) # Show a bar chart of the delays plot(delays) if __name__ == "__main__": # Configure Spark conf = SparkConf().setMaster("local[*]") conf = conf.setAppName(APP_NAME) sc = SparkContext(conf=conf) # Execute Main functionality main(sc)
使用<spark-submit命令來運行這段代碼(假設你已有ontime目錄,目錄中有兩個CSV文件):
~$ spark-submit app.py
這個Spark作業使用本機作為master,並搜索app.py同目錄下的ontime目錄下的2個CSV文件。最終結果顯示,4月的總延誤時間(單位分鍾),既有早點的(如果你從美國大陸飛往夏威夷或者阿拉斯加),但對大部分大型航空公司都是延誤的。注意,我們在app.py中使用matplotlib直接將結果可視化出來了:
這段代碼做了什么呢?我們特別注意下與Spark最直接相關的main函數。首先,我們加載CSV文件到RDD,然后把split函數映射給它。split函數使用csv模塊解析文本的每一行,並返回代表每行的元組。最后,我們將collect動作傳給RDD,這個動作把數據以Python列表的形式從RDD傳回驅動程序。本例中,airlines.csv是個小型的跳轉表(jump table),可以將航空公司代碼與全名對應起來。我們將轉移表存儲為Python字典,然后使用sc.broadcast廣播給集群上的每個節點。
接着,main函數加載了數據量更大的flights.csv([譯者注]作者筆誤寫成fights.csv,此處更正)。拆分CSV行完成之后,我們將parse函數映射給CSV行,此函數會把日期和時間轉成Python的日期和時間,並對浮點數進行合適的類型轉換。每行作為一個NamedTuple保存,名為Flight,以便高效簡便地使用。
有了Flight對象的RDD,我們映射一個匿名函數,這個函數將RDD轉換為一些列的鍵值對,其中鍵是航空公司的名字,值是到達和出發的延誤時間總和。使用reduceByKey動作和add操作符可以得到每個航空公司的延誤時間總和,然后RDD被傳遞給驅動程序(數據中航空公司的數目相對較少)。最終延誤時間按照升序排列,輸出打印到了控制台,並且使用matplotlib進行了可視化。
這個例子稍長,但是希望能演示出集群和驅動程序之間的相互作用(發送數據進行分析,結果取回給驅動程序),以及Python代碼在Spark應用中的角色。
結論
盡管算不上一個完整的Spark入門,我們希望你能更好地了解Spark是什么,如何使用進行快速、內存分布式計算。至少,你應該能將Spark運行起來,並開始在本機或Amazon EC2上探索數據。你應該可以配置好iPython notebook來運行Spark。
Spark不能解決分布式存儲問題(通常Spark從HDFS中獲取數據),但是它為分布式計算提供了豐富的函數式編程API。這個框架建立在伸縮分布式數據集(RDD)之上。RDD是種編程抽象,代表被分區的對象集合,允許進行分布式操作。RDD有容錯能力(可伸縮的部分),更重要的時,可以存儲到節點上的worker內存里進行立即重用。內存存儲提供了快速和簡單表示的迭代算法,以及實時交互分析。
由於Spark庫提供了Python、Scale、Java編寫的API,以及內建的機器學習、流數據、圖算法、類SQL查詢等模塊;Spark迅速成為當今最重要的分布式計算框架之一。與YARN結合,Spark提供了增量,而不是替代已存在的Hadoop集群,它將成為未來大數據重要的一部分,為數據科學探索鋪設了一條康庄大道。
有用的鏈接
希望你喜歡這篇博文!寫作並不是憑空而來的,以下是一些曾幫助我寫作的有用鏈接;查看這些鏈接,可能對進一步探索Spark有幫助。注意,有些圖書鏈接是推廣鏈接,意味着如果你點擊並購買了這些圖書,你將會支持District Data Labs!
這篇更多是篇入門文章,而不是District Data Labs的典型文章,有些與此入門相關的數據和代碼你可以在這里找到:
Spark論文
Spark與Hadoop一樣,有一些基礎論文,我認為那些需要對大數據集進行分布式計算的嚴謹數據科學家一定要讀。首先是HotOS(“操作系統熱門話題”的簡寫)的一篇研討會論文,簡單易懂地描述了Spark。第二個是偏理論的論文,具體描述了RDD。
- M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica, “Spark: cluster computing with working sets,” in Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, 2010, pp. 10–10.
- M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma, M. McCauley, M. J. Franklin, S. Shenker, and I. Stoica, “Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing,” in Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation, 2012, pp. 2–2.