Flink SQL ElasticSearch connector 僅支持流模式,也僅支持做 sink :
Sink: Streaming Append Mode Sink: Streaming Upsert Mode Format: JSON-only
注:Flink 提供的只有這些,自己也可以實現
ElastincSearch connector 可以在upsert 模式下運行,以使用查詢定義的 key 與外部系統交換UPSERT / DELETE消息。
對於 append-only 查詢,connector 還可以在 append 模式下操作,以僅與外部系統交換INSERT消息。 如果查詢未定義鍵,則Elasticsearch自動生成一個鍵。
DDL 定義如下:
CREATE TABLE MyUserTable ( ... ) WITH ( 'connector.type' = 'elasticsearch', -- required: specify this table type is elasticsearch 'connector.version' = '6', -- required: valid connector versions are "6" 'connector.hosts' = 'http://host_name:9092;http://host_name:9093', -- required: one or more Elasticsearch hosts to connect to 'connector.index' = 'MyUsers', -- required: Elasticsearch index 'connector.document-type' = 'user', -- required: Elasticsearch document type 'update-mode' = 'append', -- optional: update mode when used as table sink. 'connector.key-delimiter' = '$', -- optional: delimiter for composite keys ("_" by default) -- e.g., "$" would result in IDs "KEY1$KEY2$KEY3" 'connector.key-null-literal' = 'n/a', -- optional: representation for null fields in keys ("null" by default) 'connector.failure-handler' = '...', -- optional: failure handling strategy in case a request to -- Elasticsearch fails ("fail" by default). -- valid strategies are -- "fail" (throws an exception if a request fails and -- thus causes a job failure), -- "ignore" (ignores failures and drops the request), -- "retry-rejected" (re-adds requests that have failed due -- to queue capacity saturation), -- or "custom" for failure handling with a -- ActionRequestFailureHandler subclass -- optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency 'connector.flush-on-checkpoint' = 'true', -- optional: disables flushing on checkpoint (see notes below!) -- ("true" by default) 'connector.bulk-flush.max-actions' = '42', -- optional: maximum number of actions to buffer -- for each bulk request 'connector.bulk-flush.max-size' = '42 mb', -- optional: maximum size of buffered actions in bytes -- per bulk request -- (only MB granularity is supported) 'connector.bulk-flush.interval' = '60000', -- optional: bulk flush interval (in milliseconds) 'connector.bulk-flush.back-off.type' = '...', -- optional: backoff strategy ("disabled" by default) -- valid strategies are "disabled", "constant", -- or "exponential" 'connector.bulk-flush.back-off.max-retries' = '3', -- optional: maximum number of retries 'connector.bulk-flush.back-off.delay' = '30000', -- optional: delay between each backoff attempt -- (in milliseconds) -- optional: connection properties to be used during REST communication to Elasticsearch 'connector.connection-max-retry-timeout' = '3', -- optional: maximum timeout (in milliseconds) -- between retries 'connector.connection-path-prefix' = '/v1' -- optional: prefix string to be added to every -- REST communication 'format.type' = '...', -- required: Elasticsearch connector requires to specify a format, ... -- currently only 'json' format is supported. -- Please refer to Table Formats section for more details. )
Flink自動從查詢中提取有效 key。 例如,查詢SELECT a,b,c FROM t GROUP BY a,b定義了字段a和b的組合鍵。 Elasticsearch connector 通過使用關鍵字定界符按查詢中定義的順序串聯所有關鍵字字段,為每一行生成一個文檔ID。 可以定義鍵字段的空文字的自定義表示形式。
官網提供的DDL 定義,至少我已經發現添加如下參數,會報找不到合適的 TableSinkFactory
'connector.bulk-flush.back-off.max-retries' = '3',
'connector.bulk-flush.back-off.delay' = '10000'
報錯如下:
Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. Reason: No factory supports all properties. The matching candidates: org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory Unsupported property keys: connector.bulk-flush.back-off.max-retries connector.bulk-flush.back-off.delay
必須說下這個報錯了,使用 SQL 經常 會遇到這個報錯,我遇到的大概有兩種原因:
1、相應的jar 包沒有添加
2、with 中的配置有錯
好了,看下實例:
添加對應依賴:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
SQL 如下:
-- 讀 json,寫csv ---sourceTable CREATE TABLE user_log( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3) ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json' ); ---sinkTable CREATE TABLE user_log_sink ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts VARCHAR --ts TIMESTAMP(3) ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://venn:9200', 'connector.index' = 'user_behavior', 'connector.document-type' = 'user', 'connector.bulk-flush.interval' = '6000', 'connector.connection-max-retry-timeout' = '3', 'connector.bulk-flush.back-off.max-retries' = '3', 'connector.bulk-flush.back-off.delay' = '10000', --'connector.connection-path-prefix' = '/v1', 'update-mode' = 'upsert', 'format.type' = 'json' ); -- es sink is upsert, can update, use group key as es id ... 這段SQL 是亂寫的。。 ---insert INSERT INTO user_log_sink --SELECT user_id, item_id, category_id, behavior, ts --FROM user_log; SELECT cast(COUNT(*) as VARCHAR ) dt, cast(COUNT(*) as VARCHAR ) AS pv, cast(COUNT(DISTINCT user_id)as VARCHAR ) AS uv, MAX(behavior), DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:s0') FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:s0');
看下寫到ES 中的數據:
搞定
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文