第1章 Spark GraphX 概述1.1 什么是 Spark GraphX1.2 彈性分布式屬性圖1.3 運行圖計算程序第2章 Spark GraphX 解析2.1 存儲模式2.1.1 圖存儲模式2.1.2 GraphX 存儲模式2.2 vertices、edges 以及 triplets2.2.1 vertices2.2.2 edges2.2.3 triplets2.3 圖的構建2.3.1 構建圖的方法2.3.2 構建圖的過程2.4 計算模式2.4.1 BSP 計算模式2.4.2 圖操作一覽2.4.3 基本信息操作2.4.4 轉換操作2.4.5 結構操作2.4.6 頂點關聯操作2.4.7 聚合操作2.4.8 緩存操作2.5 Pregel API2.5.1 pregel 計算模型2.5.2 pregel 實現最短路徑2.6 GraphX 實例第3章 圖算法3.1 PageRank 排名算法3.1.1 算法概述3.1.2 從入鏈數量到 PageRank3.1.3 PageRank 算法原理3.1.4 Spark GraphX 實現3.2 廣度優先遍歷(參考)3.3 單源最短路徑(參考)3.4 連通圖(參考)3.5 三角計數(參考)第4章 PageRank 實例
第1章 Spark GraphX 概述
1.1 什么是 Spark GraphX
![]()
Spark GraphX 是一個分布式圖處理框架,它是基於 Spark 平台提供對圖計算和圖挖掘簡潔易用的而豐富的接口,極大的方便了對分布式圖處理的需求。那么什么是圖,都計算些什么?眾所周知社交網絡中人與人之間有很多關系鏈,例如 Twitter、Facebook、微博和微信等,數據中出現網狀結構關系都需要圖計算。
GraphX 是一個新的 Spark API,它用於圖和分布式圖(graph-parallel)的計算。GraphX 通過引入彈性分布式屬性圖(Resilient Distributed Property Graph): 頂點和邊均有屬性的有向多重圖,來擴展Spark RDD。為了支持圖計算,GraphX 開發了一組基本的功能操作以及一個優化過的 Pregel API。另外,GraphX 也包含了一個快速增長的圖算法和圖 builders 的集合,用以簡化圖分析任務。
從社交網絡到語言建模,不斷增長的數據規模以及圖形數據的重要性已經推動了許多新的分布式圖系統的發展。通過限制計算類型以及引入新的技術來切分和分配圖,這些系統可以高效地執行復雜的圖形算法,比一般的分布式數據計算(data-parallel,如 spark、MapReduce)快很多。
![]()
分布式圖(graph-parallel)計算和分布式數據(data-parallel)計算類似,分布式數據計算采用了一種 record-centric(以記錄為中心)的集合視圖,而分布式圖計算采用了一種 vertex-centric(以頂點為中心)的圖視圖。分布式數據計算通過同時處理獨立的數據來獲得並發的目的,分布式圖計算則是通過對圖數據進行分區(即切分)來獲得並發的目的
。更准確的說,分布式圖計算遞歸地定義特征的轉換函數(這種轉換函數作用於鄰居特征),通過並發地執行這些轉換函數來獲得並發的目的。
![]()
分布式圖計算比分布式數據計算更適合圖的處理,但是在典型的圖處理流水線中,它並不能很好地處理所有操作。例如,雖然分布式圖系統可以很好的計算 PageRank 等算法,但是它們不適合從不同的數據源構建圖或者跨過多個圖計算特征。更准確的說,分布式圖系統提供的更窄的計算視圖無法處理那些構建和轉換圖結構以及跨越多個圖的需求。分布式圖系統中無法提供的這些操作需要數據在圖本體之上移動並且需要一個圖層面而不是單獨的頂點或邊層面的計算視圖。例如,我們可能想限制我們的分析到幾個子圖上,然后比較結果。 這不僅需要改變圖結構,還需要跨多個圖計算。
![]()
我們如何處理數據取決於我們的目標,有時同一原始數據可能會處理成許多不同表和圖的視圖,並且圖和表之間經常需要能夠相互移動。如下圖所示:
![]()
所以我們的圖流水線必須通過組合 graph-parallel 和 data- parallel 來實現。但是這種組合必然會導致大量的數據移動以及數據復制,同時這樣的系統也非常復雜。例如,在傳統的圖計算流水線中,在 Table View 視圖下,可能需要 Spark 或者 Hadoop 的支持,在 Graph View 這種視圖下,可能需要 Prege 或者 GraphLab 的支持。也就是把圖和表分在不同的系統中分別處理。不同系統之間數據的移動和通信會成為很大的負擔。
GraphX 項目將 graph-parallel 和 data-parallel 統一到一個系統中,並提供了一個唯一的組合 API。GraphX 允許用戶把數據當做一個圖和一個集合(RDD),而不需要數據移動或者復制。也就是說 GraphX 統一了 Graph View 和 Table View,可以非常輕松的做 pipeline 操作。
1.2 彈性分布式屬性圖
![]()
GraphX 的核心抽象是彈性分布式屬性圖
,它是一個有向多重圖,帶有連接到每個頂點和邊的用戶定義的對象。有向多重圖中多個並行的邊共享相同的源和目的頂點。支持並行邊的能力簡化了建模場景,相同的頂點可能存在多種關系(例如 co-worker 和 friend)。 每個頂點用一個唯一的 64 位長的標識符(VertexID)作為 key。GraphX 並沒有對頂點標識強加任何排序。同樣,邊擁有相應的源和目的頂點標識符。
![]()
屬性圖擴展了 Spark RDD 的抽象,有 Table 和 Graph 兩種視圖,但是只需要一份物理存儲。兩種視圖都有自己獨有的操作符,從而使我們同時獲得了操作的靈活性和執行的高效率。屬性圖以 vertex(VD) 和 edge(ED) 類型作為參數類型,這些類型分別是頂點和邊相關聯的對象的類型。
![]()
在某些情況下,在同樣的圖中,我們可能希望擁有不同屬性類型的頂點
。這可以通過繼承完成。例如,將用戶和產品建模成一個二分圖,我們可以用如下方式:
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
和 RDD 一樣,屬性圖是不可變的、分布式的、容錯的。圖的值或者結構的改變需要生成一個新的圖來實現。注意,原始圖中不受影響的部分都可以在新圖中重用,用來減少存儲的成本。執行者使用一系列頂點分區方法來對圖進行分區。如 RDD 一樣,圖的每個分區可以在發生故障的情況下被重新創建在不同的機器上。
邏輯上,屬性圖對應於一對類型化的集合(RDD),這個集合包含每一個頂點和邊的屬性。因此,圖的類中包含訪問圖中頂點和邊的成員變量。
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}
VertexRDD[VD] 和 EdgeRDD[ED] 類是 RDD[(VertexID, VD)] 和 RDD[Edge[ED]] 的繼承和優化版本。VertexRDD[VD] 和 EdgeRDD[ED] 都提供了額外的圖計算功能並提供內部優化功能。如下圖所示:
![]()
源碼如下:
abstract class VertexRDD[VD](sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps)
abstract class EdgeRDD[ED](sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps)
GraphX 的底層設計有以下幾個關鍵點:
• 對 Graph 視圖的所有操作,最終都會轉換成其關聯的 Table 視圖的 RDD 操作來完成。這樣對一個圖的計算,最終在邏輯上,等價於一系列 RDD 的轉換過程。因此,Graph 最終具備了 RDD 的3個關鍵特性:Immutable、Distributed和Fault-Tolerant,其中最關鍵的是 Immutable(不變性)。邏輯上,所有圖的轉換和操作都產生了一個新圖;物理上,GraphX 會有一定程度的不變頂點和邊的復用優化,對用戶透明。
• 兩種視圖底層共用的物理數據,由 RDD[Vertex-Partition] 和 RDD[EdgePartition] 這兩個 RDD 組成。點和邊實際都不是以表 Collection[tuple] 的形式存儲的,而是由 VertexPartition/EdgePartition 在內部存儲一個帶索引結構的分片數據塊,以加速不同視圖下的遍歷速度。不變的索引結構在 RDD 轉換過程中是共用的,降低了計算和存儲開銷。
• 圖的分布式存儲采用點分割模式,而且使用 partitionBy 方法,由用戶指定不同的划分策略(PartitionStrategy)。划分策略會將邊分配到各個 EdgePartition,頂點分配到各個 VertexPartition,EdgePartition 也會緩存本地邊關聯點的 Ghost 副本。划分策略的不同會影響到所需要緩存的 Ghost 副本數量,以及每個 EdgePartition 分配的邊的均衡程度,需要根據圖的結構特征選取最佳策略。目前有 EdgePartition2d、EdgePartition1d、RandomVertexCut 和 CanonicalRandomVertexCut 這四種策略。
1.3 運行圖計算程序
假設我們想構造一個包括不同合作者的屬性圖。頂點屬性可能包含用戶名和職業。我們可以用描述合作者之間關系的字符串標注邊緣。

Step1、開始的第一步是引入 Spark 和 GraphX 到你的項目中,如下面所示:
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
Step2、如果你沒有用到 Spark shell,你還將需要 SparkContext。
所得的圖形將具有類型簽名:val userGraph: Graph[(String, String), String]
有很多方式從一個原始文件、RDD 構造一個屬性圖。最一般的方法是利用 Graph object。下面的代碼從 RDD 集合生成屬性圖。
// 創建 SparkConf() 並設置 App 名稱
val conf = new SparkConf().setMaster("local[3]").setAppName("WC")
// 創建 SparkContext,該對象是提交 spark App 的入口
val sc = new SparkContext(conf)
// Create an RDD for the vertices (頂點),這里的頂點屬性是 一個二元組
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges (邊),這里的邊屬性是 String 類型
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
val defaultUser = ("John Doe", "Missing") // 缺省屬性
// Build the initial Graph (圖)
val graph = Graph(users, relationships, defaultUser)
Step3、在上面的例子中,我們用到了 Edge 樣本類。邊有一個 srcId 和 dstId 分別對應於源和目標頂點的標示符。另外,Edge 類有一個 attr 成員用來存儲邊屬性。
我們可以分別用 graph.vertices 和 graph.edges 成員將一個圖解構為相應的頂點和邊。
// Count all users which are postdocs (過濾圖上的所有頂點,統計頂點的屬性的第二個值是 postdoc 的個數)
val verticesCount = graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
println(verticesCount)
// Count all the edges where src > dst (統計圖上滿足邊的 源頂點ID > 目標頂點ID 的個數)
val edgeCount = graph.edges.filter(e => e.srcId > e.dstId).count
println(edgeCount)
注意:graph.vertices 返回一個 VertexRDD[(String, String)],它繼承於 RDD[(VertexID, (String, String))]。所以我們可以用 scala 的 case 表達式解構這個元組。另一方面,graph.edges 返回一個包含 Edge[String] 對象的 EdgeRDD。我們也可以用到 case 類的類型構造器,如下例所示。
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
Step4、除了屬性圖的頂點和邊視圖,GraphX 也包含了一個三元組視圖,三元視圖邏輯上將頂點和邊的屬性保存為一個 RDD[EdgeTriplet[VD, ED]],它包含 EdgeTriplet 類的實例。可以通過下面的 Sql 表達式表示這個連接。
SELECT
src.id ,
dst.id ,
src.attr ,
e.attr ,
dst.attr
FROM
edges AS e
LEFT JOIN vertices AS src ,
vertices AS dst ON e.srcId = src.Id
AND e.dstId = dst.Id
或者通過下面的圖來表示:

Step5、EdgeTriplet 類繼承於 Edge 類,並且加入了 srcAttr 和 dstAttr 成員,這兩個成員分別包含源和目的的屬性。我們可以用一個三元組視圖渲染字符串集合用來描述用戶之間的關系。
val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))
第2章 Spark GraphX 解析
2.1 存儲模式
2.1.1 圖存儲模式
PowerGraph 巨型圖的存儲總體上有
邊分割
和點分割
兩種存儲方式。
1)邊分割(Edge-Cut):每個頂點都存儲一次,但有的邊會被打斷分到兩台機器上。這樣做的好處是節省存儲空間;壞處是對圖進行基於邊的計算時,對於一條兩個頂點被分到不同機器上的邊來說,要跨機器通信傳輸數據,內網通信流量大。
2)點分割(Vertex-Cut):每條邊只存儲一次,都只會出現在一台機器上。鄰居多的點會被復制到多台機器上,增加了存儲開銷,同時會引發數據同步問題。好處是可以大幅減少內網通信量。![]()
雖然兩種方法互有利弊,但現在是點分割占上風,各種分布式圖計算框架都將自己底層的存儲形式變成了點分割。主要原因有以下兩個。
1)磁盤價格下降,存儲空間不再是問題,而內網的通信資源沒有突破性進展,集群計算時內網帶寬是寶貴的,時間比磁盤更珍貴。這點就類似於常見的空間換時間
的策略。
2)在當前的應用場景中,絕大多數網絡都是“無尺度網絡”,遵循冪律分布
,不同點的鄰居數量相差非常懸殊。而邊分割會使那些多鄰居的點所相連的邊大多數被分到不同的機器上,這樣的數據分布會使得內網帶寬更加捉襟見肘,於是邊分割存儲方式被漸漸拋棄了
。
2.1.2 GraphX 存儲模式
Graphx 借鑒 PowerGraph,使用的是 Vertex-Cut(點分割)方式存儲圖,用三個 RDD 存儲圖數據信息:
VertexTable(id, data):id 為頂點 id, data 為頂點屬性
EdgeTable(pid, src, dst, data):pid 為分區 id ,src 為源頂點 id ,dst 為目的頂點 id,data 為邊屬性
RoutingTable(id, pid):id 為頂點 id ,pid 為分區 id
點分割存儲實現如下圖所示:![]()
GraphX 在進行圖分割時,有幾種不同的分區(partition)策略,它通過 PartitionStrategy 專門定義這些策略。在 PartitionStrategy 中,總共定義了 EdgePartition2D、EdgePartition1D、RandomVertexCut 以及 CanonicalRandomVertexCut 這四種不同的分區策略。下面分別介紹這幾種策略。
RandomVertexCut
case object RandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
math.abs((src, dst).hashCode()) % numParts
}
}
這個方法比較簡單,通過取源頂點和目標頂點 id 的哈希值來將邊分配到不同的分區。這個方法會產生一個隨機的邊分割,兩個頂點之間相同方向的邊會分配到同一個分區。
CanonicalRandomVertexCut
case object CanonicalRandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
if (src < dst) {
math.abs((src, dst).hashCode()) % numParts
} else {
math.abs((dst, src).hashCode()) % numParts
}
}
}
這種分割方法和前一種方法沒有本質的不同。不同的是,哈希值的產生帶有確定的方向(即兩個頂點中較小 id 的頂點在前)。兩個頂點之間所有的邊都會分配到同一個分區,而不管方向如何。
EdgePartition1D
case object EdgePartition1D extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val mixingPrime: VertexId = 1125899906842597L
(math.abs(src * mixingPrime) % numParts).toInt
}
}
這種方法僅僅根據源頂點 id 來將邊分配到不同的分區。有相同源頂點的邊會分配到同一分區。
EdgePartition2D
case object EdgePartition2D extends PartitionStrategy {
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
val mixingPrime: VertexId = 1125899906842597L
if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
// Use old method for perfect squared to ensure we get same results
val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
} else {
// Otherwise use new method
val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
val lastColRows = numParts - rows * (cols - 1)
val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
col * rows + row
}
}
}
這種分割方法同時使用到了源頂點 id 和目的頂點 id。它使用稀疏邊連接矩陣的 2 維區分來將邊分配到不同的分區,從而保證頂點的備份數不大於 2 * sqrt(numParts) 的限制。這里 numParts 表示分區數。 這個方法的實現分兩種情況,即分區數能完全開方和不能完全開方兩種情況。當分區數能完全開方時,采用下面的方法:
val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
當分區數不能完全開方時,采用下面的方法。這個方法的最后一列允許擁有不同的行數。
val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
//最后一列允許不同的行數
val lastColRows = numParts - rows * (cols - 1)
val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
col * rows + row
下面舉個例子來說明該方法。假設我們有一個擁有 12 個頂點的圖,要把它切分到 9 台機器。我們可以用下面的稀疏矩陣來表示:
__________________________________
v0 | P0 * | P1 | P2 * |
v1 | **** | * | |
v2 | ******* | ** | **** |
v3 | ***** | * * | * |
----------------------------------
v4 | P3 * | P4 *** | P5 ** * |
v5 | * * | * | |
v6 | * | ** | **** |
v7 | * * * | * * | * |
----------------------------------
v8 | P6 * | P7 * | P8 * *|
v9 | * | * * | |
v10 | * | ** | * * |
v11 | * <-E | *** | ** |
----------------------------------
上面的例子中 * 表示分配到處理器上的邊。E 表示連接頂點 v11 和 v1 的邊,它被分配到了處理器 P6 上。為了獲得邊所在的處理器,我們將矩陣切分為 sqrt(numParts) * sqrt(numParts) 塊。 注意
:上圖中與頂點 v11 相連接的邊只出現在第一列的塊 (P0,P3,P6) 或者最后一行的塊 (P6,P7,P8) 中,這保證了 V11 的副本數不會超過 2 * sqrt(numParts) 份,在上例中即副本不能超過 6 份。在上面的例子中,P0 里面存在很多邊,這會造成工作的不均衡。為了提高均衡,我們首先用頂點 id 乘以一個大的素數,然后再 shuffle 頂點的位置。乘以一個大的素數本質上不能解決不平衡的問題,只是減少了不平衡的情況發生。
2.2 vertices、edges 以及 triplets
vertices、edges 以及 triplets 是 GraphX 中三個非常重要的概念。我們在前文 GraphX 介紹中對這三個概念有初步的了解。
2.2.1 vertices
在 GraphX 中,vertices 對應着名稱為 VertexRDD 的 RDD。這個 RDD 有頂點 id 和頂點屬性兩個成員變量。它的源碼如下所示:
abstract class VertexRDD[VD](sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps)
從源碼中我們可以看到,VertexRDD 繼承自 RDD[(VertexId, VD)],這里 VertexId 表示頂點 id,VD 表示頂點所帶的屬性的類別。這從另一個角度也說明 VertexRDD 擁有頂點 id 和頂點屬性。
2.2.2 edges
在 GraphX 中,edges 對應着 EdgeRDD。這個 RDD 擁有三個成員變量,分別是源頂點 id、目標頂點 id以及邊屬性。它的源碼如下所示:
abstract class EdgeRDD[ED](sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps)
從源碼中我們可以看到,EdgeRDD 繼承自 RDD[Edge[ED]],即類型為 Edge[ED] 的 RDD。
2.2.3 triplets
在 GraphX 中,triplets 對應着 EdgeTriplet。它是一個三元組視圖,這個視圖邏輯上將頂點和邊的屬性保存為一個 RDD[EdgeTriplet[VD, ED]]。可以通過下面的 Sql 表達式表示這個三元視圖的含義:
SELECT
src.id ,
dst.id ,
src.attr ,
e.attr ,
dst.attr
FROM
edges AS e
LEFT JOIN vertices AS src ,
vertices AS dst ON e.srcId = src.Id
AND e.dstId = dst.Id
同樣,也可以通過下面圖解的形式來表示它的含義:

EdgeTriplet 的源代碼如下所示:
class EdgeTriplet[VD, ED] extends Edge[ED] {
//源頂點屬性
var srcAttr: VD = _ // nullValue[VD]
//目標頂點屬性
var dstAttr: VD = _ // nullValue[VD]
protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD, ED] = {
srcId = other.srcId
dstId = other.dstId
attr = other.attr
this
}
}
EdgeTriplet 類繼承自 Edge 類,我們來看看這個父類:
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (var srcId: VertexId = 0, var dstId: VertexId = 0, var attr: ED = null.asInstanceOf[ED]) extends Serializable
Edge 類中包含源頂點 id,目標頂點 id 以及邊的屬性。所以從源代碼中我們可以知道,triplets 既包含了邊屬性也包含了源頂點的 id 和屬性、目標頂點的 id 和屬性。
2.3 圖的構建
GraphX 的 Graph 對象是用戶操作圖的入口。前面的章節我們介紹過,它包含了邊(edges)、頂點(vertices)以及 triplets 三部分,並且這三部分都包含相應的屬性,可以攜帶額外的信息。
2.3.1 構建圖的方法
構建圖的入口方法有兩種,分別是根據邊構建和根據邊的兩個頂點構建。
根據邊構建圖(Graph.fromEdges)
def fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultValue: VD,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
}
根據邊的兩個頂點數據構建(Graph.fromEdgeTuples)
def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
{
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
uniqueEdges match {
case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
case None => graph
}
}
從上面的代碼我們知道,不管是根據邊構建圖還是根據邊的兩個頂點數據構建,最終都是使用 GraphImpl 來構建的,即調用了 GraphImpl 的 apply 方法。
2.3.2 構建圖的過程
構建圖的過程很簡單,分為三步,它們分別是 構建邊EdgeRDD、構建頂點VertexRDD、生成Graph對象。下面分別介紹這三個步驟:
Step1、構建邊 EdgeRDD
從源代碼看 構建邊EdgeRDD 也分為三步,下圖的例子詳細說明了這些步驟。

• 1 從文件中加載信息,轉換成 tuple 的形式,即 (srcId, dstId)
val rawEdgesRdd: RDD[(Long, Long)] =
sc.textFile(input).filter(s => s != "0,0").repartition(partitionNum).map {
case line =>
val ss = line.split(",")
val src = ss(0).toLong
val dst = ss(1).toLong
if (src < dst)
(src, dst)
else
(dst, src)
}.distinct()
• 2 入口,調用 Graph.fromEdgeTuples(rawEdgesRdd)
源數據為分割的兩個點 ID,把源數據映射成 Edge(srcId, dstId, attr) 對象, attr 默認為 1。這樣元數據就構建成了 RDD[Edge[ED]],如下面的代碼:
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
• 3 將 RDD[Edge[ED]] 進一步轉化成 EdgeRDDImpl[ED, VD]
第二步構建完 RDD[Edge[ED]] 之后,GraphX 通過調用 GraphImpl 的 apply 方法來構建 Graph。
val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
def apply[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}
在 apply 調用 fromEdgeRDD 之前,代碼會調用 EdgeRDD.fromEdges(edges) 將 RDD[Edge[ED]] 轉化成 EdgeRDDImpl[ED, VD]。
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = {
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]
iter.foreach { e =>
builder.add(e.srcId, e.dstId, e.attr)
}
Iterator((pid, builder.toEdgePartition))
}
EdgeRDD.fromEdgePartitions(edgePartitions)
}
程序遍歷 RDD[Edge[ED]] 的每個分區,並調用 builder.toEdgePartition 對分區內的邊作相應的處理。
def toEdgePartition: EdgePartition[ED, VD] = {
val edgeArray = edges.trim().array
new Sorter(Edge.edgeArraySortDataFormat[ED])
.sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
val localSrcIds = new Array[Int](edgeArray.size)
val localDstIds = new Array[Int](edgeArray.size)
val data = new Array[ED](edgeArray.size)
val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
val local2global = new PrimitiveVector[VertexId]
var vertexAttrs = Array.empty[VD]
//采用列式存儲的方式,節省了空間
if (edgeArray.length > 0) {
index.update(edgeArray(0).srcId, 0)
var currSrcId: VertexId = edgeArray(0).srcId
var currLocalId = -1
var i = 0
while (i < edgeArray.size) {
val srcId = edgeArray(i).srcId
val dstId = edgeArray(i).dstId
localSrcIds(i) = global2local.changeValue(srcId,
{ currLocalId += 1; local2global += srcId; currLocalId }, identity)
localDstIds(i) = global2local.changeValue(dstId,
{ currLocalId += 1; local2global += dstId; currLocalId }, identity)
data(i) = edgeArray(i).attr
//相同頂點srcId中第一個出現的srcId與其下標
if (srcId != currSrcId) {
currSrcId = srcId
index.update(currSrcId, i)
}
i += 1
}
vertexAttrs = new Array[VD](currLocalId + 1)
}
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global.trim().array, vertexAttrs,
None)
}
• toEdgePartition 的第一步就是對邊進行排序。
按照 srcId 從小到大排序。排序是為了遍歷時順序訪問,加快訪問速度。采用數組而不是 Map,是因為數組是連續的內存單元,具有原子性,避免了 Map 的 hash 問題,訪問速度快。
• toEdgePartition 的第二步就是填充 localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs。
數組 localSrcIds, localDstIds 中保存的是通過 global2local.changeValue(srcId/dstId) 轉換而成的分區本地索引。可以通過 localSrcIds、localDstIds 數組中保存的索引位從 local2global 中查到具體的 VertexId。
global2local 是一個簡單的,key 值非負的快速 hash map:GraphXPrimitiveKeyOpenHashMap,保存 vertextId 和本地索引的映射關系。global2local 中包含當前 partition 所有 srcId、dstId 與本地索引的映射關系。
• data 就是當前分區的 attr 屬性數組。
我們知道相同的 srcId 可能對應不同的 dstId。按照 srcId 排序之后,相同的 srcId 會出現多行,如上圖中的 index desc 部分。index 中記錄的是相同 srcId 中第一個出現的 srcId 與其下標。
• local2global 記錄的是所有的 VertexId 信息的數組。形如:srcId, dstId, srcId, dstId, srcId, dstId, srcId, dstId。其中會包含相同的 srcId。即:當前分區所有 vertextId 的順序實際值。
我們可以通過根據本地下標取 VertexId,也可以根據 VertexId 取本地下標,取相應的屬性。
// 根據本地下標取 VertexId
localSrcIds/localDstIds -> index -> local2global -> VertexId
// 根據 VertexId 取本地下標,取屬性
VertexId -> global2local -> index -> data -> attr object
構建頂點 VertexRDD
緊接着上面構建邊 RDD 的代碼,我們看看方法 fromEdgeRDD 的實現。
private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
edges: EdgeRDDImpl[ED, VD],
defaultVertexAttr: VD,
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache()
val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr)
.withTargetStorageLevel(vertexStorageLevel)
fromExistingRDDs(vertices, edgesCached)
}
從上面的代碼我們可以知道,GraphX 使用 VertexRDD.fromEdges 構建頂點 VertexRDD,當然我們把邊 RDD 作為參數傳入。
def fromEdges[VD: ClassTag](
edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
// 1 創建路由表
val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
// 2 根據路由表生成分區對象vertexPartitions
val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
val routingTable =
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
}, preservesPartitioning = true)
// 3 創建VertexRDDImpl對象
new VertexRDDImpl(vertexPartitions)
}
構建頂點 VertexRDD 的過程分為三步,如上代碼中的注釋。它的構建過程如下圖所示:

創建路由表
為了能通過點找到邊,每個點需要保存點到邊的信息,這些信息保存在 RoutingTablePartition 中。
private[graphx] def createRoutingTables(edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
// 將edge partition中的數據轉換成RoutingTableMessage類型,
val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))
}
上述程序首先將邊分區中的數據轉換成 RoutingTableMessage 類型,即 tuple(VertexId,Int) 類型。
def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
: Iterator[RoutingTableMessage] = {
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
edgePartition.iterator.foreach { e =>
map.changeValue(e.srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
map.changeValue(e.dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
}
map.iterator.map { vidAndPosition =>
val vid = vidAndPosition._1
val position = vidAndPosition._2
toMessage(vid, pid, position)
}
}
//`30-0`比特位表示邊分區`ID`,`32-31`比特位表示標志位
private def toMessage(vid: VertexId, pid: PartitionID, position: Byte): RoutingTableMessage = {
val positionUpper2 = position << 30
val pidLower30 = pid & 0x3FFFFFFF
(vid, positionUpper2 | pidLower30)
}
根據代碼,我們可以知道程序使用 int 的 32-31 比特位表示標志位,即 01: isSrcId ,10: isDstId。30-0 比特位表示邊分區 ID。這樣做可以節省內存。RoutingTableMessage 表達的信息是:頂點id和它相關聯的邊的分區id是放在一起的,所以任何時候,我們都可以通過 RoutingTableMessage 找到頂點關聯的邊。
根據路由表生成分區對象
private[graphx] def createRoutingTables(
edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
// 將edge partition中的數據轉換成RoutingTableMessage類型,
val numEdgePartitions = edges.partitions.size
vid2pid.partitionBy(vertexPartitioner).mapPartitions(
iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
preservesPartitioning = true)
}
我們將第 1 步生成的 vid2pid 按照 HashPartitioner 重新分區。我們看看 RoutingTablePartition.fromMsgs 方法。
def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
: RoutingTablePartition = {
val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
for (msg <- iter) {
val vid = vidFromMessage(msg)
val pid = pidFromMessage(msg)
val position = positionFromMessage(msg)
pid2vid(pid) += vid
srcFlags(pid) += (position & 0x1) != 0
dstFlags(pid) += (position & 0x2) != 0
}
new RoutingTablePartition(pid2vid.zipWithIndex.map {
case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))
})
}
該方法從 RoutingTableMessage 獲取數據,將 vid, 邊pid, isSrcId/isDstId 重新封裝到 pid2vid,srcFlags,dstFlags 這三個數據結構中。它們表示當前頂點分區中的點在邊分區的分布。想象一下,重新分區后,新分區中的點可能來自於不同的邊分區,所以一個點要找到邊,就需要先確定邊的分區號 pid, 然后在確定的邊分區中確定是 srcId 還是 dstId, 這樣就找到了邊。新分區中保存 vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)) 這樣的記錄。這里轉換為 toBitSet 保存是為了節省空間。
根據上文生成的 routingTables,重新封裝路由表里的數據結構為 ShippableVertexPartition。ShippableVertexPartition 會合並相同重復點的屬性 attr 對象,補全缺失的 attr 對象。
def apply[VD: ClassTag](
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
// 合並頂點
iter.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
// 不全缺失的屬性值
routingTable.iterator.foreach { vid =>
map.changeValue(vid, defaultVal, identity)
}
new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
}
//ShippableVertexPartition定義
ShippableVertexPartition[VD: ClassTag](
val index: VertexIdToIndexMap,
val values: Array[VD],
val mask: BitSet,
val routingTable: RoutingTablePartition)
map 就是映射 vertexId->attr,index 就是頂點集合,values 就是頂點集對應的屬性集,mask 指頂點集的 BitSet。
生成 Graph 對象
使用上述構建的 edgeRDD 和 vertexRDD,使用 new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]])) 就可以生成 Graph 對象。ReplicatedVertexView 是點和邊的視圖,用來管理運送 (shipping) 頂點屬性到 EdgeRDD 的分區。當頂點屬性改變時,我們需要運送它們到邊分區來更新保存在邊分區的頂點屬性。 注意
:在 ReplicatedVertexView 中不要保存一個對邊的引用,因為在屬性運送等級升級后,這個引用可能會發生改變。
class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](var edges: EdgeRDDImpl[ED, VD], var hasSrcId: Boolean = false, var hasDstId: Boolean = false)
2.4 計算模式
2.4.1 BSP 計算模式
目前基於圖的並行計算框架已經有很多,比如來自 Google 的 Pregel、來自 Apache 開源的圖計算框架 Giraph/HAMA 以及最為著名的 GraphLab,其中 Pregel、HAMA 和 Giraph 都是非常類似的,都是基於 BSP(Bulk Synchronous Parallell)模式。 Bulk Synchronous Parallell,即整體同步並行。
在 BSP 中,一次計算過程由一系列全局超步組成,每一個超步由並發計算、通信和同步三個步驟組成。同步完成,標志着這個超步的完成及下一個超步的開始。 BSP 模式的准則是批量同步(bulk synchrony),其獨特之處在於超步(superstep) 概念的引入。一個 BSP 程序同時具有水平和垂直兩個方面的結構。從垂直上看,一個 BSP 程序由一系列串行的超步(superstep) 組成,如圖所示:![]()
從水平上看,在一個超步中,所有的進程並行執行局部計算。一個超步可分為三個階段,如圖所示:
![]()
• 本地計算階段,每個處理器只對存儲在本地內存中的數據進行本地計算。
• 全局通信階段,對任何非本地數據進行操作。
• 柵欄同步階段,等待所有通信行為的結束。
BSP 模型有如下幾個特點:
1、將計算划分為一個一個的超步(superstep),有效避免死鎖。
2、將處理器和路由器分開,強調了計算任務和通信任務的分開,而路由器僅僅完成點到點的消息傳遞,不提供組合、復制和廣播等功能,這樣做既掩蓋具體的互連網絡拓撲,又簡化了通信協議。
3、采用障礙同步的方式、以硬件實現的全局同步是可控的粗粒度級,提供了執行緊耦合同步式並行算法的有效方式
2.4.2 圖操作一覽
正如 RDDs 有基本的操作 map、filter 和 reduceByKey 一樣,屬性圖也有基本的集合操作,這些操作采用用戶自定義的函數並產生包含轉換特征和結構的新圖。定義在 Graph 中的核心操作是經過優化的實現。表示為核心操作的組合的便捷操作定義在 GraphOps 中。然而,因為有 Scala 的隱式轉換,定義在 GraphOps 中的操作可以作為 Graph 的成員自動使用。例如,我們可以通過下面的方式計算每個頂點(定義在 GraphOps 中)的入度。
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees
區分核心圖操作和 GraphOps 的原因是為了在將來支持不同的圖表示。每個圖表示都必須提供核心操作的實現並重用很多定義在 GraphOps 中的有用操作。

2.4.3 基本信息操作
以下是定義在 Graph 和 GraphOps 中(為了簡單起見,表現為圖的成員)的功能的快速瀏覽。注意,某些函數簽名已經簡化(如默認參數和類型的限制已刪除),一些更高級的功能已經被刪除,所以請參閱 API 文檔了解官方的操作列表。

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val users: VertexRDD[(String, String)] = VertexRDD[(String, String)](sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))))
val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
val graph = Graph(users, relationships)
圖屬性操作總結
/** 圖屬性操作總結 */
class Graph[VD, ED] {
// 圖信息操作
// 獲取邊的數量
val numEdges: Long
// 獲取頂點的數量
val numVertices: Long
// 獲取所有頂點的入度
val inDegrees: VertexRDD[Int]
// 獲取所有頂點的出度
val outDegrees: VertexRDD[Int]
// 獲取所有頂點入度與出度之和
val degrees: VertexRDD[Int]
// 獲取所有頂點的集合
val vertices: VertexRDD[VD]
// 獲取所有邊的集合
val edges: EdgeRDD[ED]
// 獲取所有triplets表示的集合
val triplets: RDD[EdgeTriplet[VD, ED]]
// 緩存操作
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
// 取消緩存
def unpersist(blocking: Boolean = true): Graph[VD, ED]
// 圖重新分區
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// 頂點和邊屬性轉換
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2]
// 修改圖結構
// 反轉圖
def reverse: Graph[VD, ED]
// 獲取子圖
def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
vpred: (VertexID, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// Join RDDs with the graph
def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
(mapFunc: (VertexID, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about adjacent triplets
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexID, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}
2.4.4 轉換操作
GraphX 中的轉換操作主要有 mapVertices、mapEdges 和 mapTriplets 三個,它們在 Graph 文件中定義,在 GraphImpl 文件中實現。下面分別介紹這三個方法。
mapVertices
mapVertices 用來更新頂點屬性。從圖的構建那章我們知道,頂點屬性保存在邊分區中,所以我們需要改變的是邊分區中的屬性。
對當前圖每一個頂點應用提供的 map 函數來修改頂點的屬性,返回一個新的圖。
override def mapVertices[VD2: ClassTag]
(f: (VertexId, VD) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
if (eq != null) {
vertices.cache()
// 使用方法 f 處理 vertices
val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
// 獲得兩個不同 vertexRDD 的不同
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
// 更新 ReplicatedVertexView
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
.updateVertices(changedVerts)
new GraphImpl(newVerts, newReplicatedVertexView)
} else {
GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
}
}
上面的代碼中,當 VD 和 VD2 類型相同時,我們可以重用沒有發生變化的點,否則需要重新創建所有的點。我們分析 VD 和 VD2 相同的情況,分四步處理。
•(1)使用方法 f 處理 vertices,獲得新的 VertexRDD
•(2)使用在 VertexRDD 中定義的 diff 方法求出新 VertexRDD 和源 VertexRDD 的不同
override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
val otherPartition = other match {
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
other.partitionsRDD
case _ =>
VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
}
val newPartitionsRDD = partitionsRDD.zipPartitions(
otherPartition, preservesPartitioning = true
) { (thisIter, otherIter) =>
val thisPart = thisIter.next()
val otherPart = otherIter.next()
Iterator(thisPart.diff(otherPart))
}
this.withPartitionsRDD(newPartitionsRDD)
}
這個方法首先處理新生成的 VertexRDD 的分區,如果它的分區和源 VertexRDD 的分區一致,那么直接取出它的 partitionsRDD,否則重新分區后取出它的 partitionsRDD。 針對新舊兩個 VertexRDD 的所有分區,調用 VertexPartitionBaseOps 中的 diff 方法求得分區的不同。
def diff(other: Self[VD]): Self[VD] = {
// 首先判斷
if (self.index != other.index) {
diff(createUsingIndex(other.iterator))
} else {
val newMask = self.mask & other.mask
var i = newMask.nextSetBit(0)
while (i >= 0) {
if (self.values(i) == other.values(i)) {
newMask.unset(i)
}
i = newMask.nextSetBit(i + 1)
}
this.withValues(other.values).withMask(newMask)
}
}
該方法隱藏兩個 VertexRDD 中相同的頂點信息,得到一個新的 VertexRDD。
•(3)更新 ReplicatedVertexView
def updateVertices(updates: VertexRDD[VD]): ReplicatedVertexView[VD, ED] = {
// 生成一個 VertexAttributeBlock
val shippedVerts = updates.shipVertexAttributes(hasSrcId, hasDstId)
.setName("ReplicatedVertexView.updateVertices - shippedVerts %s %s (broadcast)".format(
hasSrcId, hasDstId))
.partitionBy(edges.partitioner.get)
// 生成新的邊 RDD
val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
(ePartIter, shippedVertsIter) => ePartIter.map {
case (pid, edgePartition) =>
(pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
}
})
new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
}
updateVertices 方法返回一個新的 ReplicatedVertexView,它更新了邊分區中包含的頂點屬性。我們看看它的實現過程。首先看 shipVertexAttributes 方法的調用。調用 shipVertexAttributes 方法會生成一個 VertexAttributeBlock,VertexAttributeBlock 包含當前分區的頂點屬性,這些屬性可以在特定的邊分區使用。
def shipVertexAttributes(
shipSrc: Boolean, shipDst: Boolean): Iterator[(PartitionID, VertexAttributeBlock[VD])] = {
Iterator.tabulate(routingTable.numEdgePartitions) { pid =>
val initialSize = if (shipSrc && shipDst) routingTable.partitionSize(pid) else 64
val vids = new PrimitiveVector[VertexId](initialSize)
val attrs = new PrimitiveVector[VD](initialSize)
var i = 0
routingTable.foreachWithinEdgePartition(pid, shipSrc, shipDst) { vid =>
if (isDefined(vid)) {
vids += vid
attrs += this(vid)
}
i += 1
}
// (邊分區id, VertexAttributeBlock (頂點id, 屬性))
(pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
}
}
獲得新的頂點屬性之后,我們就可以調用 updateVertices 更新邊中頂點的屬性了,如下面代碼所示:
edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))
//更新EdgePartition的屬性
def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
val newVertexAttrs = new Array[VD](vertexAttrs.length)
System.arraycopy(vertexAttrs, 0, newVertexAttrs, 0, vertexAttrs.length)
while (iter.hasNext) {
val kv = iter.next()
//global2local獲得頂點的本地index
newVertexAttrs(global2local(kv._1)) = kv._2
}
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global, newVertexAttrs,
activeSet)
}
例子:將頂點的2個屬性合並為1個屬性(即將字符串合並)。
scala> graph.vertices.collect.foreach(println _)
(5,(franklin,prof))
(2,(istoica,prof))
(3,(rxin,student))
(7,(jgonzal,postdoc))
scala> graph.mapVertices{ case (vid, (attr1,attr2)) => attr1 + attr2 } 或者 graph.mapVertices((VertexId, VD) => VD._1 + VD._2)
res20: org.apache.spark.graphx.Graph[String,String] = org.apache.spark.graphx.impl.GraphImpl@4c819eab
scala> res20.vertices.collect.foreach(println _)
(5,franklinprof)
(2,istoicaprof)
(3,rxinstudent)
(7,jgonzalpostdoc)
mapEdges
mapEdges 用來更新邊屬性。對當前圖每一條邊應用提供的 map 函數來修改邊的屬性,返回一個新圖。
override def mapEdges[ED2: ClassTag](f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
val newEdges = replicatedVertexView.edges
.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}
相比於 mapVertices,mapEdges 顯然要簡單得多,它只需要根據方法 f 生成新的 EdgeRDD,然后再初始化即可。
例子:將邊的屬性都加一個前綴。
scala> graph.edges.collect.foreach(println _)
Edge(3,7,collab)
Edge(5,3,advisor)
Edge(2,5,colleague)
Edge(5,7,pi)
scala> graph.mapEdges(edge => "name:" + edge.attr)
res29: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@72da828b
scala> res29.edges.collect.foreach(println _)
Edge(3,7,name:collab)
Edge(5,3,name:advisor)
Edge(2,5,name:colleague)
Edge(5,7,name:pi)
mapTriplets
mapTriplets 用來更新邊屬性。
override def mapTriplets[ED2: ClassTag](
f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2],
tripletFields: TripletFields): Graph[VD, ED2] = {
vertices.cache()
replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst)
val newEdges = replicatedVertexView.edges.mapEdgePartitions { (pid, part) =>
part.map(f(pid, part.tripletIterator(tripletFields.useSrc, tripletFields.useDst)))
}
new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}
這段代碼中,replicatedVertexView 調用 upgrade 方法修改當前的 ReplicatedVertexView,使調用者可以訪問到指定級別的邊信息(如僅僅可以讀源頂點的屬性)。
def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean) {
// 判斷傳遞級別
val shipSrc = includeSrc && !hasSrcId
val shipDst = includeDst && !hasDstId
if (shipSrc || shipDst) {
val shippedVerts: RDD[(Int, VertexAttributeBlock[VD])] =
vertices.shipVertexAttributes(shipSrc, shipDst)
.setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
includeSrc, includeDst, shipSrc, shipDst))
.partitionBy(edges.partitioner.get)
val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
(ePartIter, shippedVertsIter) => ePartIter.map {
case (pid, edgePartition) =>
(pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
}
})
edges = newEdges
hasSrcId = includeSrc
hasDstId = includeDst
}
}
最后,用 f 處理邊,生成新的 RDD,最后用新的數據初始化圖。
例子:邊屬性添加前綴。
scala> graph.edges.collect.foreach(println _)
Edge(3,7,collab)
Edge(5,3,advisor)
Edge(2,5,colleague)
Edge(5,7,pi)
scala> graph.mapTriplets(tri => "name:" + tri.attr).triplets
res37: org.apache.spark.rdd.RDD[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = MapPartitionsRDD[80] at mapPartitions at GraphImpl.scala:48
scala> graph.mapTriplets(tri => "name:" + tri.attr).triplets.collect
res39: Array[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = Array(((3,(rxin,student)),(7,(jgonzal,postdoc)),name:collab), ((5,(franklin,prof)),(3,(rxin,student)),name:advisor), ((2,(istoica,prof)),(5,(franklin,prof)),name:colleague), ((5,(franklin,prof)),(7,(jgonzal,postdoc)),name:pi))
scala> graph.mapTriplets(tri => "name:" + tri.attr).triplets.collect.foreach(println _)
((3,(rxin,student)),(7,(jgonzal,postdoc)),name:collab)
((5,(franklin,prof)),(3,(rxin,student)),name:advisor)
((2,(istoica,prof)),(5,(franklin,prof)),name:colleague)
((5,(franklin,prof)),(7,(jgonzal,postdoc)),name:pi)
2.4.5 結構操作
當前的 GraphX 僅僅支持一組簡單的常用結構性操作。下面是基本的結構性操作列表。
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
下面分別介紹這四種函數的原理:
reverse
reverse 操作返回一個新的圖,這個圖的邊的方向都是反轉的。例如,這個操作可以用來計算反轉的 PageRank。因為反轉操作沒有修改頂點或者邊的屬性或者改變邊的數量,所以我們可以在不移動或者復制數據的情況下有效地實現它。
override def reverse: Graph[VD, ED] = {
new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse())
}
def reverse(): ReplicatedVertexView[VD, ED] = {
val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
}
// EdgePartition 中的 reverse
def reverse: EdgePartition[ED, VD] = {
val builder = new ExistingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs, activeSet, size)
var i = 0
while (i < size) {
val localSrcId = localSrcIds(i)
val localDstId = localDstIds(i)
val srcId = local2global(localSrcId)
val dstId = local2global(localDstId)
val attr = data(i)
// 將源頂點和目標頂點換位置
builder.add(dstId, srcId, localDstId, localSrcId, attr)
i += 1
}
builder.toEdgePartition
}
例子:圖的入度和出度轉換。
subgraph
subgraph 操作利用頂點和邊的判斷式(predicates),返回的圖僅僅包含滿足頂點判斷式的頂點、滿足邊判斷式的邊以及滿足頂點判斷式的 triple。subgraph 操作可以用於很多場景,如獲取 感興趣的頂點和邊組成的圖或者獲取清除斷開連接后的圖。
override def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = x => true,
vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
vertices.cache()
// 過濾 vertices,重用 partitione和索引
val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
// 過濾 triplets
replicatedVertexView.upgrade(vertices, true, true)
val newEdges = replicatedVertexView.edges.filter(epred, vpred)
new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
}
// 該代碼顯示,subgraph 方法的實現分兩步:先過濾 VertexRDD,然后再過濾 EdgeRDD。如上,過濾 VertexRDD 比較簡單,我們重點看過濾 EdgeRDD 的過程。
def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgeRDDImpl[ED, VD] = {
mapEdgePartitions((pid, part) => part.filter(epred, vpred))
}
// EdgePartition 中的 filter 方法
def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = {
val builder = new ExistingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs, activeSet)
var i = 0
while (i < size) {
// The user sees the EdgeTriplet, so we can't reuse it and must create one per edge.
val localSrcId = localSrcIds(i)
val localDstId = localDstIds(i)
val et = new EdgeTriplet[VD, ED]
et.srcId = local2global(localSrcId)
et.dstId = local2global(localDstId)
et.srcAttr = vertexAttrs(localSrcId)
et.dstAttr = vertexAttrs(localDstId)
et.attr = data(i)
if (vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)) {
builder.add(et.srcId, et.dstId, localSrcId, localDstId, et.attr)
}
i += 1
}
builder.toEdgePartition
}
因為用戶可以看到 EdgeTriplet 的信息,所以我們不能重用 EdgeTriplet,需要重新創建一個,然后在用 epred 函數處理。
例子:
scala> graph.subgraph(Triplet => Triplet.attr.startsWith("c"), (VertexId, VD) => VD._2.startsWith("pro"))
res3: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@49db5438
scala> res3.vertices.collect
res4: Array[(org.apache.spark.graphx.VertexId, (String, String))] = Array((2,(istoica,prof)), (5,(franklin,prof)))
scala> res3.edges.collect
res5: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(2,5,colleague))
mask
mask 操作構造一個子圖,類似於交集,這個子圖包含輸入圖中包含的頂點和邊。它的實現很簡單,頂點和邊均做 inner join 操作即可。這個操作可以和 subgraph 操作相結合,基於另外一個相關圖的特征去約束一個圖。
override def mask[VD2: ClassTag, ED2: ClassTag] (
other: Graph[VD2, ED2]): Graph[VD, ED] = {
val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
val newEdges = replicatedVertexView.edges.innerJoin(other.edges) { (src, dst, v, w) => v }
new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
}
groupEdges
groupEdges 操作合並多重圖中的並行邊(如頂點對之間重復的邊),並傳入一個函數來合並兩個邊的屬性。在大量的應用程序中,並行的邊可以合並(它們的權重合並)為一條邊從而降低圖的大小。
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
val newEdges = replicatedVertexView.edges.mapEdgePartitions(
(pid, part) => part.groupEdges(merge))
new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
val builder = new ExistingEdgePartitionBuilder[ED, VD](
global2local, local2global, vertexAttrs, activeSet)
var currSrcId: VertexId = null.asInstanceOf[VertexId]
var currDstId: VertexId = null.asInstanceOf[VertexId]
var currLocalSrcId = -1
var currLocalDstId = -1
var currAttr: ED = null.asInstanceOf[ED]
// 迭代處理所有的邊
var i = 0
while (i < size) {
// 如果源頂點和目的頂點都相同
if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
// 合並屬性
currAttr = merge(currAttr, data(i))
} else {
// This edge starts a new run of edges
if (i > 0) {
// 添加到 builder 中
builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
}
// Then start accumulating for a new run
currSrcId = srcIds(i)
currDstId = dstIds(i)
currLocalSrcId = localSrcIds(i)
currLocalDstId = localDstIds(i)
currAttr = data(i)
}
i += 1
}
if (size > 0) {
builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
}
builder.toEdgePartition
}
在圖構建那章我們說明過,存儲的邊按照源頂點 id 排過序
,所以上面的代碼可以通過一次迭代完成對所有相同邊的處理。
應用舉例
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)
2.4.6 頂點關聯操作
在許多情況下,有必要將外部數據加入到圖中。例如,我們可能有額外的用戶屬性需要合並到已有的圖中或者我們可能想從一個圖中取出頂點特征加入到另外一個圖中。這些任務可以用 join 操作完成。主要的 join 操作如下所示。
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
joinVertices 操作 join 輸入 RDD 和頂點,返回一個新的帶有頂點特征的圖。這些特征是通過在連接頂點的結果上使用用戶定義的 map 函數獲得的。沒有匹配的頂點保留其原始值。下面詳細地來分析這兩個函數。
joinVertices
joinVertices 來 join 相同 ID 的頂點數據。
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = {
val uf = (id: VertexId, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)
case None => data
}
}
graph.outerJoinVertices(table)(uf)
}
我們可以看到,joinVertices 的實現是通過 outerJoinVertices 來實現的。這是因為 join 本來就是 outer join 的一種特例。
例子:
scala> graph.vertices.collect.foreach(println _)
(5,(franklin,prof))
(2,(istoica,prof))
(3,(rxin,student))
(7,(jgonzal,postdoc))
scala> val join = sc.parallelize(Array((3L, "123")))
join: org.apache.spark.rdd.RDD[(Long, String)] = ParallelCollectionRDD[137] at parallelize at <console>:31
scala> graph.joinVertices(join)((VertexId, VD, U) => (VD._1, VD._2 + U))
res33: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@4e5b8728
scala> res33.vertices.collect.foreach(println _)
(7,(jgonzal,postdoc))
(2,(istoica,prof))
(3,(rxin,student123))
(5,(franklin,prof))
outerJoinVertices
跟 JOIN 類似,只不過 table 中沒有的頂點默認值為 None。
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(other: RDD[(VertexId, U)])
(updateF: (VertexId, VD, Option[U]) => VD2)
(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
if (eq != null) {
vertices.cache()
// updateF preserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(other)(updateF).cache()
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
.updateVertices(changedVerts)
new GraphImpl(newVerts, newReplicatedVertexView)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(other)(updateF)
GraphImpl(newVerts, replicatedVertexView.edges)
}
}
通過以上的代碼我們可以看到,如果 updateF 不改變類型,我們只需要創建改變的頂點即可,否則我們要重新創建所有的頂點。我們討論不改變類型的情況。 這種情況分三步。
•(1)修改頂點屬性值
val newVerts = vertices.leftJoin(other)(updateF).cache()
這一步會用頂點 RDD join 傳入的 RDD,然后用 updateF 作用 joinRDD 中的所有頂點,改變它們的值。
•(2)找到發生改變的頂點
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
•(3)更新 newReplicatedVertexView 中邊分區中的頂點屬性。
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]].updateVertices(changedVerts)
例子:
scala> graph.vertices.collect.foreach(println _)
(5,(franklin,prof))
(2,(istoica,prof))
(3,(rxin,student))
(7,(jgonzal,postdoc))
scala> graph.outerJoinVertices(join)((VertexId, VD, U) => (VD._1, VD._2 + U))
res35: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@7c542a14
scala> res35.vertices.collect.foreach(println _)
(7,(jgonzal,postdocNone))
(2,(istoica,profNone))
(3,(rxin,studentSome(123)))
(5,(franklin,profNone))
2.4.7 聚合操作
GraphX 中提供的聚合操作有 aggregateMessages、collectNeighborIds 和 collectNeighbors 三個,其中 aggregateMessages 在 GraphImpl 中實現,collectNeighborIds 和 collectNeighbors 在 GraphOps 中實現。下面分別介紹這幾個方法。
aggregateMessages
aggregateMessages 接口:aggregateMessages 是 GraphX 最重要的 API,用於替換 mapReduceTriplets。目前 mapReduceTriplets 最終也是通過 aggregateMessages 來實現的。它主要功能是向鄰邊發消息,合並鄰邊收到的消息,返回 messageRDD。
aggregateMessages 的接口如下:
def aggregateMessages[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A] = {
aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
}
該接口有三個參數,分別為發消息函數、合並消息函數以及發消息的方向。
• sendMsg: 發消息函數
private def sendMsg(ctx: EdgeContext[KCoreVertex, Int, Map[Int, Int]]): Unit = {
ctx.sendToDst(Map(ctx.srcAttr.preKCore -> -1, ctx.srcAttr.curKCore -> 1))
ctx.sendToSrc(Map(ctx.dstAttr.preKCore -> -1, ctx.dstAttr.curKCore -> 1))
}
• mergeMsg:合並消息函數
該函數用於在 Map 階段每個 edge 分區中每個點收到的消息合並,並且它還用於 reduce 階段,合並不同分區的消息。合並 vertexId 相同的消息。
• tripletFields:定義發消息的方向
aggregateMessages 處理流程:aggregateMessages 方法分為 Map 和 Reduce 兩個階段,下面我們分別就這兩個階段說明。
Map 階段
從入口函數進入 aggregateMessagesWithActiveSet 函數,該函數首先使用 VertexRDD[VD] 更新 replicatedVertexView, 只更新其中 vertexRDD 中 attr 對象。如構建圖中介紹的,replicatedVertexView 是點和邊的視圖,點的屬性有變化,要更新邊中包含的點的 attr。
replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst)
val view = activeSetOpt match {
case Some((activeSet, _)) =>
// 返回只包含活躍頂點的 replicatedVertexView
replicatedVertexView.withActiveSet(activeSet)
case None =>
replicatedVertexView
}
程序然后會對 replicatedVertexView 的 edgeRDD 做 mapPartitions 操作,所有的操作都在每個邊分區的迭代中完成,如下面的代碼:
val preAgg = view.edges.partitionsRDD.mapPartitions(_.flatMap {
case (pid, edgePartition) =>
// 選擇 scan 方法
val activeFraction = edgePartition.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
EdgeActiveness.Both)
} else {
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
EdgeActiveness.Both)
}
case Some(EdgeDirection.Either) =>
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
EdgeActiveness.Either)
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
EdgeActiveness.SrcOnly)
} else {
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
EdgeActiveness.SrcOnly)
}
case Some(EdgeDirection.In) =>
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
EdgeActiveness.DstOnly)
case _ => // None
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
EdgeActiveness.Neither)
}
})
在分區內,根據 activeFraction 的大小選擇是進入 aggregateMessagesEdgeScan 還是 aggregateMessagesIndexScan 處理。aggregateMessagesEdgeScan 會順序地掃描所有的邊, 而 aggregateMessagesIndexScan 會先過濾源頂點索引,然后再掃描。我們重點去分析 aggregateMessagesEdgeScan。
def aggregateMessagesEdgeScan[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields,
activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
var i = 0
while (i < size) {
val localSrcId = localSrcIds(i)
val srcId = local2global(localSrcId)
val localDstId = localDstIds(i)
val dstId = local2global(localDstId)
val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else null.asInstanceOf[VD]
val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
ctx.set(srcId, dstId, localSrcId, localDstId, srcAttr, dstAttr, data(i))
sendMsg(ctx)
i += 1
}
}
該方法由兩步組成,分別是獲得頂點相關信息,以及發送消息。
• 獲取頂點相關信息
在前文介紹 edge partitio n時,我們知道它包含 localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs 這幾個重要的數據結構。其中 localSrcIds, localDstIds 分別表示源頂點、目的頂點在當前分區中的索引。所以我們可以遍歷 localSrcIds,根據其下標去 localSrcIds 中拿到 srcId 在全局 local2global 中的索引,最后拿到 srcId。通過 vertexAttrs 拿到頂點屬性。通過 data 拿到邊屬性。
• 發送消息
發消息前會根據接口中定義的 tripletFields,拿到發消息的方向。發消息的過程就是遍歷到一條邊,向 localSrcIds/localDstIds 中添加數據,如果 localSrcIds/localDstIds 中已經存在該數據,則執行合並函數 mergeMsg。
override def sendToSrc(msg: A) {
send(_localSrcId, msg)
}
override def sendToDst(msg: A) {
send(_localDstId, msg)
}
@inline private def send(localId: Int, msg: A) {
if (bitset.get(localId)) {
aggregates(localId) = mergeMsg(aggregates(localId), msg)
} else {
aggregates(localId) = msg
bitset.set(localId)
}
}
每個點之間在發消息的時候是獨立的,即:點單純根據方向,向以相鄰點的以 localId 為下標的數組中插數據,互相獨立,可以並行運行。Map 階段最后返回消息 RDD messages: RDD[(VertexId, VD2)]
Map 階段的執行流程如下例所示:

Reduce 階段
Reduce 階段的實現就是調用下面的代碼:
vertices.aggregateUsingIndex(preAgg, mergeMsg)
override def aggregateUsingIndex[VD2: ClassTag](
messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
val shuffled = messages.partitionBy(this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
}
this.withPartitionsRDD[VD2](parts)
}
面的代碼通過兩步實現。
• (1)對 messages 重新分區,分區器使用 VertexRDD 的 partitioner。然后使用 zipPartitions 合並兩個分區。
• (2)對等合並 attr, 聚合函數使用傳入的 mergeMs g函數。
def aggregateUsingIndex[VD2: ClassTag](
iter: Iterator[Product2[VertexId, VD2]],
reduceFunc: (VD2, VD2) => VD2): Self[VD2] = {
val newMask = new BitSet(self.capacity)
val newValues = new Array[VD2](self.capacity)
iter.foreach { product =>
val vid = product._1
val vdata = product._2
val pos = self.index.getPos(vid)
if (pos >= 0) {
if (newMask.get(pos)) {
newValues(pos) = reduceFunc(newValues(pos), vdata)
} else { // otherwise just store the new value
newMask.set(pos)
newValues(pos) = vdata
}
}
}
this.withValues(newValues).withMask(newMask)
}
根據傳參,我們知道上面的代碼迭代的是 messagePartition,並不是每個節點都會收到消息,所以 messagePartition 集合最小,迭代速度會快。
這段代碼表示,我們根據 vetexId 從 index 中取到其下標 pos,再根據下標,從 values 中取到 attr,存在 attr 就用 mergeMsg 合並 attr,不存在就直接賦值。
Reduce 階段的過程如下圖所示:

舉例
下面的例子計算比用戶年齡大的追隨者(即 followers)的平均年齡。
// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst(1, triplet.srcAttr)
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
collectNeighbors
該方法的作用是收集每個頂點的鄰居頂點的頂點 id 和頂點屬性。需要指定方向。
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
val nbrs = edgeDirection match {
case EdgeDirection.Either =>
graph.aggregateMessages[Array[(VertexId, VD)]](
ctx => {
ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
},
(a, b) => a ++ b, TripletFields.All)
case EdgeDirection.In =>
graph.aggregateMessages[Array[(VertexId, VD)]](
ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),
(a, b) => a ++ b, TripletFields.Src)
case EdgeDirection.Out =>
graph.aggregateMessages[Array[(VertexId, VD)]](
ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),
(a, b) => a ++ b, TripletFields.Dst)
case EdgeDirection.Both =>
throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +
"EdgeDirection.Either instead.")
}
graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>
nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
}
}
從上面的代碼中,第一步是根據 EdgeDirection 來確定調用哪個 aggregateMessages 實現聚合操作。我們用滿足條件 EdgeDirection.Either 的情況來說明。可以看到 aggregateMessages 的方式消息的函數為:
ctx => {
ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
},
這個函數在處理每條邊時都會同時向源頂點和目的頂點發送消息,消息內容分別為(目的頂點 id,目的頂點屬性)、(源頂點 id,源頂點屬性)。為什么會這樣處理呢? 我們知道,每條邊都由兩個頂點組成,對於這個邊,我需要向源頂點發送目的頂點的信息來記錄它們之間的鄰居關系,同理向目的頂點發送源頂點的信息來記錄它們之間的鄰居關系。
Merge 函數是一個集合合並操作,它合並同同一個頂點對應的所有目的頂點的信息。如下所示:
(a, b) => a ++ b
通過 aggregateMessages 獲得包含鄰居關系信息的 VertexRDD 后,把它和現有的 vertices 作 join 操作,得到每個頂點的鄰居消息。
collectNeighborIds
該方法的作用是收集每個頂點的鄰居頂點的頂點 id。它的實現和 collectNeighbors 非常相同。需要指定方向。
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
val nbrs =
if (edgeDirection == EdgeDirection.Either) {
graph.aggregateMessages[Array[VertexId]](
ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) },
_ ++ _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.Out) {
graph.aggregateMessages[Array[VertexId]](
ctx => ctx.sendToSrc(Array(ctx.dstId)),
_ ++ _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.In) {
graph.aggregateMessages[Array[VertexId]](
ctx => ctx.sendToDst(Array(ctx.srcId)),
_ ++ _, TripletFields.None)
} else {
throw new SparkException("It doesn't make sense to collect neighbor ids without a " +
"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
}
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
nbrsOpt.getOrElse(Array.empty[VertexId])
}
}
和 collectNeighbors 的實現不同的是,aggregateMessages 函數中的 sendMsg 函數只發送頂點Id到源頂點和目的頂點。其它的實現基本一致。
ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) }
2.4.8 緩存操作
在 Spark 中,RDD 默認是不緩存的。為了避免重復計算,當需要多次利用它們時,我們必須顯示地緩存它們。GraphX 中的圖也有相同的方式。當利用到圖多次時,確保首先訪問 Graph.cache() 方法。
在迭代計算中,為了獲得最佳的性能,不緩存可能是必須的。默認情況下,緩存的 RDD 和圖會一直保留在內存中直到因為內存壓力迫使它們以LRU的順序刪除。對於迭代計算,先前的迭代的中間結果將填充到緩存 中。雖然它們最終會被刪除,但是保存在內存中的不需要的數據將會減慢垃圾回收。只有中間結果不需要,不緩存它們是更高效的。然而,因為圖是由多個 RDD 組成的,正確的不持久化它們是困難的。對於迭代計算,我們建議使用 Pregel API,它可以正確的不持久化中間結果。
GraphX 中的緩存操作有 cache, persist, unpersist 和 unpersistVertices。它們的接口分別是:
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersist(blocking: Boolean = true): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
2.5 Pregel API
圖本身是遞歸數據結構,頂點的屬性依賴於它們鄰居的屬性,這些鄰居的屬性又依賴於自己鄰居的屬性。所以許多重要的圖算法都是迭代的重新計算每個頂點的屬性,直到滿足某個確定的條件。一系列的圖並發(graph-parallel)抽象已經被提出來用來表達這些迭代算法。GraphX 公開了一個類似 Pregel 的操作,它是廣泛使用的 Pregel 和 GraphLab 抽象的一個融合。
GraphX 中實現的這個更高級的 Pregel 操作是一個約束到圖拓撲的批量同步(bulk-synchronous)並行消息抽象。Pregel 操作者執行一系列的超步(super steps),在這些步驟中,頂點從之前的超步中接收進入 (inbound) 消息的總和,為頂點屬性計算一個新的值,然后在以后的超步中發送消息到鄰居頂點。不像 Pregel 而更像 GraphLab,消息通過邊 triplet 的一個函數被並行計算,消息的計算既會訪問源頂點特征也會訪問目的頂點特征。在超步中,沒有收到消息的頂點會被跳過。當沒有消息遺留時,Pregel 操作停止迭代並返回最終的圖。
注意
:與標准的 Pregel 實現不同的是,GraphX 中的頂點僅僅能發送信息給鄰居頂點,並且可以利用用戶自定義的消息函數並行地構造消息。這些限制允許對 GraphX 進行額外的優化。
下面的代碼是 pregel 的具體實現。
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// 計算消息
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
// 迭代
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// 接收消息並更新頂點
prevG = g
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// 發送新消息
messages = g.mapReduceTriplets(
sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
activeMessages = messages.count()
i += 1
}
g
}
2.5.1 pregel 計算模型
Pregel 計算模型中有三個重要的函數,分別是 vertexProgram、sendMessage 和 messageCombiner。
• vertexProgram:用戶定義的頂點運行程序。它作用於每一個頂點,負責接收進來的信息,並計算新的頂點值。
• sendMsg:發送消息。
• mergeMsg:合並消息。
我們具體分析它的實現。根據代碼可以知道,這個實現是一個迭代的過程。在開始迭代之前,先完成一些初始化操作:
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// 計算消息
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
程序首先用 vprog 函數處理圖中所有的頂點,生成新的圖。然后用生成的圖調用聚合操作(mapReduceTriplets,實際的實現是我們前面章節講到的 aggregateMessagesWithActiveSet 函數)獲取聚合后的消息。 activeMessages 指 messages 這個 VertexRDD中的頂點數。
下面就開始迭代操作了。在迭代內部,分為二步。
• (1)接收消息,並更新頂點
g = g.joinVertices(messages)(vprog).cache()
// joinVertices 的定義
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = {
val uf = (id: VertexId, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)
case None => data
}
}
graph.outerJoinVertices(table)(uf)
}
這一步實際上是使用 outerJoinVertices 來更新頂點屬性。outerJoinVertices 在關聯操作中有詳細介紹。
• (2)發送新消息
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
注意
:在上面的代碼中,mapReduceTriplets 多了一個參數 Some((oldMessages, activeDirection))。這個參數的作用是:它使我們在發送新的消息時,會忽略掉那些兩端都沒有接收到消息的邊,減少計算量。
2.5.2 pregel 實現最短路徑
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// 初始化圖
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a,b) => math.min(a,b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
上面的例子中,Vertex Program 函數定義如下:
(id, dist, newDist) => math.min(dist, newDist)
這個函數的定義顯而易見,當兩個消息來的時候,取它們當中路徑的最小值。同理 Merge Message 函數也是同樣的含義。
Send Message 函數中,會首先比較 triplet.srcAttr + triplet.attr 和 triplet.dstAttr,即比較加上邊的屬性后,這個值是否小於目的節點的屬性,如果小於,則發送消息到目的頂點。
2.6 GraphX 實例
下圖中有6個人,每個人有名字和年齡,這些人根據社會關系形成 8 條邊,每條邊有其屬性。在以下例子演示中將構建頂點、邊和圖,打印圖的屬性、轉換操作、結構操作、連接操作、聚合操作,並結合實際要求進行演示。

程序代碼如下:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.{Edge, _}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Practice extends App {
// 屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//設定一個 SparkConf
val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local[4]")
val sc = new SparkContext(conf)
// 初始化頂點集合
val vertexArray = Array(
(1L, ("Alice", 28)),
(2L, ("Bob", 27)),
(3L, ("Charlie", 65)),
(4L, ("David", 42)),
(5L, ("Ed", 55)),
(6L, ("Fran", 50))
)
// 創建頂點的 RDD 表示
val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
// 初始化邊的集合
val edgeArray = Array(
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 6L, 3),
Edge(4L, 1L, 1),
Edge(2L, 5L, 2),
Edge(5L, 3L, 8),
Edge(5L, 6L, 3)
)
// 創建邊的 RDD 表示
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
// 創建一個圖
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
//*************************** 圖的屬性 ****************************************
println("屬性演示")
println("**********************************************************")
println("找出圖中年齡大於30的頂點:")
graph.vertices.filter { case (id, (name, age)) => age > 30 }.collect.foreach {
case (id, (name, age)) => println(s"$name is $age")
}
println
println("找出圖中屬性大於 5 的邊:")
graph.edges.filter(e => e.attr > 5).collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
println
// triplets 操作,((srcId, srcAttr), (dstId, dstAttr), attr)
println("列出邊屬性 >5 的 tripltes:")
for (triplet <- graph.triplets.filter(t => t.attr > 5).collect) {
println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")
}
println
// degrees 操作
println("找出圖中最大的出度、入度、度數:")
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
println("max of outDegrees:" + graph.outDegrees.reduce(max) + " max of inDegrees:" + graph.inDegrees.reduce(max) + " max of Degrees:" + graph.degrees.reduce(max))
println
//*************************** 轉換操作 ****************************************
println("轉換操作")
println("**********************************************************")
println("頂點的轉換操作,頂點age + 10:")
graph.mapVertices { case (id, (name, age)) => (id, (name, age + 10)) }.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
println
println("邊的轉換操作,邊的屬性*2:")
graph.mapEdges(e => e.attr * 2).edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
println
println("三元組的轉換操作,邊的屬性為端點的age相加:")
graph.mapTriplets(tri => tri.srcAttr._2 * tri.dstAttr._2).triplets.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
println
//*************************** 結構操作 ****************************************
println("結構操作")
println("**********************************************************")
println("頂點年紀 >30 的子圖:")
val subGraph = graph.subgraph(vpred = (id, vd) => vd._2 >= 30)
println("子圖所有頂點:")
subGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
println
println("子圖所有邊:")
subGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
println
println("反轉整個圖:")
val reverseGraph = graph.reverse
println("子圖所有頂點:")
reverseGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
println
println("子圖所有邊:")
reverseGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
println
//*************************** 連接操作 ****************************************
println("連接操作")
println("**********************************************************")
val inDegrees: VertexRDD[Int] = graph.inDegrees
case class User(name: String, age: Int, inDeg: Int, outDeg: Int)
// 創建一個新圖,頂類點VD的數據型為 User,並從 graph 做類型轉換
val initialUserGraph: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, 0) }
// initialUserGraph 與 inDegrees、outDegrees(RDD)進行連接,並修改 initialUserGraph 中 inDeg 值、outDeg 值
val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {
case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)
}.outerJoinVertices(initialUserGraph.outDegrees) {
case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg, outDegOpt.getOrElse(0))
}
println("連接圖的屬性:")
userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg} outDeg: ${v._2.outDeg}"))
println
println("出度和入讀相同的人員:")
userGraph.vertices.filter {
case (id, u) => u.inDeg == u.outDeg
}.collect.foreach {
case (id, property) => println(property.name)
}
println
//*************************** 聚合操作 ****************************************
println("聚合操作")
println("**********************************************************")
println("collectNeighbors:獲取當前節點source節點的id和屬性")
graph.collectNeighbors(EdgeDirection.In).collect.foreach(v => {
println(s"id: ${v._1}");
for (arr <- v._2) {
println(s" ${arr._1} (name: ${arr._2._1} age: ${arr._2._2})")
}
})
println("aggregateMessages版本:")
graph.aggregateMessages[Array[(VertexId, (String, Int))]](ctx => ctx.sendToDst(Array((ctx.srcId.toLong, (ctx.srcAttr._1, ctx.srcAttr._2)))), _ ++ _).collect.foreach(v => {
println(s"id: ${v._1}");
for (arr <- v._2) {
println(s" ${arr._1} (name: ${arr._2._1} age: ${arr._2._2})")
}
})
println("聚合操作")
println("**********************************************************")
println("找出年紀最大的追求者:")
val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages[(String, Int)](
// 將源頂點的屬性發送給目標頂點,map 過程
ctx => ctx.sendToDst((ctx.srcAttr.name, ctx.srcAttr.age)),
// 得到最大追求者,reduce 過程
(a, b) => if (a._2 > b._2) a else b
)
userGraph.vertices.leftJoin(oldestFollower) { (id, user, optOldestFollower) =>
optOldestFollower match {
case None => s"${user.name} does not have any followers."
case Some((name, age)) => s"${name} is the oldest follower of ${user.name}."
}
}.collect.foreach { case (id, str) => println(str) }
println
//*************************** 實用操作 ****************************************
println("聚合操作")
println("**********************************************************")
val sourceId: VertexId = 5L // 定義源點
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
initialGraph.triplets.collect().foreach(println)
println("找出5到各頂點的最短距離:")
val sssp = initialGraph.pregel(Double.PositiveInfinity, Int.MaxValue, EdgeDirection.Out)(
(id, dist, newDist) => {
println("||||" + id);
math.min(dist, newDist)
},
triplet => { // 計算權重
println(">>>>" + triplet.srcId)
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
// 發送成功
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
// 發送不成功
Iterator.empty
}
},
(a, b) => math.min(a, b) // 當前節點所有輸入的最短距離
)
sssp.triplets.collect().foreach(println)
println(sssp.vertices.collect.mkString("\n"))
sc.stop()
}
運行結果截圖:
運行結果如下:
屬性演示
**********************************************************
找出圖中年齡大於30的頂點:
David is 42
Ed is 55
Fran is 50
Charlie is 65
找出圖中屬性大於 5 的邊:
2 to 1 att 7
5 to 3 att 8
列出邊屬性 >5 的 tripltes:
Bob likes Alice
Ed likes Charlie
找出圖中最大的出度、入度、度數:
max of outDegrees:(2,3) max of inDegrees:(6,2) max of Degrees:(2,4)
轉換操作
**********************************************************
頂點的轉換操作,頂點age + 10:
4 is (David,52)
1 is (Alice,38)
5 is (Ed,65)
6 is (Fran,60)
2 is (Bob,37)
3 is (Charlie,75)
邊的轉換操作,邊的屬性*2:
2 to 1 att 14
2 to 4 att 4
3 to 2 att 8
3 to 6 att 6
2 to 5 att 4
4 to 1 att 2
5 to 3 att 16
5 to 6 att 6
三元組的轉換操作,邊的屬性為端點的age相加:
2 to 1 att 756
2 to 4 att 1134
3 to 2 att 1755
3 to 6 att 3250
2 to 5 att 1485
4 to 1 att 1176
5 to 3 att 3575
5 to 6 att 2750
結構操作
**********************************************************
頂點年紀 >30 的子圖:
子圖所有頂點:
David is 42
Ed is 55
Fran is 50
Charlie is 65
子圖所有邊:
3 to 6 att 3
5 to 3 att 8
5 to 6 att 3
反轉整個圖:
子圖所有頂點:
David is 42
Alice is 28
Ed is 55
Fran is 50
Bob is 27
Charlie is 65
子圖所有邊:
1 to 2 att 7
4 to 2 att 2
2 to 3 att 4
6 to 3 att 3
1 to 4 att 1
5 to 2 att 2
3 to 5 att 8
6 to 5 att 3
連接操作
**********************************************************
連接圖的屬性:
David inDeg: 1 outDeg: 1
Alice inDeg: 2 outDeg: 0
Ed inDeg: 1 outDeg: 2
Fran inDeg: 2 outDeg: 0
Bob inDeg: 1 outDeg: 3
Charlie inDeg: 1 outDeg: 2
出度和入讀相同的人員:
David
聚合操作
**********************************************************
collectNeighbors:獲取當前節點source節點的id和屬性
id: 4
2 (name: Bob age: 27)
id: 1
2 (name: Bob age: 27)
4 (name: David age: 42)
id: 5
2 (name: Bob age: 27)
id: 6
3 (name: Charlie age: 65)
5 (name: Ed age: 55)
id: 2
3 (name: Charlie age: 65)
id: 3
5 (name: Ed age: 55)
aggregateMessages版本:
id: 4
2 (name: Bob age: 27)
id: 1
2 (name: Bob age: 27)
4 (name: David age: 42)
id: 5
2 (name: Bob age: 27)
id: 6
3 (name: Charlie age: 65)
5 (name: Ed age: 55)
id: 2
3 (name: Charlie age: 65)
id: 3
5 (name: Ed age: 55)
聚合操作
**********************************************************
找出年紀最大的追求者:
Bob is the oldest follower of David.
David is the oldest follower of Alice.
Bob is the oldest follower of Ed.
Charlie is the oldest follower of Fran.
Charlie is the oldest follower of Bob.
Ed is the oldest follower of Charlie.
聚合操作
**********************************************************
((2,Infinity),(1,Infinity),7)
((2,Infinity),(4,Infinity),2)
((3,Infinity),(2,Infinity),4)
((3,Infinity),(6,Infinity),3)
((2,Infinity),(5,0.0),2)
((4,Infinity),(1,Infinity),1)
((5,0.0),(3,Infinity),8)
((5,0.0),(6,Infinity),3)
找出5到各頂點的最短距離:
||||6
||||1
||||3
||||4
||||5
||||2
>>>>5
>>>>2
>>>>3
>>>>2
>>>>4
>>>>5
>>>>3
>>>>2
||||3
||||6
>>>>3
>>>>3
||||2
>>>>2
>>>>2
>>>>2
||||1
||||4
>>>>4
||||1
((2,12.0),(1,15.0),7)
((2,12.0),(4,14.0),2)
((3,8.0),(2,12.0),4)
((3,8.0),(6,3.0),3)
((2,12.0),(5,0.0),2)
((4,14.0),(1,15.0),1)
((5,0.0),(3,8.0),8)
((5,0.0),(6,3.0),3)
(4,14.0)
(1,15.0)
(5,0.0)
(6,3.0)
(2,12.0)
(3,8.0)
第3章 圖算法
3.1 PageRank 排名算法
3.1.1 算法概述
PageRank,即
網頁排名
,又稱網頁級別
、Google 左側排名
或佩奇排名
。是Google創始人拉里·佩奇和謝爾蓋·布林於 1997 年構建早期的搜索系統原型時提出的鏈接分析算法,在揉合了諸如 Title 標識和 Keywords 標識等所有其它因素之后,Google 通過 PageRank 來調整結果,使那些更具“等級/重要性”的網頁在搜索結果中令網站排名獲得提升,從而提高搜索結果的相關性和質量。
3.1.2 從入鏈數量到 PageRank
PageRank 的計算基於以下兩個基本假設:
• 數量假設:在 Web 圖模型中,如果一個頁面節點接收到的其他網頁指向的入鏈數量越多,那么這個頁面越重要。
• 質量假設:指向頁面 A 的入鏈質量不同,質量高的頁面會通過鏈接向其他頁面傳遞更多的權重。所以越是質量高的頁面指向頁面 A,則頁面 A 越重要。
利用以上兩個假設,PageRank 算法剛開始賦予每個網頁相同的重要性得分,通過迭代遞歸
計算來更新每個頁面節點的 PageRank 得分,直到得分穩定為止。 PageRank 計算得出的結果是網頁的重要性評價,這和用戶輸入的查詢是沒有任何關系的,即算法是主題無關的
。
3.1.3 PageRank 算法原理
PageRank 的計算充分利用了兩個假設:數量假設和質量假設。步驟如下:
• 1)在初始階段:網頁通過鏈接關系構建起 Web 圖,每個頁面設置相同的 PageRank 值,通過若干輪的計算,會得到每個頁面所獲得的最終 PageRank 值。隨着每一輪的計算進行,網頁當前的 PageRank 值會不斷得到更新。
• 2)在一輪中更新頁面 PageRank 得分的計算方法:在一輪更新頁面 PageRank 得分的計算中,每個頁面將其當前的 PageRank 值平均分配到本頁面包含的出鏈上,這樣每個鏈接即獲得了相應的權值。而每個頁面將所有指向本頁面的入鏈所傳入的權值求和,即可得到新的 PageRank 得分。當每個頁面都獲得了更新后的 PageRank 值,就完成了一輪 PageRank 計算。
基本思想
如果網頁 T 存在一個指向網頁 A 的連接,則表明 T 的所有者認為 A 比較重要,從而把 T 的一部分重要性得分賦予 A。這個重要性得分值為:PR(T)/L(T)
其中 PR(T) 為 T 的 PageRank 值,L(T) 為 T 的出鏈數。
則 A 的 PageRank 值為一系列類似於 T 的頁面重要性得分值的累加。
即一個頁面的得票數由所有鏈向它的頁面的重要性來決定
,到一個頁面的超鏈接相當於對該頁投一票。一個頁面的 PageRank 是由所有鏈向它的頁面(鏈入頁面)的重要性經過遞歸算法得到的。一個有較多鏈入的頁面會有較高的等級,相反如果一個頁面沒有任何鏈入頁面,那么它沒有等級。
我們設向量 B 為第一、第二、…、第 N 個網頁的網頁排名

矩陣 A 代表網頁之間的權重輸出關系,其中 amn 代表第 m 個網頁向第 n 個網頁的輸出權重。

輸出權重計算較為簡單:假設 m 一共有 10 個出鏈,指向 n 的一共有2個,那么 m 向 n 輸出的權重就為 2/10。
現在問題變為:A 是已知的,我們要通過計算得到 B。
假設 Bi 是第 i 次迭代的結果,那么

初始假設所有網頁的排名都是 1/N (N為網頁總數量),即

通過上述迭代計算,最終 Bi 會收斂,即 Bi 無限趨近於 B,此時 B = B × A。
具體示例
假設有網頁 A、B、C、D,它們之間的鏈接關系如下圖所示

計算 B1 如下:

不斷迭代,計算結果如下:
第 1次迭代: 0.125, 0.333, 0.083, 0.458
第 2次迭代: 0.042, 0.500, 0.042, 0.417
第 3次迭代: 0.021, 0.431, 0.014, 0.535
第 4次迭代: 0.007, 0.542, 0.007, 0.444
第 5次迭代: 0.003, 0.447, 0.002, 0.547
第 6次迭代: 0.001, 0.549, 0.001, 0.449
第 7次迭代: 0.001, 0.449, 0.000, 0.550
第 8次迭代: 0.000, 0.550, 0.000, 0.450
第 9次迭代: 0.000, 0.450, 0.000, 0.550
第10次迭代: 0.000, 0.550, 0.000, 0.450
... ...
我們可以發現,A 和 C 的權重變為 0,而 B 和 D 的權重也趨於在 0.5 附近擺動。從圖中也可以觀察出:A 和 C 之間有互相鏈接,但它們又把權重輸出給了 B 和 D,而 B 和 D之間互相鏈接,並不向 A 或 C 輸出任何權重,所以久而久之權重就都轉移到 B 和 D 了。
PageRank 的改進
上面是最簡單正常的情況,考慮一下兩種特殊情況:

第一種情況是,B 存在導向自己的鏈接,迭代計算過程是:
第 1次迭代: 0.125, 0.583, 0.083, 0.208
第 2次迭代: 0.042, 0.833, 0.042, 0.083
第 3次迭代: 0.021, 0.931, 0.014, 0.035
第 4次迭代: 0.007, 0.972, 0.007, 0.014
第 5次迭代: 0.003, 0.988, 0.002, 0.006
第 6次迭代: 0.001, 0.995, 0.001, 0.002
第 7次迭代: 0.001, 0.998, 0.000, 0.001
第 8次迭代: 0.000, 0.999, 0.000, 0.000
第 9次迭代: 0.000, 1.000, 0.000, 0.000
第10次迭代: 0.000, 1.000, 0.000, 0.000
... ...
我們發現最終 B 權重變為1,其它所有網頁的權重都變為了0。
第二種情況是 B 是孤立於其它網頁的,既沒有入鏈也沒有出鏈,迭代計算過程是:
第 1次迭代: 0.125, 0.000, 0.125, 0.250
第 2次迭代: 0.063, 0.000, 0.063, 0.125
第 3次迭代: 0.031, 0.000, 0.031, 0.063
第 4次迭代: 0.016, 0.000, 0.016, 0.031
第 5次迭代: 0.008, 0.000, 0.008, 0.016
第 6次迭代: 0.004, 0.000, 0.004, 0.008
第 7次迭代: 0.002, 0.000, 0.002, 0.004
第 8次迭代: 0.001, 0.000, 0.001, 0.002
第 9次迭代: 0.000, 0.000, 0.000, 0.001
第10次迭代: 0.000, 0.000, 0.000, 0.000
... ...
我們發現所有網頁權重都變為了0。
出現這種情況是因為上面的數學模型出現了問題,該模型認為上網者從一個網頁瀏覽下一個網頁都是通過頁面的超鏈接。想象一下正常的上網情景,其實我們在看完一個網頁后,可能直接在瀏覽器輸入一個網址,而不通過上一個頁面的超鏈接。
我們假設每個網頁被用戶通過直接訪問方式的概率是相等的,即 1/N,N 為網頁總數,設矩陣 e 如下:

設用戶通過頁面超鏈接瀏覽下一網頁的概率為 α,則直接訪問的方式瀏覽下一個網頁的概率為 1 - α,改進上一節的迭代公式為:

通常情況下設 α 為0.8,上一節”具體示例”的計算變為如下:

迭代過程如下:
第 1次迭代: 0.150, 0.317, 0.117, 0.417
第 2次迭代: 0.097, 0.423, 0.090, 0.390
第 3次迭代: 0.086, 0.388, 0.076, 0.450
第 4次迭代: 0.080, 0.433, 0.073, 0.413
第 5次迭代: 0.079, 0.402, 0.071, 0.447
第 6次迭代: 0.079, 0.429, 0.071, 0.421
第 7次迭代: 0.078, 0.408, 0.071, 0.443
第 8次迭代: 0.078, 0.425, 0.071, 0.426
第 9次迭代: 0.078, 0.412, 0.071, 0.439
第10次迭代: 0.078, 0.422, 0.071, 0.428
第11次迭代: 0.078, 0.414, 0.071, 0.437
第12次迭代: 0.078, 0.421, 0.071, 0.430
第13次迭代: 0.078, 0.415, 0.071, 0.436
第14次迭代: 0.078, 0.419, 0.071, 0.431
第15次迭代: 0.078, 0.416, 0.071, 0.435
第16次迭代: 0.078, 0.419, 0.071, 0.432
第17次迭代: 0.078, 0.416, 0.071, 0.434
第18次迭代: 0.078, 0.418, 0.071, 0.432
第19次迭代: 0.078, 0.417, 0.071, 0.434
第20次迭代: 0.078, 0.418, 0.071, 0.433
... ...
修正 PageRank 計算公式
由於存在一些出鏈為 0,也就是那些不鏈接任何其他網頁的網,也稱為孤立網頁,使得很多網頁能被訪問到。因此需要對 PageRank 公式進行修正,即在簡單公式的基礎上增加了阻尼系數(damping factor)q, q 一般取值 q=0.85。
其意義是,在任意時刻,用戶到達某頁面后並繼續向后瀏覽的概率。1- q= 0.15 就是用戶停止點擊,隨機跳到新 URL 的概率)的算法被用到了所有頁面上,估算頁面可能被上網者放入書簽的概率。
最后,即所有這些被換算為一個百分比再乘上一個系數 q。由於下面的算法,沒有頁面的 PageRank 會是 0。所以,Google 通過數學系統給了每個頁面一個最小值。

這個公式就是 S. Brin 和 L. Page 在《The Anatomy of a Large- scale Hypertextual Web Search Engine Computer Networks and ISDN Systems 》定義的公式。
所以一個頁面的 PageRank 是由其他頁面的 PageRank 計算得到。Google 不斷的重復計算每個頁面的 PageRank。如果給每個頁面一個隨機 PageRank 值(非0),那么經過不斷的重復計算,這些頁面的 PR 值會趨向於正常和穩定。這就是搜索引擎使用它的原因。
首先求完整的公式

3.1.4 Spark GraphX 實現
import org.apache.spark.graphx.GraphLoader
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
3.2 廣度優先遍歷(參考)
val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")
val root: VertexId = 1
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
Double.PositiveInfinity)
val vprog = { (id: VertexId, attr: Double, msg: Double) => math.min(attr,msg) }
val sendMessage = { (triplet: EdgeTriplet[Double, Int]) =>
var iter:Iterator[(VertexId, Double)] = Iterator.empty
val isSrcMarked = triplet.srcAttr != Double.PositiveInfinity
val isDstMarked = triplet.dstAttr != Double.PositiveInfinity
if(!(isSrcMarked && isDstMarked)){
if(isSrcMarked){
iter = Iterator((triplet.dstId,triplet.srcAttr+1))
}else{
iter = Iterator((triplet.srcId,triplet.dstAttr+1))
}
}
iter
}
val reduceMessage = { (a: Double, b: Double) => math.min(a,b) }
val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)(vprog, sendMessage, reduceMessage)
println(bfs.vertices.collect.mkString("\n"))
3.3 單源最短路徑(參考)
import scala.reflect.ClassTag
import org.apache.spark.graphx._
/**
* Computes shortest paths to the given set of landmark vertices, returning a graph where each
* vertex attribute is a map containing the shortest-path distance to each reachable landmark.
*/
object ShortestPaths {
/** Stores a map from the vertex id of a landmark to the distance to that landmark. */
type SPMap = Map[VertexId, Int]
private def makeMap(x: (VertexId, Int)*) = Map(x: _*)
private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap =
(spmap1.keySet ++ spmap2.keySet).map {
k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
}.toMap
/**
* Computes shortest paths to the given set of landmark vertices.
*
* @tparam ED the edge attribute type (not used in the computation)
*
* @param graph the graph for which to compute the shortest paths
* @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each
* landmark.
*
* @return a graph where each vertex attribute is a map containing the shortest-path distance to
* each reachable landmark vertex.
*/
def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
val spGraph = graph.mapVertices { (vid, attr) =>
if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()
}
val initialMessage = makeMap()
def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
addMaps(attr, msg)
}
def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
val newAttr = incrementMap(edge.dstAttr)
if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
else Iterator.empty
}
Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
}
}
3.4 連通圖(參考)
import scala.reflect.ClassTag
import org.apache.spark.graphx._
/** Connected components algorithm. */
object ConnectedComponents {
/**
* Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam ED the edge attribute type (preserved in the computation)
* @param graph the graph for which to compute the connected components
* @param maxIterations the maximum number of iterations to run for
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
maxIterations: Int): Graph[VertexId, ED] = {
require(maxIterations > 0, s"Maximum of iterations must be greater than 0," +
s" but got ${maxIterations}")
val ccGraph = graph.mapVertices { case (vid, _) => vid }
def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
Iterator((edge.srcId, edge.dstAttr))
} else {
Iterator.empty
}
}
val initialMessage = Long.MaxValue
val pregelGraph = Pregel(ccGraph, initialMessage,
maxIterations, EdgeDirection.Either)(
vprog = (id, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
ccGraph.unpersist()
pregelGraph
} // end of connectedComponents
/**
* Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam ED the edge attribute type (preserved in the computation)
* @param graph the graph for which to compute the connected components
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
run(graph, Int.MaxValue)
}
}
3.5 三角計數(參考)
import scala.reflect.ClassTag
import org.apache.spark.graphx._
/**
* Compute the number of triangles passing through each vertex.
*
* The algorithm is relatively straightforward and can be computed in three steps:
*
* <ul>
* <li> Compute the set of neighbors for each vertex</li>
* <li> For each edge compute the intersection of the sets and send the count to both vertices.</li>
* <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.</li>
* </ul>
*
* There are two implementations. The default `TriangleCount.run` implementation first removes
* self cycles and canonicalizes the graph to ensure that the following conditions hold:
* <ul>
* <li> There are no self edges</li>
* <li> All edges are oriented src > dst</li>
* <li> There are no duplicate edges</li>
* </ul>
* However, the canonicalization procedure is costly as it requires repartitioning the graph.
* If the input data is already in "canonical form" with self cycles removed then the
* `TriangleCount.runPreCanonicalized` should be used instead.
*
* {{{
* val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges()
* val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices
* }}}
*
*/
object TriangleCount {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// Transform the edge data something cheap to shuffle and then canonicalize
val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges()
// Get the triangle counts
val counters = runPreCanonicalized(canonicalGraph).vertices
// Join them bath with the original graph
graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
optCounter.getOrElse(0)
}
}
def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// Construct set representations of the neighborhoods
val nbrSets: VertexRDD[VertexSet] =
graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
val set = new VertexSet(nbrs.length)
var i = 0
while (i < nbrs.length) {
// prevent self cycle
if (nbrs(i) != vid) {
set.add(nbrs(i))
}
i += 1
}
set
}
// join the sets with the graph
val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null)
}
// Edge function computes intersection of smaller vertex with larger vertex
def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
(ctx.srcAttr, ctx.dstAttr)
} else {
(ctx.dstAttr, ctx.srcAttr)
}
val iter = smallSet.iterator
var counter: Int = 0
while (iter.hasNext) {
val vid = iter.next()
if (vid != ctx.srcId && vid != ctx.dstId && largeSet.contains(vid)) {
counter += 1
}
}
ctx.sendToSrc(counter)
ctx.sendToDst(counter)
}
// compute the intersection along edges
val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
// Merge counters with the graph and divide by two since each triangle is counted twice
graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) =>
val dblCount = optCounter.getOrElse(0)
// This algorithm double counts each triangle so the final count should be even
require(dblCount % 2 == 0, "Triangle count resulted in an invalid number of triangles.")
dblCount / 2
}
}
}
第4章 PageRank 實例
采用的數據是 wiki 數據中含有 Berkeley 標題的網頁之間連接關系,數據為兩個文件:graphx-wiki-vertices.txt 和 graphx-wiki-edges.txt,可以分別用於圖計算的頂點和邊。
Step1、上傳數據
[atguigu@hadoop102 hadoop-2.7.2]$ pwd
/opt/module/hadoop-2.7.2
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -put /opt/software/graphx-wiki-edges.txt /
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -put /opt/software/graphx-wiki-vertices.txt /
Step2、RDD 加載數據轉換 Edges
scala> val erdd = sc.textFile("hdfs://hadoop102:9000/graphx-wiki-edges.txt")
erdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/graphx-wiki-edges.txt MapPartitionsRDD[88] at textFile at <console>:26
scala> val edges = erdd.map(x => { val para = x.split("\t"); Edge(para(0).trim.toLong, para(1).trim.toLong,0) })
edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = MapPartitionsRDD[89] at map at <console>:28
Step3、RDD 加載數據轉換 vertices
scala> val vrdd = sc.textFile("hdfs://hadoop102:9000/graphx-wiki-vertices.txt")
vrdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/graphx-wiki-vertices.txt MapPartitionsRDD[91] at textFile at <console>:26
scala> val vertices = vrdd.map(x => { val para = x.split("\t"); (para(0).trim.toLong, para(1).trim) })
vertices: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[92] at map at <console>:28
Step4、構建 Graph
scala> val graph = Graph(vertices, edges)
graph: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@a0ff17d
Step5、運行配置 RageRank
scala> val prGraph = graph.pageRank(0.001).cache()
prGraph: org.apache.spark.graphx.Graph[Double,Double] = org.apache.spark.graphx.impl.GraphImpl@45e9b508
Step6、輸出 RageRank 結果
scala> val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {(v, title, rank) => (rank.getOrElse(0.0), title)}
titleAndPrGraph: org.apache.spark.graphx.Graph[(Double, String),Int] = org.apache.spark.graphx.impl.GraphImpl@6bb0284d
scala> titleAndPrGraph.vertices.top(10) { Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1) }.foreach(t => println(t._2._2 + ": " + t._2._1))
University of California, Berkeley: 1321.1117543121227
Berkeley, California: 664.8841977233989
Uc berkeley: 162.5013274339786
Berkeley Software Distribution: 90.47860388486127
Lawrence Berkeley National Laboratory: 81.90404939642022
George Berkeley: 81.85226118458043
Busby Berkeley: 47.87199821801991
Berkeley Hills: 44.76406979519929
Xander Berkeley: 30.32407534728813
Berkeley County, South Carolina: 28.908336483710315
示例代碼如下:
package com.atguigu.graphx
import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}
object PageRank extends App {
// 屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// 設定一個 SparkConf
val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local[4]")
val sc = new SparkContext(conf)
val erdd = sc.textFile("D:\\learn\\JetBrains\\workspace_idea\\spark\\doc\\graphx-wiki-edges.txt")
val edges = erdd.map(x => {
val para = x.split("\t");
Edge(para(0).trim.toLong, para(1).trim.toLong, 0)
})
val vrdd = sc.textFile("D:\\learn\\JetBrains\\workspace_idea\\spark\\doc\\graphx-wiki-vertices.txt")
val vertices = vrdd.map(x => {
val para = x.split("\t");
(para(0).trim.toLong, para(1).trim)
})
val graph = Graph(vertices, edges)
println("**********************************************************")
println("PageRank 計算,獲取最有價值的數據")
println("**********************************************************")
val prGraph = graph.pageRank(0.001).cache()
val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) { (v, title, rank) => (rank.getOrElse(0.0), title) }
titleAndPrGraph.vertices.top(10) {
Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
}.foreach(t => println(t._2._2 + ": " + t._2._1))
sc.stop()
}
輸出結果如下:
**********************************************************
PageRank 計算,獲取最有價值的數據
**********************************************************
University of California, Berkeley: 1321.1117543121227
Berkeley, California: 664.8841977233989
Uc berkeley: 162.5013274339786
Berkeley Software Distribution: 90.47860388486127
Lawrence Berkeley National Laboratory: 81.90404939642022
George Berkeley: 81.85226118458043
Busby Berkeley: 47.87199821801991
Berkeley Hills: 44.76406979519929
Xander Berkeley: 30.32407534728813
Berkeley County, South Carolina: 28.908336483710315