一、安裝部署Flink 1.12
Apache Flink是一個框架和分布式處理引擎,用於對無界和有界數據流進行有狀態計算。Flink被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。

1.准備tar包
flink-1.13.1-bin-scala_2.12.tgz
2.解壓
tar -zxvf flink-1.13.1-bin-scala_2.12.tgz
3.添加Hadoop依賴jar包,放在flink的lib目錄下
flink-shaded-hadoop-2-uber-2.8.0-10.0.jar
flink-sql-connector-kafka_2.12-1.13.1.jar
hudi-flink-bundle_2.12-0.10.1.jar
hive-exec-2.3.9.jar
4.啟動HDFS集群
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
5.啟動flink本地集群
/flink/bin/start-cluster.sh
可看到兩個進程:TaskManagerRunner、StandaloneSessionClusterEntrypoint
停止命令
/flink/bin/stop-cluster.sh
6.Flink Web UI

7.執行官方示例
讀取文本文件數據,進行詞頻統計WordCount,將結果打印控制台
/flink/bin/flink run /fline/examples/batch/WordCount.jar
二、Flink集成Hudi時,本質將集成jar包:hudi-flink-bundle_2.12-0.10.1.jar,放入Flink應用CLASSPATH下即可。
Flink SQL Connector支持Hudi作為Source和Sink時,兩種方式將jar包放入CLASSPATH路徑:
方式一:運行Flink SQL Client命令時,通過參數【-j xx.jar】指定jar包
flink/bin/sql-client.sh embedded -j …./hudi-flink-bundle_2.12-0.10.1.jar
方式二:將jar包直接放入Flink軟件安裝包lib目錄下【$FLINK_HOME/lib】
修改conf/flink-conf.yaml
jobmanager.rpc.address: localhost jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m taskmanager.numberOfTaskSlots: 4 classloader.check-leaked-classloader: false classloader.resolve-order: parent-first execution.checkpointing.interval: 3000 state.checkpoints.dir: hdfs://localhost:9000/flink-checkpoints state.savepoints.dir: hdfs://localhost:9000/flink-savepoints state.backend.incremental: true
由於Flink需要連接HDFS文件系統,所以需要設置HADOOP_CLASSPATH環境變量,再啟動集群
三、啟動Flink SQL Cli命令行
sql-client.sh embedded shell
設置分析結果展示模式為:set execution.result-mode=tableau;
設置檢查點間隔:set execution.checkpointing.interval=3sec;

四、使用
1.創建表:test_flink_hudi_mor,數據存儲到hudi表中,底層HDFS存儲,表類型MOR
CREATE TABLE test_flink_hudi_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH( 'connector' = 'hudi', 'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor', 'write.tasks' = '1', 'compaction.tasks' = '1', 'table.type' = 'MERGE_ON_READ' );
connector:表連接器
path:數據存儲路徑
write.tasks:flink往hudi寫數據時,task數量
compaction.tasks:往hudi寫數據時,做合並的task數量
table.type:hudi表類型
Flink SQL> desc test_flink_hudi_mor; > +-----------+--------------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +-----------+--------------+------+-----+--------+-----------+ | uuid | VARCHAR(20) | true | | | | | name | VARCHAR(10) | true | | | | | age | INT | true | | | | | ts | TIMESTAMP(3) | true | | | | | partition | VARCHAR(20) | true | | | | +-----------+--------------+------+-----+--------+-----------+ 5 rows in set
2.插入數據
INSERT INTO test_flink_hudi_mor VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1' ); INSERT INTO test_flink_hudi_mor VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
重復insert,會更新,id1的值由 VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1’ ) 改為 ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1’)
因為是MOR表,先入log,還未合並成parquet文件,如下圖:

四、Streaming query
1.創建表:test_flink_hudi_mor_2, 以流的方式查詢讀取,映射到前面表test_flink_hudi_mor
CREATE TABLE test_flink_hudi_mor_2( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH( 'connector' = 'hudi', 'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor', 'table.type' = 'MERGE_ON_READ', 'read.tasks' = '1', 'read.streaming.enabled' = 'true', 'read.streaming.start-commit' = '20220307211200', 'read.streaming.check-interval' = '4' );
read.streaming.enabled設置為true,表名通過streaming的方式讀取表數據
read.streaming.check-interval指定了source監控新的commits的間隔為4s
table.type設置表類型為MERGE_ON_READ

2.重新開啟terminal啟動flink SQL CLI,重新創建表:test_flink_hudi_mor,采用批batch模式插入一條數據
CREATE TABLE test_flink_hudi_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH( 'connector' = 'hudi', 'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor', 'write.tasks' = '1', 'compaction.tasks' = '1', 'table.type' = 'MERGE_ON_READ' ); INSERT INTO test_flink_hudi_mor VALUES ('id9','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par5' ); INSERT INTO test_flink_hudi_mor VALUES ('id10','DX',28, TIMESTAMP '1994-06-02 00:00:01', 'par5' );
