Flink 1.10 SQL 寫ElasticSearch


官網對應頁面:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

Flink SQL ElasticSearch connector 僅支持流模式,也僅支持做 sink :

  Sink: Streaming Append Mode Sink: Streaming Upsert Mode Format: JSON-only

注:Flink 提供的只有這些,自己也可以實現

ElastincSearch connector 可以在upsert 模式下運行,以使用查詢定義的 key 與外部系統交換UPSERT / DELETE消息。

對於 append-only 查詢,connector  還可以在 append  模式下操作,以僅與外部系統交換INSERT消息。 如果查詢未定義鍵,則Elasticsearch自動生成一個鍵。

DDL 定義如下:

CREATE TABLE MyUserTable (
  ...
) WITH (
  'connector.type' = 'elasticsearch', -- required: specify this table type is elasticsearch
  
  'connector.version' = '6',          -- required: valid connector versions are "6"
  
  'connector.hosts' = 'http://host_name:9092;http://host_name:9093',  -- required: one or more Elasticsearch hosts to connect to

  'connector.index' = 'MyUsers',       -- required: Elasticsearch index

  'connector.document-type' = 'user',  -- required: Elasticsearch document type

  'update-mode' = 'append',            -- optional: update mode when used as table sink.           

  'connector.key-delimiter' = '$',     -- optional: delimiter for composite keys ("_" by default)
                                       -- e.g., "$" would result in IDs "KEY1$KEY2$KEY3"

  'connector.key-null-literal' = 'n/a',  -- optional: representation for null fields in keys ("null" by default)

  'connector.failure-handler' = '...',   -- optional: failure handling strategy in case a request to 
                                         -- Elasticsearch fails ("fail" by default).
                                         -- valid strategies are 
                                         -- "fail" (throws an exception if a request fails and
                                         -- thus causes a job failure), 
                                         -- "ignore" (ignores failures and drops the request),
                                         -- "retry-rejected" (re-adds requests that have failed due 
                                         -- to queue capacity saturation), 
                                         -- or "custom" for failure handling with a
                                         -- ActionRequestFailureHandler subclass

  -- optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
  'connector.flush-on-checkpoint' = 'true',   -- optional: disables flushing on checkpoint (see notes below!)
                                              -- ("true" by default)
  'connector.bulk-flush.max-actions' = '42',  -- optional: maximum number of actions to buffer 
                                              -- for each bulk request
  'connector.bulk-flush.max-size' = '42 mb',  -- optional: maximum size of buffered actions in bytes
                                              -- per bulk request
                                              -- (only MB granularity is supported)
  'connector.bulk-flush.interval' = '60000',  -- optional: bulk flush interval (in milliseconds)
  'connector.bulk-flush.back-off.type' = '...',       -- optional: backoff strategy ("disabled" by default)
                                                      -- valid strategies are "disabled", "constant",
                                                      -- or "exponential"
  'connector.bulk-flush.back-off.max-retries' = '3',  -- optional: maximum number of retries
  'connector.bulk-flush.back-off.delay' = '30000',    -- optional: delay between each backoff attempt
                                                      -- (in milliseconds)

  -- optional: connection properties to be used during REST communication to Elasticsearch
  'connector.connection-max-retry-timeout' = '3',     -- optional: maximum timeout (in milliseconds)
                                                      -- between retries
  'connector.connection-path-prefix' = '/v1'          -- optional: prefix string to be added to every
                                                      -- REST communication
                                                      
  'format.type' = '...',   -- required: Elasticsearch connector requires to specify a format,
  ...                      -- currently only 'json' format is supported.
                           -- Please refer to Table Formats section for more details.
)

Flink自動從查詢中提取有效 key。 例如,查詢SELECT a,b,c FROM t GROUP BY a,b定義了字段a和b的組合鍵。 Elasticsearch connector  通過使用關鍵字定界符按查詢中定義的順序串聯所有關鍵字字段,為每一行生成一個文檔ID。 可以定義鍵字段的空文字的自定義表示形式。

官網提供的DDL 定義,至少我已經發現添加如下參數,會報找不到合適的 TableSinkFactory

'connector.bulk-flush.back-off.max-retries' = '3',
'connector.bulk-flush.back-off.delay' = '10000'

報錯如下:

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
Unsupported property keys:
connector.bulk-flush.back-off.max-retries
connector.bulk-flush.back-off.delay

必須說下這個報錯了,使用 SQL 經常 會遇到這個報錯,我遇到的大概有兩種原因:

  1、相應的jar 包沒有添加

  2、with 中的配置有錯

flink sql 會根據dll 的schame 和 classpath 中的內容, 自動推斷 需要使用 的 TableSinkFactory
 
如果ddl 不對,或者 classpath 中沒有對應的 TableSinkFactory 都會報 這個錯
 

好了,看下實例:

添加對應依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>    

SQL 如下:

-- 讀 json,寫csv
---sourceTable
CREATE TABLE user_log(
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'connector.startup-mode' = 'earliest-offset',
    'format.type' = 'json'
);

---sinkTable
CREATE TABLE user_log_sink (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts  VARCHAR
    --ts TIMESTAMP(3)
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '7',
    'connector.hosts' = 'http://venn:9200',
    'connector.index' = 'user_behavior',
    'connector.document-type' = 'user',
    'connector.bulk-flush.interval' = '6000',
    'connector.connection-max-retry-timeout' = '3',
    'connector.bulk-flush.back-off.max-retries' = '3',
    'connector.bulk-flush.back-off.delay' = '10000',
    --'connector.connection-path-prefix' = '/v1',
    'update-mode' = 'upsert',
     'format.type' = 'json'
);
-- es sink is upsert, can update, use group key as es id ... 這段SQL 是亂寫的。。
---insert
INSERT INTO user_log_sink
--SELECT user_id, item_id, category_id, behavior, ts
--FROM user_log;
SELECT
  cast(COUNT(*) as VARCHAR ) dt,
  cast(COUNT(*) as VARCHAR ) AS pv,
  cast(COUNT(DISTINCT user_id)as VARCHAR ) AS uv,
  MAX(behavior),
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:s0')
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:s0');

看下寫到ES 中的數據:

搞定

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM