Flink開發_Flink的SQL和TableAPI的UDF


used in Flink SQL and Table APIs.
 1. Flink內置函數和自定義函數
    使用內置函數
	自定義函數並使用

 2.flink 提供了一個module的概念,使用戶能擴展flink的內置對象
   flink內置了CoreModule
       /**
       * Modules define a set of metadata, including functions, user defined types, operators, rules, etc.
       * Metadata from modules are regarded as built-in or system metadata that users can take advantages of.
       */
   並且提供了一個hive module,允許用戶在加載了hive module之后使用hive的函數,包括內置函數、自定義hive函數
      org.apache.flink.table.module.hive.HiveModule
	  org.apache.flink.table.functions.hive.conversion
   用戶還可以自定義module
    Another example is users can load an out-of-shelf Hive module to use Hive built-in functions as Flink built-in functions.
	多個module里有重名的函數,則以先加載的函數為准。
	源碼中:
	  Hive UDF 和 GenericUDF 函數會自動轉換成 Flink 中的 ScalarFunction,
	  GenericUDTF 會被自動轉換成 Flink 中的 TableFunction,
	  UDAF 和 GenericUDAFResolver2 則轉換成 Flink 聚合函數(AggregateFunction)
 關系型數據庫中:database.schema.table
               其他
 分布式數據庫中:catalog.database.table 
   這里的自定義函數,主要指在 Flink Table API & SQL 這個層級的自定義函數,注意和Datastream有所區別

1.函數區分

1.從兩個角度來區分函數
    從函數的擁有來講: 系統內建函數 和自定義函數 system (or built-in) functions v.s. catalog functions
	從生命周期來講:   分為臨時函數和永久函數    temporary functions           v.s.  persistent functions
2.因此組合的方式:
    Temporary system functions
    System functions
    Temporary catalog functions
    Catalog functions

2.如何使用函數:

精確引用   1.10版本及以上使用
模糊引用: 在SQL語句中使用,針對同名的情況,引用的順序是:
   Temporary system function
   System function
   Temporary catalog function, in the current catalog and current database of the session
   Catalog function, in the current catalog and current database of the session

3.系統內建函數

Scalar Functions 標量型函數
    算數函數  代數函數(= != >  isnull in exists between)  邏輯函數(and or )
	字符函數  時間函數
	條件函數 case when   COALESCE  IS_DIGIT
	函數類型轉換  CAST
	Collection函數: ARRAY  Map 
	分組函數 Hash值函數
Aggregate Functions 聚合型函數
   max min sum count 
   COLLECT
   ROW_NUMBER   DENSE_RANK RANK()
      ROW_NUMBER 1 2 3 4
      dense_rank 函數在生成序號時是連續的,1 2  2 3    dense稠密
      rank      函數生成的序號有可能不連續。 1 2 2 4
Column Functions 列函數 -Column functions are only used in Table API.
   withColumns
   withoutColumns

II.Flink的UDF

 擴展了查詢的表達能力,同時可以把這種表達能力開放出去
  基於JVM語言的UDF: Java Scala

1.UDF自定義函數類型

        Scalar functions      map scalar values                  to a new scalar value.
         Table functions      map scalar values                  to new rows.
     Aggregate functions      map scalar values of multiple rows to a new scalar value.
    Table aggregate functions map scalar values of multiple rows to new rows.
     Async table functions are special functions for table sources that perform a lookup.
	 從與Hive比較角度: UDF UDTF UDAF
	 Flink自身又有說細分和增加
 版本的不同:
      UDF UDTF: org.apache.flink.table.types.DataType        
	  UDAF    : org.apache.flink.api.common.typeinfo.TypeInformation 
	            aggregate 這部分正在重構,目前是使用TypeInformation,重構后使用DataType
(注意: Flink設計類型信息的有
  TypeInformation  org.apache.flink.api.common.typeinfo.Types
                   org.apache.flink.api.common.typeinfo.TypeInformation
  Type             org.apache.flink.table.api.Types
  DataType         org.apache.flink.table.types.DataType  1.9版本以后移除了對 TypeInformation 的依賴
  )
    * @see ScalarFunction      org.apache.flink.table.functions.
    * @see TableFunction       org.apache.flink.table.functions.
    * @see AggregateFunction
    * @see TableAggregateFunction
	* @see AsyncTableFunction

2.UDF編寫和使用

 如何編寫
 如何調用: both Table API and SQL.
   For SQL queries , a function must always be registered under a name. 
   For Table API   , a function can be registered or directly used inline
示例:
0.編寫UDF
   // define function logic
   public static class SubstringFunction extends ScalarFunction {
     public String eval(String s, Integer begin, Integer end) {
       return s.substring(begin, end);
     }
   }
###使用UDF
1.對於SQL來講,需要注冊,然后在SQL中使用
  // register function
  env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
  // call registered function in SQL
  env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");

2.對於TableAPI來說,可以直接用,或者注冊后再在Table API中使用
  // call function "inline" without registration in Table API
  env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
  
  // register function
  env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
  // call registered function in Table API
  env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));

3.UDF具體說明

   Udf提供了open()和close()方法,可以被復寫,功能類似Dataset和DataStream API的RichFunction方法
 1.UDF繼承 ScalarFunction 抽象類,主要實現 eval 方法。
   輸出一行
   org.apache.flink.table.functions
     public abstract class ScalarFunction extends UserDefinedFunction {}
	 注意:返回值類型: 
	     基本的返回值類型 和自定義復雜的返回值類型
		  復雜的可能要實現方法: getResultType()
 2.UDF繼承 TableFunction 抽象類,主要實現 eval 方法。
    輸出任意數目的行數。返回的行也可以包含一個或者多個列,通過提供 provide a collect(T) method
    org.apache.flink.table.functions  
	 public abstract class TableFunction<T> extends UserDefinedFunction {}
3.Aggregation Functions
   The following methods are mandatory for each AggregateFunction:
     createAccumulator()
     accumulate()
     getValue()
    Spark SQL的UDAF  UserDefinedAggregateFunction
 Flink: org.apache.flink.table.functions.AggregateFunction
	public abstract class AggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {
	    <IN, ACC, OUT>
		必不可少的: createAccumulator() accumulate() getValue()
        The following methods of AggregateFunction are required depending on the use case
	       merge()方法在會話組窗口(session group window)上下文中是必須的
		   retract() 
		   resetAccumulator()
 Spark中 org.apache.spark.sql.expressions
   public abstract class UserDefinedAggregateFunction extends Serializable{}
    inputSchema  bufferSchema  dataType
    initialize  update  merge  evaluate
 Hive: org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
   1)需繼承AbstractGenericUDAFResolver抽象類,重寫方法getEvaluator(TypeInfo[] parameters);
   2)內部靜態類需繼承GenericUDAFEvaluator抽象類,重寫方法 init() 
      實現方法 getNewAggregationBuffer() reset() iterate() terminatePartial() merge() terminate() 

4.Table Aggregation Functions
  TableAggregateFunction
    createAccumulator()
    accumulate()
 The following methods of TableAggregateFunction are required depending on the use case:
     retract() is required for aggregations on bounded OVER windows.
     merge() is required for many batch aggregations and session window aggregations.
     resetAccumulator() is required for many batch aggregations.
     emitValue() is required for batch and window aggregations.
  emitUpdateWithRetract
 module 可以使用Hive的內置函數和定義函數
 具體待成熟后再研究

參考:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/modules.html#hivemodule
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_functions.html
flink modules詳解之使用hive函數  https://www.jianshu.com/p/95bf94060855


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM