一.概述
GraphX是Spark用於圖形並行計算的新組件。在較高的層次上,GraphX通過引入一個新的Graph抽象來擴展Spark RDD:一個定向的多圖,其屬性附加到每個定點和邊。為了支持圖計算,GraphX公開了一組基本的操作符(子圖,joinVertices和aggregateMessages),以及上述優化的變體API。此外,GraphX包括越來越多的圖形算法和構建器集合,以簡化圖形分析任務。
二..屬性圖
GraphX的屬性曲線圖是一個有向多重圖與連接到每個頂點邊緣的用戶定義對象。其可能有多個平行邊共享相同的源和目標頂點。支持平行邊的能力簡化了在相同頂點之間存在多個關系的建模場景(例如:即使朋友又是同事)。每個頂點都是由唯一的64位長標識符【VertexId】設置秘鑰。GraphX不對頂點標識符施加任何排序約束。同樣,邊具有相應的源和目標頂點標識符。
在頂點【VD】和邊【ED】類型上對屬性圖進行了參數化。這些分別是與每個頂點和邊關聯的對象的類型。
當頂點和邊類型是原始數據類型【例如:int, double等】時,GraphX可以優化它們的表示形式,方法是將它們存儲在專用數組中,從而減少了內存占用量。
在某些情況下,可能希望同一圖形中具有不同屬性類型的頂點。這可以通過繼承來實現。例如,要將用戶和產品建模為二部圖,可以執行以下操作:
與RDD一樣,屬性圖是不可變,分布式和容錯的。通過生成具有所需更改的新圖來完成對圖的值或結構的修改。原始圖形的大部分(未影響的結構,屬性和索引)在新圖中被重用,從而降低了此固有功能數據結構的成本。使用一系列頂點分區試探法在執行程序之間划分圖。與RDD一樣,發生故障時,可以在不同的計算機上重新創建圖形的每個分區。邏輯上,屬性圖對應於一對類型化集合【RDD】,它們對每個頂點和邊的屬性進行編碼。結果,圖類包含訪問圖的頂點和邊的成員的類VertexRDD[VD]和EdgeRDD[ED]的延伸,並且被分別優化為RDD[(VertexId, VD)]和RDD[Edge[ED]]。
分別使用graph.vertices和graph.edges成員將圖解析成相應的頂點和邊視圖。graph.vertices返回一個VertexRDD[(String, String)]擴展的RDD[(VertexId, (String, String))],graph.edges返回一個EdgeRDD,其包含Edge[String]的對象。
除了屬性圖的頂點和邊視圖外,GraphX還公開了一個三元組視圖。三元組視圖在邏輯上將頂點和邊屬性結合起來,從而產生一個RDD[EdgeTriplet[VD, ED]]包含EdgeTriplet類的實例。其圖形化為:
EdgeTriplet類擴Edge通過添加類srcAttr和dstAttr分別包含源和目標屬性成員。可以使用圖形的三元組視圖來呈現描述用戶之間關系的字符串集合。
三.圖運算符
正如RDDS有基本操作map,filter和reduceByKey等一樣,圖也有類似的函數,產生具有轉化特性和結構的新圖。已定義的具有優化實現的核心運算符如下:
這些運算符中的每一個都生成一個新圖形,其頂點或邊緣的屬性由用戶定義的map函數修改。
注意:在每種情況下,圖形的結構都不會改變,這是這些運算符的關鍵特征,它允許結果圖重用原始圖的結構索引!
4.結構運算符
class Graph[VD, ED] {
// Information about the Graph
val numEdges: Long
val numVertices:Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD,ED]]
//Functions for caching graphs
def persist(newLevel1:StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]//默認存儲級別為MEMORY_ONLY
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
// Change the partitioning heuristic
def partitionBy(partitionStrategy: PartitionStrategy)
// Transform vertex and edge attributes
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2]
// Modify the graph structure def reverse: Graph[VD, ED] def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,vpred: (VertexId, VD) => Boolean): Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] // 返回當前圖和其它圖的公共子圖 def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
// Join RDDs with the graph
def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]))
// Aggregate information about adjacent triplets
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag](sendMsg: EdgeContext[VD, ED, Msg] => Unit, merageMsg: (Msg, Msg) => Msg, tripletFields: TripletFields: TripletFields = TripletFields.All): VertexRDD[A]
//Iterative graph-parallel computation
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDiection)(vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A): Graph[VD, ED]
// Basic graph algorithms
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}
函數reverse將返回逆轉所有邊緣方向的新圖。當嘗試計算反向PageRank時,這可能會很有用。由於反向操作不會修改頂點或邊緣屬性或更改邊數,因此可以有效的實現,而無需數據移動或復制。
在許多情況下,有必要將外部數據(RDD)與圖形關聯起來。
1 class Graph[VD, ED] { 2 def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD) 3 : Graph[VD, ED] 4 def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2) 5 : Graph[VD2, ED] 6 }
內連接(joinVertices)運算符連接輸入RDD並返回通過用戶定義關系關聯的新圖形。RDD中沒有匹配的頂點保留原始值。
外連接(outerJoinVertices)運算符:因為並非所有的頂點在輸入RDD中都具有匹配值,所以在不確定的時候可以采用該類型。
五.相鄰聚集
許多圖形分析任務中的關鍵步驟是匯總有關每個頂點鄰域的信息。例如:我們可能想知道每個用戶擁有的關注者數量或每個用戶的關注者的平均年齡。許多迭代算法【例如:PageRank,最短路徑和連接的組件】反復聚合相鄰頂點的屬性。為了提高性能,主要聚合運算符從graph.mapReduceTriplets更改為graph.AggregateMessage。
六.匯總消息【AggregateMessage】
GraphX中的核心聚合操作為aggregateMessage,該運算符將用戶定義的sendMsg函數應用於圖形的每個邊三元組,然后使用該mergeMsg 函數在其目標頂點處聚合這些消息。
用戶定義的sendMsg函數采用EdgeContext,將公開源和目標屬性以及邊緣屬性和函數,以將消息發送到源和目標屬性。可以將sendMsg視為mapReduce中的map函數。用戶定義的mergeMsg函數接受兩條發往同一頂點的消息,並產生一條消息。可以認為是mapReduce中的reduce函數。使用aggregateMessages返回一個VertexRDD[Msg]對象,該對象包含該聚合消息並將消息發往各個頂點。未收到消息的頂點不包含在返回的VertexRDD中。另外,aggregateMessage采用一個可選參數tripletsFields,該參數指示訪問了哪些數據的EdgeContext,選項在tripletsFields中定義,TripletsFields默認值為TripletsFields.All,指示用戶定義的sendMsg函數可以訪問任何EdgeContext。tripletsFields參數可以設置只通知GraphX的一部分,EdgeContext從而允許GraphX選擇優化的連接策略。例如:如果只需要源頂點數據,可以設置TripletsFields.Src表示我們僅需要源頂點信息。
在GraphX的早期版本中,使用字節碼檢查來推斷TripletsFields,但是我們發現字節碼檢查稍微不靠譜,因而選擇了更明確的用戶控制。
代碼實戰例子參考:https://www.cnblogs.com/yszd/p/11726921.html
七.緩存
Spark默認情況下RDD不保留在內存中。為避免重新計算,可以在多次使用時顯式緩存。GraphX中的圖也是一樣。多次使用圖時,確保先調用Graph.cache()。
在迭代計算中,為了獲得最佳性能,也可能需要取消緩存。默認情況下,緩存的RDD和圖保存在內存中,直到內存壓力迫使它們按照LRU【最近最少使用頁面交換算法】逐漸從內存中移除。對於迭代計算,先前的中間結果將填滿內存。經過它們最終被移除內存,但存儲在內存中的不必要數據將減慢垃圾回收速度。因此,一旦不再需要中間結果,取消緩存中間結果將更加有效。這涉及在每次迭代中實現緩存圖或RDD,取消緩存其他所有數據集,並僅在以后的迭代中使用實現的數據集。但是,由於圖是有多個RDD組成的,因此很難正確地取消持久化。對於迭代計算,建議使用Pregel API,它可以正確地保留中間結果。
八.Pregel API
圖是固有的遞歸數據結構,因為定點的屬性取決於其自身的屬性,又取決於其相鄰節點的屬性。這導致許多圖形算法會迭代重新計算每個頂點的屬性,直到到達指定條件為止。其中,GraphX實現了基於Pregel API的變體。在較高層次上,GraphX中的Pregel運算符是受圖拓撲約束的大量同步並行消息傳遞的抽象。Tregel運算符在一系列超級步驟中執行,其中定點從上一個超級步驟接收入棧消息的總和,計算頂點屬性的新值,然后在下一個超級步驟中將消息發送到相鄰的頂點。與Pregel不同,消息是根據邊三元組並行計算的,並且消息計算可以訪問源頂點和目標頂點屬性。在超級步驟中會跳過未收到消息的頂點。當沒有消息剩余時,Pregel運算符終止迭代並返回最終結果圖。
注意,與標准的Pregel實現不同,GraphX中的頂點只能將消息發送到相鄰的頂點,並且使用用戶定義的消息傳遞功能並行完成消息的構造。這些限制允許在GraphX中進行其它優化。Pregel使用了兩個參數列表,第一個參數列表包含配置參數,包含初始消息,最大迭代次數以及發送消息的邊方向。第二個參數列表包含用戶定義的函數,這些函數用於接收消息【頂點程序vprog】、計算消息【sendMsg】以及組合消息【mergeMsg】。