1. Graphx概念
針對某些領域,如社交網絡、語言建模等,graph-parallel系統可以高效地執行復雜的圖形算法,比一般的data-parallel系統更快。
Graphx是將graph-parallel的data-parallel統一到一個系統中。允許用戶將數據當成一個圖或一個集合RDD,而簡化數據移動或復雜操作。
2. 屬性圖
屬性圖為有向多重圖,帶有鏈接到每個頂點和邊的用戶定義的對象。有向多重圖多個並行的邊共享相同源和目的地頂點。每個頂點由一個唯一的64位長的標識符(VertexId)作為key,頂點擁有相同的源和目的頂點標識符。
屬性圖通過vertex(VD)和edge(ED)類型參數化,分別與每個頂點和邊相關聯的對象的類型。某些情況下,相同圖形中希望頂點擁有不同屬性類型,可通過繼承實現。

class VertexProperty() case class UserProperty(val name: String) extends VertexProperty case class ProductProperty(val name: String, val price: Double) extends VertexProperty var grapg: Graph[VertexProperty, String] = null
與RDD類似,屬性圖是不可變、分布式、容錯的。圖中的值或結構變化需要生成新的圖實現。注意:原始圖中的大部分可以在新圖中重用,以減少固有功能數據結構成本。
邏輯上,屬性圖對應一對類型化的集合RDD,包含了每一個頂點和邊屬性。
class Graph[VD, ED]{ val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] }
VertexRDD[VD]和EdgeRDD[ED]分別繼承於RDD[(VertexID, VD)]和RDD[Edge[ED]]。
Graph也包含一個三元組視圖,三元組視圖邏輯上將頂點和邊的屬性保存為一個RDD[EdgeTriplet[VD, ED]],EdgeTriplet可通過下圖理解。
EdgeTriplet繼承於Edge類,並加入srcAttr和dstAttr成員,分別包含源和目標的屬性。
例:

import org.apache.spark.graphx.{Edge, Graph, VertexId} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} class GraphTest1 { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("GraphTest1")) // 創建頂點信息 val users: RDD[(VertexId, (String, String))] = sc.parallelize( Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))) ) // 創建圖的Edge類,Edge類具有srcId和dstId分別對應與源和目標點的標識符,次明早attr成員存儲邊屬性 val relationships: RDD[Edge[String]] = sc.parallelize( Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")) ) // 定義一個默認用戶 val defaultUser = ("John Doe", "Missing") // 基於Graph對象構造初始化圖 val graph = Graph(users, relationships, defaultUser) // 統計用戶為postdoc的總數 // graph.vertices返回VertexRDD[(String, String)],繼承於RDD[(VertexID, (String, String))] graph.vertices.filter{case (id, (name, pos)) => pos == "postdoc"}.count // 統計src > dst的邊總數 // graph.edges返回Edge[String]對象的EdgeRDD // graph.edges.filter(e => e.srcId > e.dstId).count graph.edges.filter{case Edge(srcId, dstId, attr) => srcId > dstId}.count() // graph.triplets包含的屬性有Array(((3,(rxin,student)),(7,(jgonzal,postdoc)),collab)) val facts = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + triplet.dstAttr._1 ) facts.collect().foreach(println) sc.stop() } }
3. 圖操作符
(1) 屬性操作
屬性圖包含操作如下,每個操作都產生一個新圖,包含用戶自定義map操作修改后的頂點或邊的屬性。
a. mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
b. mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
c. mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
注意:每種情況下圖結構均不受影響,如上操作的一個重要特征是允許所得圖形重用原有圖形的結構索引ndices。
例:

import org.apache.spark.graphx.{Edge, Graph, VertexId, GraphOps} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} class GraphTest1 { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("GraphTest1")) // 創建頂點信息 val users: RDD[(VertexId, (String, String))] = sc.parallelize( Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))) ) // 創建圖的Edge類,Edge類具有srcId和dstId分別對應與源和目標點的標識符,次明早attr成員存儲邊屬性 val relationships: RDD[Edge[String]] = sc.parallelize( Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")) ) // 定義一個默認用戶 val defaultUser = ("John Doe", "Missing") // 基於Graph對象構造初始化圖 val graph = Graph(users, relationships, defaultUser) // 統計用戶為postdoc的總數 // graph.vertices返回VertexRDD[(String, String)],繼承於RDD[(VertexID, (String, String))] graph.vertices.filter{case (id, (name, pos)) => pos == "postdoc"}.count // 統計src > dst的邊總數 // graph.edges返回Edge[String]對象的EdgeRDD // graph.edges.filter(e => e.srcId > e.dstId).count graph.edges.filter{case Edge(srcId, dstId, attr) => srcId > dstId}.count() // graph.triplets包含的屬性有Array(((3,(rxin,student)),(7,(jgonzal,postdoc)),collab)) val facts = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + triplet.dstAttr._1 ) facts.collect().foreach(println) // 指定新圖,頂點屬性為出度 val inputGraph: Graph[Int, String] = graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0)) // Construct a graph where each edge contains the weight and each vertex is the initial PageRank val outputGraph:Graph[Double, Double] = inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((_id, _) => 1.0) sc.stop() } }
(2) 結構性操作
圖中基本的結構性操作包含:
a. reverse: Graph[VD, ED]:返回新圖,圖的邊的方向都是反轉,可用於計算反轉的PageRank
b. subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED]:利用頂點和邊的predicates,返回的圖僅僅包含滿足頂點predicates的頂點,滿足邊predicates的邊以及滿足頂點predicates的連接頂點(connect vertices)。應用場景:
獲取感興趣的 頂點和邊組成的圖或者清除斷開鏈接后的圖。
例:

val validGraph = graph.subgraph(vpred = (id, attr) => !attr._2.equals("Missing"))
validGraph.vertices.collect.foreach(println)
validGraph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of "+triplet.dstAttr._1).collect
c. mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]:構建子圖,包含輸入圖中的頂點和邊。可與subgraph結合,基於另一個相關圖的特征去約束一個圖。
例:利用缺失頂點的圖運行連通體,返回有效子圖

val ccGraph = graph.connectedComponents() // No longer contains missing field val validCCGraph = ccGraph.mask(validGraph) // Restrict the answer to the valid subgraph
d. groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]:合並圖中的並行邊(如頂點對之間重復的邊),降低圖的大小
(3) 連接操作
用於將外部數據加入到圖中。
a. joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]:將輸入RDD和頂點相結合,返回一個新的帶有頂點特征的圖。
注意:對於給定頂點,RDD中有超過1個匹配值時,則僅使用其中一個。建議使用如下方法,保證RDD的唯一性。

val nonUniqueCosts: RDD[(VertexId, Double)] val uniqueCosts: VertexRDD[Double] = graph.vertices.aggregateUsingIndex(nonUniqueCosts, (a, b) => a + b) val joinedGraph = graph.joinVertices(uniqueCosts)((id, oldCost, extraCost) => oldCost + extraCost)
b. outerJoinVertices(mapFunc: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]:與joinVertices類似,因為不是所有頂點在RDD中擁有匹配的值,map函數需要一個Option類型