Spark GraphX圖形數據分析


Spark GraphX圖形數據分析

圖(Graph)的基本概念

  • 圖是由頂點集合(vertex)及頂點間的關系集合(邊edge)組成的一種網狀數據結構
  • 圖數據很好的表達了數據之間的關系
  • 處理的是有向圖

圖的術語-4

  • 出度:指從當前頂點指向其他頂點的邊的數量
  • 入度:其他頂點指向當前頂點的邊的數量

圖的經典表示法(了解)

  • 鄰接矩陣
1、對於每條邊,矩陣中相應單元格值為1
2、對於每個循環,矩陣中相應單元格值為2,方便在行或列上求得頂點度數

 

Spark GraphX 簡介

  • GraphX特點
1)基於內存實現了數據的復用與快速讀取
2)通過彈性分布式屬性圖(Property Graph)統一了圖視圖與表視圖
3)與Spark Streaming、Spark SQL和Spark MLlib等無縫銜接

GraphX核心抽象

  • 彈性分布式屬性圖
  • 頂點和邊都是帶屬性的有向多重圖

頂點要帶邊,兩個邊構成編號

  • 一份物理存儲,兩種視圖(table view,Graph view)

GraphX API

Graph[VD,ED]

VD:頂點的數據類型(二元組)
var rdd=sc.makeRDD(List((1L,"A"),(2L,"B")))
ED:邊的數據類型
方法一:spark API
var spark=SparkSession.builder().master("local[2]")
    .appName("hello").getOrCreate();
  val sc=spark.sparkContext
  val verticesRDD=sc.makeRDD(List((1L,1),(2L,2),(3L,3)))
  val edgesRDD=sc.makeRDD(List(Edge(1L,2L,1),Edge(2L,3L,2)))
  val graph=Graph(verticesRDD,edgesRDD)
  graph.vertices.foreach(println(_))
  graph.edges.foreach(println(_))
方法二:spark上運行
import org.apache.spark.graphx.{Edge,Graph}
var spark=SparkSession.builder().master("local[2]")
    .appName("hello").getOrCreate();
  val sc=spark.sparkContext
  val verticesRDD=sc.makeRDD(List((1L,1),(2L,2),(3L,3)))
  val edgesRDD=sc.makeRDD(List(Edge(1L,2L,1),Edge(2L,3L,2)))
  val graph=Graph(verticesRDD,edgesRDD)
  graph.vertices.collect
  graph.edges.collect
  graph.triplets.collect

圖的算子-1

屬性算子

  • 類似於RDD的map操作

對頂點進行遍歷,傳給你的頂點類型,生成新的頂點

def mapVertices[VD2](map: (VertexId, VD) => VD2)
def mapEdges[ED2](map: Edge[ED] => ED2)

結構算子

  • reverse、subgraph
scala> graph1.reverse.triplets.collect
scala> graph1.subgraph(vpred=(id,attr)=>attr._2<30).triplets.collect

圖的算子-3

join算子:從外部的RDDs加載數據,修改頂點屬性

class Graph[VD, ED] {

def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]

def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)

: Graph[VD2, ED]

}

PageRank in GraphX

  • PageRank (PR)算法

     

  •  

  • 激活(active)和未激活(inactive)

    • 相當於紅色為未激活,綠色為激活

    !!!傳過來的值和當前的值做對比,小(接受),大(本身去除)

    1)首先我們默認將2里面(-1)作為舊值

    2)我們將2值傳送給7(-1)因為2<7:得出2(7)

    3)我們將7值傳送給3(-1)因為7>3:得出3(3)

    initialMsg:在“superstep 0”之前發送至頂點的初始消息
    maxIterations:將要執行的最大迭代次數
    activeDirection:發送消息方向(默認是出邊方向:EdgeDirection.Out)
    vprog:用戶定義函數,用於頂點接收消息
    sendMsg:用戶定義的函數,用於確定下一個迭代發送的消息及發往何處
    mergeMsg:用戶定義的函數,在vprog前,合並到達頂點的多個消息
    

     --------------------------------------------------------------------------------------------------------------

 def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexID, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]

 -----------------------------------------------------------------------------------------------------------------------

實例一:
入度:多個b對象指向a
case class User(name:String,ilike:Int,follow:Int)
import org.apache.spark.graphx._
//原始圖
var points=sc.makeRDD(Array((1L,"zs"),(2L,"ls")))
var edges = sc.makeRDD(Array(Edge(2L,1L,1)))
var graph=Graph(points,edges)
//改變點的信息的結構
var newGraph=graph.mapVertices(
(id,name)=>User(name,0,0))
newGraph.inDegrees.collect//這是一張表
//將新節點表和入度表聯合outerjoin
val nnGraph=newGraph.outerJoinVertices(newGraph.inDegrees)((id,lf,rf)=>User(lf.name,lf.ilike,rf.getOrElse(0)))
scala> nnGraph.pageRank(0.01).triplets.collect

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM