0 配置
Table 和 SQL API 的默認配置能夠確保結果准確,同時也提供可接受的性能。
根據 Table 程序的需求,可能需要調整特定的參數用於優化。例如,無界流程序可能需要保證所需的狀態是有限的(請參閱 流式概念).
1 概覽
在每個 TableEnvironment 中,TableConfig
提供用於當前會話的配置項。
對於常見或者重要的配置項,TableConfig
提供帶有詳細注釋的 getters
和 setters
方法。
對於更加高級的配置,用戶可以直接訪問底層的 key-value 配置項。以下章節列舉了所有可用於調整 Flink Table 和 SQL API 程序的配置項。
注意 因為配置項會在執行操作的不同時間點被讀取,所以推薦在實例化 TableEnvironment 后盡早地設置配置項。
// instantiate table environment val tEnv: TableEnvironment = ... // access flink configuration val configuration = tEnv.getConfig().getConfiguration() // set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true") configuration.setString("table.exec.mini-batch.allow-latency", "5 s") configuration.setString("table.exec.mini-batch.size", "5000")
注意 目前,key-value 配置項僅被 Blink planner 支持。
2 執行配置
以下選項可用於優化查詢執行的性能。
Key | Default | Type | Description |
---|---|---|---|
table.exec.async-lookup.buffer-capacityBatch Streaming |
100 | Integer | The max number of async i/o operation that the async lookup join can trigger. |
table.exec.async-lookup.timeoutBatch Streaming |
"3 min" | String | The async timeout for the asynchronous operation to complete. |
table.exec.disabled-operatorsBatch |
(none) | String | Mainly for testing. A comma-separated list of operator names, each name represents a kind of disabled operator. Operators that can be disabled include "NestedLoopJoin", "ShuffleHashJoin", "BroadcastHashJoin", "SortMergeJoin", "HashAgg", "SortAgg". By default no operator is disabled. |
table.exec.mini-batch.allow-latencyStreaming |
"-1 ms" | String | The maximum latency can be used for MiniBatch to buffer input records. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: If table.exec.mini-batch.enabled is set true, its value must be greater than zero. |
table.exec.mini-batch.enabledStreaming |
false | Boolean | Specifies whether to enable MiniBatch optimization. MiniBatch is an optimization to buffer input records to reduce state access. This is disabled by default. To enable this, users should set this config to true. NOTE: If mini-batch is enabled, 'table.exec.mini-batch.allow-latency' and 'table.exec.mini-batch.size' must be set. |
table.exec.mini-batch.sizeStreaming |
-1 | Long | The maximum number of input records can be buffered for MiniBatch. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: MiniBatch only works for non-windowed aggregations currently. If table.exec.mini-batch.enabled is set true, its value must be positive. |
table.exec.resource.default-parallelismBatch Streaming |
-1 | Integer | Sets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment. |
table.exec.shuffle-modeBatch |
"ALL_EDGES_BLOCKING" | String | Sets exec shuffle mode. Accepted values are:
|
table.exec.sink.not-null-enforcerBatch Streaming |
ERROR | Enum Possible values: [ERROR, DROP] |
The NOT NULL column constraint on a table enforces that null values can't be inserted into the table. Flink supports 'error' (default) and 'drop' enforcement behavior. By default, Flink will check values and throw runtime exception when null values writing into NOT NULL columns. Users can change the behavior to 'drop' to silently drop such records without throwing exception. |
table.exec.sort.async-merge-enabledBatch |
true | Boolean | Whether to asynchronously merge sorted spill files. |
table.exec.sort.default-limitBatch |
-1 | Integer | Default limit when user don't set a limit after order by. -1 indicates that this configuration is ignored. |
table.exec.sort.max-num-file-handlesBatch |
128 | Integer | The maximal fan-in for external merge sort. It limits the number of file handles per operator. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading. |
table.exec.source.idle-timeoutStreaming |
"-1 ms" | String | When a source do not receive any elements for the timeout time, it will be marked as temporarily idle. This allows downstream tasks to advance their watermarks without the need to wait for watermarks from this source while it is idle. |
table.exec.spill-compression.block-sizeBatch |
"64 kb" | String | The memory size used to do compress when spilling data. The larger the memory, the higher the compression ratio, but more memory resource will be consumed by the job. |
table.exec.spill-compression.enabledBatch |
true | Boolean | Whether to compress spilled data. Currently we only support compress spilled data for sort and hash-agg and hash-join operators. |
table.exec.window-agg.buffer-size-limitBatch |
100000 | Integer | Sets the window elements buffer size limit used in group window agg operator. |
3 優化器配置
以下配置可以用於調整查詢優化器的行為以獲得更好的執行計划。
Key | Default | Type | Description |
---|---|---|---|
table.optimizer.agg-phase-strategyBatch Streaming |
"AUTO" | String | Strategy for aggregate phase. Only AUTO, TWO_PHASE or ONE_PHASE can be set. AUTO: No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage aggregate depends on cost. TWO_PHASE: Enforce to use two stage aggregate which has localAggregate and globalAggregate. Note that if aggregate call does not support optimize into two phase, we will still use one stage aggregate. ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate. |
table.optimizer.distinct-agg.split.bucket-numStreaming |
1024 | Integer | Configure the number of buckets when splitting distinct aggregation. The number is used in the first level aggregation to calculate a bucket key 'hash_code(distinct_key) % BUCKET_NUM' which is used as an additional group key after splitting. |
table.optimizer.distinct-agg.split.enabledStreaming |
false | Boolean | Tells the optimizer whether to split distinct aggregation (e.g. COUNT(DISTINCT col), SUM(DISTINCT col)) into two level. The first aggregation is shuffled by an additional key which is calculated using the hashcode of distinct_key and number of buckets. This optimization is very useful when there is data skew in distinct aggregation and gives the ability to scale-up the job. Default is false. |
table.optimizer.join-reorder-enabledBatch Streaming |
false | Boolean | Enables join reorder in optimizer. Default is disabled. |
table.optimizer.join.broadcast-thresholdBatch |
1048576 | Long | Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 to disable broadcasting. |
table.optimizer.reuse-source-enabledBatch Streaming |
true | Boolean | When it is true, the optimizer will try to find out duplicated table sources and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true. |
table.optimizer.reuse-sub-plan-enabledBatch Streaming |
true | Boolean | When it is true, the optimizer will try to find out duplicated sub-plans and reuse them. |
table.optimizer.source.predicate-pushdown-enabledBatch Streaming |
true | Boolean | When it is true, the optimizer will push down predicates into the FilterableTableSource. Default value is true. |
4 Planner 配置
以下配置可以用於調整 planner 的行為。
Key | Default | Type | Description |
---|---|---|---|
table.dynamic-table-options.enabledBatch Streaming |
false | Boolean | Enable or disable the OPTIONS hint used to specify table optionsdynamically, if disabled, an exception would be thrown if any OPTIONS hint is specified |
table.sql-dialectBatch Streaming |
"default" | String | The SQL dialect defines how to parse a SQL query. A different SQL dialect may support different SQL grammar. Currently supported dialects are: default and hive |