I.Flink Table API & SQL
used in Flink SQL and Table APIs.
1. Flink內置函數和自定義函數
使用內置函數
自定義函數並使用
2.flink 提供了一個module的概念,使用戶能擴展flink的內置對象
flink內置了CoreModule
/**
* Modules define a set of metadata, including functions, user defined types, operators, rules, etc.
* Metadata from modules are regarded as built-in or system metadata that users can take advantages of.
*/
並且提供了一個hive module,允許用戶在加載了hive module之后使用hive的函數,包括內置函數、自定義hive函數
org.apache.flink.table.module.hive.HiveModule
org.apache.flink.table.functions.hive.conversion
用戶還可以自定義module
Another example is users can load an out-of-shelf Hive module to use Hive built-in functions as Flink built-in functions.
多個module里有重名的函數,則以先加載的函數為准。
源碼中:
Hive UDF 和 GenericUDF 函數會自動轉換成 Flink 中的 ScalarFunction,
GenericUDTF 會被自動轉換成 Flink 中的 TableFunction,
UDAF 和 GenericUDAFResolver2 則轉換成 Flink 聚合函數(AggregateFunction)
關系型數據庫中:database.schema.table
其他
分布式數據庫中:catalog.database.table
這里的自定義函數,主要指在 Flink Table API & SQL 這個層級的自定義函數,注意和Datastream有所區別
1.函數區分
1.從兩個角度來區分函數
從函數的擁有來講: 系統內建函數 和自定義函數 system (or built-in) functions v.s. catalog functions
從生命周期來講: 分為臨時函數和永久函數 temporary functions v.s. persistent functions
2.因此組合的方式:
Temporary system functions
System functions
Temporary catalog functions
Catalog functions
2.如何使用函數:
精確引用 1.10版本及以上使用
模糊引用: 在SQL語句中使用,針對同名的情況,引用的順序是:
Temporary system function
System function
Temporary catalog function, in the current catalog and current database of the session
Catalog function, in the current catalog and current database of the session
3.系統內建函數
Scalar Functions 標量型函數
算數函數 代數函數(= != > isnull in exists between) 邏輯函數(and or )
字符函數 時間函數
條件函數 case when COALESCE IS_DIGIT
函數類型轉換 CAST
Collection函數: ARRAY Map
分組函數 Hash值函數
Aggregate Functions 聚合型函數
max min sum count
COLLECT
ROW_NUMBER DENSE_RANK RANK()
ROW_NUMBER 1 2 3 4
dense_rank 函數在生成序號時是連續的,1 2 2 3 dense稠密
rank 函數生成的序號有可能不連續。 1 2 2 4
Column Functions 列函數 -Column functions are only used in Table API.
withColumns
withoutColumns
II.Flink的UDF
擴展了查詢的表達能力,同時可以把這種表達能力開放出去
基於JVM語言的UDF: Java Scala
1.UDF自定義函數類型
Scalar functions map scalar values to a new scalar value.
Table functions map scalar values to new rows.
Aggregate functions map scalar values of multiple rows to a new scalar value.
Table aggregate functions map scalar values of multiple rows to new rows.
Async table functions are special functions for table sources that perform a lookup.
從與Hive比較角度: UDF UDTF UDAF
Flink自身又有說細分和增加
版本的不同:
UDF UDTF: org.apache.flink.table.types.DataType
UDAF : org.apache.flink.api.common.typeinfo.TypeInformation
aggregate 這部分正在重構,目前是使用TypeInformation,重構后使用DataType
(注意: Flink設計類型信息的有
TypeInformation org.apache.flink.api.common.typeinfo.Types
org.apache.flink.api.common.typeinfo.TypeInformation
Type org.apache.flink.table.api.Types
DataType org.apache.flink.table.types.DataType 1.9版本以后移除了對 TypeInformation 的依賴
)
* @see ScalarFunction org.apache.flink.table.functions.
* @see TableFunction org.apache.flink.table.functions.
* @see AggregateFunction
* @see TableAggregateFunction
* @see AsyncTableFunction
2.UDF編寫和使用
如何編寫
如何調用: both Table API and SQL.
For SQL queries , a function must always be registered under a name.
For Table API , a function can be registered or directly used inline
示例:
0.編寫UDF
// define function logic
public static class SubstringFunction extends ScalarFunction {
public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, end);
}
}
###使用UDF
1.對於SQL來講,需要注冊,然后在SQL中使用
// register function
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
// call registered function in SQL
env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
2.對於TableAPI來說,可以直接用,或者注冊后再在Table API中使用
// call function "inline" without registration in Table API
env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
// register function
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
// call registered function in Table API
env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
3.UDF具體說明
Udf提供了open()和close()方法,可以被復寫,功能類似Dataset和DataStream API的RichFunction方法
1.UDF繼承 ScalarFunction 抽象類,主要實現 eval 方法。
輸出一行
org.apache.flink.table.functions
public abstract class ScalarFunction extends UserDefinedFunction {}
注意:返回值類型:
基本的返回值類型 和自定義復雜的返回值類型
復雜的可能要實現方法: getResultType()
2.UDF繼承 TableFunction 抽象類,主要實現 eval 方法。
輸出任意數目的行數。返回的行也可以包含一個或者多個列,通過提供 provide a collect(T) method
org.apache.flink.table.functions
public abstract class TableFunction<T> extends UserDefinedFunction {}
3.Aggregation Functions
The following methods are mandatory for each AggregateFunction:
createAccumulator()
accumulate()
getValue()
Spark SQL的UDAF UserDefinedAggregateFunction
Flink: org.apache.flink.table.functions.AggregateFunction
public abstract class AggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {
<IN, ACC, OUT>
必不可少的: createAccumulator() accumulate() getValue()
The following methods of AggregateFunction are required depending on the use case
merge()方法在會話組窗口(session group window)上下文中是必須的
retract()
resetAccumulator()
Spark中 org.apache.spark.sql.expressions
public abstract class UserDefinedAggregateFunction extends Serializable{}
inputSchema bufferSchema dataType
initialize update merge evaluate
Hive: org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
1)需繼承AbstractGenericUDAFResolver抽象類,重寫方法getEvaluator(TypeInfo[] parameters);
2)內部靜態類需繼承GenericUDAFEvaluator抽象類,重寫方法 init()
實現方法 getNewAggregationBuffer() reset() iterate() terminatePartial() merge() terminate()
4.Table Aggregation Functions
TableAggregateFunction
createAccumulator()
accumulate()
The following methods of TableAggregateFunction are required depending on the use case:
retract() is required for aggregations on bounded OVER windows.
merge() is required for many batch aggregations and session window aggregations.
resetAccumulator() is required for many batch aggregations.
emitValue() is required for batch and window aggregations.
emitUpdateWithRetract
III、Flink module
module 可以使用Hive的內置函數和定義函數
具體待成熟后再研究
參考:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/modules.html#hivemodule
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_functions.html
flink modules詳解之使用hive函數 https://www.jianshu.com/p/95bf94060855