一.概述
GraphX是Spark中用於圖形和圖形並行計算的新組件。在較高的層次上,GraphX 通過引入新的Graph抽象來擴展Spark RDD:一個有向多重圖,其屬性附加到每個頂點和邊上。為了支持圖計算,GraphX公開了一組基本的操作符(例如, subgraph,joinVertices和 aggregateMessages),以及所述的優化的變體Pregel API。此外,GraphX包括越來越多的圖形算法和 構建器集合,以簡化圖形分析任務。
二.入門
首先,需要將Spark和GraphX導入項目,如下所示:
import org.apache.spark._ import org.apache.spark.graphx._ // To make some of the examples work we will also need RDD import org.apache.spark.rdd.RDD
如果不使用Spark Shell,則還需要一個SparkContext
。
三.屬性圖
GraphX的屬性曲線圖是一個有向多重圖與連接到每個頂點和邊的用戶定義的對象。有向多重圖是有向圖,其中存在的多個平行邊共享相同的源和目標頂點。支持平行邊的功能簡化了在相同頂點之間可能存在多個關系(例如,同事和朋友)的建模場景。每個頂點均由唯一的 64位長標識符(VertexId
)設置密鑰 。GraphX對頂點標識符沒有施加任何排序約束。同樣,邊具有相應的源和目標頂點標識符。
在頂點(VD
)和邊(ED
)類型上對屬性圖進行了參數化。這些是分別與每個頂點和邊關聯的對象的類型。
當頂點和邊類型是原始數據類型(例如int,double等)時,GraphX可以優化它們的表示形式,方法是將它們存儲在專用數組中,從而減少了內存占用量。
在某些情況下,可能希望在同一圖形中具有不同屬性類型的頂點。這可以通過繼承來實現。例如,要將用戶和產品建模為二部圖,我們可以執行以下操作:
class VertexProperty() case class UserProperty(val name : String) extends VertexProperty case class ProductProperty(val name : String, val price : Double) extends VertexProperty // The graph might then have the type: var graph : Graph[VertexProperty, String] = null
像RDD一樣,屬性圖是不可變的,分布式的和容錯的。圖的值或結構的更改是通過生成具有所需更改的新圖來完成的。注意,原始圖的實質部分(即不受影響的結構,屬性和索引)在新圖中被重用,從而降低了這種固有功能數據結構的成本。使用一系列頂點分區試探法在執行程序之間划分圖。與RDD一樣,發生故障時,可以在不同的計算機上重新創建圖形的每個分區。
邏輯上,屬性圖對應於一對類型化集合(RDD),它們對每個頂點和邊的屬性進行編碼。結果,圖類包含訪問圖的頂點和邊的成員:
class Graph[VD, ED] { val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] }
Graph的類是VertexRDD[VD]
和EdgeRDD[ED]的
延伸,並且分別包含被優化的版本RDD[(VertexId, VD)]
和RDD[Edge[ED]]
。VertexRDD[VD]和
EdgeRDD[ED]
提供圍繞圖形計算,並利用內部優化內置附加功能。
屬性圖示例
假設我們要構造一個由GraphX項目中的各個協作者組成的屬性圖。頂點屬性可能包含用戶名和職業。我們可以用描述協作者之間關系的字符串注釋邊:
結果圖將具有類型簽名:
val userGraph: Graph[(String, String), String]
有多種方法可以從原始文件,RDD甚至是合成生成器構造屬性圖。最通用的方法是使用 Graph對象。例如,以下代碼從RDD集合構造一個圖形:
// Assume the SparkContext has already been constructed val sc: SparkContext // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser)
在上面的示例中,我們使用了Edge
類。邊具有srcId
和dstId,分別對應於源和目標頂點標識符。另外,Edge
類具有attr
存儲edge屬性的成員。
我們可以分別使用graph.vertices
和graph.edges
成員將圖解構為相應的頂點和邊視圖。
val graph: Graph[(String, String), String] // Constructed from above // Count all users which are postdocs graph.vertices.filter {case (id, (name, pos)) => pos == "postdoc"}.count // Count all the edges where src > dst graph.edges.filter(e => e.srcId > e.dstId).count
請注意,graph.vertices
返回VertexRDD[(String, String)]
擴展了的 RDD[(VertexId, (String, String))]
,因此我們使用scala case
表達式來解構元組。另一方面,graph.edges
返回一個EdgeRDD
包含Edge[String]
對象。我們還可以使用case類類型構造函數,如下所示:
graph.edges.filter {case Edge(src, dst, prop) => src > dst}.count
除了屬性圖的頂點和邊視圖外,GraphX還公開了一個三元組視圖。三元組視圖在邏輯上連接頂點和邊屬性,從而產生一個 RDD[EdgeTriplet[VD, ED]]
包含EdgeTriplet
類的實例。可以用以下SQL表達式表示此連接:
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
或圖形化展示:
Graph的EdgeTriplet
類擴展Edge
通過添加類srcAttr
和 dstAttr
分別包含源和目的屬性成員。我們可以使用圖形的三元組視圖來呈現描述用戶之間關系的字符串集合。
val graph: Graph[(String, String), String] // Constructed from above // Use the triplets view to create an RDD of facts. val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1) facts.collect.foreach(println(_))
四.圖運算符
和RDD一樣,屬性圖也具有一組基本運算符,例如,map、filter、reduceByKey等等,這些運算符采用用戶定義的函數並生成具有轉換后的特性和結構的新圖。在屬性圖中定義了具有優化實現的核心運算符,並在其中定義了表示為核心運算符組成的便捷運算符。但是,由於使用了Scala隱式轉換,Graph中的成員可以自動應用相應的運算符。例如,我們可以通過以下方法計算每個頂點(在Graph中定義)的度數:
val graph: Graph[(String, String), String] // Use the implicit GraphOps.inDegrees operator val inDegrees: VertexRDD[Int] = graph.inDegrees
Graph常用算子
/** Summary of the functionality in the property graph */ class Graph[VD, ED] { // Information about the Graph =================================================================== val numEdges: Long val numVertices: Long val inDegrees: VertexRDD[Int] val outDegrees: VertexRDD[Int] val degrees: VertexRDD[Int] // Views of the graph as collections ============================================================= val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] val triplets: RDD[EdgeTriplet[VD, ED]] // Functions for caching graphs ================================================================== def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] def cache(): Graph[VD, ED] def unpersistVertices(blocking: Boolean = false): Graph[VD, ED] // Change the partitioning heuristic ============================================================ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] // Transform vertex and edge attributes ========================================================== def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]) : Graph[VD, ED2] // Modify the graph structure ==================================================================== def reverse: Graph[VD, ED] def subgraph( epred: EdgeTriplet[VD,ED] => Boolean = (x => true), vpred: (VertexId, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] // Join RDDs with the graph ====================================================================== def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED] def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)]) (mapFunc: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED] // Aggregate information about adjacent triplets ================================================= def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[A] // Iterative graph-parallel computation ========================================================== def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)( vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] // Basic graph algorithms ======================================================================== def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] def connectedComponents(): Graph[VertexId, ED] def triangleCount(): Graph[Int, ED] def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] }
屬性算子
與RDD運算符一樣,屬性圖包含以下內容:
class Graph[VD, ED] { def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] }
這些運算符中的每一個都會產生一個新圖,其頂點或邊屬性由用戶定義的map
函數修改。
請注意,在各種情況下,圖形結構均不受影響。這是這些運算符的關鍵功能,它允許生成的圖重用原始圖的結構索引。以下代碼段在邏輯上是等效的,但第一個代碼段不會保留結構索引,也不會從GraphX系統優化中受益:
val newVertices = graph.vertices.map {case (id, attr) => (id, mapUdf(id, attr))} val newGraph = Graph(newVertices, graph.edges)
而是使用
mapVertices
保留索引:
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
這些運算符通常用於為特定計算初始化圖或投影出不必要的屬性。例如,給定一個以出度作為頂點屬性的圖,我們將其初始化為PageRank:
// Given a graph where the vertex property is the out degree val inputGraph: Graph[Int, String] = graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0)) // Construct a graph where each edge contains the weight // and each vertex is the initial PageRank val outputGraph: Graph[Double, Double] = inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
結構算子
目前,GraphX僅支持一組簡單的常用結構運算符。以下是基本結構運算符的列表。
class Graph[VD, ED] { def reverse: Graph[VD, ED] def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] // 交集 def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] }
該reverse算子
將返回逆轉的所有邊方向上的新圖。例如,在嘗試計算逆PageRank時,這將很有用。由於反向操作不會修改頂點或邊的屬性或更改邊的數量,因此可以有效地實現它,而無需移動或復制數據。
該subgraph
操作需要的頂點和邊的謂詞,並返回包含只有滿足頂點謂詞的頂點和滿足邊謂詞邊的曲線和滿足頂點謂詞連接頂點。subgraph
可以在多種情況下使用該運算符,以將圖形限制在感興趣的頂點和邊或消除斷開的鏈接。例如,在下面的代碼中,我們刪除了斷開的鏈接:
// Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), (4L, ("peter", "student")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) // Notice that there is a user 0 (for which we have no information) connected to users // 4 (peter) and 5 (franklin). graph.triplets.map( triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1 ).collect.foreach(println(_)) // Remove missing vertices as well as the edges to connected to them val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing") // The valid subgraph will disconnect users 4 and 5 by removing user 0 validGraph.vertices.collect.foreach(println(_)) validGraph.triplets.map( triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1 ).collect.foreach(println(_))
注意,在以上示例中,僅提供了頂點謂詞。如果不設置頂點或邊謂詞
subgraph
操作默認為true
。
在mask
操作通過返回包含該頂點和邊,它們也在輸入圖形中發現曲線構造一個子圖。可以與subgraph
運算符結合使用, 以基於另一個相關圖形中的屬性來限制圖形。例如,我們可能會使用缺少頂點的圖來運行連接的組件,然后將答案限制為有效的子圖。
// Run Connected Components val ccGraph = graph.connectedComponents() // No longer contains missing field // Remove missing vertices as well as the edges to connected to them val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing") // Restrict the answer to the valid subgraph val validCCGraph = ccGraph.mask(validGraph)
屬性圖的groupEdges
操作在多重圖中合並平行邊(即,頂點對之間的重復邊緣)。在許多數值應用中,可以將平行邊添加 (合並了它們的權重)到單個邊中,從而減小了圖形的大小。
Join操作
在許多情況下,有必要將外部集合(RDD)中的數據與圖形連接起來。例如,我們可能有想要與現有圖形合並的額外用戶屬性,或者可能希望將頂點屬性從一個圖形拉到另一個圖形。這些任務可以使用聯接運算符來完成。下面我們列出了關鍵的聯接運算符:
class Graph[VD, ED] { def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD) : Graph[VD, ED] def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED] }
Graph的joinVertices
運算符與輸入RDD頂點進行連接並返回通過應用用戶定義獲得的頂點屬性的新圖形。RDD中沒有匹配值的頂點保留其原始值。
請注意,如果RDD對於給定的頂點包含多個值,則只會使用一個。因此,建議使用以下命令使輸入RDD唯一,這也將對結果值進行預索引,以大大加快后續連接的速度。
val nonUniqueCosts: RDD[(VertexId, Double)] val uniqueCosts: VertexRDD[Double] = graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b) val joinedGraph = graph.joinVertices(uniqueCosts)( (id, oldCost, extraCost) => oldCost + extraCost)
除了將用戶定義的函數應用於所有頂點並可以更改頂點屬性類型外,其他outerJoinVertices
行為與常規行為類似。由於並非所有頂點在輸入RDD中都可能具有匹配值,因此該函數采用一種類型。例如,我們可以通過使用初始化頂點屬性來為PageRank設置圖形。
val outDegrees: VertexRDD[Int] = graph.outDegrees val degreeGraph = graph.outerJoinVertices(outDegrees){(id, oldAttr, outDegOpt) => outDegOpt match { case Some(outDeg) => outDeg case None => 0 // No outDegree means zero outDegree } }
雖然我們可以同樣地寫f(a)(b)
,f(a,b)
但這意味着對類型的推斷b
將不依賴a
。結果,用戶將需要為用戶定義的函數提供類型注釋:
val joinedGraph = graph.joinVertices(uniqueCosts,
(id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)
鄰里聚集
許多圖形分析任務中的關鍵步驟是聚合有關每個頂點鄰域的信息。例如,我們可能想知道每個用戶擁有的關注者數量或每個用戶的關注者平均年齡。許多迭代圖算法(例如,PageRank,最短路徑和連接的組件)反復聚合相鄰頂點的屬性(例如,當前的PageRank值,到源的最短路徑以及最小的可到達頂點ID)。
為了提高性能,主要聚合運算符從更改
graph.mapReduceTriplets
為graph.AggregateMessages
。
匯總消息(aggregateMessages)
GraphX中的核心聚合操作為aggregateMessages
。該運算符將用戶定義的sendMsg
函數應用於圖形中的每個邊三元組,然后使用該mergeMsg
函數在其目標頂點處聚合這些消息。
class Graph[VD, ED] { def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[Msg] }
用戶定義的sendMsg
函數采用EdgeContext
,將公開源和目標屬性以及邊屬性和函數(sendToSrc
和sendToDst
),以將消息發送到源和目標節點。sendMsg可以認為是 map-reduce中的map函數。用戶定義的mergeMsg
函數接受兩條發往同一頂點的消息,並產生一條消息。可以認為是map-reduce中的reduce函數。Graph的
aggregateMessages
操作返回一個VertexRDD[Msg]
,包含發往每個頂點的聚合消息(類型的Msg
)。未收到消息的頂點不包含在返回的VertexRDD中。
另外,aggregateMessages
采用一個可選參數 tripletsFields
,該參數指示訪問哪些數據EdgeContext
(即,源頂點屬性,而不是目標頂點屬性)。Graph的可能選項在tripletsFields
中定義,TripletFields
默認值為TripletFields.All
,指示用戶定義的sendMsg
函數可以訪問任何頂點。該tripletFields
參數可用於限制GraphX僅訪問部分頂點, EdgeContext
允許GraphX選擇優化的聯接策略。例如,如果我們正在計算每個用戶的關注者的平均年齡,則僅需要源字段,因此我們可以TripletFields.Src
用來表明我們僅需要源字段。
在GraphX的早期版本中,我們使用字節碼檢查來推斷
TripletFields,
但是我們發現字節碼檢查有些不可靠,而是選擇了更明確的用戶控制。
在以下示例中,我們使用aggregateMessages
運算符來計算每個用戶的追隨者的平均年齡。
package spark2.graphx import org.apache.log4j.{Level, Logger} import org.apache.spark.graphx.{Graph, VertexRDD} import org.apache.spark.graphx.util.GraphGenerators import org.apache.spark.sql.SparkSession object AggregateMessagesExample { Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = { // Creates a SparkSession. val spark = SparkSession .builder .appName(s"${this.getClass.getSimpleName}") .master("local[2]") .getOrCreate() val sc = spark.sparkContext // 隨機生成一個圖 val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc, numVertices = 5).mapVertices((id, _) => id.toDouble) graph.triplets.collect.foreach(println) println("------------") // Compute the number of older followers and their total age val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)]( triplet => { // Map Function if (triplet.srcAttr > triplet.dstAttr) { // Send message to destination vertex containing counter and age triplet.sendToDst((1, triplet.srcAttr)) } }, // Add counter and age (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function ) olderFollowers.collect.foreach(println) println("===============") // Divide total age by number of older followers to get average age of older followers val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues( (id, value) => value match {case (count, totalAge) => totalAge / count}) // Display the results avgAgeOfOlderFollowers.collect.foreach(println) // $example off$ spark.stop() } }
執行結果: