Spark GraphX圖計算快速入門


一.概述

GraphX是Spark中用於圖形和圖形並行計算的新組件。在較高的層次上,GraphX 通過引入新的Graph抽象來擴展Spark RDD:一個有向多重圖,其屬性附加到每個頂點和邊上。為了支持圖計算,GraphX公開了一組基本的操作符(例如, subgraphjoinVertices和 aggregateMessages),以及所述的優化的變體Pregel API。此外,GraphX包括越來越多的圖形算法和 構建器集合,以簡化圖形分析任務。

二.入門

首先,需要將Spark和GraphX導入項目,如下所示:

import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

如果不使用Spark Shell,則還需要一個SparkContext

三.屬性圖

GraphX的屬性曲線圖是一個有向多重圖與連接到每個頂點和邊的用戶定義的對象。有向多重圖是有向圖,其中存在的多個平行邊共享相同的源和目標頂點。支持平行邊的功能簡化了在相同頂點之間可能存在多個關系(例如,同事和朋友)的建模場景。每個頂點均由唯一的 64位長標識符(VertexId)設置密鑰 GraphX對頂點標識符沒有施加任何排序約束。同樣,邊具有相應的源和目標頂點標識符。

在頂點(VD)和邊(ED)類型上對屬性圖進行了參數化這些是分別與每個頂點和邊關聯的對象的類型。

當頂點和邊類型是原始數據類型(例如int,double等)時,GraphX可以優化它們的表示形式,方法是將它們存儲在專用數組中,從而減少了內存占用量。

在某些情況下,可能希望在同一圖形中具有不同屬性類型的頂點。這可以通過繼承來實現。例如,要將用戶和產品建模為二部圖,我們可以執行以下操作:

class VertexProperty()
case class UserProperty(val name : String) extends VertexProperty
case class ProductProperty(val name : String, val price : Double) extends VertexProperty
// The graph might then have the type:
var graph : Graph[VertexProperty, String] = null

像RDD一樣,屬性圖是不可變的,分布式的和容錯的。圖的值或結構的更改是通過生成具有所需更改的新圖來完成的。注意,原始圖的實質部分(即不受影響的結構,屬性和索引)在新圖中被重用,從而降低了這種固有功能數據結構的成本。使用一系列頂點分區試探法在執行程序之間划分圖。與RDD一樣,發生故障時,可以在不同的計算機上重新創建圖形的每個分區。

邏輯上,屬性圖對應於一對類型化集合(RDD),它們對每個頂點和邊的屬性進行編碼。結果,圖類包含訪問圖的頂點和邊的成員:

class Graph[VD, ED] {
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
}

Graph的類是VertexRDD[VD]EdgeRDD[ED]的延伸,並且分別包含被優化的版本RDD[(VertexId, VD)]RDD[Edge[ED]]VertexRDD[VD]和EdgeRDD[ED]提供圍繞圖形計算,並利用內部優化內置附加功能。

屬性圖示例

假設我們要構造一個由GraphX項目中的各個協作者組成的屬性圖。頂點屬性可能包含用戶名和職業。我們可以用描述協作者之間關系的字符串注釋邊:

 結果圖將具有類型簽名:

val userGraph: Graph[(String, String), String]

有多種方法可以從原始文件,RDD甚至是合成生成器構造屬性圖最通用的方法是使用 Graph對象例如,以下代碼從RDD集合構造一個圖形:

// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

在上面的示例中,我們使用了Edge類。邊具有srcId和dstId,分別對應於源和目標頂點標識符。另外,Edge類具有attr存儲edge屬性成員。

我們可以分別使用graph.vertices 和graph.edges成員將圖解構為相應的頂點和邊視圖

val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter {case (id, (name, pos)) => pos == "postdoc"}.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count

請注意,graph.vertices返回VertexRDD[(String, String)]擴展了的 RDD[(VertexId, (String, String))],因此我們使用scala case表達式來解構元組。另一方面,graph.edges返回一個EdgeRDD包含Edge[String]對象。我們還可以使用case類類型構造函數,如下所示:

graph.edges.filter {case Edge(src, dst, prop) => src > dst}.count

除了屬性圖的頂點和邊視圖外,GraphX還公開了一個三元組視圖。三元組視圖在邏輯上連接頂點和邊屬性,從而產生一個 RDD[EdgeTriplet[VD, ED]]包含EdgeTriplet類的實例可以用以下SQL表達式表示連接

SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id

或圖形化展示:

 

 GraphEdgeTriplet類擴展Edge通過添加類srcAttr和 dstAttr分別包含源和目的屬性成員。我們可以使用圖形的三元組視圖來呈現描述用戶之間關系的字符串集合。

val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
  graph.triplets.map(triplet =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))

 四.圖運算符

和RDD一樣屬性圖也具有一組基本運算符,例如,map、filter、reduceByKey等等,這些運算符采用用戶定義的函數並生成具有轉換后的特性和結構的新圖。在屬性圖中定義了具有優化實現的核心運算符,並在其中定義了表示為核心運算符組成的便捷運算符但是,由於使用了Scala隱式轉換,Graph的成員可以自動應用相應的運算符例如,我們可以通過以下方法計算每個頂點(在Graph中定義的度數

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

Graph常用算子

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

屬性算子

與RDD運算符一樣,屬性圖包含以下內容:

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

這些運算符中的每一個都會產生一個新圖,其頂點或邊屬性由用戶定義的map函數修改

請注意,在各種情況下,圖形結構均不受影響。這是這些運算符的關鍵功能,它允許生成的圖重用原始圖的結構索引。以下代碼段在邏輯上是等效的,但第一個代碼段不會保留結構索引,也不會從GraphX系統優化中受益:

val newVertices = graph.vertices.map {case (id, attr) => (id, mapUdf(id, attr))}
val newGraph = Graph(newVertices, graph.edges)

而是使用mapVertices保留索引:

val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

這些運算符通常用於為特定計算初始化圖或投影出不必要的屬性。例如,給定一個以出度作為頂點屬性的圖,我們將其初始化為PageRank:

// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
  graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
  inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

結構算子

目前,GraphX僅支持一組簡單的常用結構運算符。以下是基本結構運算符的列表。

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] // 交集
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

reverse算子將返回逆轉的所有邊方向上的新圖。例如,在嘗試計算逆PageRank時,這將很有用。由於反向操作不會修改頂點或邊的屬性或更改邊的數量,因此可以有效地實現它,而無需移動或復制數據。

subgraph操作需要的頂點和邊的謂詞,並返回包含只有滿足頂點謂詞的頂點和滿足邊謂詞邊的曲線和滿足頂點謂詞連接頂點subgraph 可以在多種情況下使用運算符,以將圖形限制在感興趣的頂點和邊或消除斷開的鏈接。例如,在下面的代碼中,我們刪除了斷開的鏈接:

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))

注意,在以上示例中,僅提供了頂點謂詞。如果不設置頂點或邊謂詞subgraph操作默認為true

mask操作通過返回包含該頂點和邊,它們也在輸入圖形中發現曲線構造一個子圖。可以與subgraph運算符結合使用, 以基於另一個相關圖形中的屬性來限制圖形。例如,我們可能會使用缺少頂點的圖來運行連接的組件,然后將答案限制為有效的子圖。

// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

屬性圖的groupEdges操作在多重圖中合並平行邊(即,頂點對之間的重復邊緣)。在許多數值應用中,可以將平行邊添加 (合並了它們的權重)到單個邊中,從而減小了圖形的大小。

Join操作

在許多情況下,有必要將外部集合(RDD)中的數據與圖形連接起來。例如,我們可能有想要與現有圖形合並的額外用戶屬性,或者可能希望將頂點屬性從一個圖形拉到另一個圖形。這些任務可以使用聯接運算符來完成下面我們列出了關鍵的聯接運算符:

 

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

Graph的joinVertices運算符與輸入RDD頂點進行連接並返回通過應用用戶定義獲得的頂點屬性的新圖形。RDD中沒有匹配值的頂點保留其原始值。

請注意,如果RDD對於給定的頂點包含多個值,則只會使用一個。因此,建議使用以下命令使輸入RDD唯一,這也將對結果值進行預索引,以大大加快后續連接的速度。

val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
  graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
  (id, oldCost, extraCost) => oldCost + extraCost)

除了將用戶定義的函數應用於所有頂點並可以更改頂點屬性類型外,其他outerJoinVertices行為與常規行為類似由於並非所有頂點在輸入RDD中都可能具有匹配值,因此該函數采用一種類型。例如,我們可以通過使用初始化頂點屬性來為PageRank設置圖形

val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees){(id, oldAttr, outDegOpt) =>
  outDegOpt match {
    case Some(outDeg) => outDeg
    case None => 0 // No outDegree means zero outDegree
  }
}

雖然我們可以同樣地寫f(a)(b)f(a,b)但這意味着對類型的推斷b將不依賴a結果,用戶將需要為用戶定義的函數提供類型注釋:

val joinedGraph = graph.joinVertices(uniqueCosts,
  (id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)

鄰里聚集

許多圖形分析任務中的關鍵步驟是聚合有關每個頂點鄰域的信息。例如,我們可能想知道每個用戶擁有的關注者數量或每個用戶的關注者平均年齡。許多迭代圖算法(例如,PageRank,最短路徑和連接的組件)反復聚合相鄰頂點的屬性(例如,當前的PageRank值,到源的最短路徑以及最小的可到達頂點ID)。

為了提高性能,主要聚合運算符從更改 graph.mapReduceTripletsgraph.AggregateMessages

匯總消息(a​​ggregateMessages)

GraphX中的核心聚合操作為aggregateMessages該運算符將用戶定義的sendMsg函數應用於圖形中的每個邊三元組,然后使用該mergeMsg函數在其目標頂點處聚合這些消息。

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

用戶定義的sendMsg函數采用EdgeContext,將公開源和目標屬性以及邊屬性和函數(sendToSrcsendToDst),以將消息發送到源和目標節點。sendMsg可以認為是 map-reduce中map函數。用戶定義的mergeMsg函數接受兩條發往同一頂點的消息,並產生一條消息。可以認為是map-reduce中reduce函數。Graph的 aggregateMessages操作返回一個VertexRDD[Msg] ,包含發往每個頂點的聚合消息(類型的Msg)。未收到消息的頂點不包含在返回的VertexRDD中

另外,aggregateMessages采用一個可選參數 tripletsFields,該參數指示訪問哪些數據EdgeContext (即,源頂點屬性,而不是目標頂點屬性)。Graph的可能選項在tripletsFields中定義,TripletFields默認值為TripletFields.All,指示用戶定義的sendMsg函數可以訪問任何頂點tripletFields參數可用於限制GraphX僅訪問部分頂點, EdgeContext允許GraphX選擇優化的聯接策略。例如,如果我們正在計算每個用戶的關注者的平均年齡,則僅需要源字段,因此我們可以TripletFields.Src用來表明我們僅需要源字段。

在GraphX的早期版本中,我們使用字節碼檢查來推斷 TripletFields但是我們發現字節碼檢查有些不可靠,而是選擇了更明確的用戶控制。

在以下示例中,我們使用aggregateMessages運算符來計算每個用戶的追隨者的平均年齡。

package spark2.graphx

import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.sql.SparkSession

object AggregateMessagesExample {
  Logger.getLogger("org").setLevel(Level.WARN)
  def main(args: Array[String]): Unit = {
    // Creates a SparkSession.
    val spark = SparkSession
      .builder
      .appName(s"${this.getClass.getSimpleName}")
      .master("local[2]")
      .getOrCreate()
    val sc = spark.sparkContext

    // 隨機生成一個圖
    val graph: Graph[Double, Int] =
      GraphGenerators.logNormalGraph(sc, numVertices = 5).mapVertices((id, _) => id.toDouble)
    graph.triplets.collect.foreach(println)
    println("------------")
    // Compute the number of older followers and their total age
    val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
      triplet => { // Map Function
        if (triplet.srcAttr > triplet.dstAttr) {
          // Send message to destination vertex containing counter and age
          triplet.sendToDst((1, triplet.srcAttr))
        }
      },
      // Add counter and age
      (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
    )
    olderFollowers.collect.foreach(println)
    println("===============")
    // Divide total age by number of older followers to get average age of older followers
    val avgAgeOfOlderFollowers: VertexRDD[Double] =
      olderFollowers.mapValues( (id, value) =>
        value match {case (count, totalAge) => totalAge / count})
    // Display the results
    avgAgeOfOlderFollowers.collect.foreach(println)
    // $example off$

    spark.stop()
  }
}

執行結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM