flink Reduce、GroupReduce、GroupCombine筆記


1、reduce操作,在分組的dataset上使用,也可以在不分組的dataset上使用

應用於分組DataSet的Reduce轉換使用用戶定義的reduce函數將每個組減少為單個元素。對於每組輸入元素,reduce函數連續地將元素對組合成一個元素,直到每個組只剩下一個元素。

注意,對於ReduceFunction,返回對象的key字段應與輸入值匹配。這是因為reduce是可隱式組合(combine)的,並且從combine運算符發出的對象在傳遞給reduce運算符時再次按key分組。

1.1 使用key表達式的dataset進行reduce

key表達式指定DataSet的每個元素的一個或多個字段。每個key表達式都是公共字段的名稱或getter方法。用點被用於向下鑽取對象。key表達式“*”選擇所有字段。以下代碼顯示如何使用key表達式對POJO DataSet進行分組,並使用reduce函數對其進行規約。



// some ordinary POJO
public class WC {
  public String word;
  public int count;
  // [...]
}

// ReduceFunction that sums Integer attributes of a POJO
public class WordCounter implements ReduceFunction<WC> {
  @Override
  public WC reduce(WC in1, WC in2) {
    return new WC(in1.word, in1.count + in2.count);
  }
}

// [...]
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
                         // DataSet grouping on field "word"
                         .groupBy("word")
                         // apply ReduceFunction on grouped DataSet
                         .reduce(new WordCounter());

1.2 使用KeySelector函數的dataset上進行reduce

key選擇器函數從DataSet的每個元素中提取鍵值。提取的key用於對DataSet進行分組。以下代碼顯示如何使用鍵選擇器函數對POJO DataSet進行分組,並使用reduce函數對其進行規約操作。


// some ordinary POJO
public class WC {
  public String word;
  public int count;
  // [...]
}

// ReduceFunction that sums Integer attributes of a POJO
public class WordCounter implements ReduceFunction<WC> {
  @Override
  public WC reduce(WC in1, WC in2) {
    return new WC(in1.word, in1.count + in2.count);
  }
}

// [...]
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
                         // DataSet grouping on field "word"
                         .groupBy(new SelectWord())
                         // apply ReduceFunction on grouped DataSet
                         .reduce(new WordCounter());

public class SelectWord implements KeySelector<WC, String> {
  @Override
  public String getKey(Word w) {
    return w.word;
  }
}

1.3 在Tuple元組上應用的reduce,可以使用數字來指明字段位置,類似索引

字段位置鍵指定一個或多個字段用於分組


DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
DataSet<Tuple3<String, Integer, Double>> reducedTuples = tuples
                                         // group DataSet on first and second field of Tuple
                                         .groupBy(0, 1)
                                         // apply ReduceFunction on grouped DataSet
                                         .reduce(new MyTupleReducer());

1.4 在整個數據集上應用reduce

Reduce轉換可以將用戶定義的reduce函數應用於DataSet的所有元素。 reduce函數隨后將元素對組合成一個元素,直到只剩下一個元素。

使用Reduce轉換規約完整的DataSet意味着最終的Reduce操作不能並行完成。但是,reduce函數可以自動組合,因此Reduce轉換不會限制大多數用例的可伸縮性

以下代碼顯示如何對Integer DataSet的所有元素求和:


// ReduceFunction that sums Integers
public class IntSummer implements ReduceFunction<Integer> {
  @Override
  public Integer reduce(Integer num1, Integer num2) {
    return num1 + num2;
  }
}

// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> sum = intNumbers.reduce(new IntSummer());

2、分組reduce,即GroupReduce

應用於分組DataSet的GroupReduce調用用戶定義的group-reduce函數轉換每個分組。
這與Reduce的區別在於用戶定義的函數會立即獲得整個組。在組的所有元素上使用Iterable調用該函數,並且可以返回任意數量的結果元素。

2.1 GroupReduce對於分組的鍵於redeuce相同

以下代碼顯示如何從Integer分組的DataSet中刪除重復的字符串。


public class DistinctReduce
         implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {

  @Override
  public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {

    Set<String> uniqStrings = new HashSet<String>();
    Integer key = null;

    // add all strings of the group to the set
    for (Tuple2<Integer, String> t : in) {
      key = t.f0;
      uniqStrings.add(t.f1);
    }

    // emit all unique strings.
    for (String s : uniqStrings) {
      out.collect(new Tuple2<Integer, String>(key, s));
    }
  }
}

// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Tuple2<Integer, String>> output = input
                           .groupBy(0)            // group DataSet by the first tuple field
                           .reduceGroup(new DistinctReduce());  // apply GroupReduceFunction

2.2 將GroupReduce應用於排序分組的數據集

group-reduce函數使用Iterable訪問組的元素。Iterable可以按指定的順序分發組的元素(可選)。在許多情況下,這可以幫助降低用戶定義的組減少功能的復雜性並提高其效率。

下面的代碼顯示了如何刪除由Integer分組並按String排序的DataSet中的重復字符串的另一個示例。


// GroupReduceFunction that removes consecutive identical elements
public class DistinctReduce
         implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {

  @Override
  public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
    Integer key = null;
    String comp = null;

    for (Tuple2<Integer, String> t : in) {
      key = t.f0;
      String next = t.f1;

      // check if strings are different
      if (com == null || !next.equals(comp)) {
        out.collect(new Tuple2<Integer, String>(key, next));
        comp = next;
      }
    }
  }
}

// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Double> output = input
                         .groupBy(0)                         // group DataSet by first field
                         .sortGroup(1, Order.ASCENDING)      // sort groups on second tuple field
                         .reduceGroup(new DistinctReduce());

3、可組合的GroupReduce功能

與reduce函數相比,group-reduce函數不是可隱式組合的。為了使group-reduce函數可組合,它必須實現GroupCombineFunction接口。

要點:GroupCombineFunction接口的通用輸入和輸出類型必須等於GroupReduceFunction的通用輸入類型,如以下示例所示:


// Combinable GroupReduceFunction that computes a sum.
public class MyCombinableGroupReducer implements
  GroupReduceFunction<Tuple2<String, Integer>, String>,
  GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
{
  @Override
  public void reduce(Iterable<Tuple2<String, Integer>> in,
                     Collector<String> out) {

    String key = null;
    int sum = 0;

    for (Tuple2<String, Integer> curr : in) {
      key = curr.f0;
      sum += curr.f1;
    }
    // concat key and sum and emit
    out.collect(key + "-" + sum);
  }

  @Override
  public void combine(Iterable<Tuple2<String, Integer>> in,
                      Collector<Tuple2<String, Integer>> out) {
    String key = null;
    int sum = 0;

    for (Tuple2<String, Integer> curr : in) {
      key = curr.f0;
      sum += curr.f1;
    }
    // emit tuple with key and sum
    out.collect(new Tuple2<>(key, sum));
  }
}

4、GroupCombine 分組連接

GroupCombine轉換是可組合GroupReduceFunction中組合步驟的通用形式。它在某種意義上被概括為允許將輸入類型I組合到任意輸出類型O.
相反,GroupReduce中的組合步驟僅允許從輸入類型I到輸出類型I的組合。這是因為reduce步驟中,GroupReduceFunction期望輸入類型為I. 在一些應用中,期望在執行附加變換(例如,減小數據大小)之前將DataSet組合成中間格式。這可以通過CombineGroup轉換能以非常低的成本實現。 注意:分組數據集上的GroupCombine在內存中使用貪婪策略執行,該策略可能不會一次處理所有數據,而是以多個步驟處理。
它也可以在各個分區上執行,而無需像GroupReduce轉換那樣進行數據交換。這可能會導致輸出的是部分結果,
所以GroupCombine是不能替代GroupReduce操作的,盡管它們的操作內容可能看起來都一樣。 以下示例演示了如何將CombineGroup轉換用於備用WordCount實現。

DataSet<String> input = [..] // The words received as input

DataSet<Tuple2<String, Integer>> combinedWords = input
  .groupBy(0) // group identical words
  .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {

    public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
        String key = null;
        int count = 0;

        for (String word : words) {
            key = word;
            count++;
        }
        // emit tuple with word and count
        out.collect(new Tuple2(key, count));
    }
});

DataSet<Tuple2<String, Integer>> output = combinedWords
  .groupBy(0)                              // group by words again
  .reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange

    public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
        String key = null;
        int count = 0;

        for (Tuple2<String, Integer> word : words) {
            key = word;
            count++;
        }
        // emit tuple with word and count
        out.collect(new Tuple2(key, count));
    }
});

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM