陰陽大論之ForkJoin&MapReduce


陰陽大論之ForkJoin&MapReduce

目錄

ForkJoin


定義

ForkJoin是Java7提供的原生多線程並行處理框架,其基本思想是將大任務分割成小任務,最后將小任務聚合起來得到結果。

  • 工作竊取模式:當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到線程隊列中,然后再從一個隨即線程中偷一個並把它加入自己的隊列中。

優缺點

  • 優點:

    • 方便的利用多核平台的計算能力實現並發任務的拆分,極大的簡化了編寫並發程序的瑣碎工作。
    • 對於該模式下的應用,不再需要處理並行事務【同步、同信、死鎖、data race…。
  • 缺點:

    • 需要注意,如果拆分對象過多,短時間內將內存撐滿。等待線程的CPU資源釋放了,但線程對象等待時不會被垃圾回收機制回收;

實現原理

ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責存放程序提交給ForkJoinPool的任務,而ForkJoinWorkerThread數組負責執行這些任務

ForkJoinTask

  • fork方法--將任務划分成子任務的fork操作

    • 決定了ForkJoinTask 的異步執行,憑借這個方法可以創建新的任務
    • 調用該方法時,程序會調用push( ) 【把當前任務放在ForkJoinPool.WorkQueue[ ] 中】異步執行這個任務,然后立即返回結果。
  • join方法--等待這些子任務結束的join操作

    • 負責計算完成后返回結果,因此允許一個任務等待另一個任務執行完成。
    • 主要作用:阻塞當前線程並等待獲取結果。

MapReduce


定義

  • Mapper負責“分”,即把復雜的任務分解為若干個“簡單的任務”來處理。“簡單的任務”包含三層含義:

    • 一是數據或計算的規模相對原任務要大大縮小;
    • 二是就近計算原則,即任務會分配到存放着所需數據的節點上進行計算;
    • 三是這些小任務可以並行計算,彼此間幾乎沒有依賴關系。
  • Reducer負責對map階段的結果進行匯總。

運行機制

  1. 大規模數據集包括分布式存儲和分布式計算兩個核心環節,MapReduce的輸入輸出都需要借助分布式文件系統進行存儲,這些文件被分布存儲到不同的節點上
  2. 一個大的MapReduce作業,首先會被拆分成許多個Map認為在多台機器上並行執行,Map任務通常運行在slave節點上,這樣,計算的數據就可以放在一起運行,不需要額外的傳輸開銷,當Map任務結束后,這些結果會被分發到多個Reduce中並行執行,具有相同key的會被發送到一個Reduce,Reduce會把匯總后的結果存儲到HDFS中
  3. Map任務之間不會進行通信,Reduce任務之間也不會有任何信息交換,用戶不能顯式的從一台機器向另一台機器發送信息,所有的數據交換都是通過MapReduce框架去實現
  4. Map的輸入文件,Reduce的輸出都是保存在HDFS上,Map的輸出則保存在本地存儲中

shuffle流程概括

在Map端的shuffle過程是對Map的結果進行分區、排序、分割,然后將屬於同一划分(分區)的輸出合並在一起並寫在磁盤上,最終得到一個分區有序的文件,分區有序的含義是map輸出的鍵值對按分區進行排列,具有相同partition值的鍵值對存儲在一起,每個分區里面的鍵值對又按key值進行升序排列(默認)

Map shuffle

  1. 分區partition。在將map()函數處理后得到的(key,value)對寫入到緩沖區之前,需要先進行分區操作,這樣就能把map任務處理的結果發送給指定的reducer去執行,從而達到負載均衡,避免數據傾斜。

  2. 寫入環形內存緩沖區。 因為頻繁的磁盤I/O操作會嚴重的降低效率,因此“中間結果”不會立馬寫入磁盤,而是優先存儲到map節點的“環形內存緩沖區”,並做一些預排序以提高效率,當寫入的數據量達到預先設置的闕值后便會執行一次I/O操作將數據寫入到磁盤。每個map任務都會分配一個環形內存緩沖區,用於存儲map任務輸出的鍵值對(默認大小100MB,mapreduce.task.io.sort.mb調整)以及對應的partition,被緩沖的(key,value)對已經被序列化(為了寫入磁盤)。

  3. 執行溢出寫(排序sort--->合並combiner--->生成溢出寫文件)。一旦緩沖區內容達到閾值(mapreduce.map.io.sort.spill.percent,默認0.80,或者80%),就會會鎖定這80%的內存,並在每個分區中對其中的鍵值對按鍵進行sort排序,具體是將數據按照partition和key兩個關鍵字進行排序,排序結果為緩沖區內的數據按照partition為單位聚集在一起,同一個partition內的數據按照key有序。排序完成后會創建一個溢出寫文件(臨時文件),然后開啟一個后台線程把這部分數據以一個臨時文件的方式溢出寫(spill)到本地磁盤中(如果客戶端自定義了Combiner(相當於map階段的reduce),則會在分區排序后到溢寫出前自動調用combiner,將相同的key的value相加,這樣的好處就是減少溢寫到磁盤的數據量。這個過程叫“合並”)。剩余的20%的內存在此期間可以繼續寫入map輸出的鍵值對。溢出寫過程按輪詢方式將緩沖區中的內容寫到mapreduce.cluster.local.dir屬性指定的目錄中。

  4. 合並Combiner

    • 1..當為作業設置Combiner類后,緩存溢出線程將緩存存放到磁盤時,就會調用;
    • 2.緩存溢出的數量超過mapreduce.map.combine.minspills(默認3)時,在緩存溢出文件合並的時候會調用
  5. 歸並merge。當一個map task處理的數據很大,以至於超過緩沖區內存時,就會生成多個spill文件。此時就需要對同一個map任務產生的多個spill文件進行歸並生成最終的一個已分區且已排序的大文件。配置屬性mapreduce.task.io.sort.factor控制着一次最多能合並多少流,默認值是10。這個過程包括排序和合並(可選),歸並得到的文件內鍵值對有可能擁有相同的key,這個過程如果client設置過Combiner,也會合並相同的key值的鍵值對(根據上面提到的combine的調用時機可知)。

  6. 壓縮。寫磁盤時壓縮map端的輸出,因為這樣會讓寫磁盤的速度更快,節約磁盤空間,並減少傳給reducer的數據量。默認情況下,輸出是不壓縮的(將mapreduce.map.output.compress設置為true即可啟動)

Reduce shuffle

  1. educe任務通過HTTP向各個Map任務拖取它所需要的數據。Map任務成功完成后,會通知父TaskTracker狀態已經更新,TaskTracker進而通知JobTracker(這些通知在心跳機制中進行)。所以,對於指定作業來說,JobTracker能記錄Map輸出和TaskTracker的映射關系。Reduce會定期向JobTracker獲取Map的輸出位置,一旦拿到輸出位置,Reduce任務就會從此輸出對應的TaskTracker上復制輸出到本地,而不會等到所有的Map任務結束。
  2. Copy過來的數據會先放入內存緩沖區中,如果內存緩沖區中能放得下這次數據的話就直接把數據寫到內存中,即內存到內存merge。Reduce要向每個Map去拖取數據,在內存中每個Map對應一塊數據,當內存緩存區中存儲的Map數據占用空間達到一定程度的時候,開始啟動內存中merge,把內存中的數據merge輸出到磁盤上一個文件中,即內存到磁盤merge
  3. 在將buffer中多個map輸出合並寫入磁盤之前,如果設置了Combiner,則會化簡壓縮合並的map輸出。Reduce的內存緩沖區可通過mapred.job.shuffle.input.buffer.percent配置,默認是JVM的heap size的70%。內存到磁盤merge的啟動門限可以通過mapred.job.shuffle.merge.percent配置,默認是66%。當屬於該reducer的map輸出全部拷貝完成,則會在reducer上生成多個文件(如果拖取的所有map數據總量都沒有內存緩沖區,則數據就只存在於內存中),這時開始執行合並操作,即磁盤到磁盤merge,Map的輸出數據已經是有序的,Merge進行一次合並排序,所謂Reduce端的sort過程就是這個合並的過程。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的。最終Reduce shuffle過程會輸出一個整體有序的數據塊。

參考1


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM