Spark中的鍵值對操作


1.PairRDD介紹
    Spark為包含鍵值對類型的RDD提供了一些專有的操作。這些RDD被稱為PairRDD。PairRDD提供了並行操作各個鍵或跨節點重新進行數據分組的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分別規約每個鍵對應的數據,還有join()方法,可以把兩個RDD中鍵相同的元素組合在一起,合並為一個RDD。
2.創建Pair RDD
    程序示例:對一個英語單詞組成的文本行,提取其中的第一個單詞作為key,將整個句子作為value,建立 PairRDD
List<String> list=new ArrayList<String>();
list.add("this is a test");
list.add("how are you?");
list.add("do you love me?");
list.add("can you tell me?");
JavaRDD<String> lines=sc.parallelize(list);
JavaPairRDD<String,String> map =lines.mapToPair(
new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) throws Exception {
return new Tuple2<String, String>(s.split(" ")[0],s);
//獲取第一個單詞作為key,svalue
}
}
);
3.PairRDD的轉化操作
    PairRDD可以使用所有標准RDD上可用的轉化操作。傳遞函數的規則也適用於PairRDD。由於PairRDD中包含二元組,所以需要傳遞的函數應當操作而元素而不是獨立的元素。
                                       PairRDD的相關轉化操作如下表所示
針對兩個PairRDD的轉化操作 rdd={(1,2),(3,4),(3,6)} other={(3,9)}
函數名 目的 示例 結果
substractByKey 刪掉RDD中鍵與other RDD
中的鍵相同的元素
rdd.subtractByKey(other) {(1,2)}
join 對兩個RDD進行內連接
rdd.join(other) {(3,(4,9)),(3,(6,9))}
rightOuterJoin 對兩個RDD進行連接操作,右外連接 rdd.rightOuterJoin(other) {(3,(4,9)),(3,(6,9))}
leftOuterJoin 對兩個RDD進行連接操作,左外連接 rdd.rightOuterJoin(other) {(1,(2,None)),(3,(4,9)),(3,(6,9))}
cogroup 將兩個RDD中擁有相同鍵的數據分組 rdd.cogroup(other) {1,([2],[]),(3,[4,6],[9])}
程序實例:
針對2 中程序生成的PairRDD,刪選掉長度超過20個字符的行。
JavaPairRDD<String,String> result=map.filter(
new Function<Tuple2<String, String>, Boolean>() {
public Boolean call(Tuple2<String, String> value) throws Exception {
return value._2().length()<20;
}
}

);
for(Tuple2 tuple:result.collect()){
System.out.println(tuple._1()+": "+tuple._2());
4.聚合操作
    RDD上有fold(),combine(),reduce()等行動操作,pair RDD上則有相應的針對鍵的轉化操作。
    (1)reduceByKey()與reduce()操作類似,它們都接收一個函數,並使用該函數對值進行合並。reduceByKey()會為數據集中的每個鍵進行並行的規約操作,每個規約操作會將鍵相同的值合並起來。reduceBykey()最終返回一個由各鍵規約出來的結果值組成的新的RDD。
程序示例:用reduceByKey實現單詞計數
strLine.add("how are you");
strLine.add("I am ok");
strLine.add("do you love me");
JavaRDD<String> input=sc.parallelize(strLine);
JavaRDD<String> words=input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
JavaPairRDD<String,Integer> result=words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2(s, 1);
}
}
).reduceByKey(
new org.apache.spark.api.java.function.Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}
) ;
    (2)foldByKey()與fold()操作類似,他們都使用一個與RDD和合並函數中的數據類型相同的零值作為初始值。與fold()一樣,foldByKey()操作所使用的合並函數對零值與另一個元素進行合並,結果仍為該元素。
    程序示例:求對應key的value之和
List<Tuple2<Integer,Integer>> list=new ArrayList<Tuple2<Integer, Integer>>();
list.add(new Tuple2<Integer,Integer>(1,1));
list.add(new Tuple2<Integer, Integer>(1,3));
list.add(new Tuple2<Integer, Integer>(2,2));
list.add(new Tuple2<Integer, Integer>(2,8));
JavaPairRDD<Integer,Integer> map=sc.parallelizePairs(list);
JavaPairRDD<Integer,Integer> results=map.foldByKey(0, new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
for(Tuple2<Integer,Integer> tuple:results.collect())
System.out.println(tuple._1()+"->"+tuple._2());
結果:
1->4
2->10  
(3)
    combineByKey()是最為常用的基於鍵進行聚合的函數。大多數基於鍵聚合的函數都是用它實現的。和aggregate()一樣,combineByKey()可以讓用戶返回與輸入數據類型不同的返回值。combineByKey()會遍歷分區中的所有元素,因此,每個元素的鍵要么還么有遇到過,要么就和之前的某個元素的鍵相同。如果這是一個新的元素,combineByKey()會使用一個叫做 createCombiner()的函數來創建那個鍵對應的累加器的初始值。需要注意的是,這一過程會在每個分區中第一次出現每個鍵時發生,而不是在整個RDD中第一次出現一個鍵時發生。
    如果這是一個處理當前分區之前就已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合並。
    由於每個分區都是獨立處理的,因此對於同一個鍵可以有多個累加器。如果有兩個或者更多的分區都有對應一個鍵的累加器,就需要使用用戶提供的mergeCombiners()方法將各個分區的結果進行合並。
     以下程序示例使用combineBykey()求每個鍵對應的平均值。
public class AvgCount implements Serializable{
private int total_;
private int num_;
public AvgCount(int total,int num){
total_=total;
num_=num;
}
public float avg(){
return total_/(float) num_;
}//createCombiner()
static Function<Integer,AvgCount> createAcc =new Function<Integer,AvgCount>(){
public AvgCount call(Integer x){
return new AvgCount(x,1);
}
}
;//mergeValue()
static Function2<AvgCount,Integer,AvgCount> addAndCount=new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) throws Exception {
a.
total_+=x;
a.num_+=1;
return a;
}
}
; //mmergeCombiners()
static Function2<AvgCount,AvgCount,AvgCount> combine=new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) throws Exception {
a.
total_+=b.total_;
a.num_+=b.num_;
return a;
}
}
;
public static void main(String args[]){
AvgCount initial =
new AvgCount(0,0);
SparkConf conf = new SparkConf().setMaster("local").setAppName("my app");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer,Integer>> list=new ArrayList<Tuple2<Integer, Integer>>();
list.add(new Tuple2<Integer,Integer>(1,1));
list.add(new Tuple2<Integer, Integer>(1,3));
list.add(new Tuple2<Integer, Integer>(2,2));
list.add(new Tuple2<Integer, Integer>(2,8));
JavaPairRDD<Integer,Integer> nums=sc.parallelizePairs(list);
JavaPairRDD<Integer,AvgCount> avgCounts=nums.combineByKey(createAcc,addAndCount,combine);
Map<Integer,AvgCount> countMap= avgCounts.collectAsMap();
for(Map.Entry<Integer,AvgCount> entry:countMap.entrySet())
System.
out.println(entry.getKey()+": "+entry.getValue().avg());
}
}
結果:
2: 5.0
1: 2.0
成功求出每個key對應value對應的平均值
*(4)並行度調優
    每個RDD都有固定數目的分區,分區數決定了在RDD上執行操作時的並行度。
    在執行聚合或者分組操作時,可以要求Spark使用給定的分區數。Spark始終嘗試根據集群的大小推斷出一個有意義的默認值,但是你可以通過對並行度進行調優來獲得更好的性能表現。
    在Java中,combineByKey()函數和reduceByKey()函數的最后一個可選的參數用於指定分區的數目,即numPartitions,使用如下:
JavaPairRDD<Integer,AvgCount> avgCounts=nums.combineByKey(createAcc,addAndCount,combine,10);
5.數據分組
(1)groupByKey()
    groupByKey()會使用RDD中的鍵來對數據進行分組。對於一個由類型K的鍵和類型V的值組成的RDD,鎖的到的結果RDD類型會是[K,Iterable[v]]。
    以下是程序示例,對PairRDD調用groupByKey()函數之后,會返回 JavaPairRDD<Integer,Iterable<Integer>>類型的結果,也就是所有同一個Key的value都可以調用Iterator進行遍歷。
List<Tuple2<Integer,Integer>> list1=new ArrayList<Tuple2<Integer, Integer>>();
list1.add(new Tuple2<Integer,Integer>(1,1));
list1.add(new Tuple2<Integer, Integer>(2,2));
list1.add(new Tuple2<Integer, Integer>(1,3));
list1.add(new Tuple2<Integer, Integer>(2,4));
JavaPairRDD<Integer,Integer> nums1=sc.parallelizePairs(list1);
JavaPairRDD<Integer,Iterable<Integer>>results =nums1.groupByKey();
//接下來遍歷輸出results,注意其中關於Iterable遍歷的處理
for(Tuple2<Integer,Iterable<Integer>> tuple :results.collect()){
System.out.print(tuple._1()+": ");
Iterator<Integer> it= tuple._2().iterator();
while(it.hasNext()){
System.out.print(it.next()+" ");
}
System.out.println();
}
輸出結果:
1: 1 3 
2: 2 4 
(2)cogroup()
    除了對單個RDD的數據進行分組,還可以使用cogroup()函數對對個共享同一個鍵的RDD進行分組。對兩個鍵的類型均為K而值得類型分別為V和W的RDD進行cogroup()時,得到結果的RDD類型為[(K,(Iterable[V],Iterable[W]))]。如果其中一個RDD對於另一個RDD中存在的某個鍵沒有對應的記錄,那么對應的迭代器則為空。
舉例:
    List<Tuple2<Integer,Integer>> list1=new ArrayList<Tuple2<Integer, Integer>>();
List<Tuple2<Integer,Integer>> list2=new ArrayList<Tuple2<Integer, Integer>>();

list1.add(new Tuple2<Integer,Integer>(1,1));
list1.add(new Tuple2<Integer, Integer>(2,2));
list1.add(new Tuple2<Integer, Integer>(1,3));
list1.add(new Tuple2<Integer, Integer>(2,4));
list1.add(new Tuple2<Integer, Integer>(3,4));

list2.add(new Tuple2<Integer,Integer>(1,1));
list2.add(new Tuple2<Integer, Integer>(1,3));
list2.add(new Tuple2<Integer, Integer>(2,3));

JavaPairRDD<Integer,Integer> nums1=sc.parallelizePairs(list1);
JavaPairRDD<Integer,Integer> nums2=sc.parallelizePairs(list2);
JavaPairRDD<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> results=nums1.cogroup(nums2);
for(Tuple2<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> tuple:results.collect()){
System.out.print(tuple._1()+" [ ");
Iterator it1=tuple._2()._1().iterator();
while(it1.hasNext()){
System.out.print(it1.next()+" ");
}
System.out.print("] [ ");
Iterator it2=tuple._2()._2().iterator();
while(it2.hasNext()){
System.out.print(it2.next()+" ");
}
System.out.print("] \n");
}
}
輸出:
1 [ 1 3 ] [ 1 3 ] 
3 [ 4 ] [ ] 
2 [ 2 4 ] [ 3 ] 

6.數據排序
在Java中以字符串順序對正數進行自定義排序
(1)對RDD進行排序:
JavaRDD<Integer> nums=sc.parallelize(Arrays.asList(1,5,3,2,6,3));
JavaRDD<Integer> results =nums.sortBy(new Function<Integer, Object>() {
public Object call(Integer v1) throws Exception {
return v1;
}
},false,1);
for(Integer a:results.collect())
System.out.println(a);
(2)對PairRDD,按key的值進行排序
ist<Tuple2<Integer, Integer>> list1 = new ArrayList<Tuple2<Integer, Integer>>();
list1.add(new Tuple2<Integer, Integer>(1, 1));
list1.add(new Tuple2<Integer, Integer>(2, 2));
list1.add(new Tuple2<Integer, Integer>(1, 3));
list1.add(new Tuple2<Integer, Integer>(2, 4));
list1.add(new Tuple2<Integer, Integer>(3, 4));
JavaPairRDD<Integer, Integer> nums1 = sc.parallelizePairs(list1);
class comp implements Comparator<Integer>, Serializable {

public int compare(Integer a, Integer b) {
return a.compareTo(b);
}
};
JavaPairRDD<Integer,Integer> results=nums1.sortByKey(new comp());
for(Tuple2<Integer,Integer> tuple: results.collect()){
System.out.println(tuple._1()+": "+tuple._2());
}
7.數據分區
(1)創建數據分區
    在分布式程序中,通信的代價很大,控制數據分布以獲得最少的網絡傳輸可以極大地提升整體性能。Spark程序可以通過控制RDD分區的方式來減少通信消耗。只有當數據集多次在諸如連接這種基於鍵的操作中,分區才會有作用
    Spark中所有的鍵值對RDD都可以進行分區。系統會根據一個針對鍵的函數對元素進行分組。Spark可以確保同一組的鍵出現在一個節點上。
    舉個簡單的例子,應用如下:內存中保存着很大的用戶信息表,由(UserID,UserInfo[])組成的RDD,UserInfo是用戶所訂閱的所有主題列表。該應用會周期性地將這張表和一個小文件進行組合,這個小文件中存這過去5分鍾發生的時間,其實就是一系列(UserId,LinkInfo)RDD,其中LinkInfo是用戶訪問的鏈接的主題。我們需要對用戶訪問其未訂閱主題的頁面情況進行統計。我們可以使用Spark的join()操作進行組合操作。將兩者根據UserId連接之后,過濾出不在UserInfo[]中的LinkInfo,就是用戶訪問其未訂閱主題的情況。
List<Tuple2<String,Iterable<String>>> list1=new ArrayList<Tuple2<String, Iterable<String>>>();
list1.add(new Tuple2<String, Iterable<String>>("zhou",Arrays.asList("it","math")));
list1.add(new Tuple2<String, Iterable<String>>("gan",Arrays.asList("money","book")));
JavaPairRDD<
String,Iterable<String>> userData=sc.parallelizePairs(list1);
List<Tuple2<String,String>> list2=new ArrayList<Tuple2<String, String>>();
list2.add(new Tuple2<String, String>("zhou","it") );
list2.add(new Tuple2<String,String>("zhou","stock"));
list2.add(new Tuple2<String, String>("gan","money"));
list2.add(new Tuple2<String, String>("gan","book"));
JavaPairRDD<String,String> events=sc.parallelizePairs(list2);
JavaPairRDD<String, Tuple2<Iterable<String>, String>> joined = userData.join(events);
long a=joined.filter(
new Function<Tuple2<String, Tuple2<Iterable<String>, String>>, Boolean>() {
public Boolean call(Tuple2<String, Tuple2<Iterable<String>, String>> tuple) throws Exception {
boolean has = false;
Iterable<String> user=tuple._2()._1();
String link=tuple._2()._2();
for (String s : user) {
if (s.compareTo(link) == 0) {
has =
true;
break;
}
}
//保留不在用戶訂閱表中的RDD
return !has;
}
}

).count()
;
System.out.println(a);
輸出:1
    這段代碼可以正確運行,但是效率不高。因為每5分鍾就要進行一次join()操作,而我們對數據集如何分區卻一無所知。默認情況下,連接操作會將兩個數據集中的所有鍵的哈希值都求出來,將該哈希值相同的記錄通過網絡傳到同一台機器上,然后在那台機器上對所有鍵相同的記錄進行連接操作。因為userData表比每5分鍾出現的訪問日志表events要大很多,所以要浪費時間進行額外的工作:在每次調用時都對userDAta表進行哈希值計算和跨節點數據混洗,雖然這些數據從來不會變化。
    要解決此問題:在程序開始的時候,對userData表進行partitionBy()轉化操作,將這張表轉化為哈希分區。可以通過向patitionBy傳遞一個spark.HashPartitioner對象來實現該操作。
    Java自定義分區方式:
List<Tuple2<String,Iterable<String>>> list1=new ArrayList<Tuple2<String, Iterable<String>>>();
list1.add(new Tuple2<String, Iterable<String>>("zhou",Arrays.asList("it","math")));
list1.add(new Tuple2<String, Iterable<String>>("gan",Arrays.asList("money","book")));
JavaPairRDD<String,Iterable<String>> userData=sc.parallelizePairs(list1);//請注意,partitionBy是轉化操作
userData=userData.partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_AND_DISK());
    這樣以后在調用join()時,Spark就知道了了該RDD是根據鍵的哈希值來分區的,這樣在調用join()時,Spark就會利用這一點,只會對events進行數據混洗操作,將events中特定userId的記錄發送到userData的對應分區所在的那台機器上。這樣,需要網絡傳輸的數據就大大見笑了,程序運行的速度也顯著提高。
    請注意,我們還對userData 這個RDD進行了持久化操作默認情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成,將userData持久化之后,就能保證userData能夠在訪問時被快速獲取。
    *進一步解釋數據分區帶來的好處:
    如果沒有將partitionBy()轉化操作的結果進行持久化,那么后面每次用到這個RDD時都會重復對數據進行分區操作。不進行持久化會導致整個RDD譜系圖重新求值。那樣的話,partitionBy()帶來的好處就會抵消,導致重復對數據進行分區以及跨節點的混洗,和沒有指定分區方式時發生的情況是十分相似的。
(2)獲取數據分區的方式
接(1)中程序:
Optional<Partitioner> partitioner = userData.partitioner();
System.out.println(partitioner.get());
System.out.println(partitioner.isPresent());
    在Java中,通過調用partitioner()方法,可以獲取一個Optional<Partitioner>對象,這是Scala中用來存放可能存在的對象的容器類。對該對象調用isPresent()方法可以檢查其中是否有值,調用get可以獲取partitionBy()的參數->1個partitioner對象。
(3)從分區中獲益的操作
    Spark中的很多操作都引入了根據鍵跨結點進行混洗的過程。所有這些操作都會從數據分區中獲益。能夠從數據分區中獲益的操作有:groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),以及lockup()。
    對於像reduceByKey()這樣只作用於單個RDD的操作,運行在未分區的RDD的時候或導致每個鍵所有對應值都在每台機器上進行本地計算,只需要把本地最終歸約出的結果值從各工作節點傳回主節點,所以原本的網絡開銷就不太大。而對於諸如cogroup()和join()這樣的二元操作,預先進行數據分區會導致其中至少一個RDD(使用已知分區器的那個RDD)不發生數據混洗。如果兩個RDD使用同樣的分區方式,並且它們還緩存在同樣的機器上(比如一個RDD是通過mapValues()從另一個RDD中創建出來的,這兩個RDD就會擁有相同的鍵和分區方式),或者其中一個RDD還沒有計算出來,那么跨節點數據混洗就不會發生了。
(4)影響分區方式的操作
    所有會為生成的結果RDD設好分區方式的操作:cogroup(),groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues()(如果父RDD有分區方式的話),filter()(如果父RDD有分區方式的話)。其他所有操作生成的結果都不會存在特定的分區方式。
注意:     
    對於二元操作,輸出數據的分區方式取決於父RDD的分區方式。默認情況下,結果會采用哈希分區,分區的數量和操作的並行度是一樣的。如果其中一個父RDD已經設置過分區方式,那么結果就會采用那種分區方式;如果兩個父RDD都設置過分區方式,結果RDD會采用第一個RDD的分區方式。
8.示例程序-PageRank
     PageRank算法是一種從RDD分區中獲益的更復雜的算法,我們以它為例進行分析。PageRank算法用來根據外部文檔指向一個文檔的鏈接,對集合中每個文檔的重要程度賦一個度量值。該算法可以用於對網頁進行排序,當然,也可以用於排序科技文章或社交網絡中有影響的用戶。
    算法會維護兩個數據集,一個由(pageID,linklist[])組成,包含每個頁面的鏈接到的頁面的列表;另一個由(pageID,rank)元素組成,包含每個頁面的當前排序值。它按以下步驟進行計算:
     ① 將每個頁面的排序值初始化為1.0
          ②在每次迭代中,向每個有直接鏈接的頁面,發送一個值為rank(p)/numNeighbors(p)(出鏈數目)   的貢獻量
        ③將每個頁面的排序值設置為0.15+0.85*contributionsReceived
           最后兩步會重復幾個循環,在此過程中,算法會逐漸收斂於每個頁面的實際PageRank值。在實際操作中,收斂通常需要進行十個迭代。
下面用Spark來實現PageRank算法:
public class main {
private static class Sum implements Function2<Double, Double, Double> {
public Double call(Double a, Double b) {
return a + b;
}
}
public static void main(String args[]){
SparkConf conf =new SparkConf();
conf.setAppName("my spark app");
conf.setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
JavaRDD<String> inputs= sc.textFile("C:\\url.txt");
/*
#以下是url的內容:
www.baidu.com www.hao123.com
www.baidu.com www.2345.com
www.baidu.com www.zhouyang.com
www.hao123.com www.baidu.com
www.hao123.com www.zhouyang.com
www.zhouyang.com www.baidu.com
*/
JavaPairRDD<String, Iterable<String>> links = inputs.mapToPair(
new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) throws Exception {
String[] parts = s.split(" ");
return new Tuple2<String, String>(parts[0], parts[1]);
}
}
).distinct().groupByKey().cache();
JavaPairRDD<String, Double> ranks = links.mapValues(
new Function<Iterable<String>, Double>() {
public Double call(Iterable<String> v1) throws Exception {
return 1.0;
}
}
);
JavaPairRDD<String, Tuple2<Iterable<String>, Double>> join = links.join(ranks);

for(int current=0;current<10;current++){
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
int urlCount = Iterables.size(s._1());
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
for (String n : s._1()) {
results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
}
return results;
}
});

// Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() {
public Double call(Double sum) {
return 0.15 + sum * 0.85;
}
});

}
// Collects all URL ranks and dump them to console.
List<Tuple2<String, Double>> output = ranks.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + " has rank: " + tuple._2() + ".");
}
}
}

9.Java設置自定義分區方式
    Spark允許你通過自定義Partitioner對象來控制RDD的分區方式,這樣可以讓你利用領域知識進一步減少通信消耗。
    舉個例子,假設我們要在一個網頁的集合上運行前一屆中的PageRank算法。在這里,每個頁面的ID是頁面的URL。當我們使用簡單的哈希函數進行分區時,擁有相似的URL的頁面比如 http://www.baidu.com/news 與 http://www.baidu.com/map 可能被分在完全不同的節點上。但是我們知道,同一個域名下的網頁更有可能相互連接。由於PageRank需要在每次迭代中從每個頁面向它所有相鄰的頁面發送一條消息,因襲把這些頁面分組在同一個分區中會更好。可以使用自定義的分區器來實現僅根據域名而不是整個URL進行分區。
    要實現先自定義Partitioner,需要繼承Partitioner類並實現其下述方法:
     public int numPartitions()
         返回創建的分區數量
          public int getPartition(Object key)
         返回給定鍵的分區編號
          public boolean equals(Object obj)
    Spark需要這個方法來檢查分區器對象是否與其他分區器實例相同,這樣Spark才能判斷兩個RDD的分區方式是否相同。

class myPartitioner extends Partitioner{

int num;//分區數目

public myPartitioner(int num){//構造方法,初始化num
this.num=num;
}
@Override
public int numPartitions() {//返回分區數目
return num;
}

@Override
public int getPartition(Object key) {
String url =(String)key;
String domain="";
try {
domain=new URL(url).getHost();//獲取域名
} catch (MalformedURLException e) {
e.printStackTrace();
}
int code =domain.hashCode()%num;//獲取該域名對應的hash
if(code<0){//getPartition()方法只能返回非負數,對負數進行處理
code+=num;
}
return code;
}

@Override
public boolean equals(Object obj) {
if(obj instanceof myPartitioner){//如果objmyPartitioner的實例
return ((myPartitioner) obj).num==num;//看分區數是否相同
}
else//否則直接返回false
return false;
}
}
    































免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM