從"UDF不應有狀態" 切入來剖析Flink SQL代碼生成
0x00 摘要
"Flink SQL UDF不應有狀態" 這個技術細節可能有些朋友已經知道了。但是為什么不應該有狀態呢?這個恐怕大家就不甚清楚了。本文就帶你一起從這個問題點入手,看看Flink SQL究竟是怎么處理UDF,怎么生成對應的SQL代碼。
0x01 概述結論
先說結論,后續一步步給大家詳述問題過程。
1. 問題結論
結論是:Flink內部針對UDF生成了java代碼,但是這些java代碼針對SQL做了優化,導致在某種情況下,可能 會對 "在SQL中本應只調用一次" 的UDF 重復調用。
- 我們在寫SQL時候,經常會在SQL中只寫一次UDF,我們認為運行時候也應該只調用一次UDF。
- 對於SQL,Flink是內部解析處理之后,把SQL語句轉化為Flink原生算子來處理。大家可以認為是把SQL翻譯成了java代碼再執行,這些代碼針對 SQL做了優化。
- 對於UDF,Flink也是內部生成java代碼來處理,這些代碼也針對SQL做了優化。
- 在Flink內部生成的這些代碼中,Flink會在某些特定情況下,對 "在SQL中本應只調用一次" 的UDF 重復調用。
- Flink生成的內部代碼,是把"投影運算"和"過濾條件"分別生成,然后拼接在一起。優化后的"投影運算"和"過濾條件"分別調用了UDF,所以拼接之后就會有多個UDF調用。
- 因為實際上編寫時候的一次UDF,優化后可能調用了多次,所以UDF內部就不應該有狀態信息。
比如:
1. myFrequency 這個字段是由 UDF_FRENQUENCY 這個UDF函數 在本步驟生成。
"SELECT word, UDF_FRENQUENCY(frequency) as myFrequency FROM TableWordCount"
2. 按說下面SQL語句就應該直接取出 myFrequency 即可。因為 myFrequency 已經存在了。
"SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0"
但是因為Flink做了一些優化,把 第一個SQL中 UDF_FRENQUENCY 的計算下推到了 第二個SQL。
3. 優化后實際就變成了類似這樣的SQL。
"SELECT word, UDF_FRENQUENCY(frequency) FROM tableFrequency WHERE UDF_FRENQUENCY(frequency) <> 0"
4. 所以UDF_FRENQUENCY就被執行了兩次:在WHERE中執行了一次,在SELECT中又執行了一次。
Flink針對UDF所生成的Java代碼 簡化轉義 版如下,能看出來調用了兩次:
// 原始 SQL "SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0"
java.lang.Long result$12 = UDF_FRENQUENCY(frequency); // 這次 UDF 調用對應 WHERE myFrequency <> 0
if (result$12 != 0) { // 這里說明 myFrequency <> 0,於是可以進行 SELECT
// 這里對應的是 SELECT myFrequency,注意的是,按我們一般的邏輯,應該直接復用result$12,但是這里又調用了 UDF,重新計算了一遍。所以 UDF 才不應該有狀態信息。
java.lang.Long result$9 = UDF_FRENQUENCY(frequency);
long select;
if (result$9 == null) {
select = -1L;
}
else {
select = result$9; // 這里最終 SELECT 了 myFrequency
}
}
2. 問題流程
實際上就是Flink生成SQL代碼的流程,其中涉及到幾個重要的節點舉例如下:
關於具體SQL流程,請參見我之前的文章:[源碼分析] 帶你梳理 Flink SQL / Table API內部執行流程
// NOTE : 執行順序是從上至下, " -----> " 表示生成的實例類型
*
* +-----> "SELECT xxxxx WHERE UDF_FRENQUENCY(frequency) <> 0" (SQL statement)
* |
* |
* +-----> LogicalFilter (RelNode) // Abstract Syntax Tree,未優化的RelNode
* |
* |
* FilterToCalcRule (RelOptRule) // Calcite優化rule
* |
* |
* +-----> LogicalCalc (RelNode) // Optimized Logical Plan,邏輯執行計划
* |
* |
* DataSetCalcRule (RelOptRule) // Flink定制的優化rule,轉化為物理執行計划
* |
* |
* +-----> DataSetCalc (FlinkRelNode) // Physical RelNode,物理執行計划
* |
* |
* DataSetCalc.translateToPlanInternal // 作用是生成Flink算子
* |
* |
* +-----> FlatMapRunner (Operator) // In Flink Task
* |
* |
這里的幾個關鍵點是:
- "WHERE UDF_FRENQUENCY(frequency) <> 0" 這部分SQL對應Calcite的邏輯算子是 LogicalFilter。
- LogicalFilter被轉換為LogicalCalc,經過思考我們可以知道,Filter的Condition條件是需要進行計算才能獲得的,所以需要轉換為Calc。
- DataSetCalc中會生成UDF JAVA代碼,這個java類是:DataSetCalcRule extends RichFlatMapFunction。這點很有意思,Flink認為UDF是一個Flatmap操作。
- 為什么UDF是一個Flatmap操作。因為UDF的輸入實際是一個數據庫記錄Record,這很像集合;輸出的是數目不等的幾部分。這恰恰是Flatmap的思想所在。
關於FlatMap,請參見我之前的文章:[源碼分析] 從FlatMap用法到Flink的內部實現
我們后文中主要就是排查SQL生成流程中哪里出現了這個"UDF多次調用的問題點"。
0x02 實例代碼
以下是我們的示例程序,后續就講解這個程序的生成代碼。
1. UDF函數
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class myUdf extends ScalarFunction {
private Long current = 0L;
private static final Logger LOGGER = LoggerFactory.getLogger(myUdf.class);
public Long eval(Long a) throws Exception {
if(current == 0L) {
current = a;
} else {
current += 1;
}
LOGGER.error("The current is : " + current );
return current;
}
}
2. 測試代碼
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
object TestUdf {
def main(args: Array[String]): Unit = {
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
tEnv.registerFunction("UDF_FRENQUENCY", new myUdf())
// register the DataSet as a view "WordCount"
tEnv.createTemporaryView("TableWordCount", input, 'word, 'frequency)
val tableFrequency = tEnv.sqlQuery("SELECT word, UDF_FRENQUENCY(frequency) as myFrequency FROM TableWordCount")
tEnv.registerTable("TableFrequency", tableFrequency)
// run a SQL query on the Table and retrieve the result as a new Table
val table = tEnv.sqlQuery("SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0")
table.toDataSet[WC].print()
}
case class WC(word: String, frequency: Long)
}
3. 輸出結果
// 輸出如下,能看到本來應該是調用三次,結果現在調用了六次
11:15:05,409 ERROR mytestpackage.myUdf - The current is : 1
11:15:05,409 ERROR mytestpackage.myUdf - The current is : 2
11:15:05,425 ERROR mytestpackage.myUdf - The current is : 3
11:15:05,425 ERROR mytestpackage.myUdf - The current is : 4
11:15:05,426 ERROR mytestpackage.myUdf - The current is : 5
11:15:05,426 ERROR mytestpackage.myUdf - The current is : 6
0x03 Flink SQL UDF轉換流程
1. LogicalFilter
這里是 " myFrequency <> 0" 被轉換為 LogicalFilter。具體是SqlToRelConverter函數中會將SQL語句轉換為RelNode。
具體在SqlToRelConverter (org.apache.calcite.sql2rel)完成,其打印內容摘要如下:
filter = {LogicalFilter@4844} "LogicalFilter#2"
variablesSet = {RegularImmutableSet@4817} size = 0
condition = {RexCall@4816} "<>($1, 0)"
input = {LogicalProject@4737} "LogicalProject#1"
desc = "LogicalFilter#2"
rowType = null
digest = "LogicalFilter#2"
cluster = {RelOptCluster@4765}
id = 2
traitSet = {RelTraitSet@4845} size = 1
展開查看調用棧
create:107, LogicalFilter (org.apache.calcite.rel.logical)
createFilter:333, RelFactories$FilterFactoryImpl (org.apache.calcite.rel.core)
convertWhere:993, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelectImpl:649, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel)
rel:150, FlinkPlannerImpl (org.apache.flink.table.calcite)
rel:135, FlinkPlannerImpl (org.apache.flink.table.calcite)
toQueryOperation:490, SqlToOperationConverter (org.apache.flink.table.sqlexec)
convertSqlQuery:315, SqlToOperationConverter (org.apache.flink.table.sqlexec)
convert:155, SqlToOperationConverter (org.apache.flink.table.sqlexec)
parse:66, ParserImpl (org.apache.flink.table.planner)
sqlQuery:457, TableEnvImpl (org.apache.flink.table.api.internal)
main:55, TestUdf$ (mytestpackage)
main:-1, TestUdf (mytestpackage)
2. FilterToCalcRule
這里Flink發現了FilterToCalcRule 這個rule適合對Filter進行切換。
我們思考下可知,Filter的Condition條件是需要進行計算才能獲得的,所以需要轉換為Calc。
具體源碼在 VolcanoPlanner.findBestExp (org.apache.calcite.plan.volcano)
call = {VolcanoRuleMatch@5576} "rule [FilterToCalcRule] rels [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition=<>($1, 0))]"
targetSet = {RelSet@5581}
targetSubset = null
digest = "rule [FilterToCalcRule] rels [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition=<>($1, 0))]"
cachedImportance = 0.891
volcanoPlanner = {VolcanoPlanner@5526}
generatedRelList = null
id = 45
operand0 = {RelOptRuleOperand@5579}
nodeInputs = {RegularImmutableBiMap@5530} size = 0
rule = {FilterToCalcRule@5575} "FilterToCalcRule"
rels = {RelNode[1]@5582}
planner = {VolcanoPlanner@5526}
parents = null
展開查看調用棧
onMatch:65, FilterToCalcRule (org.apache.calcite.rel.rules)
onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
runVolcanoPlanner:280, Optimizer (org.apache.flink.table.plan)
optimizeLogicalPlan:199, Optimizer (org.apache.flink.table.plan)
optimize:56, BatchOptimizer (org.apache.flink.table.plan)
translate:280, BatchTableEnvImpl (org.apache.flink.table.api.internal)
toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
main:57, TestUdf$ (mytestpackage)
main:-1, TestUdf (mytestpackage)
3. LogicalCalc
因為上述的FilterToCalcRule,所以生成了 LogicalCalc。我們也可以看到這里就是包含了UDF_FRENQUENCY。
calc = {LogicalCalc@5632} "LogicalCalc#60"
program = {RexProgram@5631} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], proj#0..1=[{exprs}], $condition=[$t4])"
input = {RelSubset@5605} "rel#32:Subset#0.LOGICAL"
desc = "LogicalCalc#60"
rowType = {RelRecordType@5629} "RecordType(VARCHAR(65536) word, BIGINT frequency)"
digest = "LogicalCalc#60"
cluster = {RelOptCluster@5596}
id = 60
traitSet = {RelTraitSet@5597} size = 1
4. DataSetCalc
經過轉換,最后得到了physical RelNode,即物理執行計划 DataSetCalc。
具體源碼在 VolcanoPlanner.findBestExp (org.apache.calcite.plan.volcano)。
// 這里給出了執行函數,運行內容和調用棧
ConverterRule.onMatch(RelOptRuleCall call) {
RelNode rel = call.rel(0);
if (rel.getTraitSet().contains(this.inTrait)) {
RelNode converted = this.convert(rel);
if (converted != null) {
call.transformTo(converted);
}
}
}
// 轉換后的 DataSetCalc 內容如下
converted = {DataSetCalc@5560} "Calc(where: (<>(UDF_FRENQUENCY(frequency), 0:BIGINT)), select: (word, UDF_FRENQUENCY(frequency) AS myFrequency))"
cluster = {RelOptCluster@5562}
rowRelDataType = {RelRecordType@5565} "RecordType(VARCHAR(65536) word, BIGINT myFrequency)"
calcProgram = {RexProgram@5566} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], word=[$t0], myFrequency=[$t2], $condition=[$t4])"
ruleDescription = "DataSetCalcRule"
program = {RexProgram@5566} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], word=[$t0], myFrequency=[$t2], $condition=[$t4])"
input = {RelSubset@5564} "rel#71:Subset#5.DATASET"
desc = "DataSetCalc#72"
rowType = {RelRecordType@5565} "RecordType(VARCHAR(65536) word, BIGINT myFrequency)"
digest = "DataSetCalc#72"
AbstractRelNode.cluster = {RelOptCluster@5562}
id = 72
traitSet = {RelTraitSet@5563} size = 1
展開查看調用棧
init:52, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
convert:40, DataSetCalcRule (org.apache.flink.table.plan.rules.dataSet)
onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
runVolcanoPlanner:280, Optimizer (org.apache.flink.table.plan)
optimizePhysicalPlan:209, Optimizer (org.apache.flink.table.plan)
optimize:57, BatchOptimizer (org.apache.flink.table.plan)
translate:280, BatchTableEnvImpl (org.apache.flink.table.api.internal)
toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
main:57, TestUdf$ (mytestpackage)
main:-1, TestUdf (mytestpackage)
5. generateFunction (問題點所在)
在DataSetCalc中,會最后生成UDF對應的JAVA代碼。
class DataSetCalc {
override def translateToPlan(
tableEnv: BatchTableEnvImpl,
queryConfig: BatchQueryConfig): DataSet[Row] = {
......
// 這里生成了UDF對應的JAVA代碼
val genFunction = generateFunction(
generator,
ruleDescription,
new RowSchema(getRowType),
projection,
condition,
config,
classOf[FlatMapFunction[Row, Row]])
// 這里生成了FlatMapRunner
val runner = new FlatMapRunner(genFunction.name, genFunction.code, returnType)
inputDS.flatMap(runner).name(calcOpName(calcProgram, getExpressionString))
}
}
展開查看調用棧
translateToPlan:90, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
translate:306, BatchTableEnvImpl (org.apache.flink.table.api.internal)
translate:281, BatchTableEnvImpl (org.apache.flink.table.api.internal)
toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
main:57, TestUdf$ (mytestpackage)
main:-1, TestUdf (mytestpackage)
真正生成代碼的位置如下,能看出來生成代碼是FlatMapFunction。而本文的問題點就出現在這里。
// 下面能看出,針對不同的SQL子句,Flink會進行不同的轉化
trait CommonCalc {
private[flink] def generateFunction[T <: Function](
generator: FunctionCodeGenerator,
ruleDescription: String,
returnSchema: RowSchema,
calcProjection: Seq[RexNode],
calcCondition: Option[RexNode],
config: TableConfig,
functionClass: Class[T]):
GeneratedFunction[T, Row] = {
// 生成過濾條件,就是 SELEC。filterCondition實際上已經生成包含了調用UDF的代碼,下面會給出其內容
val projection = generator.generateResultExpression(
returnSchema.typeInfo,
returnSchema.fieldNames,
calcProjection)
// only projection
val body = if (calcCondition.isEmpty) {
s"""
|${projection.code}
|${generator.collectorTerm}.collect(${projection.resultTerm});
|""".stripMargin
}
else {
// 生成過濾條件,就是 WHERE。filterCondition實際上已經生成包含了調用UDF的代碼,下面會給出其內容
val filterCondition = generator.generateExpression(calcCondition.get)
// only filter
if (projection == null) {
s"""
|${filterCondition.code}
|if (${filterCondition.resultTerm}) {
| ${generator.collectorTerm}.collect(${generator.input1Term});
|}
|""".stripMargin
}
// both filter and projection
else {
// 本例中,會進入到這里。把 filterCondition 和 projection 代碼拼接起來。這下子就有了兩個 UDF 的調用。
s"""
|${filterCondition.code}
|if (${filterCondition.resultTerm}) {
| ${projection.code}
| ${generator.collectorTerm}.collect(${projection.resultTerm});
|}
|""".stripMargin
}
}
// body 是filterCondition 和 projection 代碼的拼接,分別都有 UDF 的調用,現在就有了兩個UDF調用了,也就是我們問題所在。
generator.generateFunction(
ruleDescription,
functionClass,
body,
returnSchema.typeInfo)
}
}
// 此函數輸入中,calcCondition就是我們SQL的過濾條件
calcCondition = {Some@5663} "Some(<>(UDF_FRENQUENCY($1), 0))"
// 此函數輸入中,calcProjection就是我們SQL的投影運算條件
calcProjection = {ArrayBuffer@5662} "ArrayBuffer" size = 2
0 = {RexInputRef@7344} "$0"
1 = {RexCall@7345} "UDF_FRENQUENCY($1)"
// 生成過濾條件,就是 WHERE 對應的代碼。filterCondition實際上已經生成包含了調用UDF的代碼
filterCondition = {GeneratedExpression@5749} "GeneratedExpression(result$16,isNull$17,\n\n\n\njava.lang.Long result$12 = function_spendreport$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$14 = result$12 == null;\nlong result$13;\nif (isNull$14) {\n result$13 = -1L;\n}\nelse {\n result$13 = result$12;\n}\n\n\n\nlong result$15 = 0L;\n\nboolean isNull$17 = isNull$14 || false;\nboolean result$16;\nif (isNull$17) {\n result$16 = false;\n}\nelse {\n result$16 = result$13 != result$15;\n}\n,Boolean,false)"
// 生成投影運算,就是 SELECT 對應的代碼。projection也包含了調用UDF的代碼
projection = {GeneratedExpression@5738} "GeneratedExpression(out,false,\n\nif (isNull$6) {\n out.setField(0, null);\n}\nelse {\n out.setField(0, result$5);\n}\n\n\n\n\n\njava.lang.Long result$9 = function_spendreport$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$11 = result$9 == null;\nlong result$10;\nif (isNull$11) {\n result$10 = -1L;\n}\nelse {\n result$10 = result$9;\n}\n\n\nif (isNull$11) {\n out.setField(1, null);\n}\nelse {\n out.setField(1, result$10);\n}\n,Row(word: String, myFrequency: Long),false)"
// 具體這個類其實是 DataSetCalcRule extends RichFlatMapFunction
name = "DataSetCalcRule"
// 生成的類
clazz = {Class@5773} "interface org.apache.flink.api.common.functions.FlatMapFunction"
// 生成類的部分代碼,這里對應的是UDF的業務內容
bodyCode = "\n\n\n\n\njava.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$14 = result$12 == null;\nlong result$13;\nif (isNull$14) {\n result$13 = -1L;\n}\nelse {\n result$13 = result$12;\n}\n\n\n\nlong result$15 = 0L;\n\nboolean isNull$17 = isNull$14 || false;\nboolean result$16;\nif (isNull$17) {\n result$16 = false;\n}\nelse {\n result$16 = result$13 != result$15;\n}\n\nif (result$16) {\n \n\nif (isNull$6) {\n out.setField(0, null);\n}\nelse {\n out.setField(0, result$5);\n}\n\n\n\n\n\njava.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$11 = result$9 == null;\nlong result$10;\nif (isNull$11) {\n result$10 = -1L;\n}\nelse {\n result$10 = result$9;\n}\n\n\nif (isNull$11) {\n out.setField(1, null);\n}\nelse {\n out.setField(1, result$10);\n}\n\n c.collect(out);\n}\n"
展開查看調用棧
generateFunction:94, FunctionCodeGenerator (org.apache.flink.table.codegen)
generateFunction:79, CommonCalc$class (org.apache.flink.table.plan.nodes)
generateFunction:45, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
translateToPlan:105, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
translate:306, BatchTableEnvImpl (org.apache.flink.table.api.internal)
translate:281, BatchTableEnvImpl (org.apache.flink.table.api.internal)
toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
main:57, TestUdf$ (mytestpackage)
main:-1, TestUdf (mytestpackage)
6. FlatMapRunner
從定義能夠看出來,FlatMapRunner繼承了RichFlatMapFunction,說明 Flink認為UDF就是一個Flatmap操作。
package org.apache.flink.table.runtime
class FlatMapRunner(
name: String,
code: String,
@transient var returnType: TypeInformation[Row])
extends RichFlatMapFunction[Row, Row] ... {
private var function: FlatMapFunction[Row, Row] = _
...
override def flatMap(in: Row, out: Collector[Row]): Unit =
function.flatMap(in, out)
...
}
0x04 UDF生成的代碼
1. 縮減版
這里是生成的代碼縮減版,能看出具體問題點,myUdf函數被執行了兩次。
function_mytestpackage\(myUdf\)c45b0e23278f15e8f7d075abac9a121b 這個就是 myUdf 轉換之后的函數。
// 原始 SQL "SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0"
java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
isNull$8 ? null : (java.lang.Long) result$7); // 這次 UDF 調用對應 WHERE myFrequency <> 0
boolean isNull$14 = result$12 == null;
boolean isNull$17 = isNull$14 || false;
boolean result$16;
if (isNull$17) {
result$16 = false;
}
else {
result$16 = result$13 != result$15;
}
if (result$16) { // 這里說明 myFrequency <> 0,所以可以進入
java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
isNull$8 ? null : (java.lang.Long) result$7); // 這里對應的是 SELECT myFrequency,注意的是,這里又調用了 UDF,重新計算了一遍,所以 UDF 才不應該有狀態信息。
boolean isNull$11 = result$9 == null;
long result$10;
if (isNull$11) {
result$10 = -1L;
}
else {
result$10 = result$9; // 這里才進行SELECT myFrequency,但是這時候 UDF 已經被計算兩次了
}
}
2. 完整版
以下是生成的代碼,因為是自動生成,所以看起來會有點費勁,不過好在已經是最后一步了。
public class DataSetCalcRule$18 extends org.apache.flink.api.common.functions.RichFlatMapFunction {
final mytestpackage.myUdf function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b;
final org.apache.flink.types.Row out =
new org.apache.flink.types.Row(2);
private org.apache.flink.types.Row in1;
public DataSetCalcRule$18() throws Exception {
function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b = (mytestpackage.myUdf)
org.apache.flink.table.utils.EncodingUtils.decodeStringToObject(
"rO0ABXNyABFzcGVuZHJlcG9ydC5teVVkZmGYnDRF7Hj4AgABTAAHY3VycmVudHQAEExqYXZhL2xhbmcvTG9uZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb25uLPkGQbqbDAIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9u14hb_NiViUACAAB4cHNyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAAAAAAAA",
org.apache.flink.table.functions.UserDefinedFunction.class);
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
}
@Override
public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
in1 = (org.apache.flink.types.Row) _in1;
boolean isNull$6 = (java.lang.String) in1.getField(0) == null;
java.lang.String result$5;
if (isNull$6) {
result$5 = "";
}
else {
result$5 = (java.lang.String) (java.lang.String) in1.getField(0);
}
boolean isNull$8 = (java.lang.Long) in1.getField(1) == null;
long result$7;
if (isNull$8) {
result$7 = -1L;
}
else {
result$7 = (java.lang.Long) in1.getField(1);
}
java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
isNull$8 ? null : (java.lang.Long) result$7);
boolean isNull$14 = result$12 == null;
long result$13;
if (isNull$14) {
result$13 = -1L;
}
else {
result$13 = result$12;
}
long result$15 = 0L;
boolean isNull$17 = isNull$14 || false;
boolean result$16;
if (isNull$17) {
result$16 = false;
}
else {
result$16 = result$13 != result$15;
}
if (result$16) {
if (isNull$6) {
out.setField(0, null);
}
else {
out.setField(0, result$5);
}
java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
isNull$8 ? null : (java.lang.Long) result$7);
boolean isNull$11 = result$9 == null;
long result$10;
if (isNull$11) {
result$10 = -1L;
}
else {
result$10 = result$9;
}
if (isNull$11) {
out.setField(1, null);
}
else {
out.setField(1, result$10);
}
c.collect(out);
}
}
@Override
public void close() throws Exception {
function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.close();
}
}
0x05 總結
至此,我們把Flink SQL如何生成JAVA代碼的流程大致走了一遍。
Flink生成的內部代碼,是把"投影運算"和"過濾條件"分別生成,然后拼接在一起。
即使原始SQL中只有一次UDF調用,但是如果SELECT和WHERE都間接用到了UDF,那么最終"投影運算"和"過濾條件"就會分別調用了UDF,所以拼接之后就會有多個UDF調用。
這就是 "UDF不應該有內部歷史狀態" 的最終原因。我們在實際開發過程中一定要注意這個問題。