關系計算問題描述
二度關系是指用戶與用戶通過關注者為橋梁發現到的關注者之間的關系。目前微博通過二度關系實現了潛在用戶的推薦。用戶的一度關系包含了關注、好友兩種類型,二度關系則得到關注的關注、關注的好友、好友的關注、好友的好友四種類型。
如果要為全站億級用戶根據二度關系和四種橋梁類型推薦橋梁權重最高 TopN 個用戶,大致估算了下總關系量在千億級別,按照原有的 Mapreduce 模式計算整個二度關系,需要以橋梁用戶為 Key,把它的關注和粉絲兩個億級的表做 Join,如果活躍用戶按照億計,平均關注量按百計, Join 需要傳輸的數據量為幾百 TB,同時 Mapreduce 在 shuffle 過程中中間結果需要多次排序和落地到 HDFS, 按這么實現內存和帶寬無法滿足,而且在時效上也不能滿足業務需要。
二度關系推薦可抽象成在有向圖中尋找到指定頂點的最短距離為 2 的所有頂點,將滿足上述條件的頂點稱為頂點的二跳鄰居.這是經典的圖問題,使用分布式圖計算模型在算法描述和擴展性上有很大的優勢。
原文網址:https://kknews.cc/tech/jv2mk4l.html
下面我們把二度關系抽象成圖后舉例描述下
如上圖所示,單向箭頭表示關注關系,雙向箭頭表示好友關系,箭頭上的數字表示邊權重,如 A 到 C1 的橋梁權重 =B1(0.5+0.6)+B2(0.7+0.1)=1.9,推薦理由是好友的好友.我們需要將全站千億級有效關注關系按上述模型計算求得 A 的二跳鄰居 C,再去掉 C 中 A 直接關注的,最后將 C 按橋梁權重從高到低取 TopN。
框架選擇
目前業界主流的分布式圖計算框架有 Giraph 和 GraphX。Giraph 是一個迭代的圖計算系統。Giraph 計算的輸入是由點和直連的邊組成的圖.例如,點可以表示人,邊可以表示朋友請求.每個頂點保存一個值,每個邊也保存一個值.輸入不僅取決於圖的拓撲邏輯,也包括定點和邊的初始值.
ADVERTISEMENT
Giraph 由 Yahoo 開源,原型是 Google 的 Pregel,在 2012 年已經成為 Apache 軟件基金會的開源項目,並得到 Facebook 的支持,獲得多方面的改進.
GraphX 是是 Apache 的開源項目 Spark 的重要部分,最早是伯克利 AMPLAB 的分布式圖計算框架項目,后來整合到 Spark 中成為一個核心組件。GraphX 是 Spark 中用於圖和圖並行計算的 API,其實是 GraphLab 和 Pregel 在 Spark(Scala) 上的重寫及優化,跟其他分布式圖計算框架相比, GraphX 最大的優點,在 Spark 之上提供一棧式數據解決方案,可以方便且高效地完成圖計算的一整套流水作業.
ADVERTISEMENT
End-to-end PageRank performance (20 iterations, 3.7B edges)
GraphX 借助 Spark,將圖表示為 RDD,一種分布式的能載入內存的數據集。較之 mapreduce 順序處理數據,鑒於內存具有天然的隨機訪問特性, Spark 的大多數操作都在內存中完成,因此更適合處理圖問題。GraphX 處理端到端的圖迭代問題在運行時間上也快於 Giraph( 見上圖 ),因此我們決定采用 GraphX 做二度關系挖掘和推薦。
基於 GraphX 的二度關系求解
基本概念
屬性圖 : 屬性圖是一個有向多重圖,它帶有連接到每個頂點和邊的用戶定義的對象。有向多重圖中多個並行 (parallel) 的邊共享相同的源和目的地頂點.支持並行邊的能力簡化了建模場景,這個場景中,相同的頂點存在多種關系 ( 例如 likes 和 blocks),每個頂點由一個唯一的 long 型的 VertexID 作為頂點 ID。
ADVERTISEMENT
一個屬性圖 Graph 由兩個 RDD 構成分別是 :VertexRDD[VD] 和 EdgeRDD[ED],分別表示頂點和邊。VD 和 ED 分別表示頂點和邊的屬性類型.他們和 RDD 一樣,屬性圖是不可變的、分布式的、容錯的.其中最關鍵的是不變性.邏輯上,所有圖的轉換和操作都產生了一個新圖;物理上, GraphX 會有一定程度的不變頂點和邊的復用優化,對用戶透明。
下圖是一個屬性圖的例子.
頂點和邊 : 眾所周知,圖結構中最基本的要素是頂點和邊。GraphX 描述的是擁有頂點屬性和邊屬性的有向圖。GraphX 提供頂點( Vertex)、邊( Edge)、邊三元組( EdgeTriplet)三種視圖。GraphX 的各種圖操作也是在這三種視圖上完成的.如下圖所示,頂點包含頂點 ID 和頂點數據( VD);邊包含源頂點 ID( srcId)、目的頂點 ID( dstId)和邊數據( ED).邊三元組是邊的擴展,它在邊的基礎上提供了邊的源頂點數據、目的頂點數據.在許多圖計算操作中,需要將邊數據以及邊所連接的頂點數據一起組成邊三元組,然后在邊三元組上進行操作.
圖的分布式存儲
GraphX 將圖數據以 RDD 分布式地存儲在集群的頂點上,使用頂點 RDD( VertexRDD)、邊 RDD( EdgeRDD)存儲頂點集合和邊集合。頂點 RDD 通過按頂點的 ID 進行哈希分區,將頂點數據以多分區形式分布在集群上.邊 RDD 按指定的分區策略( Partition Strategy)進行分區(默認使用邊的 srcId 進行哈希分區),將邊數據以多分區形式分布在集群.另外,頂點 RDD 中還擁有頂點到邊 RDD 分區的路由信息——路由表.路由表存在頂點 RDD 的分區中,它記錄分區內頂點跟所有邊 RDD 分區的關系.在邊 RDD 需要頂點數據時(如構造邊三元組),頂點 RDD 會根據路由表把頂點數據發送至邊 RDD 分區。如下圖所示,按頂點分割方法將圖分解后得到頂點 RDD、邊 RDD 和路由表。
在圖計算過程中,有些邊的計算需要頂點數據,即需形成邊三元組視圖,如 PageRank 算法生出邊的權值,這需要將頂點的權值發送至出邊所在的邊 RDD 分區。GraphX 依據路由表,從頂點 RDD 中生成與邊 RDD 分區相對應的重復頂點視圖( ReplicatedVertexView),它的作用是作為中間 RDD,將頂點數據傳送至邊 RDD 分區.重復頂點視圖按邊 RDD 分區並攜帶頂點數據的 RDD,如圖下圖所示,重復頂點分區 A 中便攜帶邊 RDD 分區 A 中的所有的頂點,它與邊 RDD 中的頂點是 co-partition(即分區個數相同,且分區方法相同).在圖計算時, GraphX 將重復頂點視圖和邊 RDD 進行拉鏈( zipPartition)操作,即將重復頂點視圖和邊 RDD 的分區一一對應地組合起來,從而將邊與頂點數據連接起來,使邊分區擁有頂點數據.整個形成邊三元組過程,只有在頂點 RDD 形成重復頂點視圖中存在分區間數據移動,拉鏈操作不需要移動頂點數據和邊數據.由於頂點數據一般比邊數據要少的多,而且隨着迭代次數的增加,需要更新的頂點數目也越來越少,重復頂點視圖中攜帶的頂點數據也相應減少,這樣就可以大大減少數據的移動量,加快執行速度。
GraphX 在頂點 RDD 和邊 RDD 的分區中以數組形式存儲頂點數據和邊數據,目的是為了不損失元素訪問性能。GraphX 在分區里建立了眾多索引結構,高效地實現快速訪問頂點數據或邊數據.在迭代過程中,圖的結構不會發生變化,因而頂點 RDD、邊 RDD 以及重復頂點視圖中的索引結構全部可以重用,當由一個圖生成另一個圖時,只須更新頂點 RDD 和 RDD 的數據存儲數組.索引結構的重用,是 GraphX 保持高性能的關鍵,也是相對於原生 RDD 實現圖模型性能能夠大幅提高的主要原因.
求解過程
先構造一個屬性圖,每個頂點的屬性 Attr 為 Map(dstId->distance),初始化為 Map( 該頂點 ID->0) 。然后進行兩次迭代求解二度關系.
第一次迭代 : 遍歷每條邊,將 dst 頂點屬性 dstAttr 中的跳數字段標記為 1 發給 src 頂點, src 收到后合並到頂點屬性 srcAttr 里.
第二次迭代 : 遍歷邊篩選出 dstAttr 里面跳數為 1 的 Key-Value 發給對應的 src 頂點,並將 dstId 加入橋梁頂點,最后聚合這些消息得到所有 2 跳鄰居。
最佳實踐
圖分區
如上所述, Graphx 使用的是 Vertex-Cut( 點分割 ) 方式存儲圖,用三個 RDD 存儲圖數據信息:VertexTable(ID, data): ID 為頂點 ID,data 為頂點屬性
EdgeTable(pid, src, dst, data): pid 為分區 ID,src 為源頂點 ID , dst 為目的頂點 ID, data 為邊屬性
RoutingTable(ID,pid): ID 為頂點 ID, pid 為分區 ID
點分割存儲實現如下圖所示:
用戶可指定不同的分區策略.分區策略會將邊分配到各個邊分區,頂點 Master 分配到各個頂點分區,重復頂點視圖會緩存本地邊關聯點的副本.划分策略的不同會影響到所需要緩存的副本數量,以及每個邊分區分配的邊的均衡程度,需要根據圖的結構特征選取最佳策略.
下圖展示了 GraphX 自帶的 4 種邊分區模式.
考慮到我們具體應用場景,在經過第一次迭代后圖中的橋梁頂點 B 將會收到它關注頂點發來的消息 , 其屬性會變大約 100 倍.在第二次迭代的時候,如果同一個頂點 B 被分到不同的邊分區,在其屬性 update 的時候也會被復制多份到重復頂點視圖,根據我們圖的規模,這個復制量無論是內存和帶寬都扛不住.
按照 dstId 對邊進行分區
我們分區的思路是考慮到消息發送方向是按照粉絲 (src) 方向,盡量把同 dstId 的分到同一分區里面以降低 dstAttr 的副本數.如下圖所示,避免 dst 為超級頂點 ( 關注大量用戶,同時有大量粉絲 ) 在屬性發生變更時被大量復制撐爆內存.
合理設置分區 (partition) 大小
每一次 task 只能處理一個 partition 的數據,這個值太小了會導致每片數據量太大,導致內存壓力,或者諸多 executor 的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低。推薦分區數是集群 CPU 總核數的 3 到 4 倍。
鄰邊消息聚合使用 aggregateMessages
aggregateMessages 是 Graphx 最重要的 API, 1.2 版本添加的新函數,用於替換 mapReduceTriplets。目前 mapReduceTriplets 最終也是使用兼容的 aggregateMessages。改用 aggregateMessages 后,性能提升 30%。GraphX 的 pregel 也能實現鄰邊消息聚合,在二度關系求解場景下未被采用,因為其終止條件之一是本次收到消息的活躍頂點數 activeMessages 為 0,每次迭代都會調用 activeMessages = messages.count 計算活躍頂點數。但是 count 這個 reduce 方法會產生一個新的 job,非常耗時.根據我們的場景,迭代次數是確定的兩次,因此選用 aggregateMessages,其返回值即是收到消息的活躍頂點 VertexRDD,避免了在迭代過程中使用 count。
aggregateMessages 在邏輯上分為三步:
-
由邊三元組生成消息;
-
向邊三元組的頂點發送消息;
-
頂點聚合收到的消息.
它實現分為 map 階段和 reduce 階段.
phase1.aggregateMessages
map
GraphX 使用頂點 RDD 更新重復頂點視圖.重復頂點視圖與邊 RDD 進行分區拉鏈( zipPartitions)操作,將頂點數據傳往邊 RDD 分區,實現邊三元組視圖。對邊 RDD 進行 map 操作,依據用戶提供的函數為每個邊三元組產生一個消息( Msg),生成以頂點 ID、消息為元素的 RDD,其類型為 RDD[(VertexId, Msg)]。
phase2.aggregateMessages
reduce
reduce 階段首先對 step1 中的消息 RDD 按頂點分區方式進行分區(使用頂點 RDD 的分區函數),分區后的消息 RDD 與該圖的頂點 RDD 元素分布狀況將完全相同.在分區時, GraphX 會使用用戶提供的聚合函數合並相同頂點的消息,最終形成類似頂點 RDD 的消息 RDD。
另外,在使用 aggregateMessages 的時候需要注意參數 tripletFields,這個參數用來指定發送的消息域,默認發所有 (src 屬性 ,dst 屬性和 edge 屬性 )。根據我們的模型和算法,消息發送方向是關注的反方向,數據只需要 dstAttr,因此可將 tripletFields 設置為 TripletFields.Dst。這樣只會復制頂點的 dst 屬性,降低網絡傳輸開銷.
采用 Kryo 序列化
Spark 默認使用的是 Java
Serialization,性能、空間表現都比較差,官方推薦的是 Kryo
Serialization,序列化速度更快,壓縮率也更高.在 Spark UI 上能夠看到序列化占用總時間開銷的比例,采用 Kryo 序列化后 RDD 存儲比 Java 序列化節省大約 9 倍的空間,見下圖。
圖片來源:https://github.com/EsotericSoftware/kryo/wiki/Benchmarks-for-Kryo-version-1.x
一旦啟用 Kryo 序列化機制以后,可帶來如下幾點性能提升 :
-
算子函數中使用到的外部變量,使用 Kryo 以后 : 優化網絡傳輸的性能,可以優化集群中內存的占用和消耗。
-
持久化 RDD,在調用 persist 時需要指定 StorageLevel 為 StorageLevel.MEMORY_ONLY_SER,可優化內存的占用和消耗;持久化 RDD 占用的內存越少, task 執行的時候,創建的對象,就不至於頻繁的占滿內存,頻繁發生 GC。
-
shuffle:在進行 stage 間的 task 的 shuffle 操作時,頂點與頂點之 間的 task 會互相大量通過網絡拉取和傳輸文件,此時,這些數據既然通過網絡傳輸,也是可能要序列化的,這時 Kryo 便可用於提高網絡傳輸的性能,
在使用的時候需要注意 : 如果是我們自己定義的數據類型,需要在 Kryo 中注冊.代碼如下:
val conf = new SparkConf.setMaster(...).setAppName(...)
conf.set("Spark.serializer", "org.Apache.Spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
內存和 shuffle 調優
下面這張圖展示了 Spark on YARN 內存結構
在實際二度關系求解中,每個 stage 以 shuffle 為界,上游 stage 做 map task,每個 map task 將計算結果數據分成多份,每一份對應到下游 stage 的每個 partition 中,並將其臨時寫到磁盤,該過程叫做 shuffle write ;下游 stage 做 reduce task,每個 reduce task 通過網絡拉取上游 stage 中所有 map task 的指定分區結果數據,該過程叫做 shuffle read,最后完成 reduce 的業務邏輯,如下圖所示,
map task 將 R 個 shuffle 文件寫盤 ( 采用 SortShuffleManager),其中 R 為 reduce task 的數目。在寫盤前,會先將數據寫入內存 buffer,當 buffer 滿了才會溢寫到磁盤文件中,Reduce task 會拉取自己需要的數據,如果 Map 和 Reduce 發生在不同的機器上,便會產生網絡傳輸開銷。實踐中,如果 shuffle 時內存比較緊張,就需要適當調整 Spark.shuffle.memoryFraction 參數,這個參數表示 executor 內存中分配給 shuffle read task 進行聚合操作的內存比例,默認值 0.2,可將其調大,避免由於內存不足導致聚合過程中頻繁讀寫磁盤.
其次是將文件壓縮方式設置為 snnapy 壓縮,取代原有的 lzf,可減少 map 階段 io 文件 buffer 的內存使用量
( 400k per file-> 32k per file) conf.set("Spark.io.compression.codec","org.Apache.Spark.io.SnnapyCompressionCodec")
同時注意上圖中的 Spark.storage.memoryFraction,這個參數表示 RDD Cache 所用的內存比例,默認為 0.6。我們使用 Kryo 序列化后, RDD 內存占用縮減到了原來的 1 / 9,因此將這個參數調小為 0.3,騰出更多的內存給 executor 使用。
網絡參數調優
在實際運行中還出現過如下錯誤
Java.util.concurrent.TimeoutException:
Futures timed out after [120 second]
解決
由網絡或者 GC 引起, worker 或 executor 沒有接收到 executor 或 task 的心跳反饋。提高 Spark.network.timeout 的值,根據情況改成 300(5min) 或更高。默認為 120(120s).
小結
本文主要介紹了 Spark GraphX 一些基本原理,以及用於微博二度關系推薦中的一些思考及實踐經驗,經過實際場景運行,基於 GraphX 做的好友的好友的關系推薦,在時效和推薦轉化率上均有更好的效果。
參考文獻
-
http://spark.apache.org/docs/latest/graphx-programming-guide.html
-
GraphX A Resilient Distributed Graph System on Spark annotated, https://amplab.cs.berkeley.edu/wp-content/uploads/2013/05/grades-graphx_with_fonts.pdf
-
Spark Graphx In Action
-
https://endymecy.gitbooks.io/spark-graphx-source-analysis/content/vertex-cut.html
-
https://spark.apache.org/docs/latest/tuning.html
-
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
-
Optimizing Shuffle Performance in Spark, https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf
-
https://www.iteblog.com/archives/1672
-
http://sharkdtu.com/posts/spark-shuffle.html