2. Spark GraphX解析


2.1 存儲模式

  2.1.1 圖存儲模式 

      巨型圖的存儲總體上有邊分割和點分割兩種存儲方式

      1)邊分割(Edge-Cut):每個頂點都存儲一次,但有的邊會被打斷分到兩台機器上。這樣做的好處是節省存儲空間;壞處是對圖進行基於邊的計算時,對於一條兩個頂點被分到不同機器上的邊來說,要跨機器通信傳輸數據,內網通信流量大

      2)點分割(Vertex-Cut):每條邊只存儲一次,都只會出現在一台機器上。鄰居多的點會被復制到多台機器上,增加了存儲開銷,同時會引發數據同步問題。好處是可以大幅減少內網通信量

      雖然兩種方法互有利弊,但現在是點分割占上風,各種分布式圖計算框架都將自己底層的存儲形式變成了點分割

      1)磁盤價格下降,存儲空間不再是問題,而內網的通信資源沒有突破性進展,集群計算時內網帶寬是寶貴的,時間比磁盤更珍貴。這點就類似於常見的空間換時間的策略

      2)在當前的應用場景中,絕大多數網絡都是“無尺度網絡”,遵循冪律分布,不同點的鄰居數量相差非常懸殊。而邊分割會使那些多鄰居的點所相連的邊大多數被分到不同的機器上,這樣的數據分布會使得內網帶寬更加捉襟見肘,於是邊分割存儲方式被漸漸拋棄了

  2.1.2 GraphX存儲模式 

      Graphx借鑒PowerGraph,使用的是Vertex-Cut(點分割)方式存儲圖,用三個RDD存儲圖數據信息

      VertexTable(id, data):id為頂點id,data為頂點屬性

      EdgeTable(pid, src, dst, data):pid為分區id,src為源頂點id,dst為目的頂點id,data為邊屬性

      RoutingTable(id, pid):id為頂點id,pid為分區id

      點分割存儲實現如下圖所示:

      

      GraphX在進行圖分割時,有幾種不同的分區(partition)策略,它通過PartitionStrategy專門定義這些策略。在PartitionStrategy中,總共定義了EdgePartition2D、EdgePartition1D、RandomVertexCut以及CanonicalRandomVertexCut這四種不同的分區策略。下面分別介紹這幾種策略

  2.1.2.1 RandomVertexCut

case object RandomVertexCut extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts:
  PartitionID): PartitionID = {
    math.abs((src, dst).hashCode()) % numParts
  }
}

      這個方法比較簡單,通過取源頂點和目標頂點id的哈希值來將邊分配到不同的分區。這個方法會產生一個隨機的邊分割,兩個頂點之間相同方向的邊會分配到同一個分區

  2.1.2.2 CanonicalRandomVertexCut 

case object CanonicalRandomVertexCut extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts:
PartitionID): PartitionID = {
    if (src < dst) {
      math.abs((src, dst).hashCode()) % numParts
    } else {
      math.abs((dst, src).hashCode()) % numParts
    }
  }
}

      這種分割方法和前一種方法沒有本質的不同。不同的是,哈希值的產生帶有確定的方向(即兩個頂點中較小的id的頂點在前)。兩個頂點之間所有的邊都會分配到同一個分區,而不管方向如何

  2.1.2.3 EdgePartition1D  

case object EdgePartition1D extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts:
  PartitionID): PartitionID = {
    val mixingPrime: VertexId = 1125899906842597L (math.abs(src * mixingPrime) % numParts).toInt
  }
}

      這種方法僅僅根據源頂點id來將邊分配到不同的分區。有相同源頂點的邊會分配到同一分區

  2.1.2.4 EdgePartition2D  

case object EdgePartition2D extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts:
  PartitionID): PartitionID = {
    val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
    val mixingPrime: VertexId = 1125899906842597L
    if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
      // Use old method for perfect squared to ensure we get same results
      val col: PartitionID = (math.abs(src * mixingPrime) %
        ceilSqrtNumParts).toInt
      val row: PartitionID = (math.abs(dst * mixingPrime) %
        ceilSqrtNumParts).toInt
      (col * ceilSqrtNumParts + row) % numParts
    } else {
      // Otherwise use new method
      val cols = ceilSqrtNumParts
      val rows = (numParts + cols - 1) / cols
      val lastColRows = numParts - rows * (cols - 1)
      val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
      val row = (math.abs(dst * mixingPrime) % (
        if (col < cols - 1)
          rows
        else
          lastColRows)).toInt
      col * rows + row
    }
  }
}

      這種分割方法同時使用到了源頂點id和目的頂點id。它使用稀疏邊連接矩陣的2維區分來將邊分配到不同的分區,從而保證頂點的備份數不大於2 * sqrt(numParts)的限制。這里numParts表示分區數。這個方法的實現分兩種情況,即分區數能完全開方和不能完全開方兩種情況。當分區數能完全開方時,采用下面的方法:

val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts

      當分區數不能完全開方時,采用下面的方法。這個方法的最后一列允許擁有不同的行數 

val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
//最后一列允許不同的行數
val lastColRows = numParts - rows * (cols - 1)
val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
val row = (math.abs(dst * mixingPrime) % (
  if (col < cols - 1)
    rows
  else
    lastColRows)
  ).toInt
col * rows + row

      下面舉個例子來說明該方法。假設我們有一個擁有12個頂點的圖,要把它切分到9台機器。我們可以用下面的稀疏矩陣來表示:

      

      上面的例子中*表示分配到處理器上的邊。E表示連接頂點v11和v1的邊,它被分配到了處理器P6上。為了獲得邊所在的處理器,我們將矩陣切分為sqrt(numParts) * sqrt(numParts)塊。注意,上圖中與頂點v11相連接的邊只出現在第一列的塊(P0,P3,P6)或者最后一行的塊(P6,P7,P8)中,這保證了V11的副本數不會超過2*sqrt(numParts)份,在上例中即副本不能超過6份。

      在上面的例子中,P0里面存在很多邊,這會造成工作的不均衡。為了提高均衡,我們首先用頂點id乘以一個大的素數,然后再shuffle頂點的位置。乘以一個大的素數本質上不能解決不平衡的問題,只是減少了不平衡的情況發生

2.2 vertices、edges以及triplets 

      vertices、edges 以及 triplets 是 GraphX 中三個非常重要的概念,在前文GraphX介紹中對這三個概念有初步的了解

  2.2.1 vertices

      在GraphX中,vertices對應着名稱為VertexRDD的RDD。這個RDD有頂點id和頂點屬性兩個成員變量。它的源碼如下所示:

abstract class VertexRDD[VD](sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps)

      從源碼中我們可以看到,VertexRDD繼承自RDD[(VertexId, VD)],這里VertexId表示頂點id,VD表示頂點所帶的屬性的類別。這從另一個角度也說明VertexRDD擁有頂點id和頂點屬性

  2.2.2 edges 

      在GraphX中,edges對應着EdgeRDD。這個RDD擁有三個成員變量,分別是源頂點id、目標頂點id以及邊屬性。它的源碼如下所示:

abstract class EdgeRDD[ED](sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps)

      從源碼中可以看到,EdgeRDD繼承自RDD[Edge[ED]],即類型為Edge[ED]的RDD

  2.2.3 triplets 

      在GraphX中,triplets對應着EdgeTriplet。它是一個三元組視圖,這個視圖邏輯上將頂點和邊的屬性保存為一個RDD[EdgeTriplet[VD, ED]]。可以通過下面的Sql表達式表示這個三元視圖的含義:

SELECT
      src.id,
      dst.id,
      src.attr,
      e.attr,
      dst.attr
FROM
      edges AS e
LEFT JOIN vertices AS src,
  vertices AS dst ON e.srcId = src.Id 
AND e.dstId = dst.Id

      同樣,也可以通過下面的圖解的形式來表示它的含義:

      

      EdgeTriplet的源代碼如下所示:

class EdgeTriplet[VD, ED] extends Edge[ED] {
  //源頂點屬性
  var srcAttr: VD = _ // nullValue[VD]
  //目標頂點屬性
  var dstAttr: VD = _ // nullValue[VD]
  protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD, ED] = {
    srcId = other.srcId
    dstId = other.dstId
    attr = other.attr
    this
  }
}

      EdgeTriplet 類繼承自 Edge 類,我們來看看這個父類:

case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (var srcId: VertexId = 0, var dstId: VertexId = 0, var attr: ED = null.asInstanceOf[ED]) extends Serializable

      Edge類中包含源頂點id,目標頂點id以及邊的屬性。所以從源代碼中我們可以知道,triplets既包含了邊屬性也包含了源頂點的id和屬性、目標頂點的id和屬性

2.3 圖的構建 

      GraphX的Graph對象是用戶操作圖的入口。前面的章節我們介紹過,它包含了邊(edges)、頂點(vertices)以及triplets三部分,並且這三部分都包含相應的屬性,可以攜帶額外的信息

  2.3.1 構建圖的方法 

      構建圖的入口方法有兩種,分別是根據邊構建和根據邊的兩個頂點構建

  2.3.1.1 根據邊構建圖(Graph.fromEdges) 

def fromEdges[VD: ClassTag, ED: ClassTag](
  edges: RDD[Edge[ED]],
  defaultValue: VD,
  edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD,
  ED] = {
  GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
}

  2.3.1.2 根據邊的兩個頂點數據構建(Graph.fromEdgeTuples)

def fromEdgeTuples[VD: ClassTag](
    rawEdges: RDD[(VertexId, VertexId)],
    defaultValue: VD,
    uniqueEdges: Option[PartitionStrategy] = None,
    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD,
    Int] = {
  val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
  val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
  uniqueEdges match {
    case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
    case None => graph
  }
}

      從上面的代碼我們知道,不管是根據邊構建圖還是根據邊的兩個頂點數據構建,最終都是使用GraphImpl來構建的,即調用了GraphImpl的apply方法

  2.3.2 構建圖的過程 

      構建圖的過程很簡單,分為三步,它們分別是構建邊EdgeRDD、構建頂點VertexRDD、生成Graph對象。下面分別介紹這三個步驟

2.3.2.1 構建邊EdgeRDD也分為三步,下圖的例子詳細說明了這些步驟

      

      1 從文件中加載信息,轉換成tuple的形式,即(srcId,dstId)

val rawEdgesRdd: RDD[(Long, Long)] =
  sc.textFile(input).filter(s => s != "0,0").repartition(partitionNum).map {
  case line =>
    val ss = line.split(",") 
    val src = ss(0).toLong 
    val dst = ss(1).toLong 
    if (src < dst)
      (src, dst)
    else
      (dst, src)
}.distinct()

      2 入口,調用Graph.fromEdgeTuples(rawEdgesRdd)

      源數據為分割的兩個點ID,把源數據映射成Edge(srcId, dstId, attr)對象, attr默認為1。這樣元數據就構建成了RDD[Edge[ED]],如下面的代碼

val edges = rawEdges.map(p => Edge(p._1, p._2, 1))

      3 將RDD[Edge[ED]]進一步轉化成EdgeRDDImpl[ED,VD]

      第二步構建完RDD[Edge[ED]]之后,GraphX通過調用GraphImpl的apply方法來構建Graph

val graph = GraphImpl(edges, defaultValue, edgeStorageLevel,
  vertexStorageLevel)
  def apply[VD: ClassTag, ED: ClassTag](
        edges: RDD[Edge[ED]],
        defaultVertexAttr: VD,
        edgeStorageLevel: StorageLevel,
        vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
  fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}

      在apply調用fromEdgeRDD之前,代碼會調用EdgeRDD.fromEdges(edges)將RDD[Edge[ED]]轉化成 EdgeRDDImpl[ED, VD]

def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = {
  val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
    val builder = new EdgePartitionBuilder[ED, VD]
    iter.foreach { e =>
      builder.add(e.srcId, e.dstId, e.attr)
    }
    Iterator((pid, builder.toEdgePartition))
  }
  EdgeRDD.fromEdgePartitions(edgePartitions)
}

      程序遍歷RDD[Edge[ED]]的每個分區,並調用builder.toEdgePartition對分區內的邊作相應的處理

def toEdgePartition: EdgePartition[ED, VD] = {
  val edgeArray = edges.trim().array
  new Sorter(Edge.edgeArraySortDataFormat[ED])
    .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
  val localSrcIds = new Array[Int](edgeArray.size)
  val localDstIds = new Array[Int](edgeArray.size)
  val data = new Array[ED](edgeArray.size)
  val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
  val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
  val local2global = new PrimitiveVector[VertexId]
  var vertexAttrs = Array.empty[VD]
  //采用列式存儲的方式,節省了空間
  if (edgeArray.length > 0) {
    index.update(edgeArray(0).srcId, 0)
    var currSrcId: VertexId = edgeArray(0).srcId var currLocalId = -1
    var i = 0
    while (i < edgeArray.size) {
      val srcId = edgeArray(i).srcId
      val dstId = edgeArray(i).dstId
      localSrcIds(i) = global2local.changeValue(srcId,
      { currLocalId += 1; local2global += srcId; currLocalId }, identity) localDstIds(i) = global2local.changeValue(dstId,
      { currLocalId += 1; local2global += dstId; currLocalId }, identity)
      data(i) = edgeArray(i).attr
      //相同頂點 srcId 中第一個出現的 srcId 與其下標
      if (srcId != currSrcId) {
        currSrcId = srcId
        index.update(currSrcId, i)
      }
      i += 1
    }
    vertexAttrs = new Array[VD](currLocalId + 1)
  }
  new EdgePartition(
  localSrcIds, localDstIds, data, index, global2local,
  local2global.trim().array, vertexAttrs, None)
}

      toEdgePartition的第一步就是對邊進行排序

        按照srcId從小到大排序。排序是為了遍歷時順序訪問,加快訪問速度。采用數組而不是Map,是因為數組是連續的內存單元,具有原子性,避免了Map的hash 問題,訪問速度快

      toEdgePartition的第二步就是填充localSrcIds,localDstIds,data,index,global2local,local2global,vertexAttrs

        數組localSrcIds,localDstIds中保存的是通過global2local.changeValue(srcId/dstId)轉換而成的分區本地索引。可以通過localSrcIds、localDstIds數組中保存的索引位從local2global中查到具體的VertexId

        global2local是一個簡單的,key值非負的快速hash map:GraphXPrimitiveKeyOpenHashMap, 保存vertextId和本地索引的映射關系。global2local中包含當前partition所有srcId、dstId與本地索引的映射關系

        data就是當前分區的attr屬性數組

        我們知道相同的srcId可能對應不同的dstId。按照srcId排序之后,相同的srcId會出現多行,如上圖中的index desc部分。index中記錄的是相同srcId中第一個出現的srcId與其下標

        local2global記錄的是所有的VertexId信息的數組。形如: srcId,dstId,srcId,dstId,srcId,dstId,srcId,dstId。其中會包含相同的srcId。即:當前分區所有vertextId的順序實際值

        我們可以通過根據本地下標取VertexId,也可以根據VertexId取本地下標,取相應的屬性

// 根據本地下標取VertexId
localSrcIds/localDstIds -> index -> local2global -> VertexId 
// 根據 VertexId 取本地下標,取屬性
VertexId -> global2local -> index -> data -> attr object

  2.3.2.2 構建頂點VertexRDD  

      緊接着上面構建邊RDD的代碼,方法fromEdgeRDD的實現

private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
        edges: EdgeRDDImpl[ED, VD],
        defaultVertexAttr: VD,
        edgeStorageLevel: StorageLevel,
        vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
  val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache()
  val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr)
  .withTargetStorageLevel(vertexStorageLevel)
  fromExistingRDDs(vertices, edgesCached)
}

      從上面的代碼我們可以知道,GraphX使用VertexRDD.fromEdges構建頂點VertexRDD,當然我們把邊RDD作為參數傳入

def fromEdges[VD: ClassTag](edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
  //1 創建路由表
  val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
  //2 根據路由表生成分區對象vertexPartitions
  val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
    val routingTable =
      if (routingTableIter.hasNext)
        routingTableIter.next()
      else
        RoutingTablePartition.empty
    Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
  }, preservesPartitioning = true)
  //3 創建 VertexRDDImpl 對象
  new VertexRDDImpl(vertexPartitions)
}

      構建頂點VertexRDD的過程分為三步,如上代碼中的注釋。它的構建過程如下圖所示:

      

      創建路由表

        為了能通過點找到邊,每個點需要保存點到邊的信息,這些信息保存在RoutingTablePartition中

private[graphx] def createRoutingTables(edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
  // 將 edge partition 中的數據轉換成 RoutingTableMessage 類型,
  val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
    Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))
}

        上述程序首先將邊分區中的數據轉換成RoutingTableMessage 類型,即tuple(VertexId,Int)類型

def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _]): Iterator[RoutingTableMessage] = {

  val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
  edgePartition.iterator.foreach { e =>
    map.changeValue(e.srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
    map.changeValue(e.dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
  }
  map.iterator.map { vidAndPosition =>
    val vid = vidAndPosition._1
    val position = vidAndPosition._2
    toMessage(vid, pid, position)
  }
}
  //`30-0`比特位表示邊分區`ID`,`32-31`比特位表示標志位
private def toMessage(vid: VertexId, pid: PartitionID, position: Byte): RoutingTableMessage = {
  val positionUpper2 = position << 30
  val pidLower30 = pid & 0x3FFFFFFF
  (vid, positionUpper2 | pidLower30)
}

        根據代碼,可以知道程序使用int的32-31比特位表示標志位,即01: isSrcId ,10: isDstId。30-0比特位表示邊分區ID。這樣做可以節省內存RoutingTableMessage表達的信息是:頂點id和它相關聯的邊的分區id是放在一起的,所以任何時候,我們都可以通過RoutingTableMessage找到頂點關聯的邊

      根據路由表生成分區對象

private[graphx] def createRoutingTables(
  edges: EdgeRDD[_], vertexPartitioner:
  Partitioner): RDD[RoutingTablePartition] = {
  // 將 edge partition 中的數據轉換成 RoutingTableMessage 類型,
  val numEdgePartitions = edges.partitions.size
  vid2pid.partitionBy(vertexPartitioner).mapPartitions(
  iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
  preservesPartitioning = true)
}

      我們將第1步生成的vid2pid按照HashPartitioner重新分區。我們看看RoutingTablePartition.fromMsgs方法

def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage]): RoutingTablePartition = {

  val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
  val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
  val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
  for(msg <- iter){
    val vid = vidFromMessage(msg)
    val pid = pidFromMessage(msg)
    val position = positionFromMessage(msg)
    pid2vid(pid) += vid
    srcFlags(pid) += (position & 0x1) != 0
    dstFlags(pid) += (position & 0x2) != 0
  }
  new RoutingTablePartition(pid2vid.zipWithIndex.map{
    case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid) ), toBitSet(dstFlags(pid)))
  })
}

      該方法從RoutingTableMessage獲取數據,將vid, 邊pid, isSrcId/isDstId重新封裝到pid2vid,srcFlags,dstFlags這三個數據結構中。它們表示當前頂點分區中的點在邊分區的分布。想象一下,重新分區后,新分區中的點可能來自於不同的邊分區,所以一個點要找到邊,就需要先確定邊的分區號pid,然后在確定的邊分區中確定是srcId還是 dstId, 這樣就找到了邊。新分區中保存vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid))這樣的記錄。這里轉換為toBitSet保存是為了節省空間

      根據上文生成的routingTables,重新封裝路由表里的數據結構為ShippableVertexPartition。ShippableVertexPartition會合並相同重復點的屬性attr對象,補全缺失的attr對象

def apply[VD: ClassTag](
  iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
  mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] =
{
  val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
  // 合並頂點
  iter.foreach{ pair =>
  map.setMerge(pair._1, pair._2, mergeFunc)}
  // 不全缺失的屬性值
  routingTable.iterator.foreach{ vid =>
    map.changeValue(vid, defaultVal, identity)
  }
  new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
  //ShippableVertexPartition 定義
  ShippableVertexPartition[VD: ClassTag](
  val index: VertexIdToIndexMap,
  val values: Array[VD],
  val mask: BitSet,
  val routingTable: RoutingTablePartition)

      map就是映射vertexId->attr,index就是頂點集合,values就是頂點集對應的屬性集,mask指頂點集的BitSet

  2.3.2.3 生成Graph對象 

      使用上述構建的edgeRDD和vertexRDD,使用new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]]))就可以生成 Graph對象。ReplicatedVertexView是點和邊的視圖,用來管理運送(shipping)頂點屬性到EdgeRDD的分區。當頂點屬性改變時,我們需要運送它們到邊分區來更新保存在邊分區的頂點屬性。注意,在ReplicatedVertexView中不要保存一個對邊的引用,因為在屬性運送等級升級后,這個引用可能會發生改變

class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](var edges: EdgeRDDImpl[ED, VD], var hasSrcId: Boolean = false, var hasDstId: Boolean = false)

2.4 計算模式 

  2.4.1 BSP計算模式 

      目前基於圖的並行計算框架已經有很多,比如來自Google的Pregel、來自Apache開源的圖計算框架Giraph/HAMA以及最為著名的GraphLab,其中Pregel、HAMA 和Giraph都是非常類似的,都是基於BSP(Bulk Synchronous Parallell)模式。Bulk Synchronous Parallell,即整體同步並行

      在BSP中,一次計算過程由一系列全局超步組成,每一個超步由並發計算、通信和同步三個步驟組成。同步完成,標志着這個超步的完成及下一個超步的開始。BSP 模式的准則是批量同步(bulk synchrony),其獨特之處在於超步(superstep)概念的引入。一個BSP程序同時具有水平和垂直兩個方面的結構。從垂直上看,一個BSP程序由一系列串行的超步(superstep)組成,如圖所示:

      

      從水平上看,在一個超步中,所有的進程並行執行局部計算。一個超步可分為三個階段,如圖所示:

      

      本地計算階段,每個處理器只對存儲在本地內存中的數據進行本地計算

      全局通信階段,對任何非本地數據進行操作

      柵欄同步階段,等待所有通信行為的結束

      BSP模型有如下幾個特點:

      1. 將計算划分為一個一個的超步(superstep),有效避免死鎖

      2. 將處理器和路由器分開,強調了計算任務和通信任務的分開,而路由器僅僅完成點到點的消息傳遞,不提供組合、復制和廣播等功能,這樣做既掩蓋具體的互 連網絡拓撲,又簡化了通信協議

      3. 采用障礙同步的方式、以硬件實現的全局同步是可控的粗粒度級,提供了執行緊耦合同步式並行算法的有效方式

  2.4.2 圖操作一覽 

      正如RDDs有基本的操作map, filter和reduceByKey一樣,屬性圖也有基本的集合操作,這些操作采用用戶自定義的函數並產生包含轉換特征和結構的新圖。定義在 Graph中的核心操作是經過優化的實現。表示為核心操作的組合的便捷操作定義在GraphOps中。然而,因為有Scala的隱式轉換,定義在GraphOps中的操作可以作為Graph的成員自動使用。例如,我們可以通過下面的方式計算每個頂點(定義在GraphOps中)的入度

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator 
val inDegrees: VertexRDD[Int] = graph.inDegrees

      區分核心圖操作和GraphOps的原因是為了在將來支持不同的圖表示。每個圖表示都必須提供核心操作的實現並重用很多定義在GraphOps中的有用操作

      

  2.4.3 操作一覽 

      以下是定義在Graph和GraphOps中(為了簡單起見,表現為圖的成員)的功能的快速瀏覽。注意,某些函數簽名已經簡化(如默認參數和類型的限制已刪除),一些更高級的功能已經被刪除,所以請參閱API文檔了解官方的操作列表

      

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val users: VertexRDD[(String, String)] = VertexRDD[(String, String)](sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),(5L, ("franklin", "prof")), (2L, ("istoica", "prof")))))
val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
val graph = Graph(users, relationships)

/** 圖屬性操作總結 */
class Graph[VD, ED] {
// 圖信息操作
  =============================================================
  獲取邊的數量
  val numEdges: Long
  獲取頂點的數量
  val numVertices: Long
  獲取所有頂點的入度
  val inDegrees: VertexRDD[Int]
  獲取所有頂點的出度
  val outDegrees: VertexRDD[Int]
  獲取所有頂點入度與出度之和
  val degrees: VertexRDD[Int]
  // Views of the graph as collections
  =============================================================
  獲取所有頂點的集合
  val vertices: VertexRDD[VD]
  獲取所有邊的集合
  val edges: EdgeRDD[ED]
  獲取所有 triplets 表示的集合
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs
  ==================================================================
  緩存操作
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  取消緩存
  def unpersist(blocking: Boolean = true): Graph[VD, ED]
  // Change the partitioning heuristic
  ============================================================

  圖重新分區
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // 頂點和邊屬性轉換
  ==========================================================
  def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) =>
    Iterator[ED2])
  : Graph[VD, ED2]
  // 修改圖結構
  ====================================================================
  反轉圖
  def reverse: Graph[VD, ED]
  獲取子圖
  def subgraph(
                epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
                vpred: (VertexID, VD) => Boolean = ((v, d) => true))
  : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph
  ======================================================================
  def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
                               (mapFunc: (VertexID, VD, Option[U]) => VD2)
  : Graph[VD2, ED]

  // Aggregate information about adjacent triplets
  =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
  def aggregateMessages[Msg: ClassTag](
                                        sendMsg: EdgeContext[VD, ED, Msg] => Unit,
                                        mergeMsg: (Msg, Msg) => Msg,
                                        tripletFields: TripletFields = TripletFields.All)
  : VertexRDD[A]

  // Iterative graph-parallel computation
  ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection:
  EdgeDirection)(
                 vprog: (VertexID, VD, A) => VD,
                 sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
                 mergeMsg: (A, A) => A)
  : Graph[VD, ED]
  // Basic graph algorithms
  ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexID, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}

  2.4.4 轉換操作 

      GraphX中的轉換操作主要有mapVertices,mapEdges和mapTriplets三個,它們在Graph文件中定義,在GraphImpl文件中實現。下面分別介紹這三個方法

  2.4.4.1 mapVertices 

      mapVertices用來更新頂點屬性。從圖的構建那章我們知道,頂點屬性保存在邊分區中,所以我們需要改變的是邊分區中的屬性

override def mapVertices[VD2: ClassTag]
(f: (VertexId, VD) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
  if (eq != null) {
    vertices.cache()
    // 使用方法 f 處理 vertices
    val newVerts = vertices.mapVertexPartitions(_.map(f)).cache() //獲得兩個不同 vertexRDD 的不同
    val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) //更新 ReplicatedVertexView
    val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]] .updateVertices(changedVerts)
    new GraphImpl(newVerts, newReplicatedVertexView) } 
  else {
    GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
  }
}

      上面的代碼中,當VD和VD2類型相同時,我們可以重用沒有發生變化的點,否則需要重新創建所有的點。我們分析VD和VD2相同的情況,分四步處理

        1. 使用方法f處理vertices,獲得新的VertexRDD

        2. 使用在VertexRDD中定義的diff方法求出新VertexRDD和源VertexRDD的不同

override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
  val otherPartition = other match {
    case other: VertexRDD[_] if this.partitioner == other.partitioner => other.partitionsRDD
    case _ => VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD }
  val newPartitionsRDD = partitionsRDD.zipPartitions( otherPartition, preservesPartitioning = true
  ) { (thisIter, otherIter) =>
    val thisPart = thisIter.next()
    val otherPart = otherIter.next() 
    Iterator(thisPart.diff(otherPart))
  }
  this.withPartitionsRDD(newPartitionsRDD)
}

      這個方法首先處理新生成的VertexRDD的分區,如果它的分區和源VertexRDD的分區一致,那么直接取出它的partitionsRDD,否則重新分區后取出它的 partitionsRDD。針對新舊兩個VertexRDD的所有分區,調用VertexPartitionBaseOps中的diff方法求得分區的不同

def diff(other: Self[VD]): Self[VD] = {
  //首先判斷
  if (self.index != other.index) {
    diff(createUsingIndex(other.iterator))
  } else {
    val newMask = self.mask & other.mask
    var i = newMask.nextSetBit(0)
    while (i >= 0) {
      if (self.values(i) == other.values(i)) {
        newMask.unset(i)
      }
      i = newMask.nextSetBit(i + 1)
    }
    this.withValues(other.values).withMask(newMask)
  }
}

      該方法隱藏兩個VertexRDD中相同的頂點信息,得到一個新的VertexRDD

        3. 更新ReplicatedVertexView

def updateVertices(updates: VertexRDD[VD]): ReplicatedVertexView[VD, ED] = {
  //生成一個 VertexAttributeBlock
  val shippedVerts = updates.shipVertexAttributes(hasSrcId, hasDstId)
    .setName("ReplicatedVertexView.updateVertices - shippedVerts %s %s (broadcast)".format(hasSrcId, hasDstId))
    .partitionBy(edges.partitioner.get)
  //生成新的邊 RDD
  val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
    (ePartIter, shippedVertsIter) => ePartIter.map {
      case (pid, edgePartition) => (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
    }
  })
  new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
}

      updateVertices方法返回一個新的ReplicatedVertexView,它更新了邊分區中包含的頂點屬性。我們看看它的實現過程。首先看shipVertexAttributes方法的調用。 調用shipVertexAttributes方法會生成一個VertexAttributeBlock,VertexAttributeBlock包含當前分區的頂點屬性,這些屬性可以在特定的邊分區使用

def shipVertexAttributes(shipSrc: Boolean, shipDst: Boolean): Iterator[(PartitionID, VertexAttributeBlock[VD])] = { Iterator.tabulate(routingTable.numEdgePartitions) { pid =>
  val initialSize = if (shipSrc && shipDst) routingTable.partitionSize(pid) else 64
  val vids = new PrimitiveVector[VertexId](initialSize)
  val attrs = new PrimitiveVector[VD](initialSize)
  var i = 0
    routingTable.foreachWithinEdgePartition(pid, shipSrc, shipDst) { vid =>
      if (isDefined(vid)) {
        vids += vid
        attrs += this(vid)
      }
      i += 1
    }
    //(邊分區 id,VertexAttributeBlock(頂點 id,屬性))
    (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
  }
}

      獲得新的頂點屬性之后,我們就可以調用updateVertices更新邊中頂點的屬性了,如下面代碼所示:

edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))
//更新 EdgePartition 的屬性
def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
  val newVertexAttrs = new Array[VD](vertexAttrs.length)
  System.arraycopy(vertexAttrs, 0, newVertexAttrs, 0, vertexAttrs.length)
  while (iter.hasNext) {
    val kv = iter.next()
    //global2local 獲得頂點的本地 index
    newVertexAttrs(global2local(kv._1)) = kv._2
  }
  new EdgePartition(localSrcIds, localDstIds, data, index, global2local, local2global, newVertexAttrs, activeSet)
}

      例子:將字符串合並

scala> graph.mapVertices((VertexId,VD)=>VD._1+VD._2).vertices.collect
res14: Array[(org.apache.spark.graphx.VertexId, String)] = Array((7,jgonzalpostdoc), (2,istoicaprof), (3,rxinstudent), (5,franklinprof))

  2.4.4.2 mapEdges 

      mapEdges用來更新邊屬性

override def mapEdges[ED2: ClassTag](f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
  val newEdges = replicatedVertexView.edges
    .mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
  new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}

      相比於mapVertices,mapEdges顯然要簡單得多,它只需要根據方法f生成新的EdgeRDD,然后再初始化即可

      例子:將邊的屬性都加一個前綴

scala> graph.mapEdges(edge=>"name:"+edge.attr).edges.collect
res16: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(3,7,name:collab), Edge(5,3,name:advisor), Edge(2,5,name:colleague), Edge(5,7,name:pi))

  2.4.4.3 mapTriplets 

      mapTriplets用來更新邊屬性

override def mapTriplets[ED2: ClassTag](
  f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2], tripletFields: TripletFields): Graph[VD, ED2] = {
  vertices.cache()
  replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst)
  val newEdges = replicatedVertexView.edges.mapEdgePartitions { (pid, part) => part.map(f(pid, part.tripletIterator(tripletFields.useSrc, tripletFields.useDst)))}
  new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges)) 
}

      這段代碼中,replicatedVertexView調用upgrade方法修改當前的ReplicatedVertexView,使調用者可以訪問到指定級別的邊信息(如僅僅可以讀源頂點的屬性)

def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean) {
  //判斷傳遞級別
  val shipSrc = includeSrc && !hasSrcId
  val shipDst = includeDst && !hasDstId
  if (shipSrc || shipDst) {
    val shippedVerts: RDD[(Int, VertexAttributeBlock[VD])] =
      vertices.shipVertexAttributes(shipSrc, shipDst)
        .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s
  (broadcast)".format(
          includeSrc, includeDst, shipSrc, shipDst))
        .partitionBy(edges.partitioner.get)
    val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
      (ePartIter, shippedVertsIter) => ePartIter.map {
        case (pid, edgePartition) => (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
      }
    })
    edges = newEdges
    hasSrcId = includeSrc
    hasDstId = includeDst
  }

}

      最后,用f處理邊,生成新的RDD,最后用新的數據初始化圖

      例子:邊屬性添加前綴

scala> graph.mapTriplets(tri=>"name:"+tri.attr).triplets.collect
res19: Array[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = Array(((3,(rxin,student)),(7,(jgonzal,postdoc)),name:collab), ((5,(franklin,prof)),(3,(rxin,student)),name:advisor), ((2,(istoica,prof)),(5,(franklin,prof)),name:colleague), ((5,(franklin,prof)),(7,(jgonzal,postdoc)),name:pi))

  2.4.5  結構操作 

      當前的GraphX僅僅支持一組簡單的常用結構性操作。下面是基本的結構性操作列表

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

      下面分別介紹這四種函數的原理

  2.4.5.1 reverse 

      reverse操作返回一個新的圖,這個圖的邊的方向都是反轉的。例如,這個操作可以用來計算反轉的PageRank。因為反轉操作沒有修改頂點或者邊的屬性或者改變邊的數量,所以我們可以在不移動或者復制數據的情況下有效地實現它

override def reverse: Graph[VD, ED] = {
  new GraphImpl(vertices.reverseRoutingTables(),
  replicatedVertexView.reverse())
}
def reverse(): ReplicatedVertexView[VD, ED] = {
  val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
  new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
}
//EdgePartition中的reverse
def reverse: EdgePartition[ED, VD] = {
  val builder = new ExistingEdgePartitionBuilder[ED, VD]( global2local, local2global, vertexAttrs, activeSet, size)
  var i = 0
    while (i < size) {
      val localSrcId = localSrcIds(i)
      val localDstId = localDstIds(i)
      val srcId = local2global(localSrcId)
      val dstId = local2global(localDstId)
      val attr = data(i)
      //將源頂點和目標頂點換位置
      builder.add(dstId, srcId, localDstId, localSrcId, attr)
      i += 1
    }
    builder.toEdgePartition
}

  2.4.5.2 subgraph  

      subgraph操作利用頂點和邊的判斷式(predicates),返回的圖僅僅包含滿足頂點判斷式的頂點、滿足邊判斷式的邊以及滿足頂點判斷式的triple。subgraph操作可以用於很多場景,如獲取感興趣的頂點和邊組成的圖或者獲取清除斷開連接后的圖

override def subgraph(
    epred: EdgeTriplet[VD, ED] => Boolean = x => true,
    vpred: (VertexId, VD) => Boolean = (a, b) => true):
  Graph[VD, ED] = {
    vertices.cache()
    // 過濾 vertices, 重用 partitioner 和索引
    val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
    // 過濾 triplets
    replicatedVertexView.upgrade(vertices, true, true)
    val newEdges = replicatedVertexView.edges.filter(epred, vpred)
    new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
  }
// 該代碼顯示,subgraph 方法的實現分兩步:先過濾 VertexRDD,然后再過濾 EdgeRDD。如上,過 濾 VertexRDD 比較簡單,我們重點看過濾 EdgeRDD 的過程。
def filter(
    epred: EdgeTriplet[VD, ED] => Boolean,
    vpred: (VertexId, VD) => Boolean): EdgeRDDImpl[ED, VD] = { mapEdgePartitions((pid, part) => part.filter(epred, vpred))
}
//EdgePartition 中的 filter 方法
def filter(
    epred: EdgeTriplet[VD, ED] => Boolean,
    vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
  val builder = new ExistingEdgePartitionBuilder[ED, VD](
    global2local, local2global, vertexAttrs, activeSet)
  var i = 0
  while (i < size) {
    // The user sees the EdgeTriplet, so we can't reuse it and must create one per edge.
    val localSrcId = localSrcIds(i)
    val localDstId = localDstIds(i)
    val et = new EdgeTriplet[VD, ED]
    et.srcId = local2global(localSrcId)
    et.dstId = local2global(localDstId)
    et.srcAttr = vertexAttrs(localSrcId)
    et.dstAttr = vertexAttrs(localDstId)
    et.attr = data(i)
    if (vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)) {
      builder.add(et.srcId, et.dstId, localSrcId, localDstId, et.attr)
    }
    i += 1
  }
  builder.toEdgePartition 
}

      因為用戶可以看到EdgeTriplet的信息,所以我們不能重用EdgeTriplet,需要重新創建一個,然后在用epred函數處理

      例子:

scala> graph.subgraph(Triplet => Triplet.attr.startsWith("c"),(VertexId, VD) => VD._2.startsWith("pro"))
res3: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@49db5438
scala> res3.vertices.collect
res4: Array[(org.apache.spark.graphx.VertexId, (String, String))] = Array((2,(istoica,prof)), (5,(franklin,prof)))
scala> res3.edges.collect
res5: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(2,5,colleague))

  2.4.5.3 mask  

      mask操作構造一個子圖,類似於交集,這個子圖包含輸入圖中包含的頂點和邊。它的實現很簡單,頂點和邊均做inner join操作即可。這個操作可以和subgraph操作相結合,基於另外一個相關圖的特征去約束一個圖。只使用ID進行對比,不對比屬性

override def mask[VD2: ClassTag, ED2: ClassTag] ( other: Graph[VD2, ED2]): Graph[VD, ED] = {
  val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
  val newEdges = replicatedVertexView.edges.innerJoin(other.edges) { (src, dst, v, w) => v }
  new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges)) 
}

  2.4.5.4 groupEdges  

      groupEdges操作合並多重圖中的並行邊(如頂點對之間重復的邊),並傳入一個函數來合並兩個邊的屬性。在大量的應用程序中,並行的邊可以合並(它們的權重合並)為一條邊從而降低圖的大小。(兩個邊需要在一個分區內部才行)

override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
  val newEdges = replicatedVertexView.edges.mapEdgePartitions(
    (pid, part) => part.groupEdges(merge))
  new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
  val builder = new ExistingEdgePartitionBuilder[ED, VD]( global2local, local2global, vertexAttrs, activeSet)
  var currSrcId: VertexId = null.asInstanceOf[VertexId]
  var currDstId: VertexId = null.asInstanceOf[VertexId]
  var currLocalSrcId = -1
  var currLocalDstId = -1
  var currAttr: ED = null.asInstanceOf[ED]
  // 迭代處理所有的邊
  var i = 0
  while (i < size) {
    //如果源頂點和目的頂點都相同
    if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
      // 合並屬性
      currAttr = merge(currAttr, data(i))
    } else {
      // This edge starts a new run of edges
      if (i > 0) {
        // 添加到 builder 中
        builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
      }
      // Then start accumulating for a new run
      currSrcId = srcIds(i)
      currDstId = dstIds(i)
      currLocalSrcId = localSrcIds(i)
      currLocalDstId = localDstIds(i)
      currAttr = data(i)
    }
    i += 1
  }
  if (size > 0) {
    builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
  }
  builder.toEdgePartition
}

      在圖構建那章我介紹過,存儲的邊按照源頂點id排過序,所以上面的代碼可以通過一次迭代完成對所有相同邊的處理

  2.4.5.5 應用舉例 

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
    (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
    (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
    Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
    Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " +
  triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1 ).collect.foreach(println(_))
// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

  2.4.6 頂點關聯操作 

      在許多情況下,有必要將外部數據加入到圖中。例如,我們可能有額外的用戶屬性需要合並到已有的圖中或者我們可能想從一個圖中取出頂點特征加入到另外一個圖中。這些任務可以用join操作完成。主要的join操作如下所示

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD) : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD,
    Option[U]) => VD2) : Graph[VD2, ED]
}

      joinVertices操作join輸入RDD和頂點,返回一個新的帶有頂點特征的圖。這些特征是通過在連接頂點的結果上使用用戶定義的map函數獲得的。沒有匹配的頂點保留其原始值。下面詳細地來分析這兩個函數

  2.4.6.1 joinVertices 

      joinVertices來join相同ID的頂點數據

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
  : Graph[VD, ED] = {
  val uf = (id: VertexId, data: VD, o: Option[U]) => {
    o match {
      case Some (u) => mapFunc(id, data, u)
      case None => data
    }
  }
  graph.outerJoinVertices(table)(uf)
}

      我們可以看到,joinVertices的實現是通過outerJoinVertices來實現的。這是因為join本來就是outer join的一種特例

      例子:

scala> val join = sc.parallelize(Array((3L, "123")))
  
join: org.apache.spark.rdd.RDD[(Long, String)] = ParallelCollectionRDD[137] at parallelize at <console>:31

scala> graph.joinVertices(join)((VertexId, VD, U) => (VD._1,VD._2 + U))

res33: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@4e5b8728

scala> res33.vertices.collect.foreach(println _) 

(7,(jgonzal,postdoc))
  
(2,(istoica,prof))
  
(3,(rxin,student123))
  
(5,(franklin,prof))

  2.4.6.2 outerJoinVertices  

      跟JOIN類似,只不過table中沒有的頂點默認值為None

override def outerJoinVertices[U: ClassTag, VD2: ClassTag] (other: RDD[(VertexId, U)])
(updateF: (VertexId, VD, Option[U]) => VD2)
(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
  if (eq != null) {
    vertices.cache()
    // updateF preserves type, so we can use incremental replication
    val newVerts = vertices.leftJoin(other)(updateF).cache()
    val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
    val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
      .updateVertices(changedVerts)
    new GraphImpl(newVerts, newReplicatedVertexView)
  } else {
    // updateF does not preserve type, so we must re-replicate all vertices
    val newVerts = vertices.leftJoin(other)(updateF)
    GraphImpl(newVerts, replicatedVertexView.edges)
  }
}

      通過以上的代碼我們可以看到,如果updateF不改變類型,我們只需要創建改變的頂點即可,否則我們要重新創建所有的頂點。我們討論不改變類型的情況。這種情況分三步

      1. 修改頂點屬性值

        val newVerts = vertices.leftJoin(other)(updateF).cache()

        這一步會用頂點傳入的RDD,然后用updateF作用joinRDD中的所有頂點,改變它們的值

      2. 找到發生改變的頂點

        val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)

      3. 更新newReplicatedVertexView中邊分區中的頂點屬性

val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
.updateVertices(changedVerts)

      例子:

scala> graph.outerJoinVertices(join)((VertexId, VD, U) => (VD._1,VD._2 + U))

res35: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@7c542a14

scala> res35.vertices.collect.foreach(println _)

(7,(jgonzal,postdocNone))

(2,(istoica,profNone))

(3,(rxin,studentSome(123)))

(5,(franklin,profNone))

  2.4.7 聚合操作  

      GraphX中提供的聚合操作有aggregateMessages、collectNeighborIds和collectNeighbors三個,其中aggregateMessages在GraphImpl中實現,collectNeighborIds和collectNeighbors在GraphOps中實現。下面分別介紹這幾個方法

  2.4.7.1 aggregateMessages 

  2.4.7.1.1 aggregateMessage接口 

      aggregateMessages是GraphX最重要的API,用於替換mapReduceTriplets。目前mapReduceTriplets最終也是通過aggregateMessages來實現的。它主要功能是向鄰邊發消息,合並鄰邊收到的消息,返回messageRDD。aggregateMessages的接口如下:

def aggregateMessages[A: ClassTag](
  sendMsg: EdgeContext[VD, ED, A] => Unit,
  mergeMsg: (A, A) => A,
  tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A] = {
  aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
}

      該接口有三個參數,分別為發消息函數,合並消息函數以及發消息的方向

      sending:發消息函數

private def sendMsg(ctx: EdgeContext[KCoreVertex, Int, Map[Int, Int]]): Unit = {
  ctx.sendToDst(Map(ctx.srcAttr.preKCore -> -1, ctx.srcAttr.curKCore -> 1))
  ctx.sendToSrc(Map(ctx.dstAttr.preKCore -> -1, ctx.dstAttr.curKCore -> 1)) 
}

      mergeMsg:合並消息函數

      該函數用於在Map階段每個edge分區中每個點收到的消息合並,並且它還用於reduce階段,合並不同分區的消息。合並vertexId相同的消息

      tripletFields:定義發消息的方向

  2.4.7.1.2 aggregateMessages處理流程 

      aggregateMessages方法分為Map和Reduce兩個階段,下面我們分別就這兩個階段說明

  2.4.7.1.2.1 Map階段 

      從入口函數進入aggregateMessagesWithActiveSet函數,該函數首先使用VertexRDD[VD]更新replicatedVertexView, 只更新其中vertexRDD中attr對象。如構建圖中介紹的,replicatedVertexView是點和邊的視圖,點的屬性有變化,要更新邊中包含的點的attr

replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst)
  val view = activeSetOpt match {
  case Some((activeSet, _)) =>
  //返回只包含活躍頂點的
  replicatedVertexView replicatedVertexView.withActiveSet(activeSet)
  case None => replicatedVertexView
}

      程序然后會對replicatedVertexView的edgeRDD做mapPartitions操作,所有的操作都在每個邊分區的迭代中完成,如下面的代碼:

val preAgg = view.edges.partitionsRDD.mapPartitions(_.flatMap {
  case (pid, edgePartition) =>
    // 選擇 scan 方法
    val activeFraction = edgePartition.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
    activeDirectionOpt match {
      case Some(EdgeDirection.Both) =>
        if (activeFraction < 0.8) {
          edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields, EdgeActiveness.Both)
        } else {
          edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields, EdgeActiveness.Both)
        }
      case Some(EdgeDirection.Either) => edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields, EdgeActiveness.Either)
      case Some(EdgeDirection.Out) =>
        if (activeFraction < 0.8) {
          edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields, EdgeActiveness.SrcOnly)
        } else {
          edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields, EdgeActiveness.SrcOnly)
        }
      case Some(EdgeDirection.In) => edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields, EdgeActiveness.DstOnly)
      case _ => // None
      edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields, EdgeActiveness.Neither)
    }
})

      在分區內,根據activeFraction的大小選擇是進入aggregateMessagesEdgeScan還是aggregateMessagesIndexScan處理。aggregateMessagesEdgeScan會順序地掃描所有的邊,而aggregateMessagesIndexScan會先過濾源頂點索引,然后在掃描。我們重點去分析aggregateMessagesEdgeScan

def aggregateMessagesEdgeScan[A: ClassTag](
  sendMsg: EdgeContext[VD, ED, A] => Unit,
  mergeMsg: (A, A) => A,
  tripletFields: TripletFields,
  activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
  var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
  var i = 0
  while (i < size) {
    val localSrcId = localSrcIds(i)
    val srcId = local2global(localSrcId)
    val localDstId = localDstIds(i)
    val dstId = local2global(localDstId)
    val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else
  null.asInstanceOf[VD]
    val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else
  null.asInstanceOf[VD]
    ctx.set(srcId, dstId, localSrcId, localDstId, srcAttr, dstAttr, data(i))
    sendMsg(ctx)
    i += 1
  }
}

      該方法由兩步組成,分別是獲得頂點相關信息,以及發送消息

      獲取頂點相關信息

      在前文介紹edge partition時,我們知道它包含localSrcIds,localDstIds,data, index, global2local, local2global, vertexAttrs這幾個重要的數據結構。其中localSrcIds,localDstIds分別表示源頂點、目的頂點在當前分區中的索引。 所以我們可以遍歷localSrcIds,根據其下標去localSrcIds中拿到srcId在全局local2global中的索引,最后拿到srcId。通過vertexAttrs拿到頂點屬性。通過data拿到邊屬性

      發送消息

      發消息前會根據接口中定義的tripletFields,拿到發消息的方向。發消息的過程就是遍歷到一條邊,向localSrcIds/localDstIds中添加數據,如果localSrcIds/localDstIds中已經存在該數據,則執行合並函數mergeMsg

override def sendToSrc(msg: A) {
  send(_localSrcId, msg)
}
override def sendToDst(msg: A) {
  send(_localDstId, msg)
}
@inline private def send(localId: Int, msg: A) {
  if (bitset.get(localId)) {
    aggregates(localId) = mergeMsg(aggregates(localId), msg)
  } else {
    aggregates(localId) = msg
    bitset.set(localId)
  }
}

      每個點之間在發消息的時候是獨立的,即:點單純根據方向,向以相鄰點的以localId為下標的數組中插數據,互相獨立,可以並行運行。Map階段最后返回消息RDD messages: RDD[(VertexId, VD2)]

      Map階段的執行流程如下例所示:

      

  2.4.7.1.2.2 Reduce階段 

      Reduce階段的實現就是調用下面的代碼

vertices.aggregateUsingIndex(preAgg, mergeMsg)
override def aggregateUsingIndex[VD2: ClassTag](
  messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
  val shuffled = messages.partitionBy(this.partitioner.get)
  val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
    thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
  }
  this.withPartitionsRDD[VD2](parts)
}

      上面的代碼通過兩步實現

      1. 對messages重新分區,分區器使用VertexRDD的partitioner。然后使用zipPartitions合並兩個分區

      2. 對等合並attr, 聚合函數使用傳入的mergeMsg函數

def aggregateUsingIndex[VD2: ClassTag](
    iter: Iterator[Product2[VertexId, VD2]],
    reduceFunc: (VD2, VD2) => VD2): Self[VD2] = {
  val newMask = new BitSet(self.capacity)
  val newValues = new Array[VD2](self.capacity)
  iter.foreach { product =>
    val vid = product._1
    val vdata = product._2
    val pos = self.index.getPos(vid)
    if (pos >= 0) {
      if (newMask.get(pos)) {
        newValues(pos) = reduceFunc(newValues(pos), vdata)
      } else { // otherwise just store the new value
        newMask.set(pos)
        newValues(pos) = vdata
      } 
    }
  }
  this.withValues(newValues).withMask(newMask)
}

      根據傳參,我們知道上面的代碼迭代的是messagePartition,並不是每個節點都會收到消息,所以messagePartition集合最小,迭代速度會快。這段代碼表示,我們根據 vetexId從index中取到其下標pos,再根據下標,從values中取到attr,存在attr就用mergeMsg合並attr,不存在就直接賦值

      Reduce階段的過程如下圖所示:

      

  2.4.7.1.3 舉例 

      下面的例子計算比用戶年齡大的追隨者(即followers)的平均年齡

// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.

val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst(1, triplet.srcAttr) 
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))

  2.4.7.2 collectNeighbors  

      該方法的作用是收集每個頂點的鄰居頂點的頂點和頂點屬性。需要指定方向

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
  val nbrs = edgeDirection match {
    case EdgeDirection.Either =>
      graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => {
          ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
          ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
        },
        (a, b) => a ++ b, TripletFields.All)
    case EdgeDirection.In => g
      raph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),
        (a, b) => a ++ b, TripletFields.Src)
    case EdgeDirection.Out =>
      graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))), (a, b) => a ++ b, TripletFields.Dst)
    case EdgeDirection.Both =>
      throw new SparkException("collectEdges does not support
  EdgeDirection.Both. Use" + "EdgeDirection.Either instead.")
  }
  graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
  }
}

      從上面的代碼中,第一步是根據EdgeDirection來確定調用哪個aggregateMessages實現聚合操作。我們用滿足條件EdgeDirection.Either的情況來說明。可以看到 aggregateMessages的方式消息的函數為:

ctx => {
  ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))) 
  ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
},

      這個函數在處理每條邊時都會同時向源頂點和目的頂點發送消息,消息內容分別為(目的頂點 id,目的頂點屬性)、(源頂點 id,源頂點屬性)。為什么會這樣處理呢? 我們知道,每條邊都由兩個頂點組成,對於這個邊,我需要向源頂點發送目的頂點的信息來記錄它們之間的鄰居關系,同理向目的頂點發送源頂點的信息來記錄它們之間的鄰居關系

      Merge函數是一個集合合並操作,它合並同同一個頂點對應的所有目的頂點的信息。如下所示:

(a, b) => a ++ b

      通過aggregateMessages獲得包含鄰居關系信息的VertexRDD后,把它和現有的vertices作join操作,得到每個頂點的鄰居消息

  2.4.7.3 collectNeighborlds 

      該方法的作用是收集每個頂點的鄰居頂點的頂點id。它的實現和collectNeighbors非常相同。需要指定方向

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
  val nbrs =
    if (edgeDirection == EdgeDirection.Either) {
      graph.aggregateMessages[Array[VertexId]](
        ctx => { ctx.sendToSrc(Array(ctx.dstId));
        ctx.sendToDst(Array(ctx.srcId)) },
        _ ++ _, TripletFields.None)
    } else if (edgeDirection == EdgeDirection.Out) {
        graph.aggregateMessages[Array[VertexId]]( 
          ctx => ctx.sendToSrc(Array(ctx.dstId)), 
          _ ++ _, TripletFields.None)
    } else if (edgeDirection == EdgeDirection.In) { 
        graph.aggregateMessages[Array[VertexId]](
          ctx => ctx.sendToDst(Array(ctx.srcId)),
          _ ++ _, TripletFields.None) 
    } else {
      throw new SparkException("It doesn't make sense to collect neighbor ids without a " +
  "direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
    }
  graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[VertexId])
  }
}

      和collectNeighbors的實現不同的是,aggregateMessages函數中的sendMsg函數只發送頂點Id到源頂點和目的頂點。其它的實現基本一致

ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) }

  2.4.8 緩存操作  

      在Spark中,RDD默認是不緩存的。為了避免重復計算,當需要多次利用它們時,我們必須顯示地緩存它們。GraphX中的圖也有相同的方式。當利用到圖多次時,確保首先訪問Graph.cache()方法。

      在迭代計算中,為了獲得最佳的性能,不緩存可能是必須的。默認情況下,緩存的RDD和圖會一直保留在內存中直到因為內存壓力迫使它們以LRU的順序刪除。對於迭代計算,先前的迭代的中間結果將填充到緩存中。雖然它們最終會被刪除,但是保存在內存中的不需要的數據將會減慢垃圾回收。只有中間結果不需要,不緩存它們是更高效的。然而,因為圖是由多個RDD組成的,正確的不持久化它們是困難的。對於迭代計算,建議使用Pregel API,它可以正確的不持久化中間結果

      GraphX中的緩存操作有cache,persist,unpersist和unpersistVertices。它們的接口分別是:

def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] 
def cache(): Graph[VD, ED]
def unpersist(blocking: Boolean = true): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]

2.5 Pregel API  

      圖本身是遞歸數據結構,頂點的屬性依賴於它們鄰居的屬性,這些鄰居的屬性又依賴於自己鄰居的屬性。所以許多重要的圖算法都是迭代的重新計算每個頂點的屬性,直到滿足某個確定的條件。 一系列的圖並發(graph-parallel)抽象已經被提出來用來表達這些迭代算法。GraphX公開了一個類似Pregel的操作,它是廣泛使用的Pregel和GraphLab抽象的一個融合

      GraphX中實現的這個更高級的Pregel操作是一個約束到圖拓撲的批量同步(bulk-synchronous)並行消息抽象。Pregel操作者執行一系列的超步(super steps),在這些步驟中,頂點從之前的超步中接收進入(inbound)消息的總和,為頂點屬性計算一個新的值,然后在以后的超步中發送消息到鄰居頂點。不像Pregel而更像GraphLab,消息通過邊triplet的一個函數被並行計算,消息的計算既會訪問源頂點特征也會訪問目的頂點特征。在超步中,沒有收到消息的頂點會被跳過。當沒有消息遺留時,Pregel操作停止迭代並返回最終的圖

      注意,與標准的Pregel實現不同的是,GraphX中的頂點僅僅能發送信息給鄰居頂點,並且可以利用用戶自定義的消息函數並行地構造消息。這些限制允許對GraphX進行額外的優化

      下面的代碼是pregel的具體實現:

def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
  (graph: Graph[VD, ED],
  initialMsg: A,
  maxIterations: Int = Int.MaxValue,
  activeDirection: EdgeDirection = EdgeDirection.Either)
  (vprog: (VertexId, VD, A) => VD,
  sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A)
  : Graph[VD, ED] =
{
  var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
  // 計算消息
  var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
  var activeMessages = messages.count()
  // 迭代
  var prevG: Graph[VD, ED] = null
  var i = 0
  while (activeMessages > 0 && i < maxIterations) {
    // 接收消息並更新頂點
    prevG = g
    g = g.joinVertices(messages)(vprog).cache()
    val oldMessages = messages
    // 發送新消息
    messages = g.mapReduceTriplets(
      sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
    activeMessages = messages.count()
    i += 1 
  }
  g
}

  2.5.1 pregel計算模型  

      Pregel計算模型中有三個重要的函數,分別是vertexProgram、sendMessage和messageCombiner

        vertexProgram:用戶定義的頂點運行程序。它作用於每一個頂點,負責接收進來的信息,並計算新的頂點值。

        sendMsg:發送消息

        mergeMsg:合並消息

      具體分析它的實現。根據代碼可以知道,這個實現是一個迭代的過程。在開始迭代之前,先完成一些初始化操作:

var g= graph.mapVertices((vid,vdata) => vprog(vid,vdata, initialMsg)).cache()
// 計算消息
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()

      程序首先用vprog函數處理圖中所有的頂點,生成新的圖。然后用生成的圖調用聚合操作(mapReduceTriplets,實際的實現是我們前面章節講到的aggregateMessagesWithActiveSet函數)獲取聚合后的消息。activeMessages指messages這個VertexRDD中的頂點數

      下面就開始迭代操作了。在迭代內部,分為二步:

      1. 接收消息,並更新頂點

g = g.joinVertices(messages)(vprog).cache()
//joinVertices 的定義
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED] = {
  val uf = (id: VertexId, data: VD, o: Option[U]) => { 
    o match {
      case Some(u) => mapFunc(id, data, u)
      case None => data 
    }
  }
  graph.outerJoinVertices(table)(uf) 
}

      這一步實際上是使用outerJoinVertices來更新頂點屬性。outerJoinVertices在關聯操作中有詳細介紹:

      2. 發送新消息

messages = g.mapReduceTriplets(
    sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()    

      注意,在上面的代碼中,mapReduceTriplets多了一個參數Some((oldMessages, activeDirection))。這個參數的作用是:它使我們在發送新的消息時,會忽略掉那些兩端都沒有接收到消息的邊,減少計算量

  2.5.2 pregel實現最短路徑

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
val graph: Graph[Long, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// 初始化圖
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => { // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a,b) => math.min(a,b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))

      上面的例子中,Vertex Program函數定義如下:

(id, dist, newDist) => math.min(dist, newDist)

      這個函數的定義顯而易見,當兩個消息來的時候,取它們當中路徑的最小值。同理Merge Message函數也是同樣的含義Send Message函數中,會首先比較triplet.srcAttr + triplet.attr 和triplet.dstAttr,即比較加上邊的屬性后,這個值是否小於目的節點的屬性,如果小於,則發送消息到目的頂點

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM