Spark-GraphxAPI學習筆記


  • 圖的集合視圖
graph包含三個基本的類集合視圖:
 val vertices: VertexRDD[VD]
 val edges: EdgeRDD[ED]
 val triplets: RDD[EdgeTriplet[VD, ED]],即可理解為:RDD(srcId,srcAttr,dstId,dstAttr,attr)
在對graph的某個視圖作map/filter操作時,可以使用case表達式來匹配對應的元素,如:graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
  • 圖信息接口
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  • 緩存方法
 def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
 def cache(): Graph[VD, ED]
 def unpersistVertices(blocking: Boolean = true): Graph[VD, ED],當在一個圖上頻繁修改頂點值而不重用邊信息時,可以用此方法對頂點去緩存以提高GC性能
  • 節點與邊的變換操作
  def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])  : Graph[VD, ED2]
 
  • 修改圖結構操作
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexID, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED],按條件生成子圖
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED],生成的結果圖的頂點和邊同時存在於原來的兩個圖中
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED],把重復邊進行reduce操作,注意此操作之前,應當在圖上調用partitionBy方法
  
  • 圖join操作
  def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
      (mapFunc: (VertexID, VD, Option[U]) => VD2) : Graph[VD2, ED]
注: joinVertices操作實際上是根據給定的另一個圖(原圖的每個頂點id至多對應此圖的的一個頂點id)把原圖中的頂點的屬性值根據指定的mapFunc函數進行修改,返回一個新圖,新圖的頂點類型不變,如果圖中的某個頂點id在另一個圖中不存在,則保留原值
而outerJoinVertices操作和joinVertices類似,只不過,當圖中某個頂點id在另一個圖中不存在時,則使用None值 
 
  • 在鄰邊上聚合信息
 def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
 def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
 def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
 注:聚合信息的核心方法是:aggregateMessages,其操作的本質是sendMsg和mergeMsg,具體而言,是依次在圖的每條邊(edgeTriplets)上根據sendMsg函數的要求,把該邊上A端的節點信息發送給B端,如:把src節點信息發到dst節點信息,或者把dst節點信息發送到src節點上,然后在B端調用merge函數將可能收到的多個msg合並成一個msg.
tripletFields字段指定要操作哪些字段,如果僅操作部分字段的話,通過此參數進行限定可提高性能。aggregateMessages方法最終返回一個新的頂點集:VertexRDD,這個新的頂點集中每個vertex節點上包含上聚合后的信息。
collectNeighborIds與collectNeighbors函數就是對aggregateMessages的簡單封裝以實現聚合相鄰節點id和相鄰節點的功能



  • VertexRDD與RDD有一個明顯的區別是,VertexRDD的key不重復,而RDD的key可以重復
  • aggregateUsingIndex函數的作用類似於reduceByKey,如vertexRdd1.aggregateUsingIndex(rdd2,_+_),作用是利用vertexRdd1的索引結果對rdd2進行聚合,在rdd2中對vertexRdd1中出現的id對應的屬性值做聚合操作,很像reduceByKey,得到的結果是一個VertexRDD,這個結果與vertexRdd1進行join等操作時就會很快,因為他們具有相同的索引結構

 

  • PageRank算法:

  graph.pageRank(tolorence,reset),用於計算類似於網頁排名的各種經典問題,tolorence參數用於指定可容忍的收斂度,畢竟無窮迭代下去是耗時也意義不大的,reset參數用於設定終止點和陷阱問題的概率,防止迭代結果傾斜或終止到一個節點的事情發生,所以這個參數不能傳0,詳情參考: 算法解析

 

  •   連通體算法:

    graph.connectedComponents() 返回一個新圖,新圖的頂點屬性被替換成了該頂點所在的連通體的id,這個id是此連通體中所有節點中id最小的那個節點的id

    例如,我要計算一個圖中連通圖的個數: graph.connectedComponents.vertices.map(e => (e._2, 1L)).reduceByKey(_ + _).sortBy(e => e._2, ascending = false).count

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM