1、reduce操作,在分組的dataset上使用,也可以在不分組的dataset上使用
應用於分組DataSet的Reduce轉換使用用戶定義的reduce函數將每個組減少為單個元素。對於每組輸入元素,reduce函數連續地將元素對組合成一個元素,直到每個組只剩下一個元素。
注意,對於ReduceFunction,返回對象的key字段應與輸入值匹配。這是因為reduce是可隱式組合(combine)的,並且從combine運算符發出的對象在傳遞給reduce運算符時再次按key分組。
1.1 使用key表達式的dataset進行reduce
key表達式指定DataSet的每個元素的一個或多個字段。每個key表達式都是公共字段的名稱或getter方法。用點被用於向下鑽取對象。key表達式“*”選擇所有字段。以下代碼顯示如何使用key表達式對POJO DataSet進行分組,並使用reduce函數對其進行規約。
// some ordinary POJO
public class WC {
public String word;
public int count;
// [...]
}
// ReduceFunction that sums Integer attributes of a POJO
public class WordCounter implements ReduceFunction<WC> {
@Override
public WC reduce(WC in1, WC in2) {
return new WC(in1.word, in1.count + in2.count);
}
}
// [...]
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
// DataSet grouping on field "word"
.groupBy("word")
// apply ReduceFunction on grouped DataSet
.reduce(new WordCounter());
1.2 使用KeySelector函數的dataset上進行reduce
key選擇器函數從DataSet的每個元素中提取鍵值。提取的key用於對DataSet進行分組。以下代碼顯示如何使用鍵選擇器函數對POJO DataSet進行分組,並使用reduce函數對其進行規約操作。 // some ordinary POJO public class WC { public String word; public int count; // [...] } // ReduceFunction that sums Integer attributes of a POJO public class WordCounter implements ReduceFunction<WC> { @Override public WC reduce(WC in1, WC in2) { return new WC(in1.word, in1.count + in2.count); } } // [...] DataSet<WC> words = // [...] DataSet<WC> wordCounts = words // DataSet grouping on field "word" .groupBy(new SelectWord()) // apply ReduceFunction on grouped DataSet .reduce(new WordCounter()); public class SelectWord implements KeySelector<WC, String> { @Override public String getKey(Word w) { return w.word; } }
1.3 在Tuple元組上應用的reduce,可以使用數字來指明字段位置,類似索引
字段位置鍵指定一個或多個字段用於分組 DataSet<Tuple3<String, Integer, Double>> tuples = // [...] DataSet<Tuple3<String, Integer, Double>> reducedTuples = tuples // group DataSet on first and second field of Tuple .groupBy(0, 1) // apply ReduceFunction on grouped DataSet .reduce(new MyTupleReducer());
1.4 在整個數據集上應用reduce
Reduce轉換可以將用戶定義的reduce函數應用於DataSet的所有元素。 reduce函數隨后將元素對組合成一個元素,直到只剩下一個元素。 使用Reduce轉換規約完整的DataSet意味着最終的Reduce操作不能並行完成。但是,reduce函數可以自動組合,因此Reduce轉換不會限制大多數用例的可伸縮性 以下代碼顯示如何對Integer DataSet的所有元素求和: // ReduceFunction that sums Integers public class IntSummer implements ReduceFunction<Integer> { @Override public Integer reduce(Integer num1, Integer num2) { return num1 + num2; } } // [...] DataSet<Integer> intNumbers = // [...] DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
2、分組reduce,即GroupReduce
應用於分組DataSet的GroupReduce調用用戶定義的group-reduce函數轉換每個分組。
這與Reduce的區別在於用戶定義的函數會立即獲得整個組。在組的所有元素上使用Iterable調用該函數,並且可以返回任意數量的結果元素。
2.1 GroupReduce對於分組的鍵於redeuce相同
以下代碼顯示如何從Integer分組的DataSet中刪除重復的字符串。 public class DistinctReduce implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> { @Override public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) { Set<String> uniqStrings = new HashSet<String>(); Integer key = null; // add all strings of the group to the set for (Tuple2<Integer, String> t : in) { key = t.f0; uniqStrings.add(t.f1); } // emit all unique strings. for (String s : uniqStrings) { out.collect(new Tuple2<Integer, String>(key, s)); } } } // [...] DataSet<Tuple2<Integer, String>> input = // [...] DataSet<Tuple2<Integer, String>> output = input .groupBy(0) // group DataSet by the first tuple field .reduceGroup(new DistinctReduce()); // apply GroupReduceFunction
2.2 將GroupReduce應用於排序分組的數據集
group-reduce函數使用Iterable訪問組的元素。Iterable可以按指定的順序分發組的元素(可選)。在許多情況下,這可以幫助降低用戶定義的組減少功能的復雜性並提高其效率。 下面的代碼顯示了如何刪除由Integer分組並按String排序的DataSet中的重復字符串的另一個示例。 // GroupReduceFunction that removes consecutive identical elements public class DistinctReduce implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> { @Override public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) { Integer key = null; String comp = null; for (Tuple2<Integer, String> t : in) { key = t.f0; String next = t.f1; // check if strings are different if (com == null || !next.equals(comp)) { out.collect(new Tuple2<Integer, String>(key, next)); comp = next; } } } } // [...] DataSet<Tuple2<Integer, String>> input = // [...] DataSet<Double> output = input .groupBy(0) // group DataSet by first field .sortGroup(1, Order.ASCENDING) // sort groups on second tuple field .reduceGroup(new DistinctReduce());
3、可組合的GroupReduce功能
與reduce函數相比,group-reduce函數不是可隱式組合的。為了使group-reduce函數可組合,它必須實現GroupCombineFunction接口。 要點:GroupCombineFunction接口的通用輸入和輸出類型必須等於GroupReduceFunction的通用輸入類型,如以下示例所示: // Combinable GroupReduceFunction that computes a sum. public class MyCombinableGroupReducer implements GroupReduceFunction<Tuple2<String, Integer>, String>, GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { @Override public void reduce(Iterable<Tuple2<String, Integer>> in, Collector<String> out) { String key = null; int sum = 0; for (Tuple2<String, Integer> curr : in) { key = curr.f0; sum += curr.f1; } // concat key and sum and emit out.collect(key + "-" + sum); } @Override public void combine(Iterable<Tuple2<String, Integer>> in, Collector<Tuple2<String, Integer>> out) { String key = null; int sum = 0; for (Tuple2<String, Integer> curr : in) { key = curr.f0; sum += curr.f1; } // emit tuple with key and sum out.collect(new Tuple2<>(key, sum)); } }
4、GroupCombine 分組連接
GroupCombine轉換是可組合GroupReduceFunction中組合步驟的通用形式。它在某種意義上被概括為允許將輸入類型I組合到任意輸出類型O.
相反,GroupReduce中的組合步驟僅允許從輸入類型I到輸出類型I的組合。這是因為reduce步驟中,GroupReduceFunction期望輸入類型為I.
在一些應用中,期望在執行附加變換(例如,減小數據大小)之前將DataSet組合成中間格式。這可以通過CombineGroup轉換能以非常低的成本實現。
注意:分組數據集上的GroupCombine在內存中使用貪婪策略執行,該策略可能不會一次處理所有數據,而是以多個步驟處理。
它也可以在各個分區上執行,而無需像GroupReduce轉換那樣進行數據交換。這可能會導致輸出的是部分結果,
所以GroupCombine是不能替代GroupReduce操作的,盡管它們的操作內容可能看起來都一樣。
以下示例演示了如何將CombineGroup轉換用於備用WordCount實現。
DataSet<String> input = [..] // The words received as input DataSet<Tuple2<String, Integer>> combinedWords = input .groupBy(0) // group identical words .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() { public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine String key = null; int count = 0; for (String word : words) { key = word; count++; } // emit tuple with word and count out.collect(new Tuple2(key, count)); } }); DataSet<Tuple2<String, Integer>> output = combinedWords .groupBy(0) // group by words again .reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) { String key = null; int count = 0; for (Tuple2<String, Integer> word : words) { key = word; count++; } // emit tuple with word and count out.collect(new Tuple2(key, count)); } });