Hudi-集成Flink(Flink操作hudi表)


一、安裝部署Flink 1.12

Apache Flink是一個框架和分布式處理引擎,用於對無界和有界數據流進行有狀態計算。Flink被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。
 

1.准備tar包

flink-1.13.1-bin-scala_2.12.tgz

2.解壓

 tar -zxvf flink-1.13.1-bin-scala_2.12.tgz

3.添加Hadoop依賴jar包,放在flink的lib目錄下

flink-shaded-hadoop-2-uber-2.8.0-10.0.jar
flink-sql-connector-kafka_2.12-1.13.1.jar
hudi-flink-bundle_2.12-0.10.1.jar
hive-exec-2.3.9.jar
 

4.啟動HDFS集群

hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode

5.啟動flink本地集群

/flink/bin/start-cluster.sh
可看到兩個進程:TaskManagerRunner、StandaloneSessionClusterEntrypoint
 
停止命令
/flink/bin/stop-cluster.sh
 

6.Flink Web UI

 

7.執行官方示例

讀取文本文件數據,進行詞頻統計WordCount,將結果打印控制台
/flink/bin/flink run /fline/examples/batch/WordCount.jar
 

二、Flink集成Hudi時,本質將集成jar包:hudi-flink-bundle_2.12-0.10.1.jar,放入Flink應用CLASSPATH下即可。

Flink SQL Connector支持Hudi作為Source和Sink時,兩種方式將jar包放入CLASSPATH路徑:
方式一:運行Flink SQL Client命令時,通過參數【-j xx.jar】指定jar包
flink/bin/sql-client.sh embedded -j …./hudi-flink-bundle_2.12-0.10.1.jar
方式二:將jar包直接放入Flink軟件安裝包lib目錄下【$FLINK_HOME/lib】
 
修改conf/flink-conf.yaml
jobmanager.rpc.address: localhost
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 4
 
classloader.check-leaked-classloader: false
classloader.resolve-order: parent-first
 
execution.checkpointing.interval: 3000
state.checkpoints.dir: hdfs://localhost:9000/flink-checkpoints
state.savepoints.dir: hdfs://localhost:9000/flink-savepoints
state.backend.incremental: true 
 
由於Flink需要連接HDFS文件系統,所以需要設置HADOOP_CLASSPATH環境變量,再啟動集群
 

三、啟動Flink SQL Cli命令行

sql-client.sh embedded shell
 
設置分析結果展示模式為:set execution.result-mode=tableau;
設置檢查點間隔:set execution.checkpointing.interval=3sec;
 

四、使用

1.創建表:test_flink_hudi_mor,數據存儲到hudi表中,底層HDFS存儲,表類型MOR

CREATE TABLE test_flink_hudi_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH(
    'connector' = 'hudi',
    'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor',
    'write.tasks' = '1',
    'compaction.tasks' = '1',
    'table.type' = 'MERGE_ON_READ'
);
 
connector:表連接器
path:數據存儲路徑
write.tasks:flink往hudi寫數據時,task數量
compaction.tasks:往hudi寫數據時,做合並的task數量
table.type:hudi表類型
 
Flink SQL> desc test_flink_hudi_mor;
>
+-----------+--------------+------+-----+--------+-----------+
|      name |         type | null | key | extras | watermark |
+-----------+--------------+------+-----+--------+-----------+
|      uuid |  VARCHAR(20) | true |     |        |           |
|      name |  VARCHAR(10) | true |     |        |           |
|       age |          INT | true |     |        |           |
|        ts | TIMESTAMP(3) | true |     |        |           |
| partition |  VARCHAR(20) | true |     |        |           |
+-----------+--------------+------+-----+--------+-----------+
5 rows in set

 

 

2.插入數據

INSERT INTO test_flink_hudi_mor VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1' );
 
INSERT INTO test_flink_hudi_mor VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
重復insert,會更新,id1的值由 VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1’ ) 改為  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1’)
因為是MOR表,先入log,還未合並成parquet文件,如下圖:
 

四、Streaming query

1.創建表:test_flink_hudi_mor_2, 以流的方式查詢讀取,映射到前面表test_flink_hudi_mor

CREATE TABLE test_flink_hudi_mor_2(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH(
    'connector' = 'hudi',
    'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor',   
    'table.type' = 'MERGE_ON_READ',
    'read.tasks' = '1',
    'read.streaming.enabled' = 'true',
    'read.streaming.start-commit' = '20220307211200',
    'read.streaming.check-interval' = '4'
); 
 
read.streaming.enabled設置為true,表名通過streaming的方式讀取表數據
read.streaming.check-interval指定了source監控新的commits的間隔為4s
table.type設置表類型為MERGE_ON_READ
 
 

2.重新開啟terminal啟動flink SQL CLI,重新創建表:test_flink_hudi_mor,采用批batch模式插入一條數據

CREATE TABLE test_flink_hudi_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH(
    'connector' = 'hudi',
    'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor',
    'write.tasks' = '1',
    'compaction.tasks' = '1',
    'table.type' = 'MERGE_ON_READ'
);
 
INSERT INTO test_flink_hudi_mor VALUES ('id9','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par5' );
INSERT INTO test_flink_hudi_mor VALUES ('id10','DX',28, TIMESTAMP '1994-06-02 00:00:01', 'par5' );
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM