Flink 1.11 版本對SQL的優化是很多的,其中最重要的一點就是 hive 功能的完善,不再只是作為持久化的 Catalog,
而是可以用原生的 Flink SQL 流式的寫數據到入 hive中
本文使用官網 “Streaming Writing” 案例 (https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing),
流式寫數據到 Hive (剛好之前有同學咨詢官網的例子不能寫入成功)
The Hive Streaming Sink re-use Filesystem Streaming Sink to integrate Hadoop OutputFormat/RecordWriter to streaming writing. Hadoop RecordWriters are Bulk-encoded Formats, Bulk Formats rolls files on every checkpoint.
Hive Streaming Sink 重用 Filesystem Streaming Sink,集成Hadoop OutputFormat / RecordWriter 流式寫入。 Hadoop RecordWriters是 Bulk-encoded 格式,Bulk 格式在每個 checkpoint 上滾動文件。
環境:
Flink 1.11.2
Hive 2.3.6
Hadoop 2.7
sqlSubmit,我開源 Flink SQL 提交程序(Table Api 的方式提交 SQL,代碼已提交 Github:https://github.com/springMoon/sqlSubmit)
官網SQL 如下:
SET table.sql-dialect=hive; -- 要指定 hive 方言,不然 hive 表創建不成功 CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', -- hive 分區提取器 'sink.partition-commit.trigger'='partition-time', -- 分區觸發提交 'sink.partition-commit.delay'='1 h', -- 提交延遲 'sink.partition-commit.policy.kind'='metastore,success-file' -- 提交類型 ); SET table.sql-dialect=default; -- 換回 default 方言 CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND ) WITH (...); -- kafka 表的 tblproperties -- streaming sql, insert into hive table 寫入的 sql, 最后兩個字段是 是寫入分區 INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table; -- batch sql, select with partition pruning SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
官網的案例默認是在 sql-client 中執行的,這里是用 Table Api,所以會有點不同,先看下完整的 SQL
drop table if exists user_log; CREATE TABLE user_log ( user_id VARCHAR ,item_id VARCHAR ,category_id VARCHAR ,behavior VARCHAR ,ts TIMESTAMP(3) ,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka' ,'connector.version' = 'universal' ,'connector.topic' = 'user_behavior' ,'connector.properties.zookeeper.connect' = 'venn:2181' ,'connector.properties.bootstrap.servers' = 'venn:9092' ,'connector.properties.group.id' = 'user_log' ,'connector.startup-mode' = 'group-offsets' ,'connector.sink-partitioner' = 'fixed' ,'format.type' = 'json' ); -- set table.sql-dialect=hive; -- kafka sink drop table if exists hive_table; CREATE TABLE hive_table ( user_id STRING ,item_id STRING ,category_id STRING ,behavior STRING ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file' ); -- streaming sql, insert into hive table INSERT INTO TABLE hive_table SELECT user_id, item_id, category_id, behavior, DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH') FROM user_log;
跟官網基本一樣,唯一的不同是,在指定 sql 方言的時候,Table Api 是這樣的:
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tabEnv.getConfig().setSqlDialect(SqlDialect.HIVE)
flink 方言官網: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_dialect.html#table-api
注:
partition.time-extractor.timestamp-pattern 指定分區提取器提取時間戳的格式
sink.partition-commit.trigger 觸發分區提交的類型可以指定 "process-time" 和 "partition-time" 處理時間和分區時間
如指定天、小時、分鍾三級分區:
partition.time-extractor.timestamp-pattern = $dt $hr:$ms:00
分區字段則是: DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
hive 表數據如下:
hive> select * from hive_table limit 10; OK 107025 1007359 4391936 pv 2017-11-26 01 334124 3904483 2520771 cart 2017-11-26 01 475192 3856358 2465336 pv 2017-11-26 01 475192 3856358 2465336 pv 2017-11-26 01 864482 3398512 1639158 pv 2017-11-26 01 987980 3225231 2578647 pv 2017-11-26 01 987980 3225231 2578647 pv 2017-11-26 01 563592 3377194 2131531 pv 2017-11-26 01 939115 241366 4756105 pv 2017-11-26 01 939115 241366 4756105 pv 2017-11-26 01 Time taken: 0.112 seconds, Fetched: 10 row(s)
hive 表對於目錄文件情況:
[venn@venn ~]$ hadoop fs -ls /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/ Found 7 items -rw-r--r-- 1 venn supergroup 0 2020-09-24 17:04 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/.part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-5.inprogress.42f75ffc-8c4d-4009-a00a-93482a96a2b8 -rw-r--r-- 1 venn supergroup 0 2020-09-24 17:02 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/_SUCCESS -rw-r--r-- 1 venn supergroup 7190 2020-09-24 16:56 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-0 -rw-r--r-- 1 venn supergroup 3766 2020-09-24 16:58 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-1 -rw-r--r-- 1 venn supergroup 3653 2020-09-24 17:00 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-2 -rw-r--r-- 1 venn supergroup 3996 2020-09-24 17:02 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-3 -rw-r--r-- 1 venn supergroup 3719 2020-09-24 17:04 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-4
.part-xxx 文件就是正在寫的文件,下面幾個就是已經提交的文件
在做官網的案例的過程,還算比較順利,但是也遇到幾個問題:
- 1、jar 包問題,寫 hive 需要 hadoop-mapreduce-client-core-2.7.7.jar
- 2、參數 sink.partition-commit.delay 的單位支持: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds)
- 3、hive 表日期類型字段,目前是只支持 TIMESTAMP(9),但是 flink 的 timestamp 是 3 位與 6 位(放棄 日期類型,反正 String 類型的日期 hive 也可以識別)
```java
java.time.format.DateTimeParseException: Text '2017-11-26-01 00:00:00' could not be parsed, unparsed text found at index 10
```
- 4、web 頁面 metrics,source 塊 往 Sink 塊寫數 Records Sent 的問題, Records Sent 數對應 checkpoint 次數,因為只會在 checkpoint 的時候才會提交數據到 HDFS,這個消息應該是某個信號數據,而不是真實的數據條數 (上面貼的官網說明有講,如果沒有 checkpoint,數據會寫到 hdfs,但是會出於 inprogress狀態,並且是 "." 開頭的文件,對 hive 來說是隱藏文件,查不到的)
再貼下flink lib:
flink-connector-hbase_2.11-1.11.2.jar flink-json-1.11.2.jar hbase-common-2.1.4.jar hive-exec-2.3.6.jar log4j-slf4j-impl-2.12.1.jar flink-connector-hive_2.11-1.11.2.jar flink-shaded-zookeeper-3.4.14.jar hbase-protocol-2.1.4.jar htrace-core4-4.2.0-incubating.jar metrics-core-3.2.1.jar flink-connector-kafka_2.11-1.11.2.jar flink-table_2.11-1.11.2.jar hbase-protocol-shaded-2.1.4.jar kafka-clients-2.2.0.jar flink-connector-kafka-base_2.11-1.11.2.jar flink-table-blink_2.11-1.11.2.jar hbase-shaded-miscellaneous-2.1.0.jar log4j-1.2-api-2.12.1.jar flink-csv-1.11.2.jar hadoop-mapreduce-client-core-2.7.7.jar hbase-shaded-netty-2.1.0.jar log4j-api-2.12.1.jar flink-dist_2.11-1.11.2.jar hbase-client-2.1.4.jar hbase-shaded-protobuf-2.1.0.jar log4j-core-2.12.1.jar
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

