大數據之Pregel圖計算原理


大數據之圖計算相關原理

最近在看大數據圖計算相關的論文,故將圖計算的原理簡單梳理一遍,參考資料為<<大數據技術原理與應用>>

一、圖計算簡介

1.1. 圖結構:

圖形數據結構與一般數據結構不同,它必須要反映數據所對應元素之間的幾何關系和拓撲關系。圖形數據結構通常是指由若干個圖形數據元素按一定關系所組成的有序集,一般稱為表。關系的鏈接實現是指圖形數據中的每一個數據項的存放是無規則的,其間的連接是通過數據元素中指示連接單元的指針來實現的。許多大數據都是以大規模圖或網絡的形式呈現,非圖結構的大數據,也常常會被轉換為圖模型后進行分析。

1.2. 圖計算:

“圖計算”是以“圖論”為基礎的對現實世界的一種“圖”結構的抽象表達,以及在這種數據結構上的計算模式。在圖計算中,基本的數據結構通常包含三部分,即頂點、邊及邊上的權重。其中頂點表達的是客觀世界中的實體,邊是實體之間的相互聯系,權重往往是量化關系輕重的數值表達。

二、BSP模型解析

針對大型圖的計算,主要是基於BSP模型實現的並行圖處理系統。

BSP: 整體同步並行計算模型(Bulk Synchronous Parallel Computing Model ),又稱“大同步”模型,是由美國哈佛大學的 L.G.Valiant教授在 1992 年作為一種並行計算模型提出的。BSP模型迭代執行每一個超步直到滿足終止條件或者達到一定的超步數強制終止。每一個超步由本地計算,全局通信和路障同步三個階段組成:

  • 本地計算;計算發生在每一個參與計算的節點上,每個節點只使用存儲在本地存儲器上的數據,該些數據是在程序啟動時加載數據階段或上一個超級步的全局通信時放在本地存儲器上的數據。每台處理器上的計算是獨立的,從這個意義上看節點間的執行是異步的。

  • 全局通信:某個處理器向其他處理器發出通信請求,每個處理器發送或等待消息。消息在節點之間進行交換。這種交換是單方面的以點對點的方式進行的。處理器通常需要輸入隊列及輸出隊列存儲信息。

  • 路障同步:當一個任務執行完本地計算到達同步點時,它會等待其他所有的任務完成計算。本超步中所有的數據通信在本次同步后生效,並提供給下一個超步使用。當所有的通信結束后,確保每個處理器均執行完當前超步中所有的計算和通信,並且通信過程中各處理器間的數據交換均已完成,然后進入到下一輪迭代。

BSP

三、Pregel模型解析

Pregel:谷歌公司推出的一種基於BSP模型實現的並行圖處理系統。為了解決大型圖的分布式計算問題,Pregel搭建了一套可擴展的、有容錯機制的平台,該平台提供了一套非常靈活的接口,可以描述各種各樣的圖計算。主要用於求解最短路徑、網頁排序等問題。

3.1. 有向圖 :

Pregel計算模型以有向圖作為輸入,有向圖的每個頂點都有一個頂點ID,每個頂點都有一個自定義值,每條有向邊都和其源頂點關聯,並記錄了其目標頂點ID,每條邊上有一個數值作為權重。

在每個超步S中,圖中的所有頂點都會並行執行相同的compute函數.

3.2.消息通信:

每個頂點可以接收前一個超步(S-1)中發送給它的消息,修改其自身及其出射邊的狀態,並發送消息給其他頂點,甚至是修改整個圖的拓撲結構。在這種計算模式中,邊上面不會運行相應的計算,只有頂點才會執行compute函數進行相應計算。

3.3.計算過程:

Pregel的計算模型為BSP模型,是由一系列被稱為“超步”的迭代組成的:

  • 在每個超步中,每個頂點上面都會並行執行compute函數,該函數描述了一個頂點V在一個超步S中需要執行的操作。

  • Compute函數會讀取前一個超步(S-1)中其他頂點發送給頂點V的消息,執行相應計算后,修改頂點V及其出射邊的狀態,然后沿着頂點V的出射邊發送消息給其他頂點,而且,一個消息可能經過多條邊的傳遞后被發送到任意已知ID的目標頂點上去。

  • 這些消息將會在下一個超步(S+1)中被目標頂點接收,然后像上述過程一樣開始下一個超步(S+1)的迭代過程。

在Pregel計算過程中,一個算法什么時候可以結束,是由所有頂點的狀態決定的:

  • 在第0個超步,所有頂點處於活躍狀態,都會參與該超步的計算過程。

  • 當一個頂點不需要繼續執行進一步的計算時,就會把自己的狀態設置為“停機”狀態。

  • 一旦一個頂點進入非活躍狀態,后續超步中就不會再在該頂點上執行計算,除非其他頂點給該頂點發送消息把它再次激活。

  • 當一個處於非活躍狀態的頂點收到來自其他頂點的消息時,Pregel計算框架必須根據條件判斷來決定是否將其顯式喚醒進入活躍狀態。

  • 當圖中所有的頂點都已經標識其自身達到“非活躍(inactive)”狀態,並且沒有消息在傳送的時候,算法就可以停止運行。

四、Pregel代碼探討

4.1. C++ API:

Pregel已經采用C++預先定義好了一個Vertex基類,在Vertex類中,定義了三個值類型參數,分別表示頂點、邊和消息。編寫Pregel程序時,需要繼承Vertex類,並且覆寫Vertex類的虛函數Compute。

template <typename VertexValue, typename EdgeValue, typename MessageValue>  
class Vertex {
 public:
   //Compute為虛函數,需要用戶在其子類中實現
virtual void Compute(MessageIterator* msgs) = 0;
   //表示頂點的id值
const string& vertex_id() const;
   //表示迭代次數,即超步數
int64 superstep() const;
   //獲取頂點的數值,面向對象的封裝特性
const VertexValue& GetValue();
   //改變頂點的數值
VertexValue* MutableValue();
   //獲取頂點出邊的迭代器,用於后續消息傳遞
OutEdgeIterator GetOutEdgeIterator();
   //發送消息函數
void SendMessageTo(const string& dest_vertex,const MessageValue& message);
   //頂點進入“停機”狀體
void VoteToHalt();
  };

Pregel執行計算過程時,每個超步中都會並行調用每個頂點上定義的Compute()函數,允許Compute()方法查詢當前頂點及其邊的信息,以及發送消息到其他的頂點:

  • Compute()方法可以調用GetValue()方法來獲取當前頂點的值。

  • 調用MutableValue()方法來修改當前頂點的值。

  • 通過由出射邊的迭代器OutEdgeIterator提供的方法來查看、修改出射邊對應的值。

  • 被修改頂點的狀態是能夠立即被看見的,但對於其他頂點而言是不可見的。

  • 不同頂點並發進行的數據訪問是不存在競爭關系的。

整個過程中,唯一需要在超步之間持久化的頂點級狀態,是頂點和其對應的邊所關聯的值,因而,Pregel計算框架所需要管理的圖狀態就只包括頂點和邊所關聯的值,這種做法大大簡化了計算流程,同時,也有利於圖的分布和故障恢復。

4.2. 消息傳遞機制:

頂點之間的通訊是借助於消息傳遞機制來實現的,每條消息都包含了消息值和需要到達的目標頂點ID:

  • 在一個超步S中,一個頂點可以發送任意數量的消息,這些消息將在下一個超步(S+1)中被其他頂點接收。

  • 在超步(S+1)中,當Pregel計算框架在頂點V上執行用戶自定義的Compute()方法時,所有在前一個超步S中發送給頂點V的消息,都可以通過一個迭代器來訪問到。

  • 迭代器不能保證消息的順序,不過可以保證消息一定會被傳送並且不會被重復傳送。

  • 一個頂點V通過與之關聯的出射邊向外發送消息,消息要到達的目標頂點並不一定是與頂點V相鄰的頂點。

  • 一個消息可以連續經過多條連通的邊到達某個與頂點V不相鄰的頂點U,U可以從接收的消息中獲取到與其不相鄰的頂點V的ID。

4.3. Combiner:

在大數據分布式並行處理框架MapReduce中,每一個map都會產生大量的本地輸出,Combiner的作用就是對map輸出的結果先做一次合並,以減少的map和reduce節點中的數據傳輸量,Combiner的存在就是提高當前網絡IO傳輸的性能,也是MapReduce的一種優化手段。Pregel計算框架中Combiner的作用與之類似:

  • Pregel計算框架在消息發出去之前,Combiner可以將發往同一個頂點的多個整型值進行求和得到一個值,只需向外發送這個“求和結果”,從而實現了由多個消息合並成一個消息,大大減少了傳輸和緩存的開銷。

  • 在默認情況下,Pregel計算框架並不會開啟Combiner功能,因為,通常很難找到一種對所有頂點的Compute()函數都合適的Combiner。

  • 當用戶打算開啟Combiner功能時,可以繼承Combiner類並覆寫虛函數Combine()。

  • Pregel計算框架無法保證哪些消息會被合並,也無法保證消息傳遞給 Combine()的順序和合並操作執行的順序,因此通常只對那些滿足交換律和結合律的操作才可以去開啟Combiner功能。

4.4. Aggregator:

Aggregator提供了一種全局通信、監控和數據查看的機制

在超步S中,每個頂點都可以向Aggregator提供一個數據,Pregel計算框架會對這些值進行聚合操作產生一個值,在下個超步(S+1)中,圖中的所有頂點都可以看見這個值:

  • Aggregator的聚合功能可以執行最大值、最小值、求和操作,比如,可以定義一個“Sum” Aggregator來統計每個頂點的出射邊數量,最后相加可以得到整個圖的邊的數量。

  • Aggregator還可以實現全局協同的功能,比如,可以設計“and” Aggregator來決定在某個超步中Compute()函數是否執行某些邏輯分支,只有當“and” Aggregator顯示所有頂點都滿足了某條件時,才去執行這些邏輯分支。

4.5. 拓撲改變:

Pregel計算框架允許用戶在自定義函數Compute()中定義操作,修改圖的拓撲結構,比如在圖中增加(或刪除)邊或頂點:

  • 對於全局拓撲改變,Pregel采用了惰性協調機制,在改變請求發出時,Pregel不會對這些操作進行協調,只有當這些改變請求的消息到達目標頂點並被執行時,Pregel才會對這些操作進行協調,這樣,所有針對某個頂點V的拓撲修改操作所引發的沖突,都會由V自己來處理。

  • 對於本地的局部拓撲改變,不會引發沖突,頂點或邊的本地增減能夠立即生效,很大程度上簡化了分布式編程。

4.6. 輸入和輸出:

在Pregel計算框架中,圖的保存格式多種多樣,包括文本文件、關系數據庫或鍵值數據庫等,在Pregel中,“從輸入文件生成得到圖結構”和“執行圖計算”這兩個過程是分離的,從而不會限制輸入文件的格式。同時,Pregel也可以通過多種方式輸出結果。

五、圖計算模型體系結構

Pregel計算框架采用分布式主從架構,主節點Master管理者多個Worker從節點,圖結構數據拆成多個分區並發送到各個Worker節點上進行運算。Master只負責協調多個Worker執行任務,系統不會把圖的任何分區分配給它。Worker借助於名稱服務系統可以定位到Master的位置,並向Master發送自己的注冊信息。

5.1. Pregel的執行過程:

Pregel計算框架中,一個大型圖數據會被划分成多個分區,每個分區都包含了一部分頂點以及以其為起點的邊:

  • 一個頂點應該被分配到哪個分區上,是由一個函數決定的,系統默認函數為hash(ID) mod N,其中,N為所有分區總數,ID是這個頂點的標識符。

  • 無論在哪台機器上,都可以簡單根據頂點ID判斷出該頂點屬於哪個分區,即使該頂點可能已經不存在了。

在理想的情況下,一個Pregel用戶程序的執行過程如下:

  1. 選擇集群中的多台機器執行圖計算任務,每台機器上運行用戶程序的一個副本,其中,有一台機器會被選為Master,其他機器作為Worker。

  2. Master把一個圖分成多個分區,並把分區分配到多個Worker。一個Worker會領到一個或多個分區,每個Worker知道所有其他Worker所分配到的分區情況。每個Worker負責維護分配給自己的那些分區的狀態(頂點及邊的增刪),對分配給自己的分區中的頂點執行Compute()函數,向外發送消息,並管理接收到的消息。

  3. Master會把用戶輸入划分成多個部分,通常是基於文件邊界進行划分。划分后,每個部分都是一系列記錄的集合,每條記錄都包含一定數量的頂點和邊。

  4. Master會為每個Worker分配用戶輸入的一部分。如果一個Worker從輸入內容中加載到的頂點,剛好是自己所分配到的分區中的頂點,就會立即更新相應的數據結構。否則,該Worker會根據加載到的頂點的ID,把它發送到其所屬的分區所在的Worker上。

  5. 當所有的輸入都被加載后,圖中的所有頂點都會被標記為“活躍”狀態。

  6. Master向每個Worker發送指令,Worker收到指令后,開始運行一個超步。Worker會為自己管轄的每個分區分配一個線程,對於分區中的每個頂點,Worker會把來自上一個超步的、發給該頂點的消息傳遞給它,並調用處於“活躍”狀態的頂點上的Compute()函數,在執行計算過程中,頂點可以對外發送消息,但是,所有消息的發送工作必須在本超步結束之前完成。

  7. 當所有這些工作都完成以后,Worker會通知Master,並把自己在下一個超步還處於“活躍”狀態的頂點的數量報告給Master。上述步驟會被不斷重復,直到所有頂點都不再活躍並且系統中不會有任何消息在傳輸,這時,執行過程才會結束。

  8. 計算過程結束后,Master會給所有的Worker發送指令,通知每個Worker對自己的計算結果進行持久化存儲。

5.2. Master節點:

Master主要負責協調各個Worker執行任務,每個Worker會借助於名稱服務系統定位到Master的位置,並向Master發送自己的注冊信息,Master會為每個Worker分配一個唯一的ID:

  • Master維護着關於當前處於“有效”狀態的所有Worker的各種信息,包括每個Worker的ID和地址信息,以及每個Worker被分配到的分區信息。

  • 雖然在集群中只有一個Master,但是,它仍然能夠承擔起一個大規模圖計算的協調任務,這是因為Master中保存這些信息的數據結構的大小,只與分區的數量有關,而與頂點和邊的數量無關。

一個大規模圖計算任務會被Master分解到多個Worker去執行,在每個超步開始時,Master都會向所有處於“有效”狀態的Worker發送相同的指令,然后等待這些Worker的回應:

  • 如果在指定時間內收不到某個Worker的反饋,Master就認為這個Worker失效。

  • 如果參與任務執行的多個Worker中的任意一個發生了故障失效,Master就會進入恢復模式。

  • 在每個超步中,圖計算的各種工作,比如輸入、輸出、計算、保存和從檢查點中恢復,都會在路障之前結束。

  • 如果路障同步成功,說明一個超步順利結束,Master就會進入下一個處理階段,圖計算進入下一個超步的執行。

Master在內部運行了一個HTTP服務器來顯示圖計算過程的各種信息,用戶可以通過網頁隨時監控圖計算執行過程各個細節:

  • 圖的大小。

  • 處於活躍狀態的頂點數量。

  • 在當前超步的時間信息和消息流量。

  • 所有用戶自定義Aggregator的值。

5.3 Worker:

在每個Worker中,它所管轄的分區的狀態信息是保存在內存中的。分區中的頂點的狀態信息包括:

  • 頂點的當前值。

  • 以該頂點為起點的出射邊列表,每條出射邊包含了目標頂點ID和邊的值。

  • 消息隊列,包含了所有接收到的、發送給該頂點的消息。

  • 標志位,用來標記頂點是否處於活躍狀態。

在每個超步中,Worker會對自己所管轄的分區中的每個頂點進行遍歷,並調用頂點上的Compute()函數,在調用時,會把以下三個參數傳遞進去:

  • 該頂點的當前值

  • 一個接收到的消息的迭代器

  • 一個出射邊的迭代器

  • 一個出射邊的迭代器

在Pregel中,為了獲得更好的性能,“標志位”和輸入消息隊列是分開保存的,對於每個頂點而言,Pregel只保存一份頂點值和邊值,但是,會保存兩份“標志位”和輸入消息隊列,分別用於當前超步和下一個超步:

  • 在超步S中,當一個Worker在進行頂點處理時,用於當前超步的消息會被處理,同時,它在處理過程中還會接收到來自其他Worker的消息,這些消息會在下一個超步S+1中被處理,因此,需要兩個消息隊列用於存放作用於當前超步S的消息和作用於下一個超步S+1的消息。

  • 如果一個頂點V在超步S接收到消息,表示V將會在下一個超步S+1中(而不是當前超步S中)處於“活躍”狀態。

當一個Worker上的一個頂點V需要發送消息到其他頂點U時,該Worker會首先判斷目標頂點U是否位於自己機器上:

  • 如果目標頂點U在自己的機器上,就直接把消息放入到與目標頂點U對應的輸入消息隊列中。

  • 如果發現目標頂點U在遠程機器上,這個消息就會被暫時緩存到本地,當緩存中的消息數目達到一個事先設定的閾值時,這些緩存消息會被批量異步發送出去,傳輸到目標頂點所在的Worker上。

  • 如果存在用戶自定義的Combiner操作,那么當消息被加入到輸出隊列或者到達輸入隊列時,就可以對消息執行合並操作,這樣可以節省存儲空間和網絡傳輸開銷。

5.4 容錯性:

Pregel采用檢查點機制來實現容錯。在每個超步的開始,Master會通知所有的Worker把自己管轄的分區的狀態(包括頂點值、邊值以及接收到的消息),寫入到持久化存儲設備:

  • Master會周期性地向每個Worker發送ping消息(心跳機制),Worker收到ping消息后會給Master發送反饋消息。如果Master在指定時間間隔內沒有收到某個Worker的反饋消息,就會把該Worker標記為“失效”。同樣地,如果一個Worker在指定的時間間隔內沒有收到來自Master的ping消息,該Worker也會停止工作。

  • 每個Worker上都保存了一個或多個分區的狀態信息,當一個Worker發生故障時,它所負責維護的分區的當前狀態信息就會丟失。Master監測到一個Worker發生故障“失效”后,會把失效Worker所分配到的分區,重新分配到其他處於正常工作狀態的Worker集合上,然后這些分區會從最近的超步S開始時寫出的檢查點中,重新加載狀態信息。

5.5 Aggregator:

每個用戶自定義的Aggregator都會采用聚合函數對一個值集合進行聚合計算得到一個全局值:

  • 每個Worker都保存了一個Aggregator類的對象實例集,其中的每個實例都是由類型名稱和實例名稱來標識的。

  • 在執行圖計算過程的某個超步S中,每個Worker會利用一個Aggregator對當前本地分區中包含的所有頂點的值進行歸約,得到一個本地的局部歸約值。

  • 在超步S結束時,所有Worker會將所有包含局部歸約值的Aggregator的值進行最后的匯總,得到全局值,然后提交給Master。

  • 在下一個超步S+1開始時,Master就會將Aggregator的全局值發送給每個Worker,即當前S超步中所有的Worker都擁有超步S-1結束后的全局結果值。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM