Flink UDF--Table Functions&Aggregation Functions


1.Table Functions 表函數

   與標量函數相似之處是輸入可以0,1,或者多個參數,但是不同之處可以輸出任意數目的行數。返回的行也可以包含一個或者多個列。

   為了自定義表函數,需要繼承TableFunction,實現一個或者多個evaluation方法。表函數的行為定義在這些evaluation方法內部,函數名為eval並且必須是public。TableFunction可以重載多個eval方法。Evaluation方法的輸入參數類型,決定着表函數的輸入類型。Evaluation方法也支持變參,例如:eval(String... strs)。返回表的類型取決於TableFunction的基本類型。Evaluation方法使用collect(T)發射輸出rows。

   在Table API中,表函數在scala語言中使用方法如下:.join(Expression) 或者 .leftOuterJoin(Expression),在java語言中使用方法如下:.join(String) 或者.leftOuterJoin(String)。

  • Join操作算子會使用表函數(操作算子右邊的表)產生的所有行進行(cross) join 外部表(操作算子左邊的表)的每一行。
  • leftOuterJoin操作算子會使用表函數(操作算子右邊的表)產生的所有行進行(cross) join 外部表(操作算子左邊的表)的每一行,並且在表函數返回一個空表的情況下會保留所有的outer rows。

在sql語法中稍微有點區別:

  • cross join用法是LATERAL TABLE(<TableFunction>)。
  • LEFT JOIN用法是在join條件中加入ON TRUE。

下面的例子講的是如何使用表值函數。

// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).

public class Split extends TableFunction<Tuple2<String, Integer>> {

  private String separator = " ";
  public Split(String separator) {
      this.separator = separator;
  }
  public void eval(String str) {
      for (String s : str.split(separator)) {
          // use collect(...) to emit a row
          collect(new Tuple2<String, Integer>(s, s.length()));
      }
  }
}
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ...         // table schema: [a: String]
// Register the function.
tableEnv.registerFunction("split", new Split("#"));
// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join("split(a) as (word, length)").select("a, word, length");

myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");

// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");

 

   需要注意的是PROJO類型不需要一個確定的字段順序。意味着你不能使用as修改表函數返回的pojo的字段的名字。

   默認情況下TableFunction返回值類型是由flink類型抽取工具決定。對於基礎類型及簡單的POJOS是足夠的,但是更復雜的類型,自定義類型,組合類型,會報錯。這種情況下,返回值類型的TypeInformation,需要手動指定,方法是重載TableFunction#getResultType()。

下面的例子,我們通過復寫TableFunction#getResultType()方法使得表返回類型是RowTypeInfo(String, Integer)

public class CustomTypeSplit extends TableFunction<Row> {
  public void eval(String str) {
      for (String s : str.split(" ")) {
          Row row = new Row(2);
          row.setField(0, s);
          row.setField(1, s.length);
          collect(row);
      }
  }
  @Override
  public TypeInformation<Row> getResultType() {
      return Types.ROW(Types.STRING(), Types.INT());
  }
}

 

2.Aggregation Functions 聚合函數

   用戶自定義聚合函數聚合一張表(一行或者多行,一行有一個或者多個屬性)為一個標量的值。

   聚合函數需要繼承AggregateFunction。聚合函數工作方式如下:

  • 首先,需要一個accumulator,這個是保存聚合中間結果的數據結構。調用AggregateFunction函數的createAccumulator()方法來創建一個空accumulator.

  • 隨后,每個輸入行都會調用accumulate()方法來更新accumulator。一旦所有的行被處理了,getValue()方法就會被調用,計算和返回最終的結果。

對於每個AggregateFunction,下面三個方法都是比不可少的:

createAccumulator()

accumulate()

getValue()

 

   flink的類型抽取機制不能識別復雜的數據類型,比如,數據類型不是基礎類型或者簡單的pojos類型。所以,類似於ScalarFunction 和TableFunction,AggregateFunction提供了方法去指定返回結果類型的TypeInformation,用的是AggregateFunction#getResultType()。Accumulator類型用的是AggregateFunction#getAccumulatorType()。

   除了上面的方法,還有一些可選的方法。有些方法是讓系統更加高效的執行查詢,另外的一些在特定的場景下是必須的。

   例如,merge()方法在會話組窗口(session group window)上下文中是必須的。當一行數據是被視為跟兩個會話窗口相關的時候,兩個會話窗口的accumulators需要被join

AggregateFunction的下面幾個方法,根據使用場景的不同需要被實現:

  • retract():在bounded OVER窗口的聚合方法中是需要實現的。
  • merge():在很多 batch 聚合和會話窗口聚合是必須的。
  • resetAccumulator(): 在大多數batch聚合是必須的。

AggregateFunction的所有方法都是需要被聲明為public,而不是static。定義聚合函數需要實現org.apache.flink.table.functions.AggregateFunction同時需要實現一個或者多個accumulate方法。該方法可以被重載為不同的數據類型,並且支持變參。

   為了計算加權平均值,累加器需要存儲已累積的所有數據的加權和及計數。在例子中定義一個WeightedAvgAccum類作為accumulator。盡管,retract(), merge(), 和resetAccumulator()方法在很多聚合類型是不需要的,這里也給出了例子。

 
/**
* Accumulator for WeightedAvg.
*/
public static class WeightedAvgAccum {
  public long sum = 0;
  public int count = 0;
}
/**
* Weighted Average user-defined aggregate function.
*/
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
  @Override public WeightedAvgAccum createAccumulator() {
    return new WeightedAvgAccum();
  }
  @Override public Long getValue(WeightedAvgAccum acc) {
      if (acc.count == 0) {
          return null;
      } else {
          return acc.sum / acc.count;
      }
  }
  public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
      acc.sum += iValue * iWeight;
      acc.count += iWeight;
  }

//其他方法
public void retract(WeightedAvgAccum acc, long iValue, int iWeight) { acc.sum -= iValue * iWeight; acc.count -= iWeight; } public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) { Iterator<WeightedAvgAccum> iter = it.iterator(); while (iter.hasNext()) { WeightedAvgAccum a = iter.next(); acc.count += a.count; acc.sum += a.sum; } } public void resetAccumulator(WeightedAvgAccum acc) { acc.count = 0; acc.sum = 0L; } }

//register function StreamTableEnvironment tEnv = ... tEnv.registerFunction("wAvg", new WeightedAvg()); //use function tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");

 

3.udf的最佳實踐經驗


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM