【翻譯】Flink Table Api & SQL — 自定義 Source & Sink


本文翻譯自官網: User-defined Sources & Sinks  https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sourceSinks.html

Flink Table Api & SQL 翻譯目錄

TableSource提供對存儲在外部系統(數據庫,鍵值存儲,消息隊列)或文件中的數據的訪問。在TableEnvironment中注冊TableSource后,可以通過Table APISQL查詢對其進行訪問。

TableSink 將表發送到外部存儲系統,例如數據庫,鍵值存儲,消息隊列或文件系統(采用不同的編碼,例如CSV,Parquet或ORC)。

TableFactory允許將與外部系統的連接的聲明與實際實現分開。TableFactory 從標准化的基於字符串的屬性創建表 source 和 sink 的已配置實例。可以使用Descriptor或通過SQL Client的 YAML配置文件以編程方式生成屬性

看一下通用概念和API頁面,詳細了解如何注冊TableSource以及如何通過TableSink發出表有關如何使用工廠的示例請參見內置的源,接收器和格式頁面。

定義 TableSource

TableSource是一個通用接口,使 Table API 和 SQL 查詢可以訪問存儲在外部系統中的數據。它提供了表結構以及與該表結構映射到行的記錄。根據TableSource是在流查詢還是批處理查詢中使用,記錄將生成為DataSetDataStream

如果TableSource在流查詢中使用,則必須實現 StreamTableSource接口,如果在批處理查詢中使用,則必須實現 BatchTableSource接口。TableSource還可以同時實現兩個接口,並且可以在流查詢和批處理查詢中使用。

StreamTableSource 和 BatchTableSource擴展TableSource定義以下方法的基本接口

TableSource[T] {

  def getTableSchema: TableSchema

  def getReturnType: TypeInformation[T]

  def explainSource: String

}
  • getTableSchema():返回表結構,即表的字段的名稱和類型。字段類型是使用Flink定義的TypeInformation(請參見Table API類型SQL類型)。

  • getReturnType():返回DataStreamStreamTableSource)或DataSetBatchTableSource的物理類型以及由產生的記錄TableSource

  • explainSource():返回描述的字符串TableSource此方法是可選的,僅用於顯示目的。

TableSource接口將邏輯表架構與返回的DataStream或DataSet的物理類型分開。 因此,表結構的所有字段(getTableSchema())必須映射到具有相應物理返回類型(getReturnType())類型的字段。 默認情況下,此映射是基於字段名稱完成的。 例如,一個TableSource定義具有兩個字段[name:String,size:Integer]的表結構,它需要TypeInformation至少具有兩個字段,分別名為name和size,類型分別為String和Integer。 這可以是PojoTypeInfo或RowTypeInfo,它們具有兩個名為name和size且具有匹配類型的字段。

但是,某些類型(例如Tuple或CaseClass類型)支持自定義字段名稱。如果 TableSource返回具有固定字段名稱的類型的 DataStream 或 DataSet,則它可以實現DefinedFieldMapping接口以將表結構中的字段名稱映射到物理返回類型的字段名稱。

定義BatchTableSource

BatchTableSource接口擴展了TableSource接口,並定義一個額外的方法:

BatchTableSource[T] extends TableSource[T] {

  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
  • getDataSet(execEnv):返回帶有表數據的DataSet。DataSet 的類型必須與 TableSource.getReturnType() 方法定義的返回類型相同。可以使用DataSet API的常規數據源創建DataSet通常, BatchTableSource是通過包裝 InputFormat或 batch connector 實現的

定義StreamTableSource

StreamTableSource接口擴展了TableSource接口,並定義一個額外的方法:

StreamTableSource[T] extends TableSource[T] {

  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
  • getDataStream(execEnv):返回帶有表格數據的 DataStream 。DataStream 的類型必須與 TableSource.getReturnType() 方法定義的返回類型相同。可以使用DataSet API的常規數據源創建DataSet通常, StreamTableSource是通過包裝 SourceFunctionbatch connector 實現的

使用時間屬性定義TableSource

流 Table API和SQL查詢的基於時間的操作(例如窗口聚合或 join)需要顯式指定的時間屬性。

TableSource在其表結構中將時間屬性定義為Types.SQL_TIMESTAMP類型的字段。 與結構中的所有常規字段相反,時間屬性不得與表 source 的返回類型中的物理字段匹配。 相反,TableSource通過實現某個接口來定義時間屬性。

定義處理時間屬性

處理時間屬性通常用於流查詢中。處理時間屬性返回訪問該屬性的 operator 的當前掛鍾時間。 TableSource通過實現 DefinedProctimeAttribute 接口來定義處理時間屬性接口如下所示:

DefinedProctimeAttribute {

  def getProctimeAttribute: String
}
  • getProctimeAttribute():返回處理時間屬性的名稱。指定的屬性必須 Types.SQL_TIMESTAMP 在表結構中定義為類型,並且可以在基於時間的操作中使用。DefinedProctimeAttribute表 source 無法通過返回null來定義任何處理時間屬性。

注意兩者StreamTableSource 和 BatchTableSource可以實現DefinedProctimeAttribute並定義的處理時間屬性。BatchTableSource表掃描期間,使用當前時間戳初始化處理時間字段的情況。

定義行時間屬性

行時間屬性是類型的屬性,TIMESTAMP在流查詢和批處理查詢中以統一的方式處理。

SQL_TIMESTAMP通過指定以下內容,可以將類型的表模式字段聲明為rowtime屬性:

  • 字段名稱,
  • TimestampExtractor,計算實際值的屬性(通常從一個或多個其他字段)
  • WatermarkStrategy 用於指定如何為rowtime屬性生成水印的。

 TableSource通過實現DefinedRowtimeAttributes接口來定義行時間屬性該接口如下所示:

DefinedRowtimeAttributes {

  def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}
  • getRowtimeAttributeDescriptors():返回 RowtimeAttributeDescriptor 的列表 RowtimeAttributeDescriptor描述了具有以下屬性的行時間屬性:
    • attributeName:表結構中的 rowtime 屬性的名稱。該字段必須使用 Types.SQL_TIMESTAMP 類型定義 
    • timestampExtractor:時間戳提取器從具有返回類型的記錄中提取時間戳。例如,它可以將Long字段轉換為時間戳,或者解析String編碼的時間戳。Flink帶有一組 TimestampExtractor 針對常見用例的內置實現。也可以提供自定義實現。
    • watermarkStrategy:水印策略定義了如何使用 rowtime 屬性生成水印。Flink帶有一組WatermarkStrategy用於常見用例的內置實現。也可以提供自定義實現。

注意:盡管該 getRowtimeAttributeDescriptors() 方法返回一個描述符列表,但目前僅支持單個rowtime屬性。我們計划將來刪除此限制,並支持具有多個rowtime屬性的表。

注意:兩者,StreamTableSource 和 BatchTableSource,可以實現DefinedRowtimeAttributes並定義rowtime屬性。無論哪種情況,都使用來提取rowtime字段TimestampExtractor。因此,實現StreamTableSource和BatchTableSource並定義rowtime屬性的TableSource為流查詢和批處理查詢提供了完全相同的數據。

提供的時間戳提取器

Flink提供TimestampExtractor了常見用例的實現。

TimestampExtractor當前提供以下實現:

  • ExistingField(fieldName):從現有的LONG,SQL_TIMESTAMP或時間戳格式的STRING字段中提取rowtime屬性的值。 這樣的字符串的一個示例是“ 2018-05-28 12:34:56.000”。
  • StreamRecordTimestamp():從DataStream StreamRecord的時間戳中提取rowtime屬性的值注意,這TimestampExtractor不適用於批處理表 source 。

TimestampExtractor可以通過實現相應的接口來定義定義。

提供的水印策略

Flink提供WatermarkStrategy了常見用例的實現。

WatermarkStrategy當前提供以下實現:

  • AscendingTimestamps:提升時間戳的水印策略。 時間戳不正確的記錄將被視為較晚。
  • BoundedOutOfOrderTimestamps(delay):用於時間戳的水印策略,該時間戳最多按指定的延遲亂序。
  • PreserveWatermarks():指示應從基礎DataStream中保留水印的策略

WatermarkStrategy可以通過實現相應的接口來定義定義。

使用投影下推定義TableSource

 TableSource通過實現ProjectableTableSource接口來支持投影下推該接口定義了一個方法:

ProjectableTableSource[T] {

  def projectFields(fields: Array[Int]): TableSource[T]
}
  • projectFields(fields):返回具有調整后的物理返回類型的TableSource的副本。 fields參數提供TableSource必須提供的字段的索引。 索引與物理返回類型的TypeInformation有關,而不與邏輯表模式有關。 復制的TableSource必須調整其返回類型以及返回的DataStream或DataSet。 復制的TableSource的TableSchema不得更改,即它必須與原始TableSource相同。 如果TableSource實現了DefinedFieldMapping接口,則必須將字段映射調整為新的返回類型

注意為了使Flink可以將投影下推表 source 與其原始形式區分開,必須重寫 explainSource  方法以包括有關投影字段的信息。

ProjectableTableSource增加了對項目平面字段的支持。 如果TableSource定義了具有嵌套模式的表,則可以實現NestedFieldsProjectableTableSource以將投影擴展到嵌套字段。 NestedFieldsProjectableTableSource的定義如下:

NestedFieldsProjectableTableSource[T] {

  def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
}
  • projectNestedField(fields, nestedFields):返回具有調整后的物理返回類型的TableSource的副本。 物理返回類型的字段可以刪除或重新排序,但不得更改其類型。 此方法的約定與ProjectableTableSource.projectFields()方法的約定基本相同。 另外,nestedFields參數包含字段列表中每個字段索引的查詢到的所有嵌套字段的路徑列表。 所有其他嵌套字段都不需要在TableSource生成的記錄中讀取,解析和設置。

請注意,不得更改投影字段的類型,但未使用的字段可以設置為null或默認值。

使用過濾器下推定義TableSource

FilterableTableSource接口添加了對將過濾器下推到TableSource的支持。 擴展此接口的TableSource能夠過濾記錄,以便返回的DataStream或DataSet返回較少的記錄。

該接口如下所示:

FilterableTableSource[T] {

  def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]

  def isFilterPushedDown: Boolean
}
  • applyPredicate(predicates):返回帶有添加謂詞的TableSource的副本。 謂詞參數是“提供”給TableSource的連接謂詞的可變列表。 TableSource接受通過從列表中刪除謂詞來評估謂詞。 列表中剩余的謂詞將由后續的過濾器運算符評估
  • isFilterPushedDown():如果之前調用applyPredicate()方法,則返回true。 因此,對於從applyPredicate()調用返回的所有TableSource實例,isFilterPushedDown()必須返回true。

注意:為了使Flink能夠將過濾器下推表源與其原始形式區分開來,必須重寫explainSource方法以包括有關下推式過濾器的信息

定義用於查找的TableSource

注意這是一項實驗功能。將來的版本中可能會更改 接口。僅 Blink planner 支持。

LookupableTableSource接口增加了對通過查找方式通過鍵列訪問表的支持。 當用於與維表聯接以豐富某些信息時,這非常有用。 如果要在查找模式下使用TableSource,則應在時態表聯接語法中使用源。

該接口如下所示:

LookupableTableSource[T] extends TableSource[T] {

  def getLookupFunction(lookupKeys: Array[String]): TableFunction[T]

  def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[T]

  def isAsyncEnabled: Boolean
}
  • getLookupFunction(lookupkeys):返回一個TableFunction,該函數用於通過查找鍵查找匹配的行。 lookupkeys是聯接相等條件下LookupableTableSource的字段名稱。 返回的TableFunction的eval方法參數應該按照定義的lookupkeys的順序。 建議在varargs中定義參數(例如eval(Object ... lookupkeys)以匹配所有情況。 TableFunction的返回類型必須與TableSource.getReturnType()方法定義的返回類型相同。
  • getAsyncLookupFunction(lookupkeys): 可選的。與getLookupFunction類似,但是AsyncLookupFunction異步查找匹配的行。 AsyncLookupFunction的基礎將通過Async I / O調用。 返回的AsyncTableFunction的eval方法的第一個參數應定義為java.util.concurrent.CompletableFuture以異步收集結果(例如eval(CompletableFuture <Collection <String >> result,Object ... lookupkeys))。 如果TableSource不支持異步查找,則此方法的實現可能引發異常
  • isAsyncEnabled():如果啟用了異步查找,則返回true。 如果isAsyncEnabled返回true,則需要實現getAsyncLookupFunction(lookupkeys)

定義Table Sink

TableSink指定如何將表發送到外部系統或位置。 該接口是通用的,因此它可以支持不同的存儲位置和格式。 批處理表和流式表有不同的表接收器。

接口如下所示:

TableSink[T] {

  def getOutputType: TypeInformation<T>

  def getFieldNames: Array[String]

  def getFieldTypes: Array[TypeInformation]

  def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T]
}

TableSink#configure 調用方法可將Table的結構(字段名稱和類型)傳遞給TableSink該方法必須返回TableSink的新實例,該實例被配置為發出提供的Table模式。

BatchTableSink

定義外部TableSink來發出批處理表。

該接口如下所示:

BatchTableSink[T] extends TableSink[T] {

  def emitDataSet(dataSet: DataSet[T]): Unit
}

AppendStreamTableSink

定義一個外部TableSink以發出一個批處理表。

該接口如下所示:

AppendStreamTableSink[T] extends TableSink[T] {

  def emitDataStream(dataStream: DataStream[T]): Unit
}

如果還通過更新或刪除更改來修改表,則將引發TableException。

RetractStreamTableSink

定義一個外部TableSink以發出具有插入,更新和刪除更改的流表。

該接口如下所示:

RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

  def getRecordType: TypeInformation[T]

  def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}

該表將被轉換為累積和撤消消息流,這些消息被編碼為Java Tuple2。 第一個字段是指示消息類型的布爾標志(true表示插入,false表示刪除)。 第二個字段保存請求的類型T的記錄。

UpsertStreamTableSink

定義一個外部TableSink以發出具有插入,更新和刪除更改的流表。

該接口如下所示:

UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {

  def setKeyFields(keys: Array[String]): Unit

  def setIsAppendOnly(isAppendOnly: Boolean): Unit

  def getRecordType: TypeInformation[T]

  def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}

該表必須具有唯一的鍵字段(原子鍵或復合鍵)或僅附加鍵。 如果表沒有唯一鍵並且不是僅追加表,則將引發TableException。 該表的唯一鍵由UpsertStreamTableSink#setKeyFields()方法配置。

該表將轉換為upsert和delete消息流,這些消息被編碼為Java Tuple2。 第一個字段是指示消息類型的布爾標志。 第二個字段保存請求的類型T的記錄。

具有 true 布爾值字段的消息是已配置密鑰的upsert消息。 帶有 false 標志的消息是已配置密鑰的刪除消息。 如果表是僅追加的,則所有消息都將具有true標志,並且必須將其解釋為插入。

定義一個TableFactory

TableFactory允許從基於字符串的屬性中創建與表相關的不同實例。 調用所有可用的工廠以匹配給定的屬性集和相應的工廠類。

工廠利用Java的服務提供商接口(SPI)進行發現。 這意味着每個依賴項和JAR文件都應在META_INF / services資源目錄中包含一個文件org.apache.flink.table.factories.TableFactory,該文件列出了它提供的所有可用表工廠。

每個表工廠都需要實現以下接口:

package org.apache.flink.table.factories;

interface TableFactory {

  Map<String, String> requiredContext();

  List<String> supportedProperties();
}
  • requiredContext():指定已為此工廠實現的上下文。該框架保證僅在滿足指定的屬性和值集的情況下才與此工廠匹配。典型的屬性可能是connector.type,format.type或update-mode。 為將來的向后兼容情況保留了諸如connect.property-version和format.property-version之類的屬性鍵。
  • supportedProperties():此工廠可以處理的屬性鍵的列表。此方法將用於驗證。如果傳遞了該工廠無法處理的屬性,則將引發異常。該列表不得包含上下文指定的鍵。

為了創建特定實例,工廠類可以實現一個或多個接口,該接口提供org.apache.flink.table.factories

  • BatchTableSourceFactory:創建一個批處理表源。
  • BatchTableSinkFactory:創建一個批處理表接收器。
  • StreamTableSourceFactory:創建流表源。
  • StreamTableSinkFactory:創建一個流表接收器。
  • DeserializationSchemaFactory:創建反序列化架構格式。
  • SerializationSchemaFactory:創建序列化架構格式。

工廠的發現分為多個階段:

  • 發現所有可用的工廠。
  • 按工廠類別(例如StreamTableSourceFactory過濾
  • 通過匹配上下文進行過濾。
  • 按支持的屬性過濾。
  • 驗證一個工廠是否完全匹配,否則拋出AmbiguousTableFactoryExceptionNoMatchingTableFactoryException

下面的示例演示如何為自定義流源提供附加的connector.debug屬性標志以進行參數化。

import java.util
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.types.Row

class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] {

  override def requiredContext(): util.Map[String, String] = {
    val context = new util.HashMap[String, String]()
    context.put("update-mode", "append")
    context.put("connector.type", "my-system")
    context
  }

  override def supportedProperties(): util.List[String] = {
    val properties = new util.ArrayList[String]()
    properties.add("connector.debug")
    properties
  }

  override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = {
    val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug"))

    # additional validation of the passed properties can also happen here

    new MySystemAppendTableSource(isDebug)
  }
}

在SQL客戶端中使用TableFactory

在SQL Client環境文件中,先前提供的工廠可以聲明為:

tables:
 - name: MySystemTable
   type: source
   update-mode: append
   connector:
     type: my-system
     debug: true

將YAML文件轉換為扁平化的字符串屬性,並使用描述與外部系統的連接的那些屬性來調用表工廠: 

update-mode=append
connector.type=my-system
connector.debug=true

注意:table.#.name或table.#.type之類的屬性是SQL Client的特定屬性,不會傳遞給任何工廠。 根據執行環境,type屬性決定是否需要發現BatchTableSourceFactory / StreamTableSourceFactory(對於source),BatchTableSinkFactory / StreamTableSinkFactory(對於 sink)還是同時發現兩者(對於兩者)

在Table&SQL API中使用TableFactory

 對於使用說明性Scaladoc / Javadoc的類型安全的編程方法,Table&SQL API在org.apache.flink.table.descriptor中提供了描述符,這些描述符可轉換為基於字符串的屬性。 請參閱源,接收器和格式的內置描述符作為參考。

import org.apache.flink.table.descriptors.ConnectorDescriptor
import java.util.HashMap
import java.util.Map

/**
  * Connector to MySystem with debug mode.
  */
class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, false) {
  
  override protected def toConnectorProperties(): Map[String, String] = {
    val properties = new HashMap[String, String]
    properties.put("connector.debug", isDebug.toString)
    properties
  }
}

然后可以在API中使用描述符,如下所示:

val tableEnv: StreamTableEnvironment = // ...

tableEnv
  .connect(new MySystemConnector(isDebug = true))
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MySystemTable")

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM