Spark中的鍵值對操作-scala


1.PairRDD介紹
    Spark為包含鍵值對類型的RDD提供了一些專有的操作。這些RDD被稱為PairRDD。PairRDD提供了並行操作各個鍵或跨節點重新進行數據分組的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分別規約每個鍵對應的數據,還有join()方法,可以把兩個RDD中鍵相同的元素組合在一起,合並為一個RDD。
2.創建Pair RDD
    程序示例:對一個英語單詞組成的文本行,提取其中的第一個單詞作為key,將整個句子作為value,建立 PairRDD
val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me"));
//獲取第一個單詞作為鍵
val words =rdd.map(x=>(x.split(" ")(0),x));
words.collect().foreach(println);
輸出結果:
(this,this is a test)
(how,how are you)
(do,do you love me)
(can,can you tell me)

3.PairRDD的轉化操作
    PairRDD可以使用所有標准RDD上可用的轉化操作。傳遞函數的規則也適用於PairRDD。由於PairRDD中包含二元組,所以需要傳遞的函數應當操作而元素而不是獨立的元素。
                                       PairRDD的相關轉化操作如下表所示
針對兩個PairRDD的轉化操作 rdd={(1,2),(3,4),(3,6)} other={(3,9)}
函數名 目的 示例 結果
substractByKey 刪掉RDD中鍵與other RDD
中的鍵相同的元素
rdd.subtractByKey(other) {(1,2)}
join 對兩個RDD進行內連接
rdd.join(other) {(3,(4,9)),(3,(6,9))}
rightOuterJoin 對兩個RDD進行連接操作,右外連接 rdd.rightOuterJoin(other) {(3,(4,9)),(3,(6,9))}
leftOuterJoin 對兩個RDD進行連接操作,左外連接 rdd.rightOuterJoin(other) {(1,(2,None)),(3,(4,9)),(3,(6,9))}
cogroup 將兩個RDD中擁有相同鍵的數據分組 rdd.cogroup(other) {1,([2],[]),(3,[4,6],[9])}
程序實例:
針對2 中程序生成的PairRDD,刪選掉長度超過20個字符的行。
val results=words.filter(value => value._2.length()<20);
results.foreach(println)
    RDD上有fold(),combine(),reduce()等行動操作,pair RDD上則有相應的針對鍵的轉化操作。
    (1)reduceByKey()與reduce()操作類似,它們都接收一個函數,並使用該函數對值進行合並。reduceByKey()會為數據集中的每個鍵進行並行的規約操作,每個規約操作會將鍵相同的值合並起來。reduceBykey()最終返回一個由各鍵規約出來的結果值組成的新的RDD。
程序示例:用reduceByKey實現單詞計數
val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me"));
val words =rdd.flatMap(line => line.split(" "));
val results=words.map(word => (word,1)).reduceByKey( {case(x,y) => x+y});
results.foreach(println)
輸出:
(are,1)
(this,1)
(is,1)
(you,3)
(can,1)
(a,1)
(love,1)
(do,1)
(how,1)
(tell,1)
(me,2)
(test,1)

  (2)foldByKey()與fold()操作類似,他們都使用一個與RDD和合並函數中的數據類型相同的零值作為初始值。與fold()一樣,foldByKey()操作所使用的合並函數對零值與另一個元素進行合並,結果仍為該元素。
    程序示例:求對應key的value之和
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));
val results=nums.foldByKey(0)({case(x,y)=>x+y})
results.collect().foreach(println)
結果:
(1,4)
(2,10)
(3)
    combineByKey()是最為常用的基於鍵進行聚合的函數。大多數基於鍵聚合的函數都是用它實現的。和aggregate()一樣,combineByKey()可以讓用戶返回與輸入數據類型不同的返回值。combineByKey()會遍歷分區中的所有元素,因此,每個元素的鍵要么還么有遇到過,要么就和之前的某個元素的鍵相同。如果這是一個新的元素,combineByKey()會使用一個叫做 createCombiner()的函數來創建那個鍵對應的累加器的初始值。需要注意的是,這一過程會在每個分區中第一次出現每個鍵時發生,而不是在整個RDD中第一次出現一個鍵時發生。
    如果這是一個處理當前分區之前就已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合並。
    由於每個分區都是獨立處理的,因此對於同一個鍵可以有多個累加器。如果有兩個或者更多的分區都有對應一個鍵的累加器,就需要使用用戶提供的mergeCombiners()方法將各個分區的結果進行合並。
     以下程序示例使用combineBykey()求每個鍵對應的平均值。
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));
val results=nums.combineByKey(
(v)=>(v,1),
(acc:(Int,Int),v) =>(acc._1+v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
).map{case(key,value)=>(key,value._1/value._2.toFloat)}
results.collectAsMap().map(println)
結果:
(2,5.0)
(1,2.0)
成功求出每個key對應value對應的平均值
*(4)並行度調優
    每個RDD都有固定數目的分區,分區數決定了在RDD上執行操作時的並行度。
    在執行聚合或者分組操作時,可以要求Spark使用給定的分區數。Spark始終嘗試根據集群的大小推斷出一個有意義的默認值,但是你可以通過對並行度進行調優來獲得更好的性能表現。
    在Scala中,combineByKey()函數和reduceByKey()函數的最后一個可選的參數用於指定分區的數目,即numPartitions,使用如下:
val results=nums.reduceByKey({(x,y) =>x+y},2);
5.數據分組
(1)groupByKey()
    groupByKey()會使用RDD中的鍵來對數據進行分組。對於一個由類型K的鍵和類型V的值組成的RDD,得到的RDD類型會是[K,Iterable[v]]。
    以下是程序示例,對PairRDD調用groupByKey()函數之后,返回的RDD類型是RDD [K,Iterable[v]]
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));
val group=nums.groupByKey();
val results=group.collect();
for(value <- results){
print(value._1+": ")
for(elem <- value._2)
print(elem+" ")
println()

}
輸出結果:
1: 1 3 
2: 2 8 
(2)cogroup()
    除了對單個RDD的數據進行分組,還可以使用cogroup()函數對對個共享同一個鍵的RDD進行分組。對兩個鍵的類型均為K而值得類型分別為V和W的RDD進行cogroup()時,得到結果的RDD類型為[(K,(Iterable[V],Iterable[W]))]。如果其中一個RDD對於另一個RDD中存在的某個鍵沒有對應的記錄,那么對應的迭代器則為空。
舉例:
val nums1 = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4)));
val nums2 = sc.parallelize(List(Tuple2(1,1),Tuple2(1,3),Tuple2(2,3)))
val results=nums1.cogroup(nums2)
for(tuple2 <- results.collect()){
print(tuple2._1+" [ ")
for(it <- tuple2._2._1)
print(it+" ")
print("] [ ")
for(it<-tuple2._2._2)
print(it+" ")
println("]")
}
輸出:
1 [ 1 3 ] [ 1 3 ]
3 [ 4 ] [ ]
2 [ 2 4 ] [ 3 ]
6.數據排序
在Scala中以字符串順序對正數進行自定義排序
(1)對RDD進行排序:
val nums =sc.parallelize(List(12,4,6,8,0,8));
//隱式轉換聲明排序的依據
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString())
}
val results=nums.sortBy(value=>value);
results.collect().foreach(println)
(2)對PairRDD,按key的值進行排序
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4)));
//隱式轉換聲明排序的依據
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString())
}
val results=nums.sortByKey();
results.collect().foreach(println)
7.數據分區
(1)創建數據分區
    在分布式程序中,通信的代價很大,控制數據分布以獲得最少的網絡傳輸可以極大地提升整體性能。Spark程序可以通過控制RDD分區的方式來減少通信消耗。只有當數據集多次在諸如連接這種基於鍵的操作中,分區才會有作用
    Spark中所有的鍵值對RDD都可以進行分區。系統會根據一個針對鍵的函數對元素進行分組。Spark可以確保同一組的鍵出現在一個節點上。
    舉個簡單的例子,應用如下:內存中保存着很大的用戶信息表,由(UserID,UserInfo[])組成的RDD,UserInfo是用戶所訂閱的所有主題列表。該應用會周期性地將這張表和一個小文件進行組合,這個小文件中存這過去5分鍾發生的時間,其實就是一系列(UserId,LinkInfo)RDD,其中LinkInfo是用戶訪問的鏈接的主題。我們需要對用戶訪問其未訂閱主題的頁面情況進行統計。我們可以使用Spark的join()操作進行組合操作。將兩者根據UserId連接之后,過濾出不在UserInfo[]中的LinkInfo,就是用戶訪問其未訂閱主題的情況。
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))
val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))
val userData =sc.parallelize(list1)
val events = sc.parallelize(list2)
val joined=userData.join(events)
val results=joined.filter({
case (id, (info, link)) =>
!info.contains(link)
}
).count()
println(results)
輸出:1
    這段代碼可以正確運行,但是效率不高。因為每5分鍾就要進行一次join()操作,而我們對數據集如何分區卻一無所知。默認情況下,連接操作會將兩個數據集中的所有鍵的哈希值都求出來,將該哈希值相同的記錄通過網絡傳到同一台機器上,然后在那台機器上對所有鍵相同的記錄進行連接操作。因為userData表比每5分鍾出現的訪問日志表events要大很多,所以要浪費時間進行額外的工作:在每次調用時都對userDAta表進行哈希值計算和跨節點數據混洗,雖然這些數據從來不會變化。
    要解決此問題:在程序開始的時候,對userData表進行partitionBy()轉化操作,將這張表轉化為哈希分區。可以通過向patitionBy傳遞一個spark.HashPartitioner對象來實現該操作。
    scala自定義分區方式:
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))
val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))
val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY)
    這樣以后在調用join()時,Spark就知道了該RDD是根據鍵的哈希值來分區的,這樣在調用join()時,Spark就會利用這一點,只會對events進行數據混洗操作,將events中特定userId的記錄發送到userData的對應分區所在的那台機器上。這樣,需要網絡傳輸的數據就大大減小了,程序運行的速度也顯著提高。
    請注意,我們還對userData 這個RDD進行了持久化操作默認情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成,將userData持久化之后,就能保證userData能夠在訪問時被快速獲取。
    *進一步解釋數據分區帶來的好處:
    如果沒有將partitionBy()轉化操作的結果進行持久化,那么后面每次用到這個RDD時都會重復對數據進行分區操作。不進行持久化會導致整個RDD譜系圖重新求值。那樣的話,partitionBy()帶來的好處就會抵消,導致重復對數據進行分區以及跨節點的混洗,和沒有指定分區方式時發生的情況是十分相似的。
(2)獲取數據分區的方式
接(1)中程序:
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))
val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))
val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY)
println(userData.partitioner)
  RDD的屬性partitioner就是存儲了對應的分區方式
(3)從分區中獲益的操作
    Spark中的很多操作都引入了根據鍵跨結點進行混洗的過程。所有這些操作都會從數據分區中獲益。能夠從數據分區中獲益的操作有:groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),以及lockup()。
    對於像reduceByKey()這樣只作用於單個RDD的操作,運行在未分區的RDD的時候或導致每個鍵所有對應值都在每台機器上進行本地計算,只需要把本地最終歸約出的結果值從各工作節點傳回主節點,所以原本的網絡開銷就不太大。而對於諸如cogroup()和join()這樣的二元操作,預先進行數據分區會導致其中至少一個RDD(使用已知分區器的那個RDD)不發生數據混洗。如果兩個RDD使用同樣的分區方式,並且它們還緩存在同樣的機器上(比如一個RDD是通過mapValues()從另一個RDD中創建出來的,這兩個RDD就會擁有相同的鍵和分區方式),或者其中一個RDD還沒有計算出來,那么跨節點數據混洗就不會發生了。
(4)影響分區方式的操作
    所有會為生成的結果RDD設好分區方式的操作:cogroup(),groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues()(如果父RDD有分區方式的話),filter()(如果父RDD有分區方式的話)。其他所有操作生成的結果都不會存在特定的分區方式。
注意:     
    對於二元操作,輸出數據的分區方式取決於父RDD的分區方式。默認情況下,結果會采用哈希分區,分區的數量和操作的並行度是一樣的。如果其中一個父RDD已經設置過分區方式,那么結果就會采用那種分區方式;如果兩個父RDD都設置過分區方式,結果RDD會采用第一個RDD的分區方式。
8.示例程序-PageRank
     PageRank算法是一種從RDD分區中獲益的更復雜的算法,我們以它為例進行分析。PageRank算法用來根據外部文檔指向一個文檔的鏈接,對集合中每個文檔的重要程度賦一個度量值。該算法可以用於對網頁進行排序,當然,也可以用於排序科技文章或社交網絡中有影響的用戶。
    算法會維護兩個數據集,一個由(pageID,linklist[])組成,包含每個頁面的鏈接到的頁面的列表;另一個由(pageID,rank)元素組成,包含每個頁面的當前排序值。它按以下步驟進行計算:
     ① 將每個頁面的排序值初始化為1.0
          ②在每次迭代中,向每個有直接鏈接的頁面,發送一個值為rank(p)/numNeighbors(p)(出鏈數目)   的貢獻量
        ③將每個頁面的排序值設置為0.15+0.85*contributionsReceived
           最后兩步會重復幾個循環,在此過程中,算法會逐漸收斂於每個頁面的實際PageRank值。在實際操作中,收斂通常需要進行十個迭代。
下面用Scala來實現PageRank算法:
/*
#以下是url的內容:
www.baidu.com www.hao123.com
www.baidu.com www.2345.com
www.baidu.com www.zhouyang.com
www.hao123.com www.baidu.com
www.hao123.com www.zhouyang.com
www.zhouyang.com www.baidu.com
*/
val inputs =sc.textFile("C:\\url.txt")
//url,[urls]
val links =inputs.map(x=>(x.split(" ")(0),x.split(" ")(1)))
.distinct()
.groupByKey()
.cache()
//url,rank
var ranks =links.mapValues(value =>1.0)
for(i<-0 until 10){

val contribs =links.join(ranks).flatMap({
case(pageid,(links,rank))=>
//url Double
links.map(dest=>(dest,rank/links.size))
})
//reduce and add the contribs
ranks=contribs.reduceByKey((x,y)=>x+y).mapValues(v => 0.15+0.85*v)
}
ranks.collect().foreach(println)
結果:
(www.hao123.com,0.3685546839262259)
(www.baidu.com,0.761571325242544)
(www.2345.com,0.3685546839262259)
(www.zhouyang.com,0.5269013026650011)
9.Scala設置自定義分區方式
    Spark允許你通過自定義Partitioner對象來控制RDD的分區方式,這樣可以讓你利用領域知識進一步減少通信消耗。
    舉個例子,假設我們要在一個網頁的集合上運行前一屆中的PageRank算法。在這里,每個頁面的ID是頁面的URL。當我們使用簡單的哈希函數進行分區時,擁有相似的URL的頁面比如 http://www.baidu.com/news 與 http://www.baidu.com/map 可能被分在完全不同的節點上。但是我們知道,同一個域名下的網頁更有可能相互連接。由於PageRank需要在每次迭代中從每個頁面向它所有相鄰的頁面發送一條消息,因襲把這些頁面分組在同一個分區中會更好。可以使用自定義的分區器來實現僅根據域名而不是整個URL進行分區。
    要實現先自定義Partitioner,需要繼承Partitioner類並實現其下述方法:
    override def numPartitions: Int = ???
    返回創建的分區數量
    override def getPartition(key: Any): Int = ???
    返回給定鍵的數量
          override def equals(other:Any):Boolean = ???
    Spark需要這個方法來檢查分區器對象是否與其他分區器實例相同,這樣Spark才能判斷兩個RDD的分區方式是否相同。

class DomainNamePartitioner (numParts:Int) extends Partitioner{
override def numPartitions: Int = numParts
//根據hashCodenumPartitions取余來得到Partition,因為返回的必須是非負數,所以對於hashCode為負的情況做了特殊處理
override def getPartition(key: Any): Int = {
val domain = new URL(key.toString).getHost();
val code=(domain.hashCode%numPartitions)
if(code<0){
code+numPartitions
}else{
code
}
}

override def equals(other:Any):Boolean = other
match {
//這個實例是DomainNamePartitioner的實例,並且numPartitions相同,返回true
case dnp:DomainNamePartitioner =>
dnp.numPartitions==numPartitions
//否則,返回false
case _ => false
}
}
























免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM