Flink SQL 流式寫數據到 Hive


Flink 1.11 版本對SQL的優化是很多的,其中最重要的一點就是 hive 功能的完善,不再只是作為持久化的 Catalog,
而是可以用原生的 Flink SQL 流式的寫數據到入 hive中

本文使用官網 “Streaming Writing” 案例 (https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing),

流式寫數據到 Hive (剛好之前有同學咨詢官網的例子不能寫入成功)

The Hive Streaming Sink re-use Filesystem Streaming Sink to integrate Hadoop OutputFormat/RecordWriter to streaming writing. Hadoop RecordWriters are Bulk-encoded Formats, Bulk Formats rolls files on every checkpoint.

Hive Streaming Sink 重用 Filesystem Streaming Sink,集成Hadoop OutputFormat / RecordWriter 流式寫入。 Hadoop RecordWriters是 Bulk-encoded 格式,Bulk 格式在每個 checkpoint 上滾動文件。

環境:
  Flink 1.11.2
  Hive 2.3.6
  Hadoop 2.7
  sqlSubmit,我開源 Flink SQL 提交程序(Table Api 的方式提交 SQL,代碼已提交 Github:https://github.com/springMoon/sqlSubmit)

官網SQL 如下:

SET table.sql-dialect=hive;  -- 要指定 hive 方言,不然 hive 表創建不成功
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',   -- hive 分區提取器
  'sink.partition-commit.trigger'='partition-time',               -- 分區觸發提交
  'sink.partition-commit.delay'='1 h',      -- 提交延遲
  'sink.partition-commit.policy.kind'='metastore,success-file'    -- 提交類型
);

SET table.sql-dialect=default;  -- 換回 default 方言
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);   -- kafka 表的 tblproperties

-- streaming sql, insert into hive table  寫入的 sql, 最后兩個字段是 是寫入分區
INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

官網的案例默認是在 sql-client 中執行的,這里是用 Table Api,所以會有點不同,先看下完整的 SQL

drop table if exists user_log;
CREATE TABLE user_log (
  user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior VARCHAR
  ,ts TIMESTAMP(3)
  ,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka'
  ,'connector.version' = 'universal'
  ,'connector.topic' = 'user_behavior'
  ,'connector.properties.zookeeper.connect' = 'venn:2181'
  ,'connector.properties.bootstrap.servers' = 'venn:9092'
  ,'connector.properties.group.id' = 'user_log'
  ,'connector.startup-mode' = 'group-offsets'
  ,'connector.sink-partitioner' = 'fixed'
  ,'format.type' = 'json'
);

-- set table.sql-dialect=hive;
-- kafka sink  
drop table if exists hive_table;
CREATE TABLE hive_table (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 min',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- streaming sql, insert into hive table
INSERT INTO TABLE hive_table
SELECT user_id, item_id, category_id, behavior, DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH')
FROM user_log;

跟官網基本一樣,唯一的不同是,在指定 sql 方言的時候,Table Api 是這樣的:

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tabEnv.getConfig().setSqlDialect(SqlDialect.HIVE)

flink 方言官網: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_dialect.html#table-api

注:
partition.time-extractor.timestamp-pattern 指定分區提取器提取時間戳的格式
sink.partition-commit.trigger 觸發分區提交的類型可以指定 "process-time" 和 "partition-time" 處理時間和分區時間

如指定天、小時、分鍾三級分區:
partition.time-extractor.timestamp-pattern = $dt $hr:$ms:00
分區字段則是: DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')

hive 表數據如下:

hive> select * from hive_table limit 10;
OK
107025  1007359 4391936 pv      2017-11-26      01
334124  3904483 2520771 cart    2017-11-26      01
475192  3856358 2465336 pv      2017-11-26      01
475192  3856358 2465336 pv      2017-11-26      01
864482  3398512 1639158 pv      2017-11-26      01
987980  3225231 2578647 pv      2017-11-26      01
987980  3225231 2578647 pv      2017-11-26      01
563592  3377194 2131531 pv      2017-11-26      01
939115  241366  4756105 pv      2017-11-26      01
939115  241366  4756105 pv      2017-11-26      01
Time taken: 0.112 seconds, Fetched: 10 row(s)

hive 表對於目錄文件情況:

[venn@venn ~]$ hadoop fs -ls /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/
Found 7 items
-rw-r--r--   1 venn supergroup          0 2020-09-24 17:04 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/.part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-5.inprogress.42f75ffc-8c4d-4009-a00a-93482a96a2b8
-rw-r--r--   1 venn supergroup          0 2020-09-24 17:02 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/_SUCCESS
-rw-r--r--   1 venn supergroup       7190 2020-09-24 16:56 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-0
-rw-r--r--   1 venn supergroup       3766 2020-09-24 16:58 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-1
-rw-r--r--   1 venn supergroup       3653 2020-09-24 17:00 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-2
-rw-r--r--   1 venn supergroup       3996 2020-09-24 17:02 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-3
-rw-r--r--   1 venn supergroup       3719 2020-09-24 17:04 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-4

.part-xxx 文件就是正在寫的文件,下面幾個就是已經提交的文件

在做官網的案例的過程,還算比較順利,但是也遇到幾個問題:

  • 1、jar 包問題,寫 hive 需要 hadoop-mapreduce-client-core-2.7.7.jar
  • 2、參數 sink.partition-commit.delay 的單位支持: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds)
  • 3、hive 表日期類型字段,目前是只支持 TIMESTAMP(9),但是 flink 的 timestamp 是 3 位與 6 位(放棄 日期類型,反正 String 類型的日期 hive 也可以識別)

```java
java.time.format.DateTimeParseException: Text '2017-11-26-01 00:00:00' could not be parsed, unparsed text found at index 10
```

  • 4、web 頁面 metrics,source 塊 往 Sink 塊寫數 Records Sent 的問題, Records Sent 數對應 checkpoint 次數,因為只會在 checkpoint 的時候才會提交數據到 HDFS,這個消息應該是某個信號數據,而不是真實的數據條數 (上面貼的官網說明有講,如果沒有 checkpoint,數據會寫到 hdfs,但是會出於 inprogress狀態,並且是 "." 開頭的文件,對 hive 來說是隱藏文件,查不到的)

再貼下flink lib:

flink-connector-hbase_2.11-1.11.2.jar       flink-json-1.11.2.jar                   hbase-common-2.1.4.jar                hive-exec-2.3.6.jar                log4j-slf4j-impl-2.12.1.jar
flink-connector-hive_2.11-1.11.2.jar        flink-shaded-zookeeper-3.4.14.jar       hbase-protocol-2.1.4.jar              htrace-core4-4.2.0-incubating.jar  metrics-core-3.2.1.jar
flink-connector-kafka_2.11-1.11.2.jar       flink-table_2.11-1.11.2.jar             hbase-protocol-shaded-2.1.4.jar       kafka-clients-2.2.0.jar
flink-connector-kafka-base_2.11-1.11.2.jar  flink-table-blink_2.11-1.11.2.jar       hbase-shaded-miscellaneous-2.1.0.jar  log4j-1.2-api-2.12.1.jar
flink-csv-1.11.2.jar                        hadoop-mapreduce-client-core-2.7.7.jar  hbase-shaded-netty-2.1.0.jar          log4j-api-2.12.1.jar
flink-dist_2.11-1.11.2.jar                  hbase-client-2.1.4.jar                  hbase-shaded-protobuf-2.1.0.jar       log4j-core-2.12.1.jar

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM