Spark GraphX圖算法應用【分區策略、PageRank、ConnectedComponents,TriangleCount】


一.分區策略

  

  GraphX采用頂點分割的方式進行分布式圖分區。GraphX不會沿着邊划分圖形,而是沿着頂點划分圖形,這可以減少通信和存儲的開銷。從邏輯上講,這對應於為機器分配邊並允許頂點跨越多台機器。分配邊的方法取決於分區策略PartitionStrategy並且對各種啟發式方法進行了一些折中。用戶可以使用Graph.partitionBy運算符重新划分圖【可以使用不同分區策略】。默認的分區策略是使用圖形構造中提供的邊的初始分區。但是,用戶可以輕松切換到GraphX中包含的2D分區或其他啟發式方法。

  

  一旦對邊進行了划分,高效圖並行計算的關鍵挑戰就是將頂點屬性和邊有效結合。由於現實世界中的圖通常具有比頂點更多的邊,因此我們將頂點屬性移到邊上。由於並非所有分區都包含與所有頂點相鄰的邊,因此我們在內部維護一個路由表,該路由表在實現諸如triplets操作所需要的連接時,標示在哪里廣播頂點aggregateMessages。

二.測試數據

  1.users.txt

    

  2.followers.txt

    

  3.數據可視化

    

三.PageRank網頁排名

  1.簡介

    使用PageRank測量圖中每個頂點的重要性,假設從邊u到v表示的認可度x。例如,如果一個Twitter用戶被許多其他用戶關注,則該用戶將獲得很高的排名。GraphX帶有PageRank的靜態和動態實現,作為PageRank對象上的方法。靜態PageRant運行固定的迭代次數,而動態PageRank運行直到排名收斂【變化小於指定的閾值】。GraphOps運行直接方法調用這些算法。

  2.代碼實現

 1 package graphx
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.graphx.GraphLoader
 5 import org.apache.spark.sql.SparkSession
 6 
 7 /**
 8   * Created by Administrator on 2019/11/27.
 9   */
10 object PageRank {
11   Logger.getLogger("org").setLevel(Level.WARN)
12   def main(args: Array[String]) {
13     val spark = SparkSession.builder()
14         .master("local[2]")
15         .appName(s"${this.getClass.getSimpleName}")
16         .getOrCreate()
17       val sc = spark.sparkContext
18     val graph = GraphLoader.edgeListFile(sc, "D:\\software\\spark-2.4.4\\data\\graphx\\followers.txt")
19     // 調用PageRank圖計算算法
20     val ranks = graph.pageRank(0.0001).vertices
21     // join
22     val users = sc.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\users.txt").map(line => {
23       val fields = line.split(",")
24       (fields(0).toLong, fields(1))
25     })
26     // join
27     val ranksByUsername = users.join(ranks).map{
28       case (id, (username, rank)) => (username, rank)
29     }
30     // print
31     ranksByUsername.foreach(println)
32   }
33 }

  3.執行結果

    

四.ConnectedComponents連通體算法

  1.簡介

    連通體算法實現把圖划分為多個子圖【不進行節點切分】,清除孤島子圖【只要一個節點的子圖】。其使用子圖中編號最小的頂點ID標記子圖。

  2.代碼實現

 1 package graphx
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.graphx.GraphLoader
 5 import org.apache.spark.sql.SparkSession
 6 
 7 /**
 8   * Created by Administrator on 2019/11/27.
 9   */
10 object ConnectedComponents {
11   Logger.getLogger("org").setLevel(Level.WARN)
12   def main(args: Array[String]) {
13     val spark = SparkSession.builder()
14       .master("local[2]")
15       .appName(s"${this.getClass.getSimpleName}")
16       .getOrCreate()
17     val sc = spark.sparkContext
18     val graph = GraphLoader.edgeListFile(sc, "D:\\software\\spark-2.4.4\\data\\graphx\\followers.txt")
19     // 調用connectedComponents連通體算法
20     val cc = graph.connectedComponents().vertices
21     // join
22     val users = sc.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\users.txt").map(line => {
23       val fields = line.split(",")
24       (fields(0).toLong, fields(1))
25     })
26     // join
27     val ranksByUsername = users.join(cc).map {
28       case (id, (username, rank)) => (username, rank)
29     }
30     val count = ranksByUsername.count().toInt
31     // print
32     ranksByUsername.map(_.swap).takeOrdered(count).foreach(println)
33   }
34 }

  3.執行結果

    

五.TriangleCount三角計數算法

  1.簡介  

    當頂點有兩個相鄰的頂點且它們之間存在邊時,該頂點是三角形的一部分。GraphX在TriangleCount對象中實現三角計數算法,該算法通過確定經過每個頂點的三角形的數量,從而提供聚類的度量。注意,TriangleCount要求邊定義必須為規范方向【srcId < dstId】,並且必須使用Graph.partitionBy對圖進行分區。

  2.代碼實現

 1 package graphx
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.graphx.{PartitionStrategy, GraphLoader}
 5 import org.apache.spark.sql.SparkSession
 6 
 7 /**
 8   * Created by Administrator on 2019/11/27.
 9   */
10 object TriangleCount {
11   Logger.getLogger("org").setLevel(Level.WARN)
12   def main(args: Array[String]) {
13     val spark = SparkSession.builder()
14       .master("local[2]")
15       .appName(s"${this.getClass.getSimpleName}")
16       .getOrCreate()
17     val sc = spark.sparkContext
18     val graph = GraphLoader.edgeListFile(sc, "D:\\software\\spark-2.4.4\\data\\graphx\\followers.txt", true)
19       .partitionBy(PartitionStrategy.RandomVertexCut)
20 
21     // 調用triangleCount三角計數算法
22     val triCounts = graph.triangleCount().vertices
23     // map
24     val users = sc.textFile("D:\\software\\spark-2.4.4\\data\\graphx\\users.txt").map(line => {
25       val fields = line.split(",")
26       (fields(0).toLong, fields(1))
27     })
28     // join
29     val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
30       (username, tc)
31     }
32     val count = triCountByUsername.count().toInt
33     // print
34     triCountByUsername.map(_.swap).takeOrdered(count).foreach(println)
35   }
36 }

  3.執行結果

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM