StreamTableEnvironment
該類包含sql解析、驗證、優化、執行等各環節需要的元數據管理器CatalogManager
,模塊管理器(模塊包含函數集、類型集、規則集)moduleManager
,用戶自定義函數管理器FunctionCatalog
,線程池、sql解析器Planner
。
StreamTableEnvironmentImpl.create(executionEnvironment, settings, new TableConfig)
def create(
executionEnvironment: StreamExecutionEnvironment,
settings: EnvironmentSettings,
tableConfig: TableConfig)
: StreamTableEnvironmentImpl = {
val catalogManager = new CatalogManager(
settings.getBuiltInCatalogName,
new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
val moduleManager = new ModuleManager
val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
val executorProperties = settings.toExecutorProperties
val executor = lookupExecutor(executorProperties, executionEnvironment)
val plannerProperties = settings.toPlannerProperties
val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
.create(
plannerProperties,
executor,
tableConfig,
functionCatalog,
catalogManager)
new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
functionCatalog,
tableConfig,
executionEnvironment,
planner,
executor,
settings.isStreamingMode
)
}
DataType
定義了邏輯類型,並且對其底層實際物理類型進行暗示。
LogicalType
邏輯類型有點類似標准SQL的數據類型,其子類做了具體的約束。
TableSchema
表結構定義,包含各字段名稱和各字段類型
DataStream → Table
override def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
val queryOperation = asQueryOperation(dataStream, Some(fields.toList.asJava))
createTable(queryOperation)
}
ScalaDataStreamQueryOperation
private final DataStream<E> dataStream;
private final int[] fieldIndices;
private final TableSchema tableSchema;
Table
Table
類是sql api的核心組件,定義了轉換數據的方法如filter
、groupBy
、join
等。使用TableEnvironment
類可以把Table
轉換成DataStream
或者DataSet
。
private TableImpl(
TableEnvironment tableEnvironment,
QueryOperation operationTree,
OperationTreeBuilder operationTreeBuilder,
LookupCallResolver lookupResolver) {
this.tableEnvironment = tableEnvironment;
this.operationTree = operationTree;
this.operationTreeBuilder = operationTreeBuilder;
this.lookupResolver = lookupResolver;
}
注冊表信息
private void createTemporaryView(UnresolvedIdentifier identifier, Table view) {
if (((TableImpl) view).getTableEnvironment() != this) {
throw new TableException(
"Only table API objects that belong to this TableEnvironment can be registered.");
}
CatalogBaseTable tableTable = new QueryOperationCatalogView(view.getQueryOperation());
ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
catalogManager.createTemporaryTable(tableTable, tableIdentifier, false);
}
Expression
Expression
代表字面量、函數調用或者field引用。
ExpressionVisitor
轉換Expression
的visitor
IndexedExprToFieldInfo
ExpressionVisitor的子類把Expression
解析成FieldInfo
@Override
public FieldInfo visit(UnresolvedReferenceExpression unresolvedReference) {
String fieldName = unresolvedReference.getName();
return new FieldInfo(fieldName, index, fromLegacyInfoToDataType(getTypeAt(unresolvedReference)));
}
應用舉例,把Expression轉換成FieldInfo:
private static List<FieldInfo> extractFieldInfosFromTupleType(TupleTypeInfoBase<?> inputType, Expression[] exprs) {
boolean isRefByPos = isReferenceByPosition(inputType, exprs);
if (isRefByPos) {
return IntStream.range(0, exprs.length)
.mapToObj(idx -> exprs[idx].accept(new IndexedExprToFieldInfo(inputType, idx)))
.collect(Collectors.toList());
} else {
return extractFieldInfosByNameReference(inputType, exprs);
}
}
FieldInfo
private final String fieldName;
private final int index;
private final DataType type;
Row & RowTypeInfo
代表一行數據,可以包含任意數量的列,並且各列可能包含不同的數據類型.Row
不是強類型的所以需要配合RowTypeInfo
類獲取各列具體的類型.
Row:
/** The array to store actual values. */
private final Object[] fields;
RowTypeInfo:
protected final String[] fieldNames;
protected final TypeInformation<?>[] types;
Table → DataStream
override def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = {
val returnType = createTypeInformation[T]
val modifyOperation = new OutputConversionModifyOperation(
table.getQueryOperation,
TypeConversions.fromLegacyInfoToDataType(returnType),
OutputConversionModifyOperation.UpdateMode.APPEND)
toDataStream[T](table, modifyOperation)
}
Operation
Parser.parse(sql)
的返回結果。
- ModifyOperation (DML)
- QueryOperation (DQL)
- CreateOperation & DropOperation (DDL)
FlinkStreamRuleSets
定義了sql解析優化規則集合,包含把calcite節點轉換成flink節點的規則,比如FlinkLogicalTableSourceScan
,把flink邏輯節點轉換成物理執行節點的規則,比如StreamExecTableSourceScanRule
,條件過濾下推的規則PushFilterIntoTableSourceScanRule
等.
ConverterRule
/** Converts a relational expression to the target trait(s) of this rule.
*
* <p>Returns null if conversion is not possible. */
public abstract RelNode convert(RelNode rel);
public void onMatch(RelOptRuleCall call) {
RelNode rel = call.rel(0);
if (rel.getTraitSet().contains(inTrait)) {
final RelNode converted = convert(rel);
if (converted != null) {
call.transformTo(converted);
}
}
}
class FlinkLogicalTableSourceScanConverter
extends ConverterRule(
classOf[LogicalTableScan],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalTableSourceScanConverter") {
override def matches(call: RelOptRuleCall): Boolean = {
val scan: TableScan = call.rel(0)
isTableSourceScan(scan)
}
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[TableScan]
val table = scan.getTable.asInstanceOf[FlinkRelOptTable]
FlinkLogicalTableSourceScan.create(rel.getCluster, table)
}
}
FlinkLogicalRel
flink RelNode基類不僅包含了RelNode
本身可表達的關系依賴邏輯,而且包含了各關系依賴的Flink體系中的額外信息。比如FlinkLogicalTableSourceScan
包含了TableSource
信息。
FlinkPhysicalRel
物理關系節點基類,其子類同時也會實現ExecNode
接口,用於把物理節點轉換成Transformation
ExecNode
/**
* Internal method, translates this node into a Flink operator.
*
* @param planner The [[Planner]] of the translated Table.
*/
protected def translateToPlanInternal(planner: E): Transformation[T]
def translateToPlan(planner: E): Transformation[T] = {
if (transformation == null) {
transformation = translateToPlanInternal(planner)
}
transformation
}
調用時序圖
代碼生成gencode
ExecNode
轉換成Transformation
的過程中部分邏輯會采用動態生成代碼的方式實現。動態生成的代碼保存在GeneratedClass
子類的實例中,會分發到各個TM節點然后由Janino
庫編譯執行。比如聚合查詢生成聚合處理函數NamespaceTableAggsHandleFunction
的子類。
GeneratedClass
public T newInstance(ClassLoader classLoader, Object... args) {
try {
return (T) compile(classLoader).getConstructors()[0].newInstance(args);
} catch (Exception e) {
throw new RuntimeException(
"Could not instantiate generated class '" + className + "'", e);
}
}
/**
* Compiles the generated code, the compiled class will be cached in the {@link GeneratedClass}.
*/
public Class<T> compile(ClassLoader classLoader) {
if (compiledClass == null) {
// cache the compiled class
compiledClass = CompileUtils.compile(classLoader, className, code);
}
return compiledClass;
}
示例
val sql =
"""
|SELECT
| `string`,
| HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
| HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
| COUNT(1),
| SUM(1),
| COUNT(`int`),
| COUNT(DISTINCT `float`),
| concat_distinct_agg(name)
|FROM T1
|GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)
""".stripMargin
LogicalProject#3
LogicalAggregate#2
LogicalProject#1
LogicalTableScan#0
rel#271:StreamExecSink.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecCalc#269,name=DataStreamTableSink,fields=string, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7)
rel#269:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecGroupWindowAggregate#267,select=string, w$start AS EXPR$1, w$rowtime AS EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7)
rel#267:StreamExecGroupWindowAggregate.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecExchange#265,groupBy=string,window=SlidingGroupWindow('w$, rowtime, 5, 4),properties=w$start, w$end, w$rowtime, w$proctime,select=string, COUNT(*) AS EXPR$3, $SUM0($f2) AS EXPR$4, COUNT(int) AS EXPR$5, COUNT(DISTINCT float) AS EXPR$6, concat_distinct_agg(name) AS EXPR$7, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
rel#265:StreamExecExchange.STREAM_PHYSICAL.hash[0]true.None: -1.true.Acc(input=StreamExecCalc#263,distribution=hash[string])
rel#263:StreamExecCalc.STREAM_PHYSICAL.any.None: -1.true.Acc(input=StreamExecDataStreamScan#257,select=string, rowtime, 1 AS $f2, int, float, name)
rel#257:StreamExecDataStreamScan.STREAM_PHYSICAL.any.None: -1.true.Acc(table=[Unregistered_DataStream_2],fields=rowtime, int, double, float, bigdec, string, name)
代碼生成:
StreamExecGroupWindowAggregateBase->translateToPlanInternal
StreamExecGroupWindowAggregateBase ->createAggsHandler
AggsHandlerCodeGenerator->generateNamespaceAggsHandler
new OneInputTransformation
任務提交中會把 OneInputTransformation -> OneInputStreamTask
Task->run
Task->doRun
StreamTask->invoke
StreamTask->openAllOperators
AggregateWindowOperator->open
WindowOperator->compileGeneratedCode
GeneratedNamespaceAggsHandleFunction->newInstance
SimpleCompiler->cook