FlinkSQL寫入hive


配置1:vim flink-conf.yml
流式寫入hive需要配置檢查點
# state.backend: filesystem
state.backend: filesystem
# 取消的時候保存檢查點
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# 60s 一次檢查點
execution.checkpointing.interval: 60s
# 檢查點語意
execution.checkpointing.mode: EXACTLY_ONCE


# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.checkpoints.dir: file:///tmp/flink12-checkpoints
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
state.savepoints.dir: file:///tmp/flink12-savepoints

配置2,使用FlinkSQL-client需要配置
vim  sql-client-defaults.yaml

catalogs:  #[] # empty list
# A typical catalog definition looks like:
  - name: uathive
    type: hive
    hive-conf-dir: /etc/hive/conf
    default-database: temp

 

寫sql作業

set execution.type=streaming;
--使用hive方言
SET table.sql-dialect=hive; 
--創建一張hive分區表,按天,時分區
drop table if exists ods_hive_t_area;
CREATE TABLE ods_hive_t_area (
`id` int COMMENT '代號',
`name` string COMMENT '姓名',
`area_id` int COMMENT '區域',
`money` int COMMENT '銷售額'
) PARTITIONED BY (dt STRING,hr string,mi string) STORED AS parquet  TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
  'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='1 min',
  --'sink.partition-commit.policy.kind'='metastore,success-file'
  'sink.partition-commit.policy.kind'='success-file'
);
drop table if exists ods_source_2hive_kafka_t_area;
create table ods_source_2hive_kafka_t_area(
`before` row(id int,name string,area_id int ,money int),
`after` row(id int,name string,area_id int ,money int),
op string
) with(
  'connector' = 'kafka',
  'topic' = 'ods_t_area1',
  'properties.bootstrap.servers' = '10.16.74.34:9092',
  'properties.group.id' = 'ods_t_area1_group2hive',
  --value值可為 latest-offset | earliest-offset
  'scan.startup.mode' = 'earliest-offset',
  --此處的key用的format,默認是對josn中value的數據進行定義,此時='value.format', 當json中的數據有類型錯誤時,該字段會給null值。
  'format' = 'json',
  --如果給true, 則錯誤格式可以忽略,給null值,如果給false,則會導致讀取數據錯誤,讀取中斷, 僅限於json數據使用此選項
  'json.ignore-parse-errors'='true'
  );
INSERT INTO ods_hive_t_area  
select 
case when op='d' and after is null then before.id else after.id end ,
case when op='d' and after is null then null else after.name end ,
case when op='d' and after is null then null else after.area_id end ,
case when op='d' and after is null then null else after.money end,
cast(minute(localtimestamp) as string)  FROM ods_source_2hive_kafka_t_area;

遇到的問題:

[hive@m764 lib]$ hadoop fs -ls -R /user/hive/warehouse/temp.db/ods_hive_t_area/
drwxrwxr-x   - hive hive          0 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26
-rw-r--r--   1 hive hive          0 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26/_SUCCESS
-rw-r--r--   1 hive hive       1156 2021-05-06 17:26 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=26/part-f35d61fa-6a8d-4a51-a59f-83c597c6c42c-0-0
drwxrwxr-x   - hive hive          0 2021-05-06 17:29 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27
-rw-r--r--   1 hive hive          0 2021-05-06 17:29 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27/_SUCCESS
-rw-r--r--   1 hive hive        541 2021-05-06 17:27 /user/hive/warehouse/temp.db/ods_hive_t_area/mi=27/part-f35d61fa-6a8d-4a51-a59f-83c597c6c42c-0-1

顯示成功寫入hive,有_seccess文件,但是select 不到數據

解決:刷新一下元數據

msck repair table  ods_hive_t_area;

然后可以查到hive中的數據了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM