由於本人文字表達能力不足,還是多多以代碼形式表述,首先展示測試代碼,然后解釋:
package com.txq.spark.test
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, SparkException, graphx}
import scala.reflect.ClassTag
/**
* spark GraphX 測試
* @authorTongXueQiang
*/
object test {
System.setProperty("hadoop.home.dir","D://hadoop-2.6.2");
val conf = new SparkConf().setMaster("local").setAppName("testRDDMethod");
val sc = new SparkContext(conf);
def main(args: Array[String]): Unit = {
/*
val rdd = sc.textFile("hdfs://spark:9000/user/spark/data/SogouQ.sample");//搜狗搜索日志解析
val rdd1 = rdd.map(_.split("\t")).map(line=>line(3)).map(_.split(" "));
println("共有"+rdd1.count+"行");
val rdd2 = rdd1.filter(_(0).toInt == 1).filter(_(1).toInt == 1);
println("搜索結果和點擊率均排第一的共有"+rdd2.count+"行");
val users:RDD[(VertexId,(String,String))] = sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","prof")),(2L,("istoica","prof"))));
val relationships:RDD[Edge[String]] = sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"pi")));
val defaultUser = ("jone","Missing");
val graph = Graph(users,relationships,defaultUser);
val result = graph.vertices.filter{case(id,(name,pos)) => pos == "prof"}.count();
println("職位名稱為prof的個數有:" + result + "個");
println(graph.edges.filter(e => e.srcId > e.dstId).count());
graph.triplets.collect().foreach(println)
graph.edges.collect().foreach(println)*/
/*
val graph:Graph[Double,Int] = GraphGenerators.logNormalGraph(sc,numVertices = 100).mapVertices((id,_) => id.toDouble)
println(graph);
println(graph.vertices)*/
/*
val oderFollowers:VertexRDD[(Int,Double)] = graph.mapReduceTriplets[(Int,Double)](
triplet =>{
if(triplet.srcAttr > triplet.dstAttr){
Iterator((triplet.dstId,(1,triplet.srcAttr)));
} else {
Iterator.empty
}
},
(a,b) =>(a._1 + b._1,a._2 + b._2)
)
val avgAgeOfolderFollower:VertexRDD[Double] = oderFollowers.mapValues((id,value) => {
value match{
case (count,totalAge) => totalAge / count
}
})
avgAgeOfolderFollower.collect().foreach(println)*/
//收集鄰居節點,后面有自定義方法
//collectNeighborIds(EdgeDirection.In,graph).foreach(line => {print(line._1+":"); for (elem <- line._2) {print(elem + " ")};println;});
//以Google的網頁鏈接文件(后面由下載地址)為例,演示pregel方法,找出從v0網站出發,得到經過的步數最少的鏈接網站,類似於附近地圖最短路徑算法
val graph:Graph[Double,Double] = GraphLoader.edgeListFile(sc,"hdfs://spark/user/spark/data/web-Google.txt",numEdgePartitions = 4).mapVertices((id,_) => id.toDouble).mapEdges(edge => edge.attr.toDouble);
val sourceId:VertexId = 0;//定義源網頁Id
val g:Graph[Double,Double] = graph.mapVertices((id,attr) => if(id == 0) 0.0 else Double.PositiveInfinity)
//pregel底層調用GraphOps的mapReduceTriplets方法,一會兒解釋源代碼
val result = pregel[Double,Double,Double](g,Double.PositiveInfinity)(
(id,vd,newVd) => math.min(vd,newVd),//這個方法的作用是更新節點VertexId的屬性值為新值,以利於innerJoin操作
triplets => {//map函數
if(triplets.srcAttr + triplets.attr < triplets.dstAttr){
Iterator((triplets.dstId,triplets.srcAttr + triplets.attr))
} else {
Iterator.empty
}
},
(a,b) => math.min(a,b)//reduce函數
)
//輸出結果,注意pregel返回的是更新VertexId屬性的graph,而不是VertexRDD[(VertexId,VD)]
print("最短節點:"+result.vertices.filter(_._1 != 0).reduce(min));//注意過濾掉源節點
}
//找出路徑最短的點
def min(a:(VertexId,Double),b:(VertexId,Double)):(VertexId,Double) = {
if(a._2 < b._2) a else b
}
/**
* 自定義收集VertexId的neighborIds
* @author TongXueQiang
*/
def collectNeighborIds[T,U](edgeDirection:EdgeDirection,graph:Graph[T,U])(implicit m:scala.reflect.ClassTag[T],n:scala.reflect.ClassTag[U]):VertexRDD[Array[VertexId]] = {
val nbrs = graph.mapReduceTriplets[Array[VertexId]](
//map函數
edgeTriplets => {
val msgTosrc = (edgeTriplets.srcId,Array(edgeTriplets.dstId));
val msgTodst = (edgeTriplets.dstId,Array(edgeTriplets.srcId));
edgeDirection match {
case EdgeDirection.Either =>Iterator(msgTosrc,msgTodst)
case EdgeDirection.Out => Iterator(msgTosrc)
case EdgeDirection.In => Iterator(msgTodst)
case EdgeDirection.Both => throw new SparkException("It doesn't make sense to collect neighbors without a " + "direction.(EdgeDirection.Both is not supported.use EdgeDirection.Either instead.)")
}
},_ ++ _)//reduce函數
nbrs
}
/**
* 自定義pregel函數
* @param graph 圖
* @param initialMsg 返回的vertexId屬性
* @param maxInterations 迭代次數
* @param activeDirection 邊的方向
* @param vprog 更新節點屬性的函數,以利於innerJoin操作
* @param sendMsg map函數,返回Iterator[A],一般A為Tuple2,其中id為接受消息方
* @param mergeMsg reduce函數,一般為合並,或者取最小、最大值……操作
* @tparam A 想要得到的VertexId屬性
* @tparam VD graph中vertices的屬性
* @tparam ED graph中的edge屬性
* @return 返回更新后的graph
*/
def pregel[A:ClassTag,VD:ClassTag,ED:ClassTag](graph:Graph[VD,ED],initialMsg:A,maxInterations:Int = Int.MaxValue,activeDirection:EdgeDirection = EdgeDirection.Either)(
vprog:(VertexId,VD,A) => VD,
sendMsg:EdgeTriplet[VD,ED] =>Iterator[(VertexId,A)],
mergeMsg:(A,A) => A)
: Graph[VD,ED] = {
Pregel0(graph,initialMsg,maxInterations,activeDirection)(vprog,sendMsg,mergeMsg)//調用apply方法
}
//此為節點內連接函數,返回VertexRDD
def innerJoin[U:ClassTag,VD:ClassTag](table:RDD[(VertexId,U)])(mapFunc:(VertexId,VD,U) => VertexRDD[(VertexId,VD)]) = {
val uf = (id: VertexId, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)
case None => data
}
}
}
//測試Option[T] def test():Unit = {
val map = Map("a" -> "1","b" -> "2","c" -> "3");
def show(value:Option[String]):String = {
value match{
case Some(x) => x
case None => "no value found!"
}
}
println(show(map.get("a")) == "1");
}
}
下面重點研究Pregel,為了方便,自己重新定義了一個Pregel0
package com.txq.spark.test
import org.apache.spark.Logging
import org.apache.spark.graphx.{EdgeDirection, EdgeTriplet, Graph, VertexId}
import scala.reflect.ClassTag
/**
* 自定義Pregel object,處理思路:
*/
object Pregel0 extends Logging {
def apply[VD:ClassTag,ED:ClassTag,A:ClassTag]
(graph:Graph[VD,ED],
initialMsg:A,
maxIterations:Int = Int.MaxValue,
activeDirection:EdgeDirection = EdgeDirection.Either)
(vprog:(VertexId,VD,A) => VD,
sendMsg:EdgeTriplet[VD,ED] => Iterator[(VertexId,A)],
mergeMsg:(A,A) => A)
: Graph[VD,ED] =
{
//①對vertices進行更新操作
var g = graph.mapVertices((vid,vdata) => vprog(vid,vdata,initialMsg)).cache();
//②compute the messages,注意調用的是mapReduceTriplets方法,源代碼:
def mapReduceTriplets[A](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduce: (A, A) => A),
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None )
: VertexRDD[A]
var messages = g.mapReduceTriplets(sendMsg,mergeMsg);
print("messages:"+messages.take(10).mkString("\n"))
var activeMessages = messages.count();
//LOAD
var prevG:Graph[VD,ED] = null
var i = 0;
while(activeMessages > 0 && i < maxIterations){
//③Receive the messages.Vertices that didn't get any message do not appear in newVerts.
//內聯操作,返回的結果是VertexRDD,可以參看后面的調試信息
val newVerts = g.vertices.innerJoin(messages)(vprog).cache();
print("newVerts:"+newVerts.take(10).mkString("\n"))
//④update the graph with the new vertices.
prevG = g;//先把舊的graph備份,以利於后面的graph更新和unpersist掉舊的graph
//④外聯操作,返回整個更新的graph
g = g.outerJoinVertices(newVerts){(vid,old,newOpt) => newOpt.getOrElse(old)}//getOrElse方法,意味,如果newOpt存在,返回newOpt,不存在返回old
print(g.vertices.take(10).mkString("\n"))
g.cache();//新的graph cache起來,下一次迭代使用
val oldMessages = messages;//備份,同prevG = g操作一樣
//Send new messages.Vertices that didn't get any message do not appear in newVerts.so
//don't send messages.We must cache messages.so it can be materialized on the next line.
//allowing us to uncache the previous iteration.
//⑤下一次迭代要發送的新的messages,先cache起來
messages = g.mapReduceTriplets(sendMsg,mergeMsg,Some((newVerts,activeDirection))).cache()
print("下一次迭代要發送的messages:"+messages.take(10).mkString("\n"))
activeMessages = messages.count();//⑥
print("下一次迭代要發送的messages的個數:"+ activeMessages)//如果activeMessages==0,迭代結束
logInfo("Pregel finished iteration" + i);
//原來,舊的message和graph不可用了,unpersist掉
oldMessages.unpersist(blocking= false);
newVerts.unpersist(blocking=false)//unpersist之后,就不可用了
prevG.unpersistVertices(blocking=false)
prevG.edges.unpersist(blocking=false)
i += 1;
}
g//返回最后的graph
}
}
輸出的調試信息:(距離v0節點最近的節點)
第一次跌代:
messages:(11342,1.0)
(824020,1.0)
(867923,1.0)
(891835,1.0)
newVerts:(11342,1.0)
(824020,1.0)
(867923,1.0)
(891835,1.0)
下一次迭代要發送的messages:(302284,2.0)
(760842,2.0)
(417728,2.0)
(322178,2.0)
(387543,2.0)
(846213,2.0)
(857527,2.0)
(856657,2.0)
(695578,2.0)
(27469,2.0)
下一次迭代要發送的messages的個數:29
下一次迭代要發送的messages:(754862,3.0)
(672426,3.0)
(320258,3.0)
(143557,3.0)
(789355,3.0)
(596104,3.0)
(118398,3.0)
(30115,3.0)
下一次迭代要發送的messages的個數:141
依次不斷類推,直到activeMessages = 0跌代結束。
上面需要cache的有:graph,messages,newVertis.spark中的創建RDD和transformation操作都是lazy的,存儲的只是內存地址,並非真正創建對象,當進行action時,需要從頭至尾運行一遍,所以cache之后,重復利用RDD,再次進行action時,速度會大大提升。unpersist之后,就不能用了,所以需要把舊的備份。
一般情況使用mapReduceTriplets可以解決很多問題,為什么Spark GraphX會提供Pregel API?主要是為了更方便地去做迭代操作。因為在GraphX里面,Graph這張圖並沒有自動cache,而是手動cache。但是為了每次迭代更快,需要手動去做cache,每次迭代完就需要把沒用的刪除掉而把有用的保留,這比較難以控制。因為Graph中的點和邊是分開進行Cache的,而Pregel能夠幫助我們。例如,PangeRank就非常適合用Pregel來做。
web-Google.txt.gz文件下載地址:http://snap.stanford.edu/data/web-Google.html
佟氏出品,必屬精品!專注spark GraphX、數據挖掘、機器學習的源代碼和算法,扎扎實實,寫好每一行代碼!
