歡迎轉載,轉載請注明出處,徽滬一郎。
概要
圖的並行化處理一直是一個非常熱門的話題,這里頭的重點有兩個,一是如何將圖的算法並行化,二是找到一個合適的並行化處理框架。Spark作為一個非常優秀的並行處理框架,將一些並行化的算法移到其上面就成了一個很自然的事情。
Graphx是一些圖的常用算法在Spark上的並行化實現,同時提供了豐富的API接口。本文就Graphx的代碼架構及pagerank在graphx中的具體實現做一個初步的學習。
Google為什么贏得了搜索引擎大戰
當Google還在起步的時候,在搜索引擎領域,Yahoo正如日中天,紅的發紫。顯然,在Google面前的是一堵讓人幾乎沒有任何希望的牆。
但世事難料,現在“外事問谷歌”成了不爭的事實,Yahoo應也陪客了。
這種轉換到底是如何形成的了,有一個因素是這樣的,那就是Google發明了顯著提高搜索准確率的PageRank算法。如果說PageRank算法的提出讓谷歌牢牢站穩了搜索引擎大戰的腳跟,這是毫不誇張的。
搜索引擎有幾個要考慮的關鍵因素(個人觀點而已)。
- 要想吸引用戶,就必須要有出色的搜索准確率
- 有了用戶,才能做廣告投放,提高廣告投放的針對性就可以盈利
上述兩個方面都有非常優秀的算法。
廢話少述,回到正題。PageRank算法是圖論的一個具體應用,ok, 轉到圖論。
圖論簡介
圖的組成
離散數學中非常重要的一個部分就是圖論,下面是一個無向連通圖
頂點(vertex)
上圖中的A,B,C,D,E稱為圖的頂點。
邊
頂點與頂點之間的連線稱之為邊。
圖的數學表示
讀大學的時候,一直沒有想明白為什么要學勞什子的線性代數。直到這兩天看《數學之美》一書時,才發覺,線性代數在一些計算機應用領域,那簡直就是不可或缺啊。
我們比較容易理解的平面幾何和立體幾何(一個是二維,一個是三維),而線性代數解決的其實是一個高維問題,由於無法直覺的感受到,所以很難。如果想比較通俗的理解一下數學為什么有這么多的分支及其內在關聯,強烈推薦讀一下《數學橋 對高等數學的一次觀賞之旅》。
在數學中,用什么來表示圖呢,答案就是線性代數里面的矩陣,想想看,圖的關聯矩陣,圖的鄰接矩陣。總之就是矩陣啦,線性代數一下子有用了。下面是一個具體的例子。
圖的並行化處理
剛才說到圖可以用矩陣來表示,圖的並行化問題在某種程度上就被轉化為矩陣運算的並行化問題。
那么以矩陣的乘法為例,看看其是否可以並行化處理。
以矩陣 A X B 為例,說明並行化處理過程。
將上述的矩陣A和B划分為四個部分,如下圖所示
首次對齊之后
子矩陣相乘
相乘之后,A的子矩陣左移,B的子矩陣上移
計算結果合並
圖的並行化處理框架,從Pregel說起
上一節的重點有兩點
- 圖用矩陣來表示,對圖的運算就是矩陣的運算
- 矩陣乘法運算可以並行化,動態演示其並行化的原理
你說ok,我明白了。哪有沒有一種合適的並行化處理框架可以用來進行圖的計算呢,那你肯定想到了MapReduce。
MapReduce盡管也是一個不錯的並行化處理框架,但在圖計算方面,有許多缺點,主要是計算的中間過程需要存儲到硬盤,效率很低。
Google針對圖的並行處理,專門提出了一個了不起的框架Pregel。其執行時的動態視圖如下所示。
Pregel有如下優點
- 級聯可擴性好 scalability
- 容錯性強
- 能夠很好的表示各種圖的常用算法
Pregel的計算模型
計算模型如下圖所示,重要的有三個
- 作用於每個頂點的處理邏輯 vertexProgram
- 消息發送,用於相鄰節點間的通訊 sendMessage
- 消息合並邏輯 messageCombining
Pregel在Spark中的實現
非常感謝你能堅持看到現在,這篇博客內容很多,有點難。我想還是上一幅圖將其內在邏輯整一下再繼續說下去。
該圖要表示的意思是這樣的,Graphx利用了Spark這樣了一個並行處理框架來實現了圖上的一些可並行化執行的算法。
本篇博客要表達的意思就是上面加紅的這句話,請諸位看官仔細理解。
- 算法是否能夠並行化與Spark本身無關
- 算法並行化與否的本身,需要通過數學來證明
- 已經證明的可並行化算法,利用Spark來實現會是一個錯的選擇,因為Graphx支持pregel的圖計算模型
Graphx中的重要概念
Graph
毫無疑問,圖本身是graphx中一個非常重要的概念。
成員變量
graph中重要的成員變量分別為
- vertices
- edges
- triplets
為什么要引入triplets呢,主要是和Pregel這個計算模型相關,在triplets中,同時記錄着edge和vertex. 具體代碼就不羅列了。
成員函數
函數分成幾大類
- 對所有頂點或邊的操作,但不改變圖結構本身,如mapEdges, mapVertices
- 子圖,類似於集合操作中的filter subGraph
- 圖的分割,即paritition操作,這個對於Spark計算來說,很關鍵,正是因為有了不同的Partition,才有了並行處理的可能, 不同的PartitionStrategy,其收益不同。最容易想到的就是利用Hash來將整個圖分成多個區域。
- outerJoinVertices 頂點的外連接操作
圖的運算和操作 GraphOps
圖的常用算法是集中抽象到GraphOps這個類中,在Graph里作了隱式轉換,將Graph轉換為GraphOps
implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag]
(g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops
支持的操作如下
- collectNeighborIds
- collectNeighbors
- collectEdges
- joinVertices
- filter
- pickRandomVertex
- pregel
- pageRank
- staticPageRank
- connectedComponents
- triangleCount
- stronglyConnectedComponents
RDD
RDD是Spark體系的核心,那么Graphx中引入了哪些新的RDD呢,有倆,分別為
- VertexRDD
- EdgeRDD
較之EdgeRdd,VertexRDD更為重要,其上的操作也很多,主要集中於Vertex之上屬性的合並,說到合並就不得不扯到關系代數和集合論,所以在VertexRdd中能看到許多類似於sql中的術語,如
- leftJoin
- innerJoin
至於leftJoin, innerJoin, outerJoin的區別,建議谷歌一下,不再贅述。
Graphx場景分析
圖的存儲和加載
在進行數學計算的時候,圖用線性代數中的矩陣來表示,那么如何進行存儲呢?
學數據結構的時候,老師肯定說過好多的辦法,不再啰嗦了。
不過在大數據的環境下,如果圖很巨大,表示頂點和邊的數據不足以放在一個文件中怎么辦? 用HDFS
加載的時候,一台機器的內存不足以容下怎么辦? 延遲加載,在真正需要數據時,將數據分發到不同機器中,采用級聯方式。
一般來說,我們會將所有與頂點相關的內容保存在一個文件中vertexFile,所有與邊相關的信息保存在另一個文件中edgeFile。
生成某一個具體的圖時,用edge就可以表示圖中頂點的關聯關系,同時圖的結構也表示出來了。
GraphLoader
graphLoader是graphx中專門用於圖的加載和生成,最重要的函數就是edgeListFile,定義如下。
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] =
{
val startTime = System.currentTimeMillis
// Parse the edge data table directly into edge partitions
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if (lineArray.length < 2) {
logWarning("Invalid line: " + line)
}
val srcId = lineArray(0).toLong
val dstId = lineArray(1).toLong
if (canonicalOrientation && srcId > dstId) {
builder.add(dstId, srcId, 1)
} else {
builder.add(srcId, dstId, 1)
}
}
}
Iterator((pid, builder.toEdgePartition))
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile
應用舉例之PageRank
什么是PageRank
pageRank的核心思想
”在互聯網上,如果一個網頁被很多其它網頁所鏈接,說明它受到普遍的承認和依賴,那么它的排名就很高。“ (摘自數學之美第10章)
你說這也太簡單了吧,不是跟沒說一個樣嗎,怎么用數學來表示呢?
呵呵,起初我也這么想的,后來多看了幾遍之后,明白了一點點。分析步驟用文字表述如下,
- 網頁和網頁之間的關系用圖來表示
- 網頁A和網頁B之間的連接關系表示任意一個用戶從網頁A到轉到網頁B的可能性(概率)
- 所有網頁的排名用一維向量來B來表示
所有網頁之間的連接用矩陣A來表示,所有網頁排名用B來表示。
pageRank如何進行並行化
好了,上面的數學闡述說明了“網頁排名的計算可以最終抽象為矩陣相乘”,而在開始的時候已經證明過矩陣相乘可以並行化處理。
理論研究結束了,接下來的就是工程實現了,借用Pregel模型,PageRank中定義的各主要函數分別如下。
vertexProgram
def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
}
sendMessage
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
if (edge.srcAttr._2 > tol) {
Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
} else {
Iterator.empty
}
}
messageCombiner
def messageCombiner(a: Double, b: Double): Double = a + b
一點點啟示
通過pageRank這個例子,我們能夠搞清楚如何將平素學習的數學理論用以解決實際問題。
“學習的東西總是有價值的,至於用的上用不上,全靠造化了”
完整代碼
// Connect to the Spark cluster
val sc = new SparkContext("spark://master.amplab.org", "research")
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("graphx/data/users.txt")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
case (uid, deg, Some(attrList)) => attrList
// Some users may not have attributes so we set them as empty
case (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
case (uid, attrList, None) => (0.0, attrList.toList)
}
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
小結
本篇講來講去就在強調一個問題,Spark是一個分布式並行計算框架。能不能用Spark,其實大體取決於問題的數學模型本身,如果可以並行化處理,則用之,切不可削足適履。
另一個用張圖來總結一下提到的數學知識吧。
再一次強烈推薦《數學橋》
參考資料
- 《數學之美》
- 《數學橋 高等數學的觀賞之旅》
- 《大數據》