本文翻譯自官網:Connect to External Systems https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html
注:本文對應代碼段為多種格式,影響文章篇幅,所以只選取其中一種類似列入,全部內容見官網對應頁面
Flink 的 Table API 和 SQL 程序可以連接到其他外部系統,以讀取和寫入批處理表和流式表。表源提供對存儲在外部系統(例如數據庫,鍵值存儲,消息隊列或文件系統)中的數據的訪問。表接收器將表發送到外部存儲系統。根據源和接收器的類型,它們支持不同的格式,例如 CSV,Parquet或ORC。
本頁介紹如何聲明內置表源/表接收器以及如何在 Flink中注冊它們。注冊源或接收器后,可以通過 Table API 和 SQL 語句對其進行訪問。
注意如果要實現自己的定制表源或接收器,請查看用戶定義的源和接收器頁面。
依賴
下表列出了所有可用的連接器和格式。它們的相互兼容性在表連接器和表格式的相應部分中進行了標記。下表提供了使用構建自動化工具(例如Maven或SBT)和帶有SQL JAR捆綁包的SQL Client的兩個項目的依賴項信息。
連接器
| Name | Version | Maven dependency | SQL Client JAR |
|---|---|---|---|
| Filesystem | Built-in | Built-in | |
| Elasticsearch | 6 | flink-connector-elasticsearch6 |
Download |
| Apache Kafka | 0.8 | flink-connector-kafka-0.8 |
Not available |
| Apache Kafka | 0.9 | flink-connector-kafka-0.9 |
Download |
| Apache Kafka | 0.10 | flink-connector-kafka-0.10 |
Download |
| Apache Kafka | 0.11 | flink-connector-kafka-0.11 |
Download |
| Apache Kafka | 0.11+ (universal) |
flink-connector-kafka |
Download |
| HBase | 1.4.3 | flink-hbase |
Download |
| JDBC | flink-jdbc |
Download |
格式
| Name | Maven dependency | SQL Client JAR |
|---|---|---|
| Old CSV (for files) | Built-in | Built-in |
| CSV (for Kafka) | flink-csv |
Download |
| JSON | flink-json |
Download |
| Apache Avro | flink-avro |
Download |
總覽
從Flink 1.6開始,與外部系統的連接聲明與實際實現分開了。
可以指定連接
- 以編程方式使用org.apache.flink.table.descriptors下的Table&SQL API的Descriptor 。
- 通過用於SQL客戶端的YAML配置文件聲明。
這不僅可以更好地統一API和SQL Client,還可以在自定義實現的情況下更好地擴展而不更改實際聲明。
每個聲明都類似於SQL CREATE TABLE語句。 可以定義表的名稱,表的結構,連接器以及用於連接到外部系統的數據格式。
連接器描述了存儲表數據的外部系統。 可以在此處聲明諸如Apacha Kafka之類的存儲系統或常規文件系統。 連接器可能已經提供了帶有字段和結構的固定格式。
某些系統支持不同的數據格式。 例如,存儲在Kafka或文件中的表可以使用CSV,JSON或Avro對行進行編碼。 數據庫連接器可能需要此處的表結構。 每個連接器都記錄了存儲系統是否需要格式的定義。 不同的系統還需要不同類型的格式(例如,面向列的格式與面向行的格式)。 該文檔說明了哪些格式類型和連接器兼容。
表結構定義了公開給SQL查詢的表的結構。 它描述了源如何將數據格式映射到表模式,反之亦然。 該模式可以訪問由連接器或格式定義的字段。 它可以使用一個或多個字段來提取或插入時間屬性。 如果輸入字段沒有確定性的字段順序,則該結構將明確定義列名稱,其順序和來源。
隨后的部分將更詳細地介紹每個定義部分(連接器,格式和結構)。 以下示例顯示如何傳遞它們:
Java、Scala 定義 tableEnvironment .connect(...) .withFormat(...) .withSchema(...) .inAppendMode() .registerTableSource("MyTable")
表格的類型(源,接收器或兩者)決定了表格的注冊方式。 對於兩種表類型,表源和表接收器都以相同的名稱注冊。 從邏輯上講,這意味着我們可以像讀取常規DBMS中的表一樣讀取和寫入該表。
對於流查詢,更新模式聲明了如何在動態表和存儲系統之間進行通信以進行連續查詢。
以下代碼顯示了如何連接到Kafka以讀取Avro記錄的完整示例。
Java、Scala 定義
tableEnvironment // declare the external system to connect to .connect( new Kafka() .version("0.10") .topic("test-input") .startFromEarliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) // declare a format for this system .withFormat( new Avro() .avroSchema( "{" + " \"namespace\": \"org.myorganization\"," + " \"type\": \"record\"," + " \"name\": \"UserMessage\"," + " \"fields\": [" + " {\"name\": \"timestamp\", \"type\": \"string\"}," + " {\"name\": \"user\", \"type\": \"long\"}," + " {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" + " ]" + "}" ) ) // declare the schema of the table .withSchema( new Schema() .field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("timestamp") .watermarksPeriodicBounded(60000) ) .field("user", Types.LONG) .field("message", Types.STRING) ) // specify the update-mode for streaming tables .inAppendMode() // register as source, sink, or both and under a name .registerTableSource("MyUserTable");
兩種方式都將所需的連接屬性轉換為標准化的基於字符串的鍵值對。 所謂的表工廠根據鍵值對創建已配置的表源,表接收器和相應的格式。 搜索完全匹配的表工廠時,會考慮到所有可通過Java服務提供商接口(SPI)找到的表工廠。
如果找不到給定屬性的工廠或多個工廠匹配,則將引發異常,並提供有關考慮的工廠和支持的屬性的其他信息。
表結構
表結構定義列的名稱和類型,類似於SQL CREATE TABLE語句的列定義。 此外,可以指定如何將列與表數據編碼格式的字段進行映射。 如果列名應與輸入/輸出格式不同,則字段的來源可能很重要。 例如,一列user_name應該引用JSON格式的$$-user-name字段。 此外,需要使用該結構將類型從外部系統映射到Flink的表示形式。 如果是表接收器,則可確保僅將具有有效結構的數據寫入外部系統。
以下示例顯示了一個沒有時間屬性的簡單架構,並且輸入/輸出到表列的一對一字段映射。
Java、Scala 定義
.withSchema( new Schema() .field("MyField1", Types.INT) // required: specify the fields of the table (in this order) .field("MyField2", Types.STRING) .field("MyField3", Types.BOOLEAN) )
對於每個字段,除列的名稱和類型外,還可以聲明以下屬性:
Java、Scala 定義
.withSchema( new Schema() .field("MyField1", Types.SQL_TIMESTAMP) .proctime() // optional: declares this field as a processing-time attribute .field("MyField2", Types.SQL_TIMESTAMP) .rowtime(...) // optional: declares this field as a event-time attribute .field("MyField3", Types.BOOLEAN) .from("mf3") // optional: original field in the input that is referenced/aliased by this field )
使用無界流表時,時間屬性至關重要。 因此,處理時間和事件時間(也稱為“行時間”)屬性都可以定義為架構的一部分。
有關Flink中時間處理(尤其是事件時間)的更多信息,我們建議使用常規事件時間部分。
行時間屬性
為了控制表的事件時間行為,Flink提供了預定義的時間戳提取器和水印策略。
支持以下時間戳提取器:
Java、Scala
// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute. .rowtime( new Rowtime() .timestampsFromField("ts_field") // required: original field name in the input ) // Converts the assigned timestamps from a DataStream API record into the rowtime attribute // and thus preserves the assigned timestamps from the source. // This requires a source that assigns timestamps (e.g., Kafka 0.10+). .rowtime( new Rowtime() .timestampsFromSource() ) // Sets a custom timestamp extractor to be used for the rowtime attribute. // The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`. .rowtime( new Rowtime() .timestampsFromExtractor(...) )
支持以下水印策略:
Java、Scala
// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum // observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp // are not late. .rowtime( new Rowtime() .watermarksPeriodicAscending() ) // Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval. // Emits watermarks which are the maximum observed timestamp minus the specified delay. .rowtime( new Rowtime() .watermarksPeriodicBounded(2000) // delay in milliseconds ) // Sets a built-in watermark strategy which indicates the watermarks should be preserved from the // underlying DataStream API and thus preserves the assigned watermarks from the source. .rowtime( new Rowtime() .watermarksFromSource() )
確保始終聲明時間戳和水印。 觸發基於時間的操作需要水印。
類型字符串
由於類型信息僅在編程語言中可用,因此支持在YAML文件中定義以下類型字符串:
VARCHAR BOOLEAN TINYINT SMALLINT INT BIGINT FLOAT DOUBLE DECIMAL DATE TIME TIMESTAMP MAP<fieldtype, fieldtype> # generic map; e.g. MAP<VARCHAR, INT> that is mapped to Flink's MapTypeInfo MULTISET<fieldtype> # multiset; e.g. MULTISET<VARCHAR> that is mapped to Flink's MultisetTypeInfo PRIMITIVE_ARRAY<fieldtype> # primitive array; e.g. PRIMITIVE_ARRAY<INT> that is mapped to Flink's PrimitiveArrayTypeInfo OBJECT_ARRAY<fieldtype> # object array; e.g. OBJECT_ARRAY<POJO(org.mycompany.MyPojoClass)> that is mapped to # Flink's ObjectArrayTypeInfo ROW<fieldtype, ...> # unnamed row; e.g. ROW<VARCHAR, INT> that is mapped to Flink's RowTypeInfo # with indexed fields names f0, f1, ... ROW<fieldname fieldtype, ...> # named row; e.g., ROW<myField VARCHAR, myOtherField INT> that # is mapped to Flink's RowTypeInfo POJO<class> # e.g., POJO<org.mycompany.MyPojoClass> that is mapped to Flink's PojoTypeInfo ANY<class> # e.g., ANY<org.mycompany.MyClass> that is mapped to Flink's GenericTypeInfo ANY<class, serialized> # used for type information that is not supported by Flink's Table & SQL API
更新模式
對於流查詢,需要聲明如何在動態表和外部連接器之間執行轉換。 更新模式指定應與外部系統交換的消息類型:
Append Mode: 在追加模式下,動態表和外部連接器僅交換INSERT消息。
Retract Mode: 在撤回模式下,動態表和外部連接器交換ADD和RETRACT消息。 INSERT更改被編碼為ADD消息,DELETE更改為RETRACT消息,UPDATE更改為更新(先前)行的RETRACT消息,而ADD消息被更新(新)行的ADD消息。 在此模式下,與upsert模式相反,不得定義密鑰。 但是,每個更新都包含兩個效率較低的消息。
Upsert Mode: 在upsert模式下,動態表和外部連接器交換UPSERT和DELETE消息。 此模式需要一個(可能是復合的)唯一密鑰,通過該密鑰可以傳播更新。 外部連接器需要了解唯一鍵屬性,才能正確應用消息。 INSERT和UPDATE更改被編碼為UPSERT消息。 DELETE更改為DELETE消息。 與撤回流的主要區別在於UPDATE更改使用單個消息進行編碼,因此效率更高。
注意:每個連接器的文檔都說明了支持哪些更新模式。
Java、Scala .connect(...) .inAppendMode() // otherwise: inUpsertMode() or inRetractMode()
另請參閱常規流概念文檔。
表連接器
Flink提供了一組用於連接到外部系統的連接器。
請注意,並非所有連接器都可以批量和流式使用。 此外,並非每個流連接器都支持每種流模式。 因此,每個連接器都有相應的標記。 格式標簽表示連接器需要某種類型的格式。
文件系統連接器
Source: Batch, Source: Streaming, Append Mode, Sink: Batch, Sink: Streaming, Append Mode Format: CSV-only
文件系統連接器允許從本地或分布式文件系統讀取和寫入。 文件系統可以定義為:
Java、Scala .connect( new FileSystem() .path("file:///path/to/whatever") // required: path to a file or directory )
文件系統連接器本身包含在Flink中,不需要其他依賴項。 需要指定一種相應的格式,以便在文件系統中讀取和寫入行。
注意:確保包括Flink File System特定的依賴項。
注意文件系統源和流接收器僅是實驗性的。 將來,我們將支持實際的流傳輸用例,即目錄監視和存儲桶輸出。
Kafka連接器
Source: Streaming Append Mode, Sink: Streaming Append Mode, Format: Serialization, Schema Format: Deserialization Schema
Kafka連接器允許在Apache Kafka主題之間進行讀寫。 可以定義如下:
.connect( new Kafka() .version("0.11") // required: valid connector versions are // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("...") // required: topic name from which the table is read // optional: connector specific properties .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .property("group.id", "testGroup") // optional: select a startup mode for Kafka offsets .startFromEarliest() .startFromLatest() .startFromSpecificOffsets(...) // optional: output partitioning from Flink's partitions into Kafka's partitions .sinkPartitionerFixed() // each Flink partition ends up in at-most one Kafka partition (default) .sinkPartitionerRoundRobin() // a Flink partition is distributed to Kafka partitions round-robin .sinkPartitionerCustom(MyCustom.class) // use a custom FlinkKafkaPartitioner subclass )
Specify the start reading position: 默認情況下,Kafka源將從Zookeeper或Kafka代理中的已提交組偏移中開始讀取數據。 您可以指定其他起始位置,這些位置與“ Kafka Consumers起始位置配置”部分中的配置相對應。
Flink-Kafka Sink Partitioning: 默認情況下,Kafka接收器最多可以寫入與其自身並行性一樣多的分區(每個並行的接收器實例都寫入一個分區)。 為了將寫入內容分配到更多分區或控制行到分區的路由,可以提供自定義接收器分區程序。 循環分區器對於避免不平衡分區很有用。 但是,這將導致所有Flink實例與所有Kafka代理之間的大量網絡連接。
Consistency guarantees: 默認情況下,如果在啟用檢查點的情況下執行查詢,則Kafka接收器會將具有至少一次保證的數據提取到Kafka主題中。
Kafka 0.10+ Timestamps: 從Kafka 0.10開始,Kafka消息具有時間戳作為元數據,用於指定何時將記錄寫入Kafka主題。 通過選擇時間戳,可以將這些時間戳用於rowtime屬性:分別是YAML中的from-source和Java / Scala中的timestampsFromSource()。
Kafka 0.11+ Versioning: 從Flink 1.7開始,Kafka連接器定義獨立於硬編碼的Kafka版本。 將通用連接器版本用作Flink Kafka連接器的通配符,該連接器與所有版本從0.11開始的Kafka兼容。
Elasticsearch連接器
Sink: Streaming Append Mode, Sink: Streaming Upsert Mode, Format: JSON-only
Elasticsearch連接器允許寫入Elasticsearch搜索引擎的索引。
連接器可以在upsert模式下運行,以使用查詢定義的密鑰與外部系統交換UPSERT / DELETE消息。
對於 appen-only 查詢,連接器還可以在追加模式下操作,以僅與外部系統交換INSERT消息。 如果查詢未定義任何鍵,則Elasticsearch自動生成一個鍵。
連接器可以定義如下:
.connect( new Elasticsearch() .version("6") // required: valid connector versions are "6" .host("localhost", 9200, "http") // required: one or more Elasticsearch hosts to connect to .index("MyUsers") // required: Elasticsearch index .documentType("user") // required: Elasticsearch document type .keyDelimiter("$") // optional: delimiter for composite keys ("_" by default) // e.g., "$" would result in IDs "KEY1$KEY2$KEY3" .keyNullLiteral("n/a") // optional: representation for null fields in keys ("null" by default) // optional: failure handling strategy in case a request to Elasticsearch fails (fail by default) .failureHandlerFail() // optional: throws an exception if a request fails and causes a job failure .failureHandlerIgnore() // or ignores failures and drops the request .failureHandlerRetryRejected() // or re-adds requests that have failed due to queue capacity saturation .failureHandlerCustom(...) // or custom failure handling with a ActionRequestFailureHandler subclass // optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency .disableFlushOnCheckpoint() // optional: disables flushing on checkpoint (see notes below!) .bulkFlushMaxActions(42) // optional: maximum number of actions to buffer for each bulk request .bulkFlushMaxSize("42 mb") // optional: maximum size of buffered actions in bytes per bulk request // (only MB granularity is supported) .bulkFlushInterval(60000L) // optional: bulk flush interval (in milliseconds) .bulkFlushBackoffConstant() // optional: use a constant backoff type .bulkFlushBackoffExponential() // or use an exponential backoff type .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff attempt (in milliseconds) // optional: connection properties to be used during REST communication to Elasticsearch .connectionMaxRetryTimeout(3) // optional: maximum timeout (in milliseconds) between retries .connectionPathPrefix("/v1") // optional: prefix string to be added to every REST communication )
Bulk flushing: 有關可選的刷新參數的特征的更多信息,請參見相應的低級文檔。
Disabling flushing on checkpoint: 禁用后,接收器將不等待Elasticsearch在檢查點上確認所有阻塞的操作請求。 因此,接收器不會為動作請求的至少一次傳遞提供任何有力的保證。
Key extraction: Flink自動從查詢中提取有效鍵。 例如,查詢SELECT a,b,c FROM t GROUP BY a,b定義了字段a和b的組合鍵。 Elasticsearch連接器通過使用關鍵字定界符按查詢中定義的順序連接所有關鍵字字段,為每一行生成一個文檔ID字符串。 可以定義鍵字段的空文字的自定義表示形式。
注意:JSON格式定義了如何為外部系統編碼文檔,因此,必須將其添加為依賴項。
HBase連接器
Source: Batch, Sink: Batch, Sink: Streaming Append Mode, Sink: Streaming Upsert Mode, Temporal Join: Sync Mode
HBase連接器允許讀取和寫入HBase群集。
連接器可以在upsert模式下運行,以使用查詢定義的密鑰與外部系統交換UPSERT / DELETE消息。
對於 append-only 查詢,連接器還可以在追加模式下操作,以僅與外部系統交換INSERT消息。
連接器可以定義如下:
connector: type: hbase version: "1.4.3" # required: currently only support "1.4.3" table-name: "hbase_table_name" # required: HBase table name zookeeper: quorum: "localhost:2181" # required: HBase Zookeeper quorum configuration znode.parent: "/test" # optional: the root dir in Zookeeper for HBase cluster. # The default value is "/hbase". write.buffer-flush: max-size: "10mb" # optional: writing option, determines how many size in memory of buffered # rows to insert per round trip. This can help performance on writing to JDBC # database. The default value is "2mb". max-rows: 1000 # optional: writing option, determines how many rows to insert per round trip. # This can help performance on writing to JDBC database. No default value, # i.e. the default flushing is not depends on the number of buffered rows. interval: "2s" # optional: writing option, sets a flush interval flushing buffered requesting # if the interval passes, in milliseconds. Default value is "0s", which means # no asynchronous flush thread will be scheduled.
Columns: HBase表中的所有列系列必須聲明為ROW類型,字段名稱映射到列 family 名稱,而嵌套的字段名稱映射到列 qualifier 名稱。 無需在結構中聲明所有族和限定符,用戶可以聲明必要的內容。 除ROW type字段外,原子類型的唯一一個字段(例如STRING,BIGINT)將被識別為表的行鍵。 行鍵字段的名稱沒有任何限制。
Temporary join: 針對HBase的查找聯接不使用任何緩存; 始終總是通過HBase客戶端直接查詢數據。
Java/Scala/Python API: Java/Scala/Python APIs 還不支持
JDBC連接器
Source: Batch, Sink: Batch, Sink: Streaming Append Mode, Sink: Streaming Upsert Mode, Temporal Join: Sync Mode
JDBC連接器允許讀取和寫入JDBC客戶端。
連接器可以在upsert模式下運行,以使用查詢定義的密鑰與外部系統交換UPSERT / DELETE消息。
對於 append-only 查詢,連接器還可以在追加模式下操作,以僅與外部系統交換INSERT消息。
要使用JDBC連接器,需要選擇一個實際的驅動程序來使用。 當前支持以下驅動程序:
支持的驅動:
| Name | Group Id | Artifact Id | JAR |
|---|---|---|---|
| MySQL | mysql | mysql-connector-java | Download |
| PostgreSQL | org.postgresql | postgresql | Download |
| Derby | org.apache.derby | derby | Download |
連接器可以定義如下:
connector: type: jdbc url: "jdbc:mysql://localhost:3306/flink-test" # required: JDBC DB url table: "jdbc_table_name" # required: jdbc table name driver: "com.mysql.jdbc.Driver" # optional: the class name of the JDBC driver to use to connect to this URL. # If not set, it will automatically be derived from the URL. username: "name" # optional: jdbc user name and password password: "password" read: # scan options, optional, used when reading from table partition: # These options must all be specified if any of them is specified. In addition, partition.num must be specified. They # describe how to partition the table when reading in parallel from multiple tasks. partition.column must be a numeric, # date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide # the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. # This option applies only to reading. column: "column_name" # optional, name of the column used for partitioning the input. num: 50 # optional, the number of partitions. lower-bound: 500 # optional, the smallest value of the first partition. upper-bound: 1000 # optional, the largest value of the last partition. fetch-size: 100 # optional, Gives the reader a hint as to the number of rows that should be fetched # from the database when reading per round trip. If the value specified is zero, then # the hint is ignored. The default value is zero. lookup: # lookup options, optional, used in temporary join cache: max-rows: 5000 # optional, max number of rows of lookup cache, over this value, the oldest rows will # be eliminated. "cache.max-rows" and "cache.ttl" options must all be specified if any # of them is specified. Cache is not enabled as default. ttl: "10s" # optional, the max time to live for each rows in lookup cache, over this time, the oldest rows # will be expired. "cache.max-rows" and "cache.ttl" options must all be specified if any of # them is specified. Cache is not enabled as default. max-retries: 3 # optional, max retry times if lookup database failed write: # sink options, optional, used when writing into table flush: max-rows: 5000 # optional, flush max size (includes all append, upsert and delete records), # over this number of records, will flush data. The default value is "5000". interval: "2s" # optional, flush interval mills, over this time, asynchronous threads will flush data. # The default value is "0s", which means no asynchronous flush thread will be scheduled. max-retries: 3 # optional, max retry times if writing records to database failed.
Upsert sink: Flink自動從查詢中提取有效鍵。 例如,查詢SELECT a,b,c FROM t GROUP BY a,b定義了字段a和b的組合鍵。 如果將JDBC表用作upsert接收器,請確保查詢的鍵是基礎數據庫的唯一鍵集或主鍵之一。 這樣可以保證輸出結果符合預期。
Temporary Join: JDBC連接器可以在臨時聯接中用作查找源。 當前,僅支持同步查找模式。 如果指定了查找緩存選項(connector.lookup.cache.max-rows和connector.lookup.cache.ttl),則必須全部指定它們。 查找緩存用於通過首先查詢緩存而不是將所有請求發送到遠程數據庫來提高臨時連接JDBC連接器的性能。 但是,如果來自緩存,則返回的值可能不是最新的。 因此,這是吞吐量和正確性之間的平衡。
Writing: 默認情況下,connector.write.flush.interval為0s,connector.write.flush.max-rows為5000,這意味着對於低流量查詢,緩沖的輸出行可能不會長時間刷新到數據庫。 因此,建議設置間隔配置。
表格格式
Flink提供了一組表格式,可與表連接器一起使用。
格式標簽表示與連接器匹配的格式類型。
CSV格式
Format: Serialization Schema, Format: Deserialization Schema
CSV格式旨在符合Internet工程任務組(IETF)提出的RFC-4180(“逗號分隔值(CSV)文件的通用格式和MIME類型”)。
該格式允許讀取和寫入與給定格式模式對應的CSV數據。 格式結構可以定義為Flink類型,也可以從所需的表結構派生。
如果格式模式等於表結構,則也可以自動派生該結構。 這僅允許定義一次結構信息。 格式的名稱,類型和字段的順序由表的結構確定。 如果時間屬性的來源不是字段,則將忽略它們。 表模式中的from定義被解釋為以該格式重命名的字段。
CSV格式可以如下使用:
.withFormat( new Csv() // required: define the schema either by using type information .schema(Type.ROW(...)) // or use the table's schema .deriveSchema() .fieldDelimiter(';') // optional: field delimiter character (',' by default) .lineDelimiter("\r\n") // optional: line delimiter ("\n" by default; // otherwise "\r" or "\r\n" are allowed) .quoteCharacter('\'') // optional: quote character for enclosing field values ('"' by default) .allowComments() // optional: ignores comment lines that start with '#' (disabled by default); // if enabled, make sure to also ignore parse errors to allow empty rows .ignoreParseErrors() // optional: skip fields and rows with parse errors instead of failing; // fields are set to null in case of errors .arrayElementDelimiter("|") // optional: the array element delimiter string for separating // array and row element values (";" by default) .escapeCharacter('\\') // optional: escape character for escaping values (disabled by default) .nullLiteral("n/a") // optional: null literal string that is interpreted as a // null value (disabled by default) )
下表列出了可以讀取和寫入的受支持類型:
| Supported Flink SQL Types |
|---|
ROW |
VARCHAR |
ARRAY[_] |
INT |
BIGINT |
FLOAT |
DOUBLE |
BOOLEAN |
DATE |
TIME |
TIMESTAMP |
DECIMAL |
NULL (unsupported yet) |
Numeric types: 值應該是數字,但字面量“ null”也可以理解。 空字符串被視為null。 值也被修剪(開頭/結尾隨空白)。 數字是使用Java的valueOf語義解析的。 其他非數字字符串可能會導致解析異常。
String and time types: 值未修剪。 文字“ null”也可以理解。 時間類型必須根據Java SQL時間格式進行格式化,且精度為毫秒。 例如:日期為2018-01-01,時間為20:43:59,時間戳為2018-01-01 20:43:59.999。
Boolean type: 值應為布爾值(“ true”,“ false”)字符串或“ null”。 空字符串被解釋為false。 值被修剪(開頭/結尾隨空白)。 其他值導致異常。
Nested types: 使用數組元素定界符可以為一級嵌套支持數組和行類型。
Primitive byte arrays: 基本字節數組以Base64編碼表示形式處理。
Line endings: 對於行末未引號的字符串字段,即使對於基於行的連接器(如Kafka)也應忽略行尾。
Escaping and quoting: 下表顯示了使用*進行轉義和使用'進行引用的轉義和引用如何影響字符串的解析的示例:
| CSV Field | Parsed String |
|---|---|
123*'4** |
123'4* |
'123''4**' |
123'4* |
'a;b*'c' |
a;b'c |
'a;b''c' |
a;b'c |
確保將CSV格式添加為依賴項。
JSON格式
Format: Serialization Schema, Format: Deserialization Schema
JSON格式允許讀取和寫入與給定格式結構相對應的JSON數據。 格式結構可以定義為Flink類型,JSON 結構或從所需的表結構派生。 Flink類型啟用了更類似於SQL的定義並映射到相應的SQL數據類型。 JSON 格式允許更復雜和嵌套的結構。
如果格式結構等於表結構,則也可以自動派生該結構。 這僅允許定義一次結構信息。 格式的名稱,類型和字段的順序由表的結構確定。 如果時間屬性的來源不是字段,則將忽略它們。 表結構中的from定義被解釋為以該格式重命名的字段。
JSON格式可以如下使用:
.withFormat( new Json() .failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default // required: define the schema either by using type information which parses numbers to corresponding types .schema(Type.ROW(...)) // or by using a JSON schema which parses to DECIMAL and TIMESTAMP .jsonSchema( "{" + " type: 'object'," + " properties: {" + " lon: {" + " type: 'number'" + " }," + " rideTime: {" + " type: 'string'," + " format: 'date-time'" + " }" + " }" + "}" ) // or use the table's schema .deriveSchema() )
下表顯示了JSON模式類型到Flink SQL類型的映射:
| JSON schema | Flink SQL |
|---|---|
object |
ROW |
boolean |
BOOLEAN |
array |
ARRAY[_] |
number |
DECIMAL |
integer |
DECIMAL |
string |
VARCHAR |
string with format: date-time |
TIMESTAMP |
string with format: date |
DATE |
string with format: time |
TIME |
string with encoding: base64 |
ARRAY[TINYINT] |
null |
NULL (unsupported yet) |
當前,Flink僅支持JSON模式規范draft-07的子集。 尚不支持聯合類型(以及allOf,anyOf和not)。 僅支持oneOf和類型數組用於指定可為空性。
支持鏈接到文檔中通用定義的簡單引用,如以下更復雜的示例所示:
{ "definitions": { "address": { "type": "object", "properties": { "street_address": { "type": "string" }, "city": { "type": "string" }, "state": { "type": "string" } }, "required": [ "street_address", "city", "state" ] } }, "type": "object", "properties": { "billing_address": { "$ref": "#/definitions/address" }, "shipping_address": { "$ref": "#/definitions/address" }, "optional_address": { "oneOf": [ { "type": "null" }, { "$ref": "#/definitions/address" } ] } } }
Missing Field Handling: 默認情況下,缺少的JSON字段設置為null。 您可以啟用嚴格的JSON解析,如果缺少字段,則將取消源(和查詢)。
確保將JSON格式添加為依賴項。
Apache Avro格式
Format: Serialization Schema, Format: Deserialization Schema
Apache Avro格式允許讀取和寫入與給定格式模式相對應的Avro數據。 格式結構可以定義為Avro特定記錄的完全限定的類名,也可以定義為Avro架構字符串。 如果使用了類名,則在運行時該類必須在類路徑中可用。
Avro格式可以如下使用:
.withFormat( new Avro() // required: define the schema either by using an Avro specific record class .recordClass(User.class) // or by using an Avro schema .avroSchema( "{" + " \"type\": \"record\"," + " \"name\": \"test\"," + " \"fields\" : [" + " {\"name\": \"a\", \"type\": \"long\"}," + " {\"name\": \"b\", \"type\": \"string\"}" + " ]" + "}" ) )
Avro類型映射到相應的SQL數據類型。 僅支持聯合類型用於指定可為空性,否則它們將轉換為ANY類型。 下表顯示了映射:
| Avro schema | Flink SQL |
|---|---|
record |
ROW |
enum |
VARCHAR |
array |
ARRAY[_] |
map |
MAP[VARCHAR, _] |
union |
non-null type or ANY |
fixed |
ARRAY[TINYINT] |
string |
VARCHAR |
bytes |
ARRAY[TINYINT] |
int |
INT |
long |
BIGINT |
float |
FLOAT |
double |
DOUBLE |
boolean |
BOOLEAN |
int with logicalType: date |
DATE |
int with logicalType: time-millis |
TIME |
int with logicalType: time-micros |
INT |
long with logicalType: timestamp-millis |
TIMESTAMP |
long with logicalType: timestamp-micros |
BIGINT |
bytes with logicalType: decimal |
DECIMAL |
fixed with logicalType: decimal |
DECIMAL |
null |
NULL (unsupported yet) |
Avro使用Joda-Time表示特定記錄類中的邏輯日期和時間類型。 Joda-Time依賴性不屬於Flink的發行版。 因此,在運行時,請確保Joda-Time和特定的記錄類在您的類路徑中。 通過模式字符串指定的Avro格式不需要顯示Joda-Time。
確保添加Apache Avro依賴項。
舊的CSV格式
注意:僅用於原型制作!
舊的CSV格式允許使用文件系統連接器讀取和寫入以逗號分隔的行。
此格式描述了Flink的非標准CSV表源/接收器。 將來,該格式將被適當的RFC兼容版本取代。 寫入Kafka時,請使用符合RFC的CSV格式。 現在,將舊版本用於流/批處理文件系統操作。
.withFormat( new OldCsv() .field("field1", Types.STRING) // required: ordered format fields .field("field2", Types.TIMESTAMP) .fieldDelimiter(",") // optional: string delimiter "," by default .lineDelimiter("\n") // optional: string delimiter "\n" by default .quoteCharacter('"') // optional: single character for string values, empty by default .commentPrefix('#') // optional: string to indicate comments, empty by default .ignoreFirstLine() // optional: ignore the first line, by default it is not skipped .ignoreParseErrors() // optional: skip records with parse error instead of failing by default )
舊的CSV格式包含在Flink中,不需要其他依賴項。
注意:目前,用於寫入行的舊CSV格式受到限制。 僅支持將自定義字段定界符作為可選參數。
更多TableSources和TableSinks
下表的源和接收器尚未遷移(或尚未完全遷移)到新的統一接口。
這些是Flink隨附的其他TableSources:
| Class name | Maven dependency | Batch? | Streaming? | Description |
OrcTableSource |
flink-orc |
Y | N | A TableSource for ORC files. |
These are the additional TableSinks which are provided with Flink:
| Class name | Maven dependency | Batch? | Streaming? | Description |
CsvTableSink |
flink-table |
Y | Append | A simple sink for CSV files. |
JDBCAppendTableSink |
flink-jdbc |
Y | Append | Writes a Table to a JDBC table. |
CassandraAppendTableSink |
flink-connector-cassandra |
N | Append | Writes a Table to a Cassandra table. |
OrcTableSource
OrcTableSource讀取ORC文件。 ORC是用於結構化數據的文件格式,並以壓縮的列表示形式存儲數據。 ORC具有很高的存儲效率,並支持投影和濾鏡下推。
創建一個OrcTableSource,如下所示:
// create Hadoop Configuration Configuration config = new Configuration(); OrcTableSource orcTableSource = OrcTableSource.builder() // path to ORC file(s). NOTE: By default, directories are recursively scanned. .path("file:///path/to/data") // schema of ORC files .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>") // Hadoop configuration .withConfiguration(config) // build OrcTableSource .build();
注意:OrcTableSource還不支持ORC的聯合類型。
CsvTableSink
CsvTableSink發出一個表到一個或多個CSV文件。
接收器僅支持 append-only 流表。 它不能用於發出連續更新的表。 有關詳細信息,請參見表到流轉換的文檔。 發出流表時,行至少寫入一次(如果啟用了檢查點),並且CsvTableSink不會將輸出文件拆分為存儲區文件,而是連續寫入相同的文件。
CsvTableSink sink = new CsvTableSink( path, // output path "|", // optional: delimit files by '|' 1, // optional: write to a single file WriteMode.OVERWRITE); // optional: override existing files tableEnv.registerTableSink( "csvOutputTable", // specify table schema new String[]{"f0", "f1"}, new TypeInformation[]{Types.STRING, Types.INT}, sink); Table table = ... table.insertInto("csvOutputTable");
JDBCAppendTableSink
JDBCAppendTableSink發出到JDBC連接的表。 接收器僅支持 append-only 流表。 它不能用於發出連續更新的表。 有關詳細信息,請參見表到流轉換的文檔。
JDBCAppendTableSink將每個Table行至少插入一次到數據庫表中(如果啟用了檢查點)。 但是,您可以使用REPLACE或INSERT OVERWRITE指定插入查詢,以執行對數據庫的向上寫入。
要使用JDBC接收器,必須將JDBC連接器依賴項(flink-jdbc)添加到項目中。 然后,您可以使用JDBCAppendSinkBuilder創建接收器:
JDBCAppendTableSink sink = JDBCAppendTableSink.builder() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:ebookshop") .setQuery("INSERT INTO books (id) VALUES (?)") .setParameterTypes(INT_TYPE_INFO) .build(); tableEnv.registerTableSink( "jdbcOutputTable", // specify table schema new String[]{"id"}, new TypeInformation[]{Types.INT}, sink); Table table = ... table.insertInto("jdbcOutputTable");
與使用JDBCOutputFormat相似,您必須顯式指定JDBC驅動程序的名稱,JDBC URL,要執行的查詢以及JDBC表的字段類型。
CassandraAppendTableSink
CassandraAppendTableSink向Cassandra表發出一個表。 接收器僅支持append 流表。 它不能用於發出連續更新的表。 有關詳細信息,請參見表到流轉換的文檔。
如果啟用了檢查點,則CassandraAppendTableSink將所有行至少插入一次到Cassandra表中。 但是,您可以將查詢指定為upsert查詢。
要使用CassandraAppendTableSink,必須將Cassandra連接器依賴項(flink-connector-cassandra)添加到項目中。 以下示例顯示了如何使用CassandraAppendTableSink。
ClusterBuilder builder = ... // configure Cassandra cluster connection CassandraAppendTableSink sink = new CassandraAppendTableSink( builder, // the query must match the schema of the table "INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)"); tableEnv.registerTableSink( "cassandraOutputTable", // specify table schema new String[]{"id", "name", "value"}, new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE}, sink); Table table = ... table.insertInto(cassandraOutputTable);
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

