本文翻譯自官網:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
TableSource 提供訪問存儲在外部系統(數據庫、key-value 存款,消息隊列)或文件中的數據的權限。TableSource 在 TableEnvironment 中注冊后,就可以在 Table API 或 SQL 查詢中訪問了。
TableSink 將表發送到外部存儲系統,例如數據庫,key-value 存儲,消息隊列,或文件系統(使用不同的編碼(格式),e.g.: CSV,Parquet,or ORC)。
TableFactory 允許將與外部系統的連接的聲明與實際實現分開。TableFactory 從標准化的基於字符串的屬性創建 table sources 和 sinks 的已配置實例。可以使用描述符或 SQL客戶端的YAML配置文件以編程方式生成屬性。
請查看通用 concepts and API 頁面,以詳細了解如何注冊TableSource以及如何通過TableSink發出表。 有關如何使用工廠的示例,請參見 內置的源,接收器和格式頁面。
- Define a TableSource 定義TableSource
- Defining a BatchTableSource 定義 BatchTableSource
- Defining a StreamTableSource 定義 StreamTableSource
- Defining a TableSource with Time Attributes 定義帶時間屬性的TableSource
- Defining a TableSource with Projection Push-Down 定義投影下推定義TableSource
- Defining a TableSource with Filter Push-Down 定義過濾下推的TableSource
- Defining a TableSource for Lookups 定義用於查找的TableSource
- Define a TableSink 定義TableSink
- Define a TableFactory
- Use a TableFactory in the SQL Client 在SQL 客戶端中使用TableFactory
- Use a TableFactory in the Table & SQL AP 在Table & SQL API 中使用TableFactory
Define a TableSource
TableSource是一個通用接口,可讓Table API和SQL查詢訪問存儲在外部系統中的數據。 它提供了表的 schema 以及與該表的 schema 映射到行的記錄。 根據是在流查詢還是批處理查詢中使用TableSource,記錄是作為 DataSet 或 DataStream產生。
如果在流查詢中使用TableSource,則必須實現StreamTableSource接口;如果在批處理查詢中使用TableSource,則必須實現BatchTableSource接口。 TableSource還可以同時實現兩個接口,並且可以在流查詢和批處理查詢中使用。
StreamTableSource和BatchTableSource擴展了定義以下方法的基本接口TableSource:
TableSource[T] {
def getTableSchema: TableSchema
def getReturnType: TypeInformation[T]
def explainSource: String
}
- getTableSchema():返回產生的表的 schema,即表的字段名稱和類型。 字段類型是使用Flink的DataType定義的(請參見Table API類型和SQL類型)。 請注意,返回的TableSchema不應包含反映物理TableSource schema 的計算列。
- getReturnType():返回DataStream(StreamTableSource)或DataSet(BatchTableSource)的物理類型以及TableSource生成的記錄。
- describeSource():返回描述TableSource的字符串。 此方法是可選的,僅用於顯示的目的。
TableSource接口將邏輯表 schema 與返回的DataStream或DataSet的物理類型分開。 因此,表 schema 的所有字段(getTableSchema())必須映射到具有相應物理返回類型(getReturnType())類型的字段。 默認情況下,此映射是基於字段名稱完成的。 例如,一個TableSource用兩個字段[name:String,size:Integer]定義一個表 schema,它需要一個TypeInformation,其中至少有兩個字段分別稱為name和size,類型分別為String和Integer。 這可能是PojoTypeInfo或RowTypeInfo,它們具有兩個名為name和size且具有匹配類型的字段。
但是,某些類型(例如Tuple或CaseClass類型)確實支持自定義字段名稱。 如果TableSource返回具有固定字段名稱類型的DataStream或DataSet,則它可以實現DefinedFieldMapping接口以將表 schema 中的字段名稱映射到物理返回類型的字段名稱。
Defining a BatchTableSource
BatchTableSource接口擴展了TableSource接口並定義了另一個方法:
BatchTableSource[T] extends TableSource[T] { def getDataSet(execEnv: ExecutionEnvironment): DataSet[T] }
- getDataSet(execEnv):返回帶有表數據的DataSet。 DataSet的類型必須與TableSource.getReturnType()方法定義的返回類型相同。 可以使用DataSet API的常規數據源創建DataSet。 通常,BatchTableSource是通過包裝InputFormat或批處理連接器來實現的。
Defining a StreamTableSource
StreamTableSource接口擴展了TableSource接口並定義了另一個方法:
StreamTableSource[T] extends TableSource[T] { def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] }
- getDataStream(execEnv):返回帶有表數據的DataStream。 DataStream的類型必須與TableSource.getReturnType()方法定義的返回類型相同。 通過使用DataStream API的常規數據源可以創建DataStream。 通常,通過包裝SourceFunction或流連接器來實現StreamTableSource。
Defining a TableSource with Time Attributes
流表API和SQL查詢的基於時間的操作(例如窗口聚合或 joins )需要顯式指定的時間屬性。
TableSource在其表 schema 中將時間屬性定義為Types.SQL_TIMESTAMP類型的字段。 與模式中的所有常規字段相反,時間屬性不得與表源的返回類型中的物理字段匹配。 相反,TableSource通過實現某個接口來定義時間屬性。
Defining a Processing Time Attribute
處理時間屬性通常在流查詢中使用。 處理時間屬性返回訪問該屬性的 operator 的當前 wall-clock 時間。 TableSource通過實現DefinedProctimeAttribute接口來定義處理時間屬性。 該接口如下所示:
DefinedProctimeAttribute {
def getProctimeAttribute: String
}
- getProctimeAttribute():返回處理時間屬性的名稱。 指定的屬性必須在表 schema 中定義為Types.SQL_TIMESTAMP類型,並且可以在基於時間的操作中使用。 DefinedProctimeAttribute table source 通過返回null來定義無處理時間屬性。
注意:StreamTableSource和BatchTableSource都可以實現DefinedProctimeAttribute並定義處理時間屬性。 如果是BatchTableSource,則在表掃描期間,使用當前時間戳初始化處理時間字段。
Defining a Rowtime Attribute
行時間屬性是TIMESTAMP類型的屬性,在流查詢和批處理查詢中以統一的方式處理。
可以通過指定SQL_TIMESTAMP類型的表 schema 字段聲明為rowtime屬性
- 字段名稱
- 一個TimestampExtractor,用於計算屬性的實際值(通常從一個或多個其他字段)
- 一個WatermarkStrategy,它指定如何為rowtime屬性生成水印。
TableSource通過實現DefinedRowtimeAttributes接口來定義行時間屬性。 該接口如下所示:
DefinedRowtimeAttributes {
def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}
- getRowtimeAttributeDescriptors():返回RowtimeAttributeDescriptor的列表。 RowtimeAttributeDescriptor描述具有以下屬性的行時間屬性:
- attributeName:表 schema 中的行時間屬性的名稱。 必須使用Types.SQL_TIMESTAMP類型定義該字段。
- timestampExtractor:時間戳提取器從具有返回類型的記錄中提取時間戳。 例如,它可以將Long字段轉換為時間戳,或者解析String編碼的時間戳。 Flink帶有一組內置的TimestampExtractor實現,用於常見用例。 也可以提供自定義實現。
- watermarkStrategy:水印策略定義了如何為rowtime屬性生成水印。 Flink隨附了一組針對常見用例的內置WatermarkStrategy實現。 也可以提供自定義實現。
注意:盡管getRowtimeAttributeDescriptors()方法返回描述符列表,但目前僅支持單個rowtime屬性。 我們計划將來刪除此限制,並支持具有多個rowtime屬性的表。
注意:StreamTableSource和BatchTableSource都可以實現DefinedRowtimeAttributes並定義行時間屬性。 無論哪種情況,都使用TimestampExtractor提取rowtime字段。 因此,實現StreamTableSource和BatchTableSource並定義rowtime屬性的TableSource為流查詢和批處理查詢提供了完全相同的數據。
Provided Timestamp Extractors
Flink為常見用例提供TimestampExtractor實現。
當前可以使用以下TimestampExtractor實現:
- ExistingField(fieldName):從現有的LONG,SQL_TIMESTAMP或時間戳格式的STRING字段中提取rowtime屬性的值。 這樣的字符串的一個示例是“ 2018-05-28 12:34:56.000”。
- StreamRecordTimestamp():從DataStream StreamRecord的時間戳中提取rowtime屬性的值。 請注意,此TimestampExtractor不適用於batch table sources。
可以通過實現相應的接口來定義自定義TimestampExtractor。
Provided Watermark Strategies
Flink為常見用例提供WatermarkStrategy實現。
當前有以下WatermarkStrategy實現:
- AscendingTimestamps:遞增時間戳的水印策略。 時間戳不正確的記錄將被視為late。
- BoundedOutOfOrderTimestamps(delay):一種針對指定時間延遲最多亂序的時間戳的水印策略。
- PreserveWatermarks():一種策略,指定應從基礎DataStream中保留水印。
可以通過實現相應的接口來定義自定義的WatermarkStrategy。
Defining a TableSource with Projection Push-Down
TableSource通過實現ProjectableTableSource接口來支持投影下推。 該接口定義了一個方法:
ProjectableTableSource[T] {
def projectFields(fields: Array[Int]): TableSource[T]
}
- projectFields(fields):返回具有調整后的物理返回類型的TableSource的副本。 fields參數提供TableSource必須提供的字段的索引。 索引與物理返回類型的TypeInformation有關,而不與邏輯表 schame 有關。 復制的TableSource必須調整其返回類型以及返回的DataStream或DataSet。 復制的TableSource的TableSchema不得更改,即它必須與原始TableSource相同。 如果TableSource實現了DefinedFieldMapping接口,則必須將字段映射調整為新的返回類型。
注意:為了使Flink可以將投影下推 table source 與其原始形式區分開,必須重寫explainSource方法以包括有關投影字段的信息。
ProjectableTableSource為項目平面字段添加了支持。 如果TableSource定義了具有嵌套模式的表,則可以實現NestedFieldsProjectableTableSource以將投影擴展到嵌套字段。 NestedFieldsProjectableTableSource的定義如下:
NestedFieldsProjectableTableSource[T] {
def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
}
- projectNestedField(fields,nestedFields):返回具有調整后的物理返回類型的TableSource的副本。 物理返回類型的字段可以刪除或重新排序,但不得更改其類型。 該方法的協定與ProjectableTableSource.projectFields()方法的協定基本上相同。 另外,nestedFields參數包含字段列表中每個字段索引的查詢到的所有嵌套字段的路徑列表。 所有其他嵌套字段都不需要在TableSource生成的記錄中讀取,解析和設置。
請注意,不得更改投影字段的類型,但未使用的字段可以設置為null或默認值。
Defining a TableSource with Filter Push-Down
FilterableTableSource接口增加了對將過濾器下推到TableSource的支持。 擴展此接口的TableSource能夠過濾記錄,以使返回的DataStream或DataSet返回較少的記錄。
該接口如下所示:
FilterableTableSource[T] {
def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]
def isFilterPushedDown: Boolean
}
- applyPredicate(predicates):返回具有添加的謂詞的TableSource的副本。 謂詞參數是“提供”給TableSource的連接謂詞的可變列表。 TableSource接受通過從列表中刪除謂詞來評估謂詞。 列表中剩余的謂詞將由后續的過濾器運算符評估。
- isFilterPushedDown():如果之前調用了applyPredicate()方法,則返回true。 因此,對於從applyPredicate()調用返回的所有TableSource實例,isFilterPushedDown()必須返回true。
注意:為了使Flink能夠將過濾器下推 table source 與其原始形式區分開來,必須重寫explainSource方法以包括有關下推式過濾器的信息。
Defining a TableSource for Lookups
注意:這是一項實驗功能。 將來的版本中可能會更改接口。 僅Blink planner支持。
LookupableTableSource接口增加了對通過查找方式通過鍵列訪問表的支持。 當用於與維表聯接以豐富某些信息時,這非常有用。 如果要在查找模式下使用TableSource,則應在時態表聯接語法中使用源。
該接口如下所示:
LookupableTableSource[T] extends TableSource[T] { def getLookupFunction(lookupKeys: Array[String]): TableFunction[T] def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[T] def isAsyncEnabled: Boolean }
- getLookupFunction(lookupkeys):返回一個TableFunction,該函數用於通過查找鍵查找匹配的行。 lookupkeys是聯接相等條件下的LookupableTableSource的字段名稱。 返回的TableFunction的eval方法參數應按照定義的lookupkeys的順序排列。 建議在varargs中定義參數(例如,eval(Object ... lookupkeys)以匹配所有情況。 TableFunction的返回類型必須與TableSource.getReturnType()方法定義的返回類型相同。
- getAsyncLookupFunction(lookupkeys):可選。 與getLookupFunction類似,但AsyncLookupFunction異步查找匹配的行。 AsyncLookupFunction的基礎將通過Async I / O調用。 返回的AsyncTableFunction的eval方法的第一個參數應該定義為java.util.concurrent.CompletableFuture以異步收集結果(例如eval(CompletableFuture <Collection <String >> result,Object ... lookupkeys))。 如果TableSource不支持異步查找,則此方法的實現可能會引發異常。
- isAsyncEnabled():如果啟用了異步查找,則返回true。 如果isAsyncEnabled返回true,則需要實現getAsyncLookupFunction(lookupkeys)。
Define a TableSink
TableSink指定如何將表發送到外部系統或位置。 該接口是通用的,因此它可以支持不同的存儲位置和格式。 批處理表和流式表有不同的表接收器。
通用接口如下所示:
TableSink[T] { def getOutputType: TypeInformation<T> def getFieldNames: Array[String] def getFieldTypes: Array[TypeInformation] def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T] }
調用TableSink#configure方法將表的 schema(字段名稱和類型)傳遞給TableSink。 該方法必須返回TableSink的新實例,該實例被配置為發出提供的Table模式。 請注意,提供的TableSchema不應包含計算列以反映物理TableSink的模式。
BatchTableSink
定義一個外部TableSink以發出一個批處理表。
該接口如下所示:
BatchTableSink[T] extends TableSink[T] { def emitDataSet(dataSet: DataSet[T]): Unit }
AppendStreamTableSink
定義一個外部TableSink來發出僅具有插入更改的流表。
該接口如下所示:
AppendStreamTableSink[T] extends TableSink[T] { def emitDataStream(dataStream: DataStream[T]): Unit }
如果還通過更新或刪除更改來修改表,則將引發TableException。
RetractStreamTableSink
定義一個外部TableSink以發出具有插入,更新和刪除更改的流表。
該接口如下所示:
RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] { def getRecordType: TypeInformation[T] def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit }
該表將被轉換為 accumulate 和撤消消息流,這些消息被編碼為Java Tuple2。 第一個字段是指示消息類型的布爾標志(true表示插入,false表示刪除)。 第二個字段保存請求的類型的記錄。
UpsertStreamTableSink
定義一個外部TableSink以發出具有插入,更新和刪除更改的流表。
該接口如下所示:
UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] { def setKeyFields(keys: Array[String]): Unit def setIsAppendOnly(isAppendOnly: Boolean): Unit def getRecordType: TypeInformation[T] def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit }
該表必須具有唯一的鍵字段(原子的或復合的)或 append-only 。 如果表沒有唯一鍵並且不是append-only ,則將引發TableException。 該表的唯一鍵由UpsertStreamTableSink#setKeyFields()方法配置。
該表將轉換為upsert和delete消息流,它們被編碼為Java Tuple2。 第一個字段是指示消息類型的布爾標志。 第二個字段保存請求的類型T的記錄。
具有 true 布爾值字段的消息是已配置密鑰的upsert消息。 帶有 false 標志的消息是已配置密鑰的刪除消息。 如果表是 append-only,則所有消息都將具有true標志,並且必須將其解釋為插入。
Define a TableFactory
TableFactory允許從基於字符串的屬性創建與表相關的不同實例。 調用所有可用的工廠以匹配給定的屬性集和相應的工廠類。
工廠利用 Java’s Service Provider Interfaces(SPI)進行發現。 這意味着每個依賴項和JAR文件都應在 META_INF/services 資源目錄中包含一個文件org.apache.flink.table.factories.TableFactory,該文件列出了它提供的所有可用表工廠。
每個表工廠都需要實現以下接口:
package org.apache.flink.table.factories trait TableFactory { def requiredContext(): util.Map[String, String] def supportedProperties(): util.List[String] }
- requiredContext():指定已為此工廠實現的上下文。 該框架保證僅在滿足指定的屬性和值集的情況下才與此工廠匹配。 典型的屬性可能是connector.type,format.type或update-mode。 屬性鍵(例如connector.property-version和format.property-version)保留用於將來的向后兼容情況。
- supportedProperties():此工廠可以處理的屬性鍵的列表。 此方法將用於驗證。 如果傳遞了該工廠無法處理的屬性,則將引發異常。 該列表不得包含上下文指定的鍵。
為了創建特定實例,工廠類可以實現org.apache.flink.table.factories中提供的一個或多個接口:
BatchTableSourceFactory: Creates a batch table source.
BatchTableSinkFactory: Creates a batch table sink.
StreamTableSourceFactory: Creates a stream table source.
StreamTableSinkFactory: Creates a stream table sink.
DeserializationSchemaFactory: Creates a deserialization schema format.
SerializationSchemaFactory: Creates a serialization schema format.
工廠的發現分為多個階段:
Discover all available factories. 發現所有可用的工廠。 Filter by factory class (e.g., StreamTableSourceFactory). 按工廠分類過濾 Filter by matching context. 通過匹配上下文進行過濾。 Filter by supported properties. 按支持的屬性過濾。 Verify that exactly one factory matches, otherwise throw an AmbiguousTableFactoryException or NoMatchingTableFactoryException. 驗證一個工廠是否完全匹配,否則拋出AmbiguousTableFactoryException或NoMatchingTableFactoryException。
以下示例說明如何為自定義流源提供附加的connector.debug屬性標志以進行參數化。
import java.util import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.types.Row class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] { override def requiredContext(): util.Map[String, String] = { val context = new util.HashMap[String, String]() context.put("update-mode", "append") context.put("connector.type", "my-system") context } override def supportedProperties(): util.List[String] = { val properties = new util.ArrayList[String]() properties.add("connector.debug") properties } override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = { val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug")) # additional validation of the passed properties can also happen here new MySystemAppendTableSource(isDebug) } }
Use a TableFactory in the SQL Client
在SQL Client環境文件中,先前提供的工廠可以聲明為:
tables: - name: MySystemTable type: source update-mode: append connector: type: my-system debug: true
將YAML文件轉換為扁平化的字符串屬性,並使用描述與外部系統的連接的那些屬性來調用表工廠:
update-mode=append connector.type=my-system connector.debug=true
注意:屬性例如table.#.name或tables.#.type是SQL Client的特定屬性,不會傳遞給任何工廠。 根據執行環境的不同,type屬性決定是否需要發現BatchTableSourceFactory / StreamTableSourceFactory(對於 source),BatchTableSinkFactory / StreamTableSinkFactory(對於 sink)還是兩者都被發現。
Use a TableFactory in the Table & SQL API
對於使用說明性Scaladoc / Javadoc的類型安全的編程方法,Table&SQL API在org.apache.flink.table.descriptor中提供了描述符,這些描述符可轉換為基於字符串的屬性。 請參閱 sources,sink 和 format 的內置描述符作為參考。
可以通過擴展ConnectorDescriptor類來定義自定義描述符。
import org.apache.flink.table.descriptors.ConnectorDescriptor import java.util.HashMap import java.util.Map /** * Connector to MySystem with debug mode. */ class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, false) { override protected def toConnectorProperties(): Map[String, String] = { val properties = new HashMap[String, String] properties.put("connector.debug", isDebug.toString) properties } } The descriptor can then be used to create a table with the table environment. val tableEnv: StreamTableEnvironment = // ... tableEnv .connect(new MySystemConnector(isDebug = true)) .withSchema(...) .inAppendMode() .createTemporaryTable("MySystemTable")
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文