一.分區策略
GraphX采用頂點分割的方式進行分布式圖分區。GraphX不會沿着邊划分圖形,而是沿着頂點划分圖形,這可以減少通信和存儲的開銷。從邏輯上講,這對應於為機器分配邊並允許頂點跨越多台機器。分配邊的方法取決於分區策略PartitionStrategy並且對各種啟發式方法進行了一些折中。用戶可以使用Graph.partitionBy運算符重新划分圖【可以使用不同分區策略】。默認的分區策略是使用圖形構造中提供的邊的初始分區。但是,用戶可以輕松切換到GraphX中包含的2D分區或其他啟發式方法。
一旦對邊進行了划分,高效圖並行計算的關鍵挑戰就是將頂點屬性和邊有效結合。由於現實世界中的圖通常具有比頂點更多的邊,因此我們將頂點屬性移到邊上。由於並非所有分區都包含與所有頂點相鄰的邊,因此我們在內部維護一個路由表,該路由表在實現諸如triplets操作所需要的連接時,標示在哪里廣播頂點aggregateMessages。
二.測試數據
1.users.txt
2.followers.txt
3.數據可視化
三.PageRank網頁排名
1.簡介
使用PageRank測量圖中每個頂點的重要性,假設從邊u到v表示的認可度x。例如,如果一個Twitter用戶被許多其他用戶關注,則該用戶將獲得很高的排名。GraphX帶有PageRank的靜態和動態實現,作為PageRank對象上的方法。靜態PageRant運行固定的迭代次數,而動態PageRank運行直到排名收斂【變化小於指定的閾值】。GraphOps運行直接方法調用這些算法。
2.代碼實現
1 package graphx 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.graphx.GraphLoader 5 import org.apache.spark.sql.SparkSession 6 7 /** 8 * Created by Administrator on 2019/11/27. 9 */ 10 object PageRank { 11 Logger.getLogger("org").setLevel(Level.WARN) 12 def main(args: Array[String]) { 13 val spark = SparkSession.builder() 14 .master("local[2]") 15 .appName(s"${this.getClass.getSimpleName}") 16 .getOrCreate() 17 val sc = spark.sparkContext 18 val graph = GraphLoader.edgeListFile(sc, "D:\\software\\spark-2.4.4\\data\\graphx\\followers.txt") 19 // 調用PageRank圖計算算法 20 val ranks = graph.pageRank(0.0001).vertices 21 // join 22 val users = sc.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\users.txt").map(line => { 23 val fields = line.split(",") 24 (fields(0).toLong, fields(1)) 25 }) 26 // join 27 val ranksByUsername = users.join(ranks).map{ 28 case (id, (username, rank)) => (username, rank) 29 } 30 // print 31 ranksByUsername.foreach(println) 32 } 33 }
3.執行結果
四.ConnectedComponents連通體算法
1.簡介
連通體算法實現把圖划分為多個子圖【不進行節點切分】,清除孤島子圖【只要一個節點的子圖】。其使用子圖中編號最小的頂點ID標記子圖。
2.代碼實現
1 package graphx 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.graphx.GraphLoader 5 import org.apache.spark.sql.SparkSession 6 7 /** 8 * Created by Administrator on 2019/11/27. 9 */ 10 object ConnectedComponents { 11 Logger.getLogger("org").setLevel(Level.WARN) 12 def main(args: Array[String]) { 13 val spark = SparkSession.builder() 14 .master("local[2]") 15 .appName(s"${this.getClass.getSimpleName}") 16 .getOrCreate() 17 val sc = spark.sparkContext 18 val graph = GraphLoader.edgeListFile(sc, "D:\\software\\spark-2.4.4\\data\\graphx\\followers.txt") 19 // 調用connectedComponents連通體算法 20 val cc = graph.connectedComponents().vertices 21 // join 22 val users = sc.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\users.txt").map(line => { 23 val fields = line.split(",") 24 (fields(0).toLong, fields(1)) 25 }) 26 // join 27 val ranksByUsername = users.join(cc).map { 28 case (id, (username, rank)) => (username, rank) 29 } 30 val count = ranksByUsername.count().toInt 31 // print 32 ranksByUsername.map(_.swap).takeOrdered(count).foreach(println) 33 } 34 }
3.執行結果
五.TriangleCount三角計數算法
1.簡介
當頂點有兩個相鄰的頂點且它們之間存在邊時,該頂點是三角形的一部分。GraphX在TriangleCount對象中實現三角計數算法,該算法通過確定經過每個頂點的三角形的數量,從而提供聚類的度量。注意,TriangleCount要求邊定義必須為規范方向【srcId < dstId】,並且必須使用Graph.partitionBy對圖進行分區。
2.代碼實現
1 package graphx 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.graphx.{PartitionStrategy, GraphLoader} 5 import org.apache.spark.sql.SparkSession 6 7 /** 8 * Created by Administrator on 2019/11/27. 9 */ 10 object TriangleCount { 11 Logger.getLogger("org").setLevel(Level.WARN) 12 def main(args: Array[String]) { 13 val spark = SparkSession.builder() 14 .master("local[2]") 15 .appName(s"${this.getClass.getSimpleName}") 16 .getOrCreate() 17 val sc = spark.sparkContext 18 val graph = GraphLoader.edgeListFile(sc, "D:\\software\\spark-2.4.4\\data\\graphx\\followers.txt", true) 19 .partitionBy(PartitionStrategy.RandomVertexCut) 20 21 // 調用triangleCount三角計數算法 22 val triCounts = graph.triangleCount().vertices 23 // map 24 val users = sc.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\users.txt").map(line => { 25 val fields = line.split(",") 26 (fields(0).toLong, fields(1)) 27 }) 28 // join 29 val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) => 30 (username, tc) 31 } 32 val count = triCountByUsername.count().toInt 33 // print 34 triCountByUsername.map(_.swap).takeOrdered(count).foreach(println) 35 } 36 }
3.執行結果