0 簡介
自定義函數(UDF)是一種擴展開發機制,可以用來在查詢語句里調用難以用其他方式表達的頻繁使用或自定義的邏輯。
自定義函數可以用 JVM 語言(例如 Java 或 Scala)或 Python 實現,實現者可以在 UDF 中使用任意第三方庫,本文聚焦於使用 JVM 語言開發自定義函數。
1 概述
當前 Flink 有如下幾種函數:
- 標量函數 將標量值轉換成一個新標量值;
- 表值函數 將標量值轉換成新的行數據;
- 聚合函數 將多行數據里的標量值轉換成一個新標量值;
- 表值聚合函數 將多行數據里的標量值轉換成新的行數據;
- 異步表值函數 是異步查詢外部數據系統的特殊函數。
注意 標量和表值函數已經使用了新的基於數據類型的類型系統,聚合函數仍然使用基於 TypeInformation
的舊類型系統。
以下示例展示了如何創建一個基本的標量函數,以及如何在 Table API 和 SQL 里調用這個函數。
函數用於 SQL 查詢前要先經過注冊;而在用於 Table API 時,函數可以先注冊后調用,也可以 內聯 后直接使用。
import org.apache.flink.table.api._ import org.apache.flink.table.functions.ScalarFunction // define function logic class SubstringFunction extends ScalarFunction { def eval(s: String, begin: Integer, end: Integer): String = { s.substring(begin, end) } } val env = TableEnvironment.create(...) // 在 Table API 里不經注冊直接“內聯”調用函數 env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12)) // 注冊函數 env.createTemporarySystemFunction("SubstringFunction", classOf[SubstringFunction]) // 在 Table API 里調用注冊好的函數 env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12)) // 在 SQL 里調用注冊好的函數 env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable")
對於交互式會話,還可以在使用或注冊函數之前對其進行參數化,這樣可以把函數 實例 而不是函數 類 用作臨時函數。
為確保函數實例可應用於集群環境,參數必須是可序列化的。
import org.apache.flink.table.api._ import org.apache.flink.table.functions.ScalarFunction // 定義可參數化的函數邏輯 class SubstringFunction(val endInclusive) extends ScalarFunction { def eval(s: String, begin: Integer, end: Integer): String = { s.substring(endInclusive ? end + 1 : end) } } val env = TableEnvironment.create(...) // 在 Table API 里不經注冊直接“內聯”調用函數 env.from("MyTable").select(call(new SubstringFunction(true), $"myField", 5, 12)) // 注冊函數 env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(true))
2 開發指南
注意在聚合函數使用新的類型系統前,本節僅適用於標量和表值函數。
所有的自定義函數都遵循一些基本的實現原則。
2.1 函數類
實現類必須繼承自合適的基類之一(例如 org.apache.flink.table.functions.ScalarFunction
)。
該類必須聲明為 public
,而不是 abstract
,並且可以被全局訪問。不允許使用非靜態內部類或匿名類。
為了將自定義函數存儲在持久化的 catalog 中,該類必須具有默認構造器,且在運行時可實例化。
2.2 求值方法
基類提供了一組可以被重寫的方法,例如 open()
、 close()
或 isDeterministic()
。
但是,除了上述方法之外,作用於每條傳入記錄的主要邏輯還必須通過專門的 求值方法 來實現。
根據函數的種類,后台生成的運算符會在運行時調用諸如 eval()
、accumulate()
或 retract()
之類的求值方法。
這些方法必須聲明為 public
,並帶有一組定義明確的參數。
常規的 JVM 方法調用語義是適用的。因此可以:
- 實現重載的方法,例如
eval(Integer)
和eval(LocalDateTime)
; - 使用變長參數,例如
eval(Integer...)
; - 使用對象繼承,例如
eval(Object)
可接受LocalDateTime
和Integer
作為參數; - 也可組合使用,例如
eval(Object...)
可接受所有類型的參數。
以下代碼片段展示了一個重載函數的示例:
import org.apache.flink.table.functions.ScalarFunction import scala.annotation.varargs // 有多個重載求值方法的函數 class SumFunction extends ScalarFunction { def eval(a: Integer, b: Integer): Integer = { a + b } def eval(a: String, b: String): Integer = { Integer.valueOf(a) + Integer.valueOf(b) } @varargs // generate var-args like Java def eval(d: Double*): Integer = { d.sum.toInt } }
2.3 類型推導
Table(類似於 SQL 標准)是一種強類型的 API。因此,函數的參數和返回類型都必須映射到數據類型。
從邏輯角度看,Planner 需要知道數據類型、精度和小數位數;從 JVM 角度來看,Planner 在調用自定義函數時需要知道如何將內部數據結構表示為 JVM 對象。
術語 類型推導 概括了意在驗證輸入值、派生出參數/返回值數據類型的邏輯。
Flink 自定義函數實現了自動的類型推導提取,通過反射從函數的類及其求值方法中派生數據類型。如果這種隱式的反射提取方法不成功,則可以通過使用 @DataTypeHint
和 @FunctionHint
注解相關參數、類或方法來支持提取過程,下面展示了有關如何注解函數的例子。
如果需要更高級的類型推導邏輯,實現者可以在每個自定義函數中顯式重寫 getTypeInference()
方法。但是,建議使用注解方式,因為它可使自定義類型推導邏輯保持在受影響位置附近,而在其他位置則保持默認狀態。
自動類型推導
自動類型推導會檢查函數的類和求值方法,派生出函數參數和結果的數據類型, @DataTypeHint
和 @FunctionHint
注解支持自動類型推導。
有關可以隱式映射到數據類型的類的完整列表,請參閱數據類型。
@DataTypeHint
在許多情況下,需要支持以 內聯 方式自動提取出函數參數、返回值的類型。
以下例子展示了如何使用 @DataTypeHint
,詳情可參考該注解類的文檔。
import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.annotation.InputGroup import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row import scala.annotation.varargs // function with overloaded evaluation methods class OverloadedFunction extends ScalarFunction { // no hint required def eval(a: Long, b: Long): Long = { a + b } // 定義 decimal 的精度和小數位 @DataTypeHint("DECIMAL(12, 3)") def eval(double a, double b): BigDecimal = { java.lang.BigDecimal.valueOf(a + b) } // 定義嵌套數據類型 @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>") def eval(Int i): Row = { Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i)) } // 允許任意類型的符入,並輸出定制序列化后的值 @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer]) def eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o): java.nio.ByteBuffer = { MyUtils.serializeToByteBuffer(o) } }
@FunctionHint
有時我們希望一種求值方法可以同時處理多種數據類型,有時又要求對重載的多個求值方法僅聲明一次通用的結果類型。
@FunctionHint
注解可以提供從入參數據類型到結果數據類型的映射,它可以在整個函數類或求值方法上注解輸入、累加器和結果的數據類型。可以在類頂部聲明一個或多個注解,也可以為類的所有求值方法分別聲明一個或多個注解。所有的 hint 參數都是可選的,如果未定義參數,則使用默認的基於反射的類型提取。在函數類頂部定義的 hint 參數被所有求值方法繼承。
以下例子展示了如何使用 @FunctionHint
,詳情可參考該注解類的文檔。
import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.annotation.FunctionHint import org.apache.flink.table.functions.TableFunction import org.apache.flink.types.Row // 為函數類的所有求值方法指定同一個輸出類型 @FunctionHint(output = new DataTypeHint("ROW<s STRING, i INT>")) class OverloadedFunction extends TableFunction[Row] { def eval(a: Int, b: Int): Unit = { collect(Row.of("Sum", Int.box(a + b))) } // overloading of arguments is still possible def eval(): Unit = { collect(Row.of("Empty args", Int.box(-1))) } } // 解耦類型推導與求值方法,類型推導完全取決於 @FunctionHint @FunctionHint( input = Array(new DataTypeHint("INT"), new DataTypeHint("INT")), output = new DataTypeHint("INT") ) @FunctionHint( input = Array(new DataTypeHint("BIGINT"), new DataTypeHint("BIGINT")), output = new DataTypeHint("BIGINT") ) @FunctionHint( input = Array(), output = new DataTypeHint("BOOLEAN") ) class OverloadedFunction extends TableFunction[AnyRef] { // an implementer just needs to make sure that a method exists // that can be called by the JVM @varargs def eval(o: AnyRef*) = { if (o.length == 0) { collect(Boolean.box(false)) } collect(o(0)) } }
定制類型推導
在大多數情況下,@DataTypeHint
和 @FunctionHint
足以構建自定義函數,然而通過重寫 getTypeInference()
定制自動類型推導邏輯,實現者可以創建任意像系統內置函數那樣有用的函數。
以下用 Java 實現的例子展示了定制類型推導的潛力,它根據字符串參數來確定函數的結果類型。該函數帶有兩個字符串參數:第一個參數表示要分析的字符串,第二個參數表示目標類型。
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.types.Row; public static class LiteralFunction extends ScalarFunction { public Object eval(String s, String type) { switch (type) { case "INT": return Integer.valueOf(s); case "DOUBLE": return Double.valueOf(s); case "STRING": default: return s; } } // 禁用自動的反射式類型推導,使用如下邏輯進行類型推導 @Override public TypeInference getTypeInference(DataTypeFactory typeFactory) { return TypeInference.newBuilder() // 指定輸入參數的類型,必要時參數會被隱式轉換 .typedArguments(DataTypes.STRING(), DataTypes.STRING()) // specify a strategy for the result data type of the function .outputTypeStrategy(callContext -> { if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) { throw callContext.newValidationError("Literal expected for second argument."); } // 基於字符串值返回數據類型 final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING"); switch (literal) { case "INT": return Optional.of(DataTypes.INT().notNull()); case "DOUBLE": return Optional.of(DataTypes.DOUBLE().notNull()); case "STRING": default: return Optional.of(DataTypes.STRING()); } }) .build(); } }
2.4 運行時集成
有時候自定義函數需要獲取一些全局信息,或者在真正被調用之前做一些配置(setup)/清理(clean-up)的工作。自定義函數也提供了 open()
和 close()
方法,你可以重寫這兩個方法做到類似於 DataStream API 中 RichFunction
的功能。
open() 方法在求值方法被調用之前先調用。close() 方法在求值方法調用完之后被調用。
open() 方法提供了一個 FunctionContext,它包含了一些自定義函數被執行時的上下文信息,比如 metric group、分布式文件緩存,或者是全局的作業參數等。
下面的信息可以通過調用 FunctionContext
的對應的方法來獲得:
方法 | 描述 |
---|---|
getMetricGroup() |
執行該函數的 subtask 的 Metric Group。 |
getCachedFile(name) |
分布式文件緩存的本地臨時文件副本。 |
getJobParameter(name, defaultValue) |
跟對應的 key 關聯的全局參數值。 |
下面的例子展示了如何在一個標量函數中通過 FunctionContext 來獲取一個全局的任務參數:
import org.apache.flink.table.api._ import org.apache.flink.table.functions.FunctionContext import org.apache.flink.table.functions.ScalarFunction class HashCodeFunction extends ScalarFunction { private var factor: Int = 0 override def open(context: FunctionContext): Unit = { // 獲取參數 "hashcode_factor" // 如果不存在,則使用默認值 "12" factor = context.getJobParameter("hashcode_factor", "12").toInt } def eval(s: String): Int = { s.hashCode * factor } } val env = TableEnvironment.create(...) // 設置任務參數 env.getConfig.addJobParameter("hashcode_factor", "31") // 注冊函數 env.createTemporarySystemFunction("hashCode", classOf[HashCodeFunction]) // 調用函數 env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable")