1. 圖論與GraphX
圖論是一個數學學科,研究一組實體(稱為頂點)之間兩兩關系(稱為邊)的特點。通過構建關系圖譜,並對關系進行分析,可以實現更好的投放廣告,推薦關系等。隨着關系圖譜越來越強大,計算量也越來越大,於是不斷有新的並行圖處理框架被開發出來。如谷歌的Pregel、雅虎的 Giraph 和卡內基梅隆大學的 GraphLab。
本章介紹的GraphX是基於Spark 上的一個擴展工具它支持 Pregel、Giraph 和 GraphLab 中的許多圖並行處理任務。雖然有些圖處理並不如這幾個框架快,但是由於它是基於 Spark 的,所以用於數據分析時還是較為方便的。
2. MEDLINE
MEDLINE (Medical Literature Analysis and Retrieval System Online, 醫學文獻在線分析和檢索系統)是一個學術論文數據庫,收錄發表在生命科學和醫學領域期刊上的文獻。MEDLINE 引用量非常大而且更新頻率快,研究人員在所有文獻引用索引上開發了一套全面的語義標簽,稱為 MeSH(Medical Subject Headings)。這些標簽提供了一個有用的框架,使用Mesh,人們就可以在閱讀文獻時知道文獻之間的關系。這里我們會用 Spark 和 GraphX 來獲取、轉化並分析Mesh 術語網絡。
我們的目的是了解文獻引用圖譜的概況和特點,所以需要從多個角度來研究數據集:
1. 數據集中主要主題和它們的伴生關系
2. 找出數據集中的連通組件(connected component),也就是主題之間是否有連通性
3. 圖的度分布:描述了主題的相關度變化,有助於找到與其相關聯最多的主題
4. 兩個圖統計量:聚類系數 和 平均路徑長度
3. 獲取數據
wget ftp://ftp.nlm.nih.gov/nlmdata/sample/medline/*.gz
解壓並檢查數據,然后將數據上傳到 HDFS:
hdfs dfs -mkdir medline
hdfs dfs -put *.xml medline/
樣本中每個條目是一條 MedlineCitation 類型的記錄,該記錄包含文章在生物醫學雜志上的發表信息,包括雜志名稱、發行期號、發行日期、作者姓名、摘要、MeSH 關鍵字集合。此外,MeSH 關鍵字還有一個屬性,用於表示該關鍵字所指概念是不是文章主要的主題
首先我們需要把 xml 格式的 medline 數據讀到 Spark shell 中:
def loadMedline(sc: SparkContext, path: String) = {
@transient val conf = new Configuration()
conf.set(XmlInputFormat.START_TAG_KEY, "<MedlineCitation ")
conf.set(XmlInputFormat.END_TAG_KEY, "</MedlineCitation>")
val in = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
in.map(line => line._2.toString)
}
val medline_raw = loadMedline(sc, "hdfs:///user/hadoop/medline")
這里的 @transient 注釋表示某個字段不需要被序列化。newAPIHadoopFile 方法:Get an RDD for a Hadoop file with an arbitrary new API InputFormat。方法定義為:
public <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> RDD<scala.Tuple2<K,V>> newAPIHadoopFile(String path, scala.reflect.ClassTag<K> km, scala.reflect.ClassTag<V> vm, scala.reflect.ClassTag<F> fm)
4. 用 scala XML 工具解析XML 文檔
import scala.xml._
val raw_xml = medline_raw.take(1)(0)
val elem = XML.loadString(raw_xml)
變量 elem 是 scala.xml.Elem 類的實例,Scala 用 scala.xml.Elem 類表示 XML 文檔中的一個節點,該類內置了查詢節點信息和節點內容的函數,如:
scala> elem.label
res3: String = MedlineCitation
scala> elem.attributes
res4: scala.xml.MetaData = Status="MEDLINE" Owner="PIP"
它也提供了查找給定 XML 節點子節點的幾個運算符,其中第一個就是 \,用於根據名稱查詢節點的直接節點:
scala> elem \ "MeshHeadingList"
res5: scala.xml.NodeSeq =
NodeSeq(<MeshHeadingList>
<MeshHeading>
<DescriptorName UI="D001519" MajorTopicYN="N">Behavior</DescriptorName>
</MeshHeading>
<MeshHeading>
<DescriptorName UI="D000013" MajorTopicYN="N">Congenital Abnormalities</DescriptorName>
</MeshHeading>
…
運算符 \ 只對節點的直接節點有效,如果我們運行 elem \ "MeshHeading" 會得到空的 NodeSeq。為了得到給定節點的間接子節點,我們要用運算符 \\:
scala> elem \\ "MeshHeading"
res7: scala.xml.NodeSeq =
NodeSeq(<MeshHeading>
<DescriptorName UI="D001519" MajorTopicYN="N">Behavior</DescriptorName>
</MeshHeading>, <MeshHeading>
<DescriptorName UI="D000013" MajorTopicYN="N">Congenital Abnormalities</DescriptorName>
</MeshHeading>, <MeshHeading>
<DescriptorName UI="D006233" MajorTopicYN="N">Disabled Persons</DescriptorName>
…
可以用 \\ 運算符直接得到 DescriptorName 條目,並通過在每個 NodeSeq 內部元素上調用 text 函數把每個節點內的 MeSH 標簽提取出來:
(elem \\ "DescriptorName").map(_.text)
我們在數據里要提取的條目為 MajorTopic 的屬性,對此我們可以根據 DescriptorName 條目下的 MajorTopicYN 的值來判斷。它表示該 MeSH 標簽是否是所引用的文章的主要標題。只要我們在 XML 標簽屬性前加上“@“符號,就可以用 \ 和 \\ 運算符得到 XML 標簽屬性的值。用此特性我們可以構造一個過濾器,只返回主要屬性的 topic:
def majorTopics(elem : Elem): Seq[String] = {
val dn = elem \\ "DescriptorName"
val mt = dn.filter( n => (n \ "@MajorTopicYN").text == "Y")
mt.map(_.text)
}
scala> majorTopics(elem)
res24: Seq[String] = List(Intellectual Disability, Maternal-Fetal Exchange, Pregnancy Complications)
接下來將代碼應用到集群:
val melem = medline_raw.map(XML.loadString)
val medline = melem.map(majorTopics)
medline.cache
medline.first
res26: Seq[String] = List(Intellectual Disability, Maternal-Fetal Exchange, Pregnancy Complications)
5. 分析 MeSH 主要主題及其伴生關系
在獲取到MeSH 標簽后,我們需要知道數據集中標簽的總體分布情況,為此我們需要計算一些基本統計量,比如記錄條數和主要MeSH 主題出現頻率的直方圖:
scala> medline.count()
res30: Long = 240000
scala> val topics = medline.flatMap(mesh => mesh)
val topicCount = topics.countByValue()
scala> topicCount.size
res32: Int = 14548
scala> topicCount.toSeq.sortBy(_._2).reverse.take(10).foreach(println)
(Research,1649)
(Disease,1349)
(Neoplasms,1123)
(Tuberculosis,1066)
(Public Policy,816)
(Jurisprudence,796)
(Demography,763)
(Population Dynamics,753)
(Economics,690)
(Medicine,682)
以上結果可以對數據給出一個大致的描述,包括一共有多少個主題,最頻繁的主題等。可以看到,我們的數據一共有240000 個文檔,最頻繁出現的 topic(Research)只占了很少一部分(1649/240000 = 0.06%)。對此,我們猜測包含某個主題的文檔的個數的總體分布可能為長尾形態。為了驗證此猜想,可以使用以下命令查看數據分布
scala> topicCount.groupBy(_._2).mapValues(_.size).toSeq.sorted.take(10).foreach(println)
(1,3106)
(2,1699)
(3,1207)
(4,902)
(5,680)
(6,571)
(7,490)
(8,380)
(9,356)
(10,296)
當然我們主要關注的還是 MeSH 的伴生主題。MEDLINE 數據集中每一項都是一個字符串列表,代表每個引用記錄中提及的主題名稱。要得到伴生關系,我們要為這些字符串列表生成一個二元組集合。對此我們可以使用 Scala 集合工具包里的 combinations 方法,它返回的是一個 Iterator,因此並不需要把所有組合都放在內存里,如:
val list = List(1, 2, 3)
val combs = list.combinations(2)
combs.foreach(println)
List(1, 2)
List(1, 3)
List(2, 3)
當用這個函數來生產子列表時,我們要注意所有的列表要按照同樣的方式進行排序,以便之后使用 Spark 對它們進行匯總。這個因為 combinations 函數返回的列表取決於輸入元素的順序,而元素相同但屬虛不同的兩個列表是不等的:
val combs = list.reverse.combinations(2)
combs.foreach(println)
List(3, 2)
List(3, 1)
List(2, 1)
List(3,2) == List(2,3)
res50: Boolean = false
所以在為每條引用記錄生成二元自列表集合時,調用 combinations 之前要確保列表是排好序的:
val topicPairs = medline.flatMap(t => t.sorted.combinations(2))
val cooccurs = topicPairs.map(p => (p,1)).reduceByKey(_+_)
cooccurs.cache
cooccurs.count
res60: Long = 213745
我們數據中一共有 14548 個主題,總共可能有 14548 × 14548 / 2 = 105822152 個無序的伴生二元組。然而伴生二元組計算結果顯示只有 213745 個,只占可能數量的很少一部分。如果我們查看一下數據中最常出現的伴生二元組:
scala> val ord = Ordering.by[(Seq[String], Int), Int](_._2)
scala> cooccurs.top(10)(ord).foreach(println)
(List(Demography, Population Dynamics),288)
(List(Government Regulation, Social Control, Formal),254)
(List(Emigration and Immigration, Population Dynamics),230)
(List(Acquired Immunodeficiency Syndrome, HIV Infections),220)
(List(Antibiotics, Antitubercular, Dermatologic Agents),205)
(List(Analgesia, Anesthesia),183)
(List(Economics, Population Dynamics),181)
(List(Analgesia, Anesthesia and Analgesia),179)
(List(Anesthesia, Anesthesia and Analgesia),177)
(List(Population Dynamics, Population Growth),174)
以上並未提供特別有用的信息,最常見的伴生二元組與最常見的 topic 非常相關。除此之外,也沒有提供什么額外的信息
6. 用 GraphX 來建立一個伴生網絡
在研究伴生網絡時,標准的數據統計工具並不能提供額外的有價值的信息。我們可以了解數據的統計量等,但是無法了解網絡中關系的總體結構。
我們真正想要做的是把伴生網絡當作網絡來分析:把主題當作圖的頂點,把連接兩個主題的引用記錄看成兩個相應頂點之間的邊。這樣我們就可以計算以網絡為中心的統計量。這些網絡統計量呢個幫助我們理解網絡的總體結構並識別出那些有意思的局部離群點,識別出這些離群點之后我們才需要對其做進一步研究。
GraphX 構建與 Spark 之上,它繼承了 Spark 在可擴展性方面的所有特性。這就意味着可以利用 GraphX 對規模極其龐大的圖進行分析,這些任務可以在做個分布式的機器上並行執行。
GraphX 針對圖計算對 RDD 的實現進行了兩項特殊的優化:
1. VertexRDD[VD] 是 RDD[(VertexId, VD)] 的特殊實現,其中 VertexID 類型是 Long 的實例,對每個頂點都是必須的。VD 是頂點關聯的任何類型數據,稱為頂點屬性(vertex attribute)。
2. EdgeRDD[ED] 是 RDD[Edge[ED]] 的特殊實現,其中 Edge 是包含兩個 VertexId 值和一個 ED 類型的邊屬性(edge attribute)。VertexRDD 和 EdgeRDD 在每個數據分區內部均有用於加快連接和屬性更新的索引結構。給定 VertexRDD 及其相應的 EdgeRDD 后,我們就能建立一個Graph類的實例,Graph 類包含了許多圖計算的高校方法。
要建立一個圖,首先要取得用作圖頂點標識符的 Long 型值。由於所有主題都是用字符串標識的,因此我們在創建伴生網絡時需要將每個主題字符串轉換為 64 位的 Long 型值,而且這種方式最好能夠分布式執行。
方法之一就是內置的 hashCode 來對任意 Scala 對象產生一個 32 位整數。就這個例子而言,圖只有 14548 個頂點,是行得通的。但是如果是對於數百萬甚至數千萬個頂點的圖來說,發生哈希沖突的概率會較高。因此我們選用谷歌開發的 Guava 庫中的 Hashing 工具。利用此工具,可以通過 MD5 哈希算法為每個主題生成一個 64 位的唯一標識符:
import com.google.common.hash.Hashing
def hashId(str: String) = {
Hashing.md5().hashString(str).asLong()
}
應用到集群:
val vertices = topics.map(topic => (hashId(topic), topic))
同時,我們需要檢查一下是否有哈希沖突:
val uniqueHashes = vertices.map(_._1).countByValue()
val uniqueTopics = vertices.map(_._2).countByValue()
uniqueHashes.size == uniqueTopics.size
res2: Boolean = true
從結果里看到並沒有哈希沖突
接下來我們要用前一節中得到的伴生頻率計數來生成圖的邊,方法是使用hash 函數將每個主題映射到相應的頂點 ID。在生成邊的時候一個好習慣就是好習慣就是保證左邊的 VertexId(GraphX 稱為 src)要比右邊的 VertexId(GraphX 稱為 dst)小。雖然 GraphX 工具包中大多數算法都不要求 src 和 dst 之間有大小關系,但確實有幾個算法存在這樣的要求。因此最好在一開始就保證大小順序。
import org.apache.spark.graphx._
val edges = cooccurs.map( p => {
val (topics, cnt) = p
val ids = topics.map(hashId).sorted
Edge(ids(0), ids(1), cnt)
})
把頂點和邊都創建好后,就可以創建 Graph 實例了。我們需要將 Graph 緩存起來,這樣便於后續處理時使用:
val topicGraph = Graph(vertices, edges)
topicGraph.cache
上面的 vertices 和 edges 參數都是普通 RDD ,但是Graph API 會自動將輸入的 RDD 轉換為 VertexRDD 和 EdgeRDD,這樣頂點計數也不會將重復的頂點算在計數內:
scala> vertices
val vertices: org.apache.spark.rdd.RDD[(Long, String)]
scala> vertices.count
res0: Long = 280464
scala> topicGraph.vertices
val vertices: org.apache.spark.graphx.VertexRDD[String]
scala> topicGraph.vertices.count
res3: Long = 14548
如果某兩個頂點二元組在 EdgeRDD 中重復出現,Graph API 不會對其進行去重處理,這樣 GraphX 就可以創建多圖(multigraph),也就是相同頂點之間可以用多條不同值的邊。如果圖頂點代表了我許多豐富含義的對象,多圖往往是很有用的。多圖也可以讓我們根據實際情況使用有向邊或無向邊。
7. 理解網絡結構
研究圖時,我們需要計算出一些概要統計量,大概了解數據的結構。Graph 類內置了計算一些概要統計量的 API,結合其他常規 Spark RDD API,我們可以很輕松地了解到圖的結構。
1. 連通組件
最基本的屬性之一就是是否是連通圖。如果圖是非聯通的,那么我們可以將圖划分成一組更小的子圖,這樣就可以分別對每個子圖進行研究。
連通性是圖的基本屬性,可以通過調用 GraphX 的connectedComponents 方法獲取:
scala> val connectedComponentGraph = topicGraph.connectedComponents()
connectedComponentGraph: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Int] = org.apache.spark.graphx.impl.GraphImpl@67ca9edb
請注意 connectedComponents 方法的返回值,也是一個 Graph 對象,頂點屬性的類型是 VertexId,第二個 Int 類型的值表示每個頂點所屬連通組件的唯一標識符。想得到連通組件的個數和大小,我們可以在VertexRDD 中的每個頂點的 VertexId 上調用 countByValue 這個方法:
def sortedConnectedComponents(
connectedComponents: Graph[VertexId, _]): Seq[(VertexId, Long)] = {
val componentCounts = connectedComponents.vertices.map(_._2).countByValue
componentCounts.toSeq.sortBy(_._2).reverse
}
val componentCounts = sortedConnectedComponents(connectedComponentGraph)
我們可以看看一共有多少個連通組件,以及前10個最大的連通組件:
scala> componentCounts.size
res7: Int = 878
scala> componentCounts.take(10).foreach(println)
(-9222594773437155629,13610)
(-6100368176168802285,5)
(-1043572360995334911,4)
(-8641732605581146616,3)
(-8082131391550700575,3)
(-5453294881507568143,3)
(-6561074051356379043,3)
(-2349070454956926968,3)
(-8186497770675508345,3)
(-858008184178714577,2)
可以看到最大的連通圖包含了超過 90% 的頂點,第二大的連通組件只有5 個頂點。為了弄清楚為什么這些小的連通組件沒有和最大的連通組件連通,我們需要看一下它們的主題。為了查看小組件相關的主題名稱,我們需要將連通組件圖對應的 VertexRDD 和原始概念圖執行 join 操作。
VertexRDD 提供了 innerJoin 轉換,它利用了 GraphX 的內部數據結構,性能比常規Spark 的join轉換要好得多。innerJoin 方法需要我們提供一個函數,該函數的輸入為 VertexID 和兩個 VertexRDD的內部數據,函數的返回值是一個新的VertexRDD,它是innerJoin 方法結果。對應到我們這里的情況,我們想要知道每個連通組件的概念的名稱,因此需要返回一個包含概念名稱和組件ID 的二元組:
val nameCID = topicGraph.vertices.innerJoin(connectedComponentGraph.vertices){
(topicId, name, componentId) => (name, componentId)
}
現在我們查看一下第二大連通組件的主題名稱:
> val c1 = nameCID.filter( x => x._2._2 == componentCounts(1)._1)
> c1.collect.foreach(x => println(x._2._1))
Acetyl-CoA C-Acyltransferase
Racemases and Epimerases
Enoyl-CoA Hydratase
3-Hydroxyacyl CoA Dehydrogenases
Carbon-Carbon Double Bond Isomerases
在找連通組件時,底層使用了以下方式:
- connectedComponents 方法利用 VertexId 作為頂點唯一標識符在圖上執行一些列迭代計算
- 在計算的每個階段,每個頂點把它所收到的最小VertexID廣播到相鄰節點。
- 第一次迭代時,這個最小VertexID就是頂點自身的ID,在隨后的迭代中該最小VertexID通常會被更新掉。
- 每個頂點都記錄它所收到的VertexID的最小值,如果在某一次迭代中,所有頂點的最小VertexID都沒有變化,那么連通組件的計算就完成了,每個頂點都將分配給該頂點的最小VertexID所代表的組件
2. 度的分布
為了更多了解圖的結構信息,我們需要知道每個頂點的度,也就是每個頂點所屬邊的條數。對於一個五環圖,因為每條邊都包含兩個不同的頂點,所以全體頂點的度之和等於邊的條數的兩倍。
GraphX 中我們可以通過在Graph 對象上調用degrees方法得到每個頂點的度。degrees 方法返回一個整數的VertexRDD,其中每個整數代表一個頂點的度。現在我們計算一下圖的度:
> val degrees = topicGraph.degrees.cache()
> degrees.map(_._2).stats
res26: org.apache.spark.util.StatCounter = (count: 13721, mean: 31.155892, stdev: 65.497591, max: 2596.000000, min: 1.000000)
可以看到degree RDD 中條目的個數(13721)比圖里的頂點數(14548)要少,這是由於有頂點並沒有連接邊。這可能是由於MEDLINE 數據中某些引用只有一個主要主題詞,因此有些主題並未與其他主題同時出現。我們可以通過檢查原始RDD medline 來確認我們的推測:
> val sing = medline.filter( _.size == 1)
> sing.count
res31: Long = 44509
> val singTopic = sing.flatMap(topic => topic).distinct()
> singTopic.count
res34: Long = 8243
其中有 8243 個不同主題詞單獨出現在 MEDLINE 數據庫中的 44509 篇文章中。現在我們將已經出現在 topicPairs RDD 出現過的這些主題詞去掉:
> val topic2 = topicPairs.flatMap(p => p)
> singTopic.subtract(topic2).count
res39: Long = 827
這會過濾掉 MEDLINE 數據庫文檔中單獨出現的 827 個主題詞。14548 - 827 = 13721,正好是 degrees 中的條目數。
雖然此圖中度的均值較小,意味着普通頂點只連接到少數幾個其他節點,但是度的最大值卻表明至少有一個節點是高度連接的,它幾乎和圖中五分之一的點均有連接。
我們進一步看看那些度很高的點所對應的概念,這里我們仍有 innerJoin 方法,此方法會返回在兩個 VertexRDD 中均出現的頂點,因此那些沒有與其他概念同時出現的概念將被過濾掉:
def topNamesAndDegrees(degrees: VertexRDD[Int], topicGraph: Graph[String, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(topicGraph.vertices) {
(topicId, degree, name) => (name, degree)
}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)
}
然后可以查看到度數較高的主題:
scala> topNamesAndDegrees(degrees, topicGraph).foreach(println)
(Research,2596)
(Disease,1746)
(Neoplasms,1202)
(Blood,914)
(Pharmacology,882)
(Tuberculosis,815)
(Toxicology,694)
(Drug Therapy,678)
(Jurisprudence,661)
(Biomedical Research,633)