周末的任務是更新Learning Spark系列第三篇,以為自己寫不完了,但為了改正拖延症,還是得完成給自己定的任務啊 = =。這三章主要講Spark的運行過程(本地+集群),性能調優以及Spark SQL相關的知識,如果對Spark不熟的同學可以先看看之前總結的兩篇文章:
【原】Learning Spark (Python版) 學習筆記(一)----RDD 基本概念與命令
【原】Learning Spark (Python版) 學習筆記(二)----鍵值對、數據讀取與保存、共享特性
########################################我是正文分割線######################################
第七章主要講了Spark的運行架構以及在集群上的配置,這部分文字比較多,可能會比較枯燥,主要是講整個過程是怎么運行的。首先我們來了解一下Spark在分布式環境中的架構,如圖1 所示
圖1 Spark分布式結構圖
如上圖所示,在Spark集群中有一個節點負責中央協調,調度各個分布式工作節點。這個中央協調點叫“驅動器節點(Driver)”,與之對應的工作節點叫“執行器節點(executor)”。驅動器節點和所有的執行器節點被稱為一個Spark應用(Application)。Spark應用通過一個“集群管理器(Cluster Manager)”的外部服務在集群中的機器上啟動,其中它自帶的集群管理器叫“獨立集群管理器”。
驅動器節點:
作用
- 執行程序中的main()方法的進程,一旦終止,Spark應用也終止了。
職責
- 把用戶程序轉化為任務
- 用戶輸入數據,創建了一系列RDD,再使用Transformation操作生成新的RDD,最后啟動Action操作存儲RDD中的數據,由此構成了一個有向無環圖(DAG)。當Drive啟動時,Spark會執行這些命令,並轉為一系列stage(步驟)來操作。在這些步驟中,包含了多個task(任務),這些task被打包送到集群中,就可以進行分布式的運算了,是不是像流水線上的工人呢~
- 為執行器節點調度任務:
- Driver啟動后,必須在各執行器進程間協調各個任務。執行器進程啟動后會在Driver上注冊自己的節點,這樣Driver就有所有執行器節點的完整記錄了。每個執行器節點代表一個能夠處理任務和存儲RDD數據的進程。Spark會根據當前任務的執行器節點集合,嘗試把所有的任務基於數據所在的位置分配給合適的執行器進程。當我們的任務執行時,執行器進程會把緩存數據存儲起來,而驅動器進程同樣也會跟蹤這些緩存數據的任務,並利用這些位置信息來調度以后的任務,以盡量減少數據的網絡傳輸。
執行器節點:
作用:
- 負責在Spark作業中運行任務,各個任務間相互獨立。Spark啟動應用時,執行器節點就被同時啟動,並一直持續到Spark應用結束。
職責:
- 負責運行組成Spark應用的任務,並將結果返回給驅動器程序。
- 通過自身的塊管理器(Block Manager)為用戶程序中要求緩存的RDD提供內存式存儲。RDD是直接緩存在執行器進程里的,所以可以在運行時充分利用緩存數據提高運算速度。
集群管理器:
在圖一中我們看到,Spark依賴於集群管理器來啟動執行器節點,而在某些特殊情況下,也會依賴集群管理器來啟動驅動器節點。Spark有自帶的獨立集群管理器,也可以運行在其他外部集群管理器上,如YARN和Mesos等。下面講一下兩種比較常見的外部集群管理器:
獨立集群管理器:
1.啟動獨立集群管理器
2.提交應用:spark-submit --master spark://masternode:7077 yourapp
支持兩種部署模式:客戶端模式和集群模式
3.配置資源用量:在多個應用間共享Spark集群時,通過以下兩個設置來對執行器進程分配資源:
3.1 執行器進程內存:可以通過spark-submit中的 --executor-memory 參數來配置。每個應用在每個工作節點上最多擁有一個執行器進程。因此這個這個能夠控制 執行器節點占用工作節點多少內存。默認值是1G。
3.2 占用核心總數的最大值:可以通過spark-submit中的 --total -executorcores 參數來設置。
Hadoop YARN:
1.提交應用:設置指向你的Hadoop配置目錄的環境變量,然后使用spark-submit 向一個特殊的主節點URL提交作業即可。
2.配置資源用量:
- --num -executors :設置執行器節點,默認值為2
- --executor -memory: 設置每個執行器的內存用量
- --executor -cores: 設置每個執行器進程從YARN中占用的核心數目
- --queue:設置隊列名稱,YARN可以將應用調度到多個隊列中。
Apache Mesos:
1.提交應用:spark-submit --master mesos://masternode:5050 your app
2.Mesos調度模式:兩種:
- 細粒度模式:默認模式。一台運行了多個執行器進程的機器可以動態共享CPU資源
- 粗粒度模式:Spark為每個執行器分配固定數量的CPU數目,並且在應用結束前不會釋放該資源,即使執行器進程當前沒有運行任務(多浪費啊 = =)。可以通過spark-submit 傳遞 --conf spark.mesos.coarse=true 來打開粗粒度模式
3.部署模式:僅支持以客戶端的部署模式運行應用,即驅動器程序必須運行提交應用的那台機器上。
4.配置資源用量:
- --executor -memory:設置每個執行器進程的內存
- --total -executor -cores :設置應用占用的核心數(所有執行器節點占用的總數)的最大值。如果不設置該值,Mesos可能會使用急群眾所有可用的核心。
選擇合適的集群管理器:
1.一般情況下,可以直接選擇獨立集群模式,功能全,而且簡單。
2.如果要在使用Spark的同時使用其他應用,可以選擇YARN或Mesos。而且大多數版本的Hadoop中已經預裝好YARN了,非常方便。
3.對於多用戶同事運行交互式shell時,可以選擇Mesos(選擇細粒度模式),這種模式可以將Spark-shell這樣的交互式應用中的不同命令分配到不同的CPU上。
4.任何時候,最好把Spark運行在運行HDFS的節點上,可以快速訪問存儲。
提交應用:
使用spark-submit腳本提交應用,可以根據不同的情況設置成在本地運行和在集群運行等:
- 本地模式:bin/spark-submit (--local) my_script.py
(lcoal可以省略)
- 集群模式:bin/spark-submit --master spark://host:7077 --executor-memory 10g my_script.py
(--master標記要連接的集群的URL)
總結一下Spark在集群上的運行過程:
#########################################我是看累了休息會兒的分割線##############################
前面已經講完了Spark的運行過程,包括本地和集群上的。現在我們來講講Spark的調優與調試。
我們知道,Spark執行一個應用時,由作業、任務和步驟組成。先回顧一下:
任務:Spark的最小工作單位
步驟:由多個任務組成
作業:由一個或多個作業組成
在第一篇中我們也講過,當我們創建轉化(Transformation)RDD時,是執行"Lazy"(惰性)計算的,只有當出現Action操作時才會觸發真正的計算。而Action操作是如何調用Transformation計算的呢?實際上,Spark調度器會創建出用於計算Action操作的RDD物理執行計划,當它從最終被調用Action操作的RDD時,向上回溯所有必需計算的RDD。調度器會訪問RDD的父節點、父節點的父節點,以此類推,遞歸向上生成計算所有必要的祖先RDD的物理計划。
然而,當調度器圖與執行步驟的對應關系並不一定是一對一的。當RDD不需要混洗數據就可以從父節點計算出來,RDD不需要混洗數據就可以從父節點計算出來,或把多個RDD合並到一個步驟中時,調度器就會自動進行進行"流水線執行"(pipeline)。例如下圖中,盡管有很多級父RDD,但從縮進來看,只有兩個步驟,說明物理執行只需要兩個步驟。因為這個執行序列中有幾個連續的篩選和映射操作,所以才會出現流水線執行。
當步驟圖確定下來后,任務就會被創建出來並發給內部的調度器,這些步驟會以特定的順序執行。一個物理步驟會啟動很多任務,每個任務都是在不同的數據分區上做同樣的事情,任務內部的流程是一樣的,如下所示:
1.從數據存儲(輸入RDD)或已有RDD(已緩存的RDD)或數據混洗的輸出中獲取輸入數據
2.執行必要的操作來計算RDD。
3.把輸出寫到一個數據混洗文件中,寫入外部存儲,或是發揮驅動器程序。
總結一下,Spark執行的流程:
- 用戶定義RDD的有向無環圖(DAG):RDD上的操作會創建出新的RDD,並引用它們的父節點,這樣就創建出了一個圖。
- Action操作把有向無環圖強制轉譯為執行計划:Spark調度器提交一個作業來計算所必要的RD,這個作業包含一個或多個步驟,每個步驟就是一些並行執行的計算任務。一個步驟對應有向無環圖中的一個或多個RDD(其中對應多個RDD是在"流水線執行"中發生的)
- 在集群中調度並執行任務:步驟是按順序處理的,任務則獨立啟動來計算RDD的一部分。當作業的最后一個步驟結束時,一個Action操作也執行完了。
Spark調優
到這里我們已經基本了解Spark的內部工作原理了,那么在哪些地方可以進行調優呢?有以下四個方面:
並行度
- 影響性能的兩個方面
- a.並行度過低時,會出現資源限制的情況。此時可以提高並行度來充分利用更多的計算core。
- b.並行度過高時,每個分區產生的間接開銷累計起來會更大。評價並行度是否過高可以看你的任務是不是在瞬間(毫秒級)完成的,或者任務是不是沒有讀寫任何數據。
- 調優方法
- 在數據混洗操作時,對混洗后的RDD設定參數制定並行度
- 對於任何已有的RDD進行重新分區來獲取更多/更少的分區數。重新分區:repartition();減少分區:coalesce(),比repartition()更高效。
序列化格式
當Spark需要通過網絡傳輸數據,或者將數據溢出寫到磁盤上時(默認存儲方式是內存存儲),Spark需要數據序列化為二進制格式。默認情況下,使用Java內建的序列化庫。當然,也支持使用第三方序列化庫Kryo,比Java序列化時間更短,並且有更高壓縮比的二進制表示。但有一點需要注意:Kryo不能序列化全部類型的對象。
內存管理
- RDD存儲(60%)
- 調用persisit()或cahe()方法時,RDD的分區會被存儲到緩存區中。Spark會根據spark.storage.memoryFraction限制用來緩存的內存占整個JVM堆空間的比例大小。超出限制的話,舊的分區會被移出內存。
- 數據混洗與聚合的緩存區(20%)
- 當數據進行數據混洗時,Spark會創造一些中間緩存區來存儲數據混洗的輸出數據。根據spark.shuffle.memoryFraction限定這種緩存區占總內存的比例。
- 用戶的代碼(20%)
- spark可以執行任意代碼,所以用戶的代碼可以申請大量內存,它可以訪問JVM堆空間中除了分配給RDD存儲和數據混洗存儲以外的全部空間。20%是默認情況下的分配比例。不過用戶可以自行調節這個比例來提高性能。
當然,除了調整內存比例,也可以改變內存的存儲順序。我們知道,Spark默認的cache()操作是以Memory_ONLY的存儲等級持久化數據的,也就是說內存優先。如果RDD分區時的空間不夠,舊的分區會直接刪除。(妹的刪數據也不帶打聲招呼的 = =!)當用到這些分區時,又會重新進行計算。所以,如果我們用Memory_AND_DISK的存儲等級調用persist()方法效果會更好。因為當內存滿的時候,放不下的舊分區會被寫入磁盤,再用的時候就從磁盤里讀取回來,這樣比重新計算各分區的消耗要小得多,性能也更穩定(不會動不動報Memory Error了,哈哈)。特別是當RDD從數據庫中讀取數據的話,最好選擇內存+磁盤的存儲等級吧。
硬件供給
影響集群規模的主要這幾個方面:分配給每個執行器節點的內存大小、每個執行器節點占用的核心數、執行器節點總數、以及用來存儲臨時數據的本地磁盤數量(在數據混洗使用Memory_AND_DISK的存儲等級時,更大的磁盤可以提升Spark的性能哦~)。
##################################我是文章快結束的分割線######################################
最后我們來講講Spark SQL,上一篇中我們已經總結了如何使用Spark讀取和保存文件,涉及到了這部分內容,所以這一篇中只會簡要的說明一下:
導入Spark SQL與簡單的查詢示例
1 #初始化Spark SQL
2 #導入Spark SQL
3 from pyspark.sql import HiveContext,Row 4 #當不能引入Hive依賴時
5 from pyspark.sql import SQLContext,Row 6 #創建SQL上下文環境
7 hiveCtx = HiveContext(sc) 8 #基本查詢示例
9 input = hiveCtx.jsonFile(inputFile) 10 #注冊輸入的SchemaRDD(SchemaRDD在Spark 1.3版本后已經改為DataFrame)
11 input.registerTempTable("tweets") 12 #依據retweetCount(轉發計數)選出推文
13 topTweets = hiveCtx.sql("SELECT text,retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")
緩存
以一種列式存儲格式在內存中存儲數據。這些緩存下來的表只會在Driver的生命周期內保留在內存中,退出的話就沒有了。可以通過cache() 和 uncache()命令來緩存表或者刪除已緩存的表。
讀取和存儲數據
Apache Hive
1 #使用Python從Hive中讀取
2 from pyspark.sql import HiveContext 3
4 hiveCtx = HiveContext(sc) 5 rows = hiveCtx.sql("SELECT key,value FROM mytable") 6 keys = rows.map(lambda: row,row[0])
Parquet
1 #Python中的Parquet數據讀取
2 #從一個有name和favoriteAnimal字段的Parquet文件中讀取數據
3 rows = hiveCtx.parquetFile(parquetFile) 4 names = rows.map(lambda row: row.name) 5 print "Everyone"
6 print names.collect() 7
8 #Python中的Parquet數據查詢
9 #這里把Parquet文件注冊為Spark SQL的臨時表來查詢數據
10 #尋找熊貓愛好者
11 tbl = rows.registerTempTable("people") 12 pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal = \"panda\"") 13 print "Panda friends"
14 print pandaFriends.map(lambda row:row.name).collect() 15
16 #使用saveAsParquetFile()保存文件
17 pandaFriends.saveAsParqueFile("hdfs://")
JSON
1 #在python中讀取JSON數據 2 input= hiveCtx.jsonFile(inputFile)
使用BeeLine
創建、列舉、查詢Hive表
用戶自定義函數(UDF)
1 #Python版本的字符串長度UDF
2 hiveCtx.registerFuction("strLenPython",lambda x :len(x),IntegerType()) 3 LengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10")
Spark SQL性能
Spark SQL在緩存數據時,使用的是內存式的列式存儲,即Parquet格式,不僅節約了緩存時間,而且盡可能的減少了后續查詢中針對某幾個字段時的數據讀取。
性能調優選項
選項 | 默認值 | 用途 |
spark.sql.codegen | false | 設為True時,Spark SQL會把每條查詢語句在運行時編譯為Java二進制代碼。這可以提高大型查詢的性能,但在小規模查詢時會變慢 |
spark.sql.inMemoryColumnarStorage.compressed | false | 自動對內存中的列式存儲進行壓縮 |
spark.sql.inMemoryColumnarStorage.batchSize | 1000 | 列式緩存時的每個批處理的大小。把這個值調大可能會導致內存不夠的異常 |
spark.sql.parquet.compression.codec | snappy | 選擇不同的壓縮編碼器。可選項包括uncompressed/snappy/gzip/lzo |
到這里,第七章-第九章的內容就全部總結完了,看完之后會對Spark的運行過程,性能調優以及存儲格式等有一個更清晰的概念。下一篇是最后一篇,5.15更新,主要講Spark Streaming和Spark MLlib機器學習的內容。順便也可以和PySpark做機器學習做一下對比:D