Spark性能優化之 Tungsten


轉自

 

https://blog.csdn.net/u011564172/article/details/71170176

https://www.aliyun.com/jiaocheng/436079.html

https://blog.csdn.net/sundujing/article/details/51424491

 

 

Tungsten項目概述

Tungsten號稱Spark有史以來最大的改動,其致力於提升Spark程序對內存和CPU的利用率,使性能達到硬件的極限,主要工作包含以下三個方面

  1. Memory Management and Binary Processing: leveraging application semantics to manage memory explicitly and eliminate the overhead of JVM object model and garbage collection。
  2. Cache-aware computation: algorithms and data structures to exploit memory hierarchy。
  3. Code generation: using code generation to exploit modern compilers and CPUs。

大致的內容如下

  1. Memory Management and Binary Processing: off-heap管理內存,降低對象的開銷和消除JVM GC帶來的延時。
  2. Cache-aware computation: 優化存儲,提升CPU L1/ L2/L3緩存命中率。
  3. Code generation: 優化Spark SQL的代碼生成部分,提升CPU利用率。

上述第一點和內存管理相關,是本篇文章關注的重點。

 

 

 

 

    • Spark的下一代引擎-Project Tungsten啟示錄
    • 發布時間:2018-02-28 來源:網絡 上傳者:用戶

      關鍵字: 啟示錄 下一代 Tungsten project 引擎 Spark

      發表文章
    • 摘要:由於公司被收購的原因,之前分享的博客違反了公司的制度,只好將這篇科普類的文章獨立出來放到自己的博客下面與大家交流。2015年我們一直在利用Spark做實時交互式分析系統方面的嘗試,我們在不斷受到`Spark`啟發的同時,也不得不忍受尚處於青春期的`Spark`性格中的叛逆。特別是在不斷優化系統性能過程中,發現我們實際上是在做與`ProjectTungsten`同樣的工作。不知道是該慶幸選對了方向,還是該憂傷重復發明了輪子。尤其是在對比了`ProjectTungsten`與我們


    • 由於公司被收購的原因,之前分享的博客違反了公司的制度,只好將這篇科普類的文章獨立出來放到自己的博客下面與大家交流。 
      2015年我們一直在利用Spark做實時交互式分析系統方面的嘗試,我們在不斷受到`Spark`啟發的同時,也不得不忍受尚處於青春期的`Spark`性格中的叛逆。特別是在不斷優化系統性能過程中,發現我們實際上是在做與`Project Tungsten`同樣的工作。不知道是該慶幸選對了方向,還是該憂傷重復發明了輪子。尤其是在對比了`Project Tungsten`與我們自己的實現,心中五味雜陳。不過也正是由於重復發明輪子的過程,也讓我們對`Project Tungsten`有了自己的理解,所以在這里聊一下`Project Tungsten`背后的黑科技。 
      ## 1. Project Tungsten的野心 
      如果你對ProjectTungsten還比較陌生,建議大家看一下這篇博客《Project Tungsten: Bringing Spark Closer to Bare Metal》。在這篇博客中,作者指出`Project Tungsten`是為了大幅提升`Spark`應用使用`CPU`和`Memory`的效率,讓`Spark`的性能接近硬件的極限。聽着就振奮人心,卻不禁引起大家對於`Spark`性能的疑惑,難道我們看到的比`Hadoop`快幾十到幾百倍的效率還不是性能的極限嗎?其實是遠遠沒到,實際上目前`Spark`享受的福利還僅僅是將數據放在了內存中,相對於很多其他的框架例如`Apache Drill`以及`Lucene`,`Spark`在調優方面做得工作只能說是剛剛開始。說了這么多,我們首先來分析一下`Project Tungsten`在搗鼓些什么: 
      內存管理與二進制處理(`Memory Management and Binary Processing`) 
      Cache-aware計算(`Cache-awareComputation`) 
      代碼生成(`Code Generation`) 
      看着這三個方面,你可能有很多的疑惑,這完全沒有一個主線啊,`DataBricks`真是不按常理出牌!可是如果你曾經試圖利用Java寫一個數據庫,或者其他數據密集型的應用時,你會發現這三個方向幾乎是你必然的選擇。 
      ## 1.1 內存管理與二進制處理 
      首先聊一下內存管理以及二進制處理,在相當多的場景中IO是我們程序永恆的瓶頸,我們總是試圖做批處理,基於列存儲,分區甚至是倒排索引,這一切的努力都是在解決磁盤的IO瓶頸。但是如果數據完全放入了內存之后,我們面臨的新問題是什么呢?CPU不夠用。其實我們在做實時交互式分析系統的時候就發現了這個問題,我們從來沒有如此希望增加CPU的核數,但是集群告訴我們CPU的核數是有限的。那么問題出現在哪兒?其實是我們沒有把CPU的資源用在刀刃上。以普通的`DDR 3 1666MHz`的為例,理論值能夠達到`10GB/s`的讀取能力,實際值大概在`5GB/s`附近,相對於普通機械硬盤`30MB/s`的連續讀取能力,我們可以想象一下,相對於從磁盤讀取,從內存直接讀取數據對CPU的計算能力訴求有多大。如果再考慮多通道的問題,這個數據量將會按倍數增加。面對這么多的數據,我們可憐的CPU要做哪些工作呢?第一個繁重的工作就是**序列化與反序列化**,以Java為例這個過程說白了就是完成一堆的對象和一堆二級制數據之間的相互轉化。那么為什么Spark需要序列化和反序列化呢,原因很簡單就是為了溝通,為了能夠將數據從一個實例搬到另外一個實例,這是不是讓你想到shuffle的過程。第二個繁重的工作是**創建對象**,你可能會反駁JVM new一個對象是多么得有效率,而實際上當數據像潮水般涌來,讓CPU把他們都包裝成一個個的對象寫回到內存中,這個創建的時間就不能忽略了。不僅如此,創建過多的對象帶來的是大量的內存消耗和GC時間,我們都知道GC也是要消耗CPU時間片的。有時候創建了對象還不是一切的終點,我們需要構建一個有效的數據結構,比如`HashMap`或者`HashSet`,構建這些數據結構的時候,大量的比較或者hash值計算都是CPU的天敵。`Spark RDD` 在進行join操作是就是需要傳建一個`HashMap`,所以大家可能會體會到join操作是多么得讓人着急。第三個工作可能是不停地壓縮數據和解壓縮數據,如果選擇了對序列化后的數據進行壓縮編碼處理,一旦遇到這種密集型的計算訴求,CPU就會成為絕對的性能瓶頸。 
      ## 1.2 Cache-aware 
      Cache-aware並不是一個新的概念,在上個世紀90年代的時候就有學者在這些方面做過很多的研究,有興趣的朋友可能看看`Cache-aware`(《An Overview of Cache Optimization Techniques and Cache-Aware Numerical Algorithms》)以及`Cache-oblivious`(《Cache-Oblivious Algorithms and Data Structures》)相關的算法方面的研究。單純從名字就可以看出,這個Cache-aware就是要讓大家牢記CPU是有一級和二級緩存的,CPU在讀取內存數據的時候不是一個字節一個字節地讀取的,而是按照cache line進行讀取,也就是我們印象中CPU依次讀取8個字節。更進一步我們是不是還朦朧的記得現代CPU的向量計算,也就是`SIMD Programming`,有興趣的同學可以看一下《Basics of SIMD Programming》。如果每次CPU都是在隨機的地址讀取數據,由於CPU的頻率比內存的頻率高很多,就會造成CPU長時間處於等待的狀態。那么我們的Spark是不是都在利用這些特性呢?事實上Spark是構建在JVM上的,JVM基本上都不理會以上介紹的這些CPU技能(當然JIT背后的黑魔法有可能會應用上這些特性,筆者確實沒有深入研究過),在一些特殊情況下,比如一個String類型的數據進行排序的時候,實際上首先傳入CPU的是對象的引用,在進行比較是需要重新找到對應的字節,這是一個非常耗費CPU資源的操作,產生的`cache miss`會造成程序慢上數倍。此外由於GC的存在,即使是一開始連續分配的內存,也有可能在隨后GC過程中被徹底打散,造成CPU的隨機訪問內存。如果你需要強行用上CPU這些奧義,那么你就需要打怪升級了。 
      ## 1.3 代碼生成 
      這是一個很有意思的領域,作為一個程序員如果了解了解釋器、執行計划以及代碼生成,你就會發現你下一步想要做的事就是`git clone`一下`MySQL`的源代碼了,高階的同學們就會考慮寫一個自己喜歡的語言了。實際上如果你閱讀了`spark-catalyst`以及`DataFrame`的實現之后,你會發現他們就是在搞一個內存版的基於列數據庫。回到優化這個主題上來,為什么要進行代碼生成,不知道各位有沒有寫過一個sql解析的程序,就是那個典型的`visitor`模式。你會發現在進行了AST語義解析之后你會生成一個簡單的邏輯執行計划,而這個執行計划其實就是一個嵌套了各類操作的方法。實際上這個方法是可以進行直接運行的,但是你會發現這個代碼慢的有點驚人。舉一個簡單的例子,如果用AST表達 $a+b/c+d*e$,你會得到一個三層嵌套的邏輯樹,在完成賦值操作(`ValueNode`)后,首先計算$b/c$(`BinaryNode`)、$d*e$(`BinaryNode`),然后計算$a+(b/c)$(`BinaryNode`),然后再計算$(a+b/c)+d*e$(`BinaryNode`),邏輯看着都費勁,這種pop和push棧的操作肯定也是快不起來。有沒有一種方法能夠生成一個更加高效的執行代碼呢?就是在優化階段生成一個以下的函數: 
      ```java 
      public int fun(int a,int b, int c ,intd,int e){ 
      return a + b / c + d * e; 

      ``` 
      這個代碼里面把那些煩人的BinaryNode嵌套替換成了普通的代碼,然后利用編譯器編譯一下上面的代碼,這時候你就會發現一切都變得美好了。道理就是這么簡單,但是實現過程中可能需要處理SQL協議,執行計划優化等問題,關於SQL解析大家可以看一下《SQL解析過程詳解》。生成動態代碼用什么編譯器會比較快一點,比如大家耳熟能詳的`LLVM(C++)`/`JavaCompiler(Java)`/`Janino(Java)`。不少框架為了統一接口,采用JSON的交互方式,比如`ElasticSearch`,但實際上原理都是一樣的。Spark使用了`Janino`作為代碼生成的默認`Compiler`,其實在編譯器上也沒有太多的選擇余地。 
      介紹完`Project Tungsten`的三個方向之后,你會發現一條清晰的主線,那就是構造一個更加有效地分析引擎,而且這個分析引擎的大部分優化靈感都來自於關系數據庫。與此同時我們體會到Spark的無奈,`Project Tungsten`其實是為了Spark選擇Scala而買單。 
      ### 2. Spark與JVM的緣起緣滅 
      在講`Project Tungsten`之前其實應該聊聊Spark的實現,眾所周知,Spark選擇了Scala作為實現語言。與其說選擇Scala是Spark的敗筆,還不如說是一種妥協。從語言性質上說,Scala是一個非常易於使用的語言,特別是擁有`.map().filter().reduce()`這樣優雅的接口,當你利用Spark寫一個mapreduce程序的時候,你會覺得這才是我想要的語言。但是如果你在閱讀`akka`的源代碼時,你可能就會因為晦澀的代碼而叫苦不迭。因此`Scala`是一個為API設計的語言,與此同時為了能夠復用Java的開源組件,`Scala`又選擇了JVM,於是Spark的調優就與JVM結下了不解之緣。其實選擇JVM並不是一個壞選擇,在多數情況下程序會運行地很好。但是當Spark遇到shuffle以及DataFrame時,這就是JVM不太擅長的領域了。FULL GC的長時間停頓將會嚴重影響一個SQL的執行時間,如果這個系統又是一個分布式的系統,你無法控制讓所有的實例都保持相同的GC頻率,這個時候根據木桶定律,這個SQL的執行時間取決於運行最慢的那個節點。實際上每一次執行過程中都需要面對GC的問題,根據我們的經驗,有時候在進行大規模的數據聚合操作(aggregation)時,GC的時間甚至要比執行的時間多2到3倍。這個時候我們會發現在利用Spark做一個秒級響應的分布式交互分析系統是多么地困難。當然為了解決最短的一塊木板的問題,我們可以采取speculation的方式緩解,然而這種方式沒有觸動一個核心的問題:我們能不能控制GC的回收策略?比如我們需要復用的數據是不需要GC的,臨時生成的對象是不需要放到Old Generation的,我們自己或者是Spark比JVM更加了解數據的特性。事實上JVM沒有提供方便的接口來實現這些,為了能夠稍微控制一下GC的命脈,唯一的救命稻草就是`sun.misc.Unsafe`。據說Oracle正在討論在Java 9 中刪除Unsafe類,不知道Oracle的這種執着會造成大量的框架開發者投向Golang的懷抱。實際上在Project Tungsten之前Spark已經利用Netty改造了網絡層傳輸問題,而Netty用了多少unsafe的代碼,各位同學可以自己研究一下。以上就是JVM與Spark的前世今生,不僅如此,RDD有很多的`transform`操作比如`map()`,`filter()`, `join()`實際上是在重新生成一個容器,可想而知當數據量非常大的時候,JVM要創建多少的對象,同時這些對象又有多少會進入到`Old Generation`。有的同學可能會想到復用對象,那么復用對象是不是最終的解決方法呢?只能說要看情況,比如你只是做一個SQL的查詢操作,復用是一個非常好的思路;如果你是在做一個分類器訓練的時候,也許transform后的數據會成為下面幾步迭代的輸入數據,生成一個新的Array並且`cache()`一下會讓一切變得更快。 
      ##3. Project Tungsten的葵花寶典 
      這一回我們要回到枯燥的代碼上來了,這一章還是盡量多講大道理,少擺代碼,畢竟代碼都在那兒,大家有時間可以自己看一遍。道理講明白了,解決方案也都放在哪里,剩下的事情就變得相對簡單了。我們可以來看一下在過去的一年中`Project Tungsten`針對這些問題修煉了哪些神功。 
      ###3.1 二進制處理 
      ok,我們首先翻開`Project Tungsten`的第一章,欲練此功,必先自宮。這里是不是要拋棄JVM呢?當然如果你有的是時間,並且能夠說服所有的Java程序員接受delete,我覺得放棄JVM是美好的。如果無法放棄,我建議大家首先看一下spark-unsafe,你會發現所有的黑科技都在這個工程中。這個工程還只是一個嬰兒,前后加起來也不到20個類,而且邏輯都非常清晰,封裝一個好用Unsafe工具類Platform,然后抽象了一個MemoryBlock的管理連續內存的類,剩下的就是管理連續內存以及基於連續內存封裝的一系列數據結構。前面已經提到了CPU加載數據是以cache line為單位的,那么什么樣結構的數據是CPU友好類型的呢?毫無疑問就是連續內存,不管是堆上連續內存還是直接連續內存。其實說到這里就非常有意思了,如果需要應用連續內存,我們就需要按照byte來管理我們的數據,仿佛一夜又回到了c的時代。雖然操作二進制數據不太方便,但是帶來的好處是非常明顯的:可以愉快地擁抱序列化和反序列化了,也可以用上CPU一次加載8個字節的技能了(cache-aware算法),甚至可以用上SIMD向量計算了。不僅如此,你會發現壓縮算法比例也會有提升,同時如果用上直接內存(Direct Memory),GC的時間也會跟着減少。剩下來的問題是怎么從連續內存中讀取數據?相信大家對於mysql的表的定義非常熟悉,就是為每一個數據的字段設定一個元數據,這樣是int類型就讀4個字節,是long類型就讀8個字節,是String類型的數據可以先記錄一個地址和偏移量等。這也是為什么你在加載DataFrame數據的時候需要提供一個json格式元數據定義。筆者測試過,利用unsafe讀直接內存的效率與直接操作讀byte[]數組的效率相差無幾,但是利用unsafe的setLong()來寫數據是直接寫byte[]速度的8倍。 
      `Project Tungsten`在內存管理上也是有值得借鑒的地方,比如`MemoryBlock`就是對於連續內存塊的封裝,記錄了字節長度以及引用的位置。一個數據集就可以由`List `來記錄內存`page`,通過`TaskMemoryManager`的`encodePageNumberAndOffset`方法來編碼內存地址,是不是有點類似Linux管理內存的方式呢?利用`MemoryBlock`甚至可以寫一個CPU友好的Map,例如`org.apache.spark.unsafe.map.BytesToBytesMap`。Java中的HashMap實際上是由一個數組和鏈表實現的,然而這種實現將內存數據徹底打散,CPU的運行效率自然無法跟上。`Databricks`想到利用一個連續內存記錄hash值以及內存地址,同時將key和value都記錄到`MemoryBlock`中,這樣構建的Map雖然不具有通用的功能,但是在有些操作比如大量數據`aggregation`操作時,效率是非常出色的,同時加上使用直接內存避開了GC,想想都有點小激動。 
      ###3.2 Cache-aware算法 
      我們可以在`spark-core`的`org.apache.spark.util.collection.unsafe.sort`包找到`ProjectTungsten`在`Cache-aware`方面做出的努力。例如`RecordPointerAndKeyPrefix`和`UnsafeInMemorySorter`這兩個類,`RecordPointerAndKeyPrefix`實際上存儲了一個long類型的引用和一個long類型的`key prefix`,這個類的對象實際上是復用的,一般是從一個連續內存中取出16個字節,前8個字節是`key value`在連續內存中的地址,而后8個自己是自定義的`key prefix`,然后賦值給這個對象。這樣就可以利用`key prefix`做一個初步的比較操作了,而不用再去隨機查詢pointer對應的實際key value再來進行比較。當然如果發生key prefix相同,就需要比較真正的value值了。 
      ```java 
      @Override 
      public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) { 
      final int prefixComparisonResult = prefixComparator.compare(r1.keyPrefix, r2.keyPrefix); 
      if (prefixComparisonResult == 0) { 
      final Object baseObject1 = memoryManager.getPage(r1.recordPointer); 
      final long baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + 4; // skip length 
      final Object baseObject2 = memoryManager.getPage(r2.recordPointer); 
      final long baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + 4; // skip length 
      return recordComparator.compare(baseObject1, baseOffset1, baseObject2, baseOffset2); 
      } else { 
      return prefixComparisonResult; 


      ``` 
      這是一個非常巧妙的設計,對比一下String對象的排序,隨機查詢內存的幾率大大減少,又由於所有的key都是存儲在連續內存中的,可以大大加速排序的過程。循着這個思路,或者一些經典的Cache-aware的算法,利用Unsafe對於連續內存的操作,可以極大地提升系統的性能,相信Spark將會在這方面做更多的努力。 
      ###3.3 Code Generation 
      其實如果拋開解釋器、邏輯執行計划、物理執行計划以及優化器來討論代碼生成是一個管中窺豹的行為,因為能夠用上代碼生成的地方一般都是在構造一個查詢或者分析引擎的時候。當然如果你是在寫一個類似於spring、hibernate或者JUnit的框架,你也會發現代碼生成也是非常有用的工具。其實代碼生成一般都用在代碼不好寫或者改寫復雜的情況下,例如在有一次面試的過程中,就遇到一位候選者利用ASM的動態代碼生成完成自動配置的工作,這是非常有趣的一個方向。說了這么多還是希望大家仔細閱讀一下`spark-catalyst`的所有源代碼。非常有意思的是大名鼎鼎的`Tensorflow`也是同樣的套路,利用`python`構建計算`graph`,然后利用`C++`進行自動求導以及代碼生成的工作,這樣做還有一個好處可以做到訓練過程的分布式。 
      動態代碼生成其實網上資料非常多,介紹`JavaCompiler`、`Janino`或者是`ASM`框架使用和`Benchmark`的文章比比皆是。這里就簡單的說說我們在實際應用過程中的體會,`JavaCompiler`實際上是需要通過一個源代碼的字節流來進行編譯代碼的,生成的`class`文件會存儲到本地磁盤上,然后如果需要使用這個類構建對象或者其中的靜態方法,就需要通過`ClassLoader`來加載字節碼,進而就可以利用這個動態生成的類創建對象了。`Janino`有一個自己的編譯器,同時也可以配置成JavaCompiler,提供的結構非常方便,但是相對於`JavaCompiler`,Janino自身的編譯器編譯出來的代碼運行效率會慢10倍。`ASM`則是通過直接寫字節碼來動態生成,編譯的速度非常快,但是動態代碼的效率與`Janino`的效率差不多,運行速度都比較慢,特別是當這個動態代碼需要運行上億次,差距非常明顯。如果你的代碼只是運行一遍,我覺得ASM極快的編譯速度會讓它成為最好的選擇。 
      ## 4. 總結 
      看了上面的介紹,相信各類看官對於`Project Tungsten`或多或少都有了一定的了解,`Project Tungsten`其實不是一個非常神秘的計划,其實它的存在是選擇JVM的數據引擎必然的優化之路。與其說`Project Tungsten`將鑄就下一次Spark數據引擎的基石,還不如說是長嘆一聲出身的無奈。`Project Tungsten`可以說是一個非常簡單,同時也還沒有成熟的實現方案,對於廣大的Java、Scala程序員來說是寶貴的學習材料和案例。希望這篇博客能夠幫助大家選擇適合自身項目的優化方向,欲知下事如何,且聽下回分解。
    • 以上是

Spark的下一代引擎-Project Tungsten啟示錄

      的內容,更多 

啟示錄 下一代 Tungsten project 引擎 Spark 

      的內容,請您使用右上方搜索功能獲取相關信息。

 

更多詳細內容請請看文章開頭的博客

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM