【原】Learning Spark (Python版) 學習筆記(三)----工作原理、調優與Spark SQL


  周末的任務是更新Learning Spark系列第三篇,以為自己寫不完了,但為了改正拖延症,還是得完成給自己定的任務啊 = =。這三章主要講Spark的運行過程(本地+集群),性能調優以及Spark SQL相關的知識,如果對Spark不熟的同學可以先看看之前總結的兩篇文章:

  【原】Learning Spark (Python版) 學習筆記(一)----RDD 基本概念與命令

  【原】Learning Spark (Python版) 學習筆記(二)----鍵值對、數據讀取與保存、共享特性

 

 

########################################我是正文分割線######################################

 

  第七章主要講了Spark的運行架構以及在集群上的配置,這部分文字比較多,可能會比較枯燥,主要是講整個過程是怎么運行的。首先我們來了解一下Spark在分布式環境中的架構,如圖1 所示

圖1 Spark分布式結構圖

 

  如上圖所示,在Spark集群中有一個節點負責中央協調,調度各個分布式工作節點。這個中央協調點叫“驅動器節點(Driver)”,與之對應的工作節點叫“執行器節點(executor)”。驅動器節點和所有的執行器節點被稱為一個Spark應用(Application)。Spark應用通過一個“集群管理器(Cluster Manager)”的外部服務在集群中的機器上啟動,其中它自帶的集群管理器叫“獨立集群管理器”。

  驅動器節點:

  作用

  • 執行程序中的main()方法的進程,一旦終止,Spark應用也終止了。

  職責

  • 把用戶程序轉化為任務
    • 用戶輸入數據,創建了一系列RDD,再使用Transformation操作生成新的RDD,最后啟動Action操作存儲RDD中的數據,由此構成了一個有向無環圖(DAG)。當Drive啟動時,Spark會執行這些命令,並轉為一系列stage(步驟)來操作。在這些步驟中,包含了多個task(任務),這些task被打包送到集群中,就可以進行分布式的運算了,是不是像流水線上的工人呢~
  • 為執行器節點調度任務:
    • Driver啟動后,必須在各執行器進程間協調各個任務。執行器進程啟動后會在Driver上注冊自己的節點,這樣Driver就有所有執行器節點的完整記錄了。每個執行器節點代表一個能夠處理任務和存儲RDD數據的進程。Spark會根據當前任務的執行器節點集合,嘗試把所有的任務基於數據所在的位置分配給合適的執行器進程。當我們的任務執行時,執行器進程會把緩存數據存儲起來,而驅動器進程同樣也會跟蹤這些緩存數據的任務,並利用這些位置信息來調度以后的任務,以盡量減少數據的網絡傳輸。

  

  執行器節點:

  作用:

  • 負責在Spark作業中運行任務,各個任務間相互獨立。Spark啟動應用時,執行器節點就被同時啟動,並一直持續到Spark應用結束。

  職責:

  • 負責運行組成Spark應用的任務,並將結果返回給驅動器程序。
  • 通過自身的塊管理器(Block Manager)為用戶程序中要求緩存的RDD提供內存式存儲。RDD是直接緩存在執行器進程里的,所以可以在運行時充分利用緩存數據提高運算速度。

 

  集群管理器:

  在圖一中我們看到,Spark依賴於集群管理器來啟動執行器節點,而在某些特殊情況下,也會依賴集群管理器來啟動驅動器節點。Spark有自帶的獨立集群管理器,也可以運行在其他外部集群管理器上,如YARN和Mesos等。下面講一下兩種比較常見的外部集群管理器:

  獨立集群管理器:

1.啟動獨立集群管理器

2.提交應用:spark-submit --master spark://masternode:7077 yourapp

  支持兩種部署模式:客戶端模式和集群模式

3.配置資源用量:在多個應用間共享Spark集群時,通過以下兩個設置來對執行器進程分配資源:

  3.1 執行器進程內存:可以通過spark-submit中的 --executor-memory 參數來配置。每個應用在每個工作節點上最多擁有一個執行器進程。因此這個這個能夠控制         執行器節點占用工作節點多少內存。默認值是1G。

  3.2 占用核心總數的最大值:可以通過spark-submit中的 --total -executorcores 參數來設置。

  

  Hadoop YARN:

1.提交應用:設置指向你的Hadoop配置目錄的環境變量,然后使用spark-submit 向一個特殊的主節點URL提交作業即可。

2.配置資源用量:

  • --num -executors :設置執行器節點,默認值為2
  • --executor -memory: 設置每個執行器的內存用量
  • --executor -cores: 設置每個執行器進程從YARN中占用的核心數目
  • --queue:設置隊列名稱,YARN可以將應用調度到多個隊列中。

 

   Apache Mesos:

1.提交應用:spark-submit --master mesos://masternode:5050 your app

2.Mesos調度模式:兩種:

  • 細粒度模式:默認模式。一台運行了多個執行器進程的機器可以動態共享CPU資源
  • 粗粒度模式:Spark為每個執行器分配固定數量的CPU數目,並且在應用結束前不會釋放該資源,即使執行器進程當前沒有運行任務(多浪費啊  = =)。可以通過spark-submit 傳遞 --conf spark.mesos.coarse=true 來打開粗粒度模式

3.部署模式:僅支持以客戶端的部署模式運行應用,即驅動器程序必須運行提交應用的那台機器上。

4.配置資源用量:

  • --executor -memory:設置每個執行器進程的內存
  • --total -executor -cores :設置應用占用的核心數(所有執行器節點占用的總數)的最大值。如果不設置該值,Mesos可能會使用急群眾所有可用的核心。

 

   選擇合適的集群管理器:

1.一般情況下,可以直接選擇獨立集群模式,功能全,而且簡單。

2.如果要在使用Spark的同時使用其他應用,可以選擇YARN或Mesos。而且大多數版本的Hadoop中已經預裝好YARN了,非常方便。

3.對於多用戶同事運行交互式shell時,可以選擇Mesos(選擇細粒度模式),這種模式可以將Spark-shell這樣的交互式應用中的不同命令分配到不同的CPU上。

4.任何時候,最好把Spark運行在運行HDFS的節點上,可以快速訪問存儲。

 

  提交應用:

  使用spark-submit腳本提交應用,可以根據不同的情況設置成在本地運行和在集群運行等:

  • 本地模式:bin/spark-submit (--local) my_script.py

  (lcoal可以省略)

  • 集群模式:bin/spark-submit --master spark://host:7077 --executor-memory 10g my_script.py

  (--master標記要連接的集群的URL)

 

  

  總結一下Spark在集群上的運行過程:

 

 

 

#########################################我是看累了休息會兒的分割線##############################

 

 

  前面已經講完了Spark的運行過程,包括本地和集群上的。現在我們來講講Spark的調優與調試。

  我們知道,Spark執行一個應用時,由作業、任務和步驟組成。先回顧一下:

任務:Spark的最小工作單位

步驟:由多個任務組成

作業:由一個或多個作業組成

  

  在第一篇中我們也講過,當我們創建轉化(Transformation)RDD時,是執行"Lazy"(惰性)計算的,只有當出現Action操作時才會觸發真正的計算。而Action操作是如何調用Transformation計算的呢?實際上,Spark調度器會創建出用於計算Action操作的RDD物理執行計划,當它從最終被調用Action操作的RDD時,向上回溯所有必需計算的RDD。調度器會訪問RDD的父節點、父節點的父節點,以此類推,遞歸向上生成計算所有必要的祖先RDD的物理計划。

  然而,當調度器圖與執行步驟的對應關系並不一定是一對一的。當RDD不需要混洗數據就可以從父節點計算出來,RDD不需要混洗數據就可以從父節點計算出來,或把多個RDD合並到一個步驟中時,調度器就會自動進行進行"流水線執行"(pipeline)。例如下圖中,盡管有很多級父RDD,但從縮進來看,只有兩個步驟,說明物理執行只需要兩個步驟。因為這個執行序列中有幾個連續的篩選和映射操作,所以才會出現流水線執行。

 

  

 

  當步驟圖確定下來后,任務就會被創建出來並發給內部的調度器,這些步驟會以特定的順序執行。一個物理步驟會啟動很多任務,每個任務都是在不同的數據分區上做同樣的事情,任務內部的流程是一樣的,如下所示:

1.從數據存儲(輸入RDD)或已有RDD(已緩存的RDD)或數據混洗的輸出中獲取輸入數據

2.執行必要的操作來計算RDD。

3.把輸出寫到一個數據混洗文件中,寫入外部存儲,或是發揮驅動器程序。

  

  總結一下Spark執行的流程

  • 用戶定義RDD的有向無環圖(DAG):RDD上的操作會創建出新的RDD,並引用它們的父節點,這樣就創建出了一個圖。
  • Action操作把有向無環圖強制轉譯為執行計划:Spark調度器提交一個作業來計算所必要的RD,這個作業包含一個或多個步驟,每個步驟就是一些並行執行的計算任務。一個步驟對應有向無環圖中的一個或多個RDD(其中對應多個RDD是在"流水線執行"中發生的
  • 在集群中調度並執行任務:步驟是按順序處理的,任務則獨立啟動來計算RDD的一部分。當作業的最后一個步驟結束時,一個Action操作也執行完了。

 

  Spark調優

  到這里我們已經基本了解Spark的內部工作原理了,那么在哪些地方可以進行調優呢?有以下四個方面:

  並行度

  • 影響性能的兩個方面
    • a.並行度過低時,會出現資源限制的情況。此時可以提高並行度來充分利用更多的計算core。
    • b.並行度過高時,每個分區產生的間接開銷累計起來會更大。評價並行度是否過高可以看你的任務是不是在瞬間(毫秒級)完成的,或者任務是不是沒有讀寫任何數據。
  • 調優方法
    • 在數據混洗操作時,對混洗后的RDD設定參數制定並行度
    • 對於任何已有的RDD進行重新分區來獲取更多/更少的分區數。重新分區:repartition();減少分區:coalesce(),比repartition()更高效。

  

  序列化格式

  當Spark需要通過網絡傳輸數據,或者將數據溢出寫到磁盤上時(默認存儲方式是內存存儲),Spark需要數據序列化為二進制格式。默認情況下,使用Java內建的序列化庫。當然,也支持使用第三方序列化庫Kryo,比Java序列化時間更短,並且有更高壓縮比的二進制表示。但有一點需要注意:Kryo不能序列化全部類型的對象。

  

  內存管理

  • RDD存儲(60%)
    • 調用persisit()或cahe()方法時,RDD的分區會被存儲到緩存區中。Spark會根據spark.storage.memoryFraction限制用來緩存的內存占整個JVM堆空間的比例大小。超出限制的話,舊的分區會被移出內存。
  • 數據混洗與聚合的緩存區(20%)
    • 當數據進行數據混洗時,Spark會創造一些中間緩存區來存儲數據混洗的輸出數據。根據spark.shuffle.memoryFraction限定這種緩存區占總內存的比例。
  • 用戶的代碼(20%)
    • spark可以執行任意代碼,所以用戶的代碼可以申請大量內存,它可以訪問JVM堆空間中除了分配給RDD存儲和數據混洗存儲以外的全部空間。20%是默認情況下的分配比例。不過用戶可以自行調節這個比例來提高性能。

  

  當然,除了調整內存比例,也可以改變內存的存儲順序。我們知道,Spark默認的cache()操作是以Memory_ONLY的存儲等級持久化數據的,也就是說內存優先。如果RDD分區時的空間不夠,舊的分區會直接刪除。(妹的刪數據也不帶打聲招呼的 = =!)當用到這些分區時,又會重新進行計算。所以,如果我們用Memory_AND_DISK的存儲等級調用persist()方法效果會更好。因為當內存滿的時候,放不下的舊分區會被寫入磁盤,再用的時候就從磁盤里讀取回來,這樣比重新計算各分區的消耗要小得多,性能也更穩定(不會動不動報Memory Error了,哈哈)。特別是當RDD從數據庫中讀取數據的話,最好選擇內存+磁盤的存儲等級吧。

 

  硬件供給

影響集群規模的主要這幾個方面:分配給每個執行器節點的內存大小、每個執行器節點占用的核心數、執行器節點總數、以及用來存儲臨時數據的本地磁盤數量(在數據混洗使用Memory_AND_DISK的存儲等級時,更大的磁盤可以提升Spark的性能哦~)。

 

 

##################################我是文章快結束的分割線######################################

 

 

  最后我們來講講Spark SQL,上一篇中我們已經總結了如何使用Spark讀取和保存文件,涉及到了這部分內容,所以這一篇中只會簡要的說明一下:

  導入Spark SQL與簡單的查詢示例

 1 #初始化Spark SQL
 2 #導入Spark SQL
 3 from pyspark.sql import HiveContext,Row  4 #當不能引入Hive依賴時
 5 from pyspark.sql import SQLContext,Row  6 #創建SQL上下文環境
 7 hiveCtx = HiveContext(sc)  8 #基本查詢示例
 9 input = hiveCtx.jsonFile(inputFile) 10 #注冊輸入的SchemaRDD(SchemaRDD在Spark 1.3版本后已經改為DataFrame)
11 input.registerTempTable("tweets") 12 #依據retweetCount(轉發計數)選出推文
13 topTweets = hiveCtx.sql("SELECT text,retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")

 

  緩存

以一種列式存儲格式在內存中存儲數據。這些緩存下來的表只會在Driver的生命周期內保留在內存中,退出的話就沒有了。可以通過cache() 和 uncache()命令來緩存表或者刪除已緩存的表。

 

 

  讀取和存儲數據

  Apache Hive

1 #使用Python從Hive中讀取
2 from pyspark.sql import HiveContext 3 
4 hiveCtx = HiveContext(sc) 5 rows = hiveCtx.sql("SELECT key,value FROM mytable") 6 keys = rows.map(lambda: row,row[0])

 

  Parquet

 1 #Python中的Parquet數據讀取
 2 #從一個有name和favoriteAnimal字段的Parquet文件中讀取數據
 3 rows = hiveCtx.parquetFile(parquetFile)  4 names = rows.map(lambda row: row.name)  5 print "Everyone"
 6 print names.collect()  7 
 8 #Python中的Parquet數據查詢
 9 #這里把Parquet文件注冊為Spark SQL的臨時表來查詢數據
10 #尋找熊貓愛好者
11 tbl = rows.registerTempTable("people") 12 pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal = \"panda\"") 13 print "Panda friends"
14 print pandaFriends.map(lambda row:row.name).collect() 15 
16 #使用saveAsParquetFile()保存文件
17 pandaFriends.saveAsParqueFile("hdfs://")

 

  JSON

 1 #在python中讀取JSON數據 2 input= hiveCtx.jsonFile(inputFile) 

 

  使用BeeLine

  創建、列舉、查詢Hive表

 

  用戶自定義函數(UDF)

1 #Python版本的字符串長度UDF
2 hiveCtx.registerFuction("strLenPython",lambda x :len(x),IntegerType()) 3 LengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10")

 

  Spark SQL性能

  Spark SQL在緩存數據時,使用的是內存式的列式存儲,即Parquet格式,不僅節約了緩存時間,而且盡可能的減少了后續查詢中針對某幾個字段時的數據讀取。

性能調優選項

選項 默認值 用途
spark.sql.codegen false 設為True時,Spark SQL會把每條查詢語句在運行時編譯為Java二進制代碼。這可以提高大型查詢的性能,但在小規模查詢時會變慢
spark.sql.inMemoryColumnarStorage.compressed false 自動對內存中的列式存儲進行壓縮
spark.sql.inMemoryColumnarStorage.batchSize 1000 列式緩存時的每個批處理的大小。把這個值調大可能會導致內存不夠的異常
spark.sql.parquet.compression.codec snappy
選擇不同的壓縮編碼器。可選項包括uncompressed/snappy/gzip/lzo

 

  到這里,第七章-第九章的內容就全部總結完了,看完之后會對Spark的運行過程,性能調優以及存儲格式等有一個更清晰的概念。下一篇是最后一篇,5.15更新,主要講Spark Streaming和Spark MLlib機器學習的內容。順便也可以和PySpark做機器學習做一下對比:D

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM