Pregel是個強大的基於圖的迭代算法,也是Spark中的一個迭代應用aggregateMessage的典型案例,用它可以在圖中方便的迭代計算,如最短路徑、關鍵路徑、n度關系等。然而對於之前對圖計算接觸不多的童鞋來說,這個api還算是一個比較重量組的接口,不太容易理解。
Spark中的Pregel定義如下:
def pregel[A: ClassTag]( initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either)( vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg) }
各個參數的意義詳細解釋如下:
initialMsg: 初始化消息,這個初始消息會被用來初始化圖中的每個節點的屬性,在pregel進行調用時,會首先在圖上使用mapVertices來根據initialMsg的值更新每個節點的值,至於如何更新,則由vprog參數而定,vprog函數就接收了initialMsg消息做為參數來更新對應節點的值
maxIterations: 最大迭代次數
activeDirection: 表示邊的活躍方向,什么是活躍方向呢,首先要解釋一下活躍消息與活躍頂點的概念,活躍節點是指在某一輪迭代中,pregel會以sendMsg和mergeMsg為參數來調用graph的aggregateMessage方法后收到消息的節點,活躍消息就是這輪迭代中所有被收成功收到的消息。這樣一來,有的邊的src節點是活躍節點,有的dst節點是活躍節點,而有的邊兩端節點都是活躍節點。如果activeDirection參數指定為“EdgeDirection.Out”,則在下一輪迭代時,只有接收消息的出邊(src—>dst)才會執行sendMsg函數,也就是說,sendMsg回調函數會過濾掉”dst—>src”的edgeTriplet上下文參數
vprog: 節點變換函數,在初始時,以及每輪迭代后,pregel會根據上一輪使用的msg和這里的vprod函數在圖上調用joinVertices方法變化每個收到消息的節點,注意這個函數除初始時外,都是僅在接收到消息的節點上運行,這一點可以從源碼中看到,源碼中用的是joinVertices(message)(vprog),因此,沒有收到消息的節點在join之后就濾掉了
sendMsg: 消息發送函數,該函數的運行參數是一個代表邊的上下文,pregel在調用aggregateMessages時,會將EdgeContext轉換成EdgeTriplet對象(ctx.toEdgeTriplet)來使用,用戶需要通過Iterator[(VertexId,A)]指定發送哪些消息,發給那些節點,發送的內容是什么,因為在一條邊上可以發送多個消息,如sendToDst,如sendToSrc,所以這里是個Iterator,每一個元素是一個tuple,其中的vertexId表示要接收此消息的節點的id,它只能是該邊上的srcId或dstId,而A就是要發送的內容,因此如果是需要由src發送一條消息A給dst,則有:Iterator((dstId,A)),如果什么消息也不發送,則可以返回一個空的Iterator:Iterator.empty
mergeMsg: 鄰居節點收到多條消息時的合並邏輯,注意它區別於vprog函數,mergeMsg僅能合並消息內容,但合並后並不會更新到節點中去,而vprog函數可以根據收到的消息(就是mergeMsg產生的結果)更新節點屬性。
以上是本人使用PregelApi后的理解,更多詳細討論,請參考:《pregel 與 spark graphX 的 pregel api》