1.1 Hudi是什么
Apache Hudi(Hadoop Upserts Deletes and Incrementals,簡稱Hudi,發音為Hoodie)由UBer開源,它以極低的延遲將數據快速攝取到HDFS或雲存儲(S3)中,其最主要的特點是支持記錄(Record)級別的插入更新(Upsert)和刪除,同時還提供增量查詢的支持。
本質上,Hudi並非是一種全新的文件格式,相反,它僅僅是充分利用了開源的列格式(Parquet)和行格式(Avro)的文件作為數據的存儲形式,並在數據寫入的同時生成特定的索引,進而可以提供更快的查詢性能。
Hudi自身無法完成對數據的讀寫操作,它強依賴於外部的Spark、Flink、Presto和Impala等計算引擎才可以使用,目前尤其對Spark依賴嚴重(在0.7.0中新增了Flink支持)。
1.2 Hudi的應用場景
1.2.1 近實時攝取
數據源系統 | Hudi的攝取方式 | 目標存儲系統 |
---|---|---|
RDBMS | 使用Upsert加載數據,通過讀取BinLog或Sqoop增量數據寫入到HDFS上的Hudi表,可以避免低效的全量加載。 | HDFS |
NoSQL | 使用HBase表來維護Hudi數據的索引,但實際數據依舊是位於HDFS中。 | HBase+HDFS |
MQ | Hudi在處理Kafka這種數據源時可以強制使用最小文件大小來改善NameNode負載,避免了頻繁創建小文件的問題。 | HDFS |
1.2.2 近實時分析
SQL on Hadoop解決方案(如Presto和Spark SQL)具有出色的性能,一般可以在**幾秒鍾內完成查詢**。而Hudi是一個可以提供面向實時分析更有效的替代方案,並支持對存儲在HDFS中更大規模數據集的近實時查詢。此外,它是一個非常輕量級的庫,無需額外的組件依賴即可使用,並不會增加操作開銷。
1.2.3 增量處理管道
1.2.4 HDFS數據分發
一個常見的用例是先在Hadoop體系中進行處理數據,然后再分發回面向在線服務的存儲系統,以供應用程序使用。在這種用例中一般都會引入諸如Kafka之類的隊列系統來防止目標存儲系統被壓垮。但如果不使用Kafka的情況下,僅將每次運行的Spark管道更新插入的輸出轉換為Hudi數據集,也可以有效地解決這個問題,然后以增量方式獲取數據(就像讀取Kafka topic一樣)寫入服務存儲層。
1.3 Hudi的核心概念
1.3.1 時間軸(Timeline)
。
-
時間軸(Timeline)的實現類(位於hudi-common-0.6.0.jar中):
注意:由於hudi-spark-bundle.jar和hudi-hadoop-mr-bundle.jar屬於Uber類型的jar包,已經將hudi-common-0.6.0.jar的所有class打包進去了。時間軸相關的實現類位於org.apache.hudi.common.table.timeline包下。
最頂層的接口約定類為:HoodieTimeline。
默認使用的時間軸類:HoodieDefaultTimeline繼承自HoodieTimeline。
活動時間軸類為:HoodieActiveTimeline(此類維護最近12小時內的時間,可配置)。
存檔時間軸類為:HoodieArchivedTimeline(超出12小時的時間在此類中維護,可配置)。
-
時間軸(Timeline)的核心組件:
組件名稱 | 組件說明 |
---|---|
Instant action | 在時間軸上執行的操作 COMMITS(一次提交表示將一組記錄原子寫入到數據集中) CLEANS(刪除數據集中不再需要的舊文件版本的后台活動) DELTA_COMMIT(增量提交,將一批記錄原子寫入到MOR表) COMPACTION(比如更新從基於行的日志文件變成列格式) ROLLBACK(指提交/增量提交不成功且已回滾並刪除在寫入時產生的文件) SAVEPOINT(在發生失敗時將數據還原到時間軸的某個即時時間) 實現類:org.apache.hudi.common.table.timeline.HoodieTimeline |
Instant time | 是一個時間戳(格式為20190117010349)且單調增加。 實現類:org.apache.hudi.common.table.timeline.HoodieInstant |
State | 即時的狀態包括: REQUESTED(已調度但尚未啟動) INFLIGHT(正在執行操作) COMPLETED(操作完成) INVALID(操作失敗) 實現類:org.apache.hudi.common.table.timeline.HoodieInstant.State |
1.3.2 文件組織形式
Hudi將DFS上的數據集組織到基本路徑(HoodieWriteConfig.BASE_PATH_PROP)下的目錄結構中。數據集分為多個分區(DataSourceOptions.PARTITIONPATH_FIELD_OPT_KEY),這些分區與Hive表非常相似,是包含該分區的數據文件的文件夾。
在每個分區內,文件被組織為文件組,由文件id充當唯一標識。 每個文件組包含多個文件切片,其中每個切片包含在某個即時時間的提交/壓縮生成的基本列文件(.parquet)以及一組日志文件(.log),該文件包含自生成基本文件以來對基本文件的插入/更新。 Hudi采用MVCC設計,其中壓縮操作將日志和基本文件合並以產生新的文件切片,而清理操作則將未使用的/較舊的文件片刪除以回收DFS上的空間。
1.3.3 索引機制(4類6種)
Hudi通過索引機制提供高效的Upsert操作,該機制會將一個RecordKey+PartitionPath組合的方式作為唯一標識映射到一個文件ID,而且,這個唯一標識和文件組/文件ID之間的映射自記錄被寫入文件組開始就不會再改變。Hudi內置了4類(6個)索引實現,均是繼承自頂層的抽象類HoodieIndex而來,如下:
索引類型 | 實現類 | 索引規則 |
---|---|---|
Simple | HoodieSimpleIndex | 簡單索引,基於RecordKey+PartitionPath組合的方式作為索引,僅在特定分區內查找數據。 |
HoodieGlobalSimpleIndex | 簡單的全局索引,基於RecordKey+PartitionPath組合的方式作為索引,在全部分區中查找數據。 | |
Memory | InMemoryHashIndex | 由內存中維護的HashMap來支持的Hoodie Index實現,基於RecordKey+PartitionPath組合的方式作為索引。 |
HBase | HBaseIndex | 在HBase中維護Hoodie Index。 存儲Index的HBase表名稱使用HoodieHBaseIndexConfig.HBASE_TABLENAME_PROP指定, Row_Key是RecordKey, 列簇是HBaseIndex.SYSTEM_COLUMN_FAMILY(默認為_s), 還包括3個默認的列,即: HBaseIndex.COMMIT_TS_COLUMN(默認為commit_ts)、 HBaseIndex.FILE_NAME_COLUMN(默認為file_name)、 HBaseIndex.PARTITION_PATH_COLUMN(默認為partition_path)。 |
Bloom | HoodieBloomIndex | 基於布隆過濾器實現的索引機制,僅在特定分區內查找。每個Parquet文件在其元數據中都包含其row_key的Bloom篩選器。 |
HoodieGlobalBloomIndex | 基於布隆過濾器實現的全局索引機制,會在所有分區中查找。它會先獲取所有分區(只加載帶有.hoodie_partition_metadata文件的分區),然后再加載各分區內的最新文件。 |
注意:
-
全局索引:指在全表的所有分區范圍下強制要求鍵保持唯一,即確保對給定的鍵有且只有一個對應的記錄。全局索引提供了更強的保證,也使得更刪的消耗隨着表的大小增加而增加(O(表的大小)),更適用於是小表。
-
非全局索引:僅在表的某一個分區內強制要求鍵保持唯一,它依靠寫入器為同一個記錄的更刪提供一致的分區路徑,但由此同時大幅提高了效率,因為索引查詢復雜度成了O(更刪的記錄數量)且可以很好地應對寫入量的擴展。
1.3.4 查詢視圖(3類)
-
讀優化視圖 : 直接查詢基本文件(數據集的最新快照),其實就是列式文件(Parquet)。並保證與非Hudi列式數據集相比,具有相同的列式查詢性能。
-
增量視圖 : 僅查詢新寫入數據集的文件,需要指定一個Commit/Compaction的即時時間(位於Timeline上的某個Instant)作為條件,來查詢此條件之后的新數據。
-
實時快照視圖 : 查詢某個增量提交操作中數據集的最新快照,會先進行動態合並最新的基本文件(Parquet)和增量文件(Avro)來提供近實時數據集(通常會存在幾分鍾的延遲)。
權衡 | 讀優化視圖 | 實時快照視圖 | 增量視圖 |
---|---|---|---|
數據延遲 | 更高 | 更低 | 更高 |
查詢延遲 | 更低(原始列式性能) | 更低(查詢會先合並列式+行增量) | 更低 |
1.4 Hudi支持的存儲類型
1.4.1 寫時復制(Copy on Write,COW)表
COW表主要使用列式文件格式(Parquet)存儲數據,在寫入數據過程中,執行同步合並,更新數據版本並重寫數據文件,類似RDBMS中的B-Tree更新。
1) 更新:在更新記錄時,Hudi會先找到包含更新數據的文件,然后再使用更新值(最新的數據)重寫該文件,包含其他記錄的文件保持不變。當突然有大量寫操作時會導致重寫大量文件,從而導致極大的I/O開銷。
2)讀取:在讀取數據集時,通過讀取最新的數據文件來獲取最新的更新,此存儲類型適用於少量寫入和大量讀取的場景。
1.4.2 讀時合並(Merge On Read,MOR)表
MOR表是COW表的升級版,它使用列式(parquet)與行式(avro)文件混合的方式存儲數據。在更新記錄時,類似NoSQL中的LSM-Tree更新。
1) 更新:在更新記錄時,僅更新到增量文件(Avro)中,然后進行異步(或同步)的compaction,最后創建列式文件(parquet)的新版本。此存儲類型適合頻繁寫的工作負載,因為新記錄是以追加的模式寫入增量文件中。
2) 讀取:在讀取數據集時,需要先將增量文件與舊文件進行合並,然后生成列式文件成功后,再進行查詢。
1.4.3 COW和MOR的對比
權衡 | 寫時復制COW | 讀時合並MOR |
---|---|---|
數據延遲 | 更高 | 更低 |
更新代價(I/O) | 更高(重寫整個parquet文件) | 更低(追加到增量日志) |
Parquet文件大小 | 更小(高更新代價(I/O)) | 更大(低更新代價) |
寫放大 | 更高 | 更低(取決於壓縮策略) |
適用場景 | 寫少讀多 | 寫多讀少 |
1.4.4 COW和MOR支持的視圖
存儲類型 | 支持的視圖 | 不支持的視圖 | |
---|---|---|---|
COW | 讀優化 + 增量 + 實時視圖 | 無 | |
MOR | 讀優化 + 近實時 | 增量(如果使用增量視圖查詢時會提示:Incremental view not implemented yet, for merge-on-read tables) |
|
1.5 安裝Hudi
1.5.1 自行編譯(待驗證)
// 下載hudi的源碼發行版 wget -P /tmp/ http://archive.apache.org/dist/hudi/0.6.0/hudi-0.6.0.src.tgz // 解壓hudi到/usr/hdp/current/下 tar -zxf /tmp/hudi-0.6.0.src.tgz -C /usr/hdp/current/ // 編譯hudi(確保服務器上有$SCALA_HOME) mvn -DskipTests clean package
// 下載Hudi on Spark的Jar包(放入$SPARK_HOME/jars/下) wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.11/0.6.0/hudi-spark-bundle_2.11-0.6.0.jar // 下載Hudi on MapReduce的Jar包(放入$HADOOP_HOME/share/hadoop/mapreduce/下) wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/0.6.0/hudi-hadoop-mr-bundle-0.6.0.jar // 創建軟連接(在$HIVE_HOME/lib/下) ln -s $SPARK_HOME/jars/ $HIVE_HOME/lib/ ln -s $HADOOP_HOME/share/hadoop/mapreduce/hudi-hadoop-mr-bundle-0.6.0.jar $HIVE_HOME/lib/
1.6 Hudi的SparkSQL使用
Hudi支持對文件系統(HDFS、LocalFS)和Hive的讀寫操作,以下分別使用COW和MOR存儲類型來操作文件系統和Hive的案例。
1.6.1 文件系統操作
1.6.1.1 基於COW表的LocalFS/HDFS使用
package com.mengyao.hudi import com.mengyao.Configured import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.model.EmptyHoodieRecordPayload import org.apache.hudi.config.HoodieIndexConfig._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.index.HoodieIndex import org.apache.spark.SparkConf import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.JavaConverters._ /** * Spark on Hudi(COW表) to HDFS/LocalFS * @ClassName Demo1 * @Description * @Created by: MengYao * @Date: 2021-01-11 10:10:53 * @Version V1.0 */ object Demo1 { private val APP_NAME = Demo1.getClass.getSimpleName private val MASTER = "local[2]" val SOURCE = "hudi" val insertData = Array[TemperatureBean]( new TemperatureBean("4301",1,28.6,"2019-12-07 12:35:33"), new TemperatureBean("4312",0,31.4,"2019-12-07 12:25:03"), new TemperatureBean("4302",1,30.1,"2019-12-07 12:32:17"), new TemperatureBean("4305",3,31.5,"2019-12-07 12:33:11"), new TemperatureBean("4310",2,29.9,"2019-12-07 12:34:42") ) val updateData = Array[TemperatureBean]( new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器發生修改,溫度值由29.9->30.4,時間由12:34:42->12:35:42 ) val deleteData = Array[TemperatureBean]( new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器要被刪除,必須與最新的數據保持一致(如果字段值不同時無法刪除) ) def main(args: Array[String]): Unit = { System.setProperty(Configured.HADOOP_HOME_DIR, Configured.getHadoopHome) // 創建SparkConf val conf = new SparkConf() .set("spark.master", MASTER) .set("spark.app.name", APP_NAME) .setAll(Configured.sparkConf().asScala) // 創建SparkSession val spark = SparkSession.builder() .config(conf) .getOrCreate() // 關閉日志 spark.sparkContext.setLogLevel("OFF") // 導入隱式轉換 import spark.implicits._ import DemoUtils._ // 類似Hive中的DB(basePath的schema決定了最終要操作的文件系統,如果是file:則為LocalFS,如果是hdfs:則為HDFS) val basePath = "file:/D:/tmp" // 類似Hive中的Table val tableName = "tbl_temperature_cow1" // 數據所在的路徑 val path = s"$basePath/$tableName" // 插入數據 insert(spark.createDataFrame(insertData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path) // 修改數據 // update(spark.createDataFrame(updateData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path) // 刪除數據 // delete(spark.createDataFrame(deleteData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path) // 【查詢方式1:默認為快照(基於行或列獲取最新視圖)查詢 // query(spark.read.format(SOURCE).load(s"$path/*/*").orderBy($"deviceId".asc)) // query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_SNAPSHOT_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查詢方式為:快照(默認)")).orderBy($"deviceId".asc)) // 【查詢方式2:讀時優化 // query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查詢方式為:讀時優化")).orderBy($"deviceId".asc)) // 【查詢方式3:增量查詢 // 先取出最近一次提交的時間 // val commitTime: String = spark.read.format(SOURCE).load(s"$path/*/*").dropDuplicates("_hoodie_commit_time").select($"_hoodie_commit_time".as("commitTime")).orderBy($"commitTime".desc).first().getAs(0) // 再查詢最近提交時間之后的數據 // query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_INCREMENTAL_OPT_VAL,Option((commitTime.toLong-2).toString))).load(s"$path/*/*").withColumn("queryType", lit("查詢方式為:增量查詢")).orderBy($"deviceId".asc).toDF()) spark.close() } /** * 新增數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def insert(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(默認COW) TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL, // 執行insert操作 OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的,必須是數值類型 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name, // 執行insert操作的shuffle並行度 INSERT_PARALLELISM->"2" )) // 如果數據存在會覆蓋 .mode(Overwrite) .save(path) } /** * 修改數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def update(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(默認COW) TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL, // 執行upsert操作 OPERATION_OPT_KEY->UPSERT_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name, // 執行upsert操作的shuffle並行度 UPSERT_PARALLELISM-> "2" )) // 如果數據存在會覆蓋 .mode(Append) .save(path) } /** * 刪除數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def delete(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(默認COW) TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL, // 執行delete操作 OPERATION_OPT_KEY->DELETE_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name, // 執行delete操作的shuffle並行度 DELETE_PARALLELISM->"2", // 刪除策略有軟刪除(保留主鍵且其余字段為null)和硬刪除(從數據集中徹底刪除)兩種,此處為硬刪除 PAYLOAD_CLASS_OPT_KEY->classOf[EmptyHoodieRecordPayload].getName )) // 如果數據存在會覆蓋 .mode(Append) .save(path) } /** * 查詢類型 * <br>Hoodie具有3種查詢模式:</br> * <br>1、默認是快照模式(Snapshot mode,根據行和列數據獲取最新視圖)</br> * <br>2、增量模式(incremental mode,查詢從某個commit時間片之后的數據)</br> * <br>3、讀時優化模式(Read Optimized mode,根據列數據獲取最新視圖)</br> * @param queryType * @param queryTime * @return */ def buildQuery(queryType: String, queryTime: Option[String]=Option.empty) = Map( queryType match { // 如果是讀時優化模式(read_optimized,根據列數據獲取最新視圖) case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_READ_OPTIMIZED_OPT_VAL // 如果是增量模式(incremental mode,查詢從某個時間片之后的新數據) case QUERY_TYPE_INCREMENTAL_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_INCREMENTAL_OPT_VAL // 默認使用快照模式查詢(snapshot mode,根據行和列數據獲取最新視圖) case _ => QUERY_TYPE_OPT_KEY->QUERY_TYPE_SNAPSHOT_OPT_VAL }, if(queryTime.nonEmpty) BEGIN_INSTANTTIME_OPT_KEY->queryTime.get else BEGIN_INSTANTTIME_OPT_KEY->"0" ) }
1.6.1.2 基於MOR表的LocalFS/HDFS使用
package com.mengyao.hudi import com.mengyao.Configured import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.model.EmptyHoodieRecordPayload import org.apache.hudi.config.HoodieIndexConfig.INDEX_TYPE_PROP import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.index.HoodieIndex import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.SaveMode._ import org.apache.spark.SparkConf import org.apache.spark.sql.functions.lit import scala.collection.JavaConverters._ /** * Spark on Hudi(MOR表) to HDFS/LocalFS * @ClassName Demo1 * @Description * @Created by: MengYao * @Date: 2021-01-11 10:10:53 * @Version V1.0 */ object Demo2 { private val APP_NAME: String = Demo1.getClass.getSimpleName private val MASTER: String = "local[2]" val SOURCE: String = "hudi" val insertData = Array[TemperatureBean]( new TemperatureBean("4301",1,28.6,"2019-12-07 12:35:33"), new TemperatureBean("4312",0,31.4,"2019-12-07 12:25:03"), new TemperatureBean("4302",1,30.1,"2019-12-07 12:32:17"), new TemperatureBean("4305",3,31.5,"2019-12-07 12:33:11"), new TemperatureBean("4310",2,29.9,"2019-12-07 12:34:42") ) val updateData = Array[TemperatureBean]( new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器發生修改,溫度值由29.9->30.4,時間由12:34:42->12:35:42 ) val deleteData = Array[TemperatureBean]( new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器要被刪除,必須與最新的數據保持一致(如果字段值不同時無法刪除) ) def main(args: Array[String]): Unit = { System.setProperty(Configured.HADOOP_HOME_DIR, Configured.getHadoopHome) // 創建SparkConf val conf = new SparkConf() .set("spark.master", MASTER) .set("spark.app.name", APP_NAME) .setAll(Configured.sparkConf().asScala) // 創建SparkSession val spark = SparkSession.builder() .config(conf) .getOrCreate() // 關閉日志 spark.sparkContext.setLogLevel("OFF") // 導入隱式轉換 import spark.implicits._ import DemoUtils._ // 類似Hive中的DB(basePath的schema決定了最終要操作的文件系統,如果是file:則為LocalFS,如果是hdfs:則為HDFS) val basePath = "file:/D:/tmp" // 類似Hive中的Table val tableName = "tbl_temperature_mor" // 數據所在的路徑 val path = s"$basePath/$tableName" // 插入數據 // insert(spark.createDataFrame(insertData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path) // 修改數據 // update(spark.createDataFrame(updateData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path) // 刪除數據 // delete(spark.createDataFrame(deleteData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path) // 【查詢方式1:默認為快照(基於行或列獲取最新視圖)查詢 query(spark.read.format(SOURCE).load(s"$path/*/*").orderBy($"deviceId".asc)) // query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_SNAPSHOT_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查詢方式為:快照(默認)")).orderBy($"deviceId".asc)) // 【查詢方式2:讀時優化 query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查詢方式為:讀時優化")).orderBy($"deviceId".asc)) // 【查詢方式3:增量查詢(不支持) spark.close() } /** * 新增數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def insert(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(使用MOR) TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL, // 執行insert操作 OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的,必須是數值類型 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name, // 執行insert操作的shuffle並行度 INSERT_PARALLELISM->"2" )) // 如果數據存在會覆蓋 .mode(Overwrite) .save(path) } /** * 修改數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def update(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(使用MOR) TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL, // 執行upsert操作 OPERATION_OPT_KEY->UPSERT_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name, // 執行upsert操作的shuffle並行度 UPSERT_PARALLELISM-> "2" )) // 如果數據存在會覆蓋 .mode(Append) .save(path) } /** * 刪除數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def delete(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(使用MOR) TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL, // 執行delete操作 OPERATION_OPT_KEY->DELETE_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name, // 執行delete操作的shuffle並行度 DELETE_PARALLELISM->"2", // 刪除策略有軟刪除(保留主鍵且其余字段為null)和硬刪除(從數據集中徹底刪除)兩種,此處為硬刪除 PAYLOAD_CLASS_OPT_KEY->classOf[EmptyHoodieRecordPayload].getName )) // 如果數據存在會覆蓋 .mode(Append) .save(path) } /** * 查詢類型 * <br>Hoodie具有3種查詢模式:</br> * <br>1、默認是快照模式(Snapshot mode,根據行和列數據獲取最新視圖)</br> * <br>2、增量模式(incremental mode,查詢從某個commit時間片之后的數據)</br> * <br>3、讀時優化模式(Read Optimized mode,根據列數據獲取最新視圖)</br> * @param queryType * @param queryTime * @return */ def buildQuery(queryType: String, queryTime: Option[String]=Option.empty): Map[String, String] = { Map( queryType match { // 如果是讀時優化模式(read_optimized,根據列數據獲取最新視圖) case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_READ_OPTIMIZED_OPT_VAL // 如果是增量模式(incremental mode,查詢從某個時間片之后的新數據) case QUERY_TYPE_INCREMENTAL_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_INCREMENTAL_OPT_VAL // 默認使用快照模式查詢(snapshot mode,根據行和列數據獲取最新視圖) case _ => QUERY_TYPE_OPT_KEY->QUERY_TYPE_SNAPSHOT_OPT_VAL }, if(queryTime.nonEmpty) BEGIN_INSTANTTIME_OPT_KEY->queryTime.get else BEGIN_INSTANTTIME_OPT_KEY->"0" ) } }
1.6.2 Hive操作
1.6.2.1 基於COW表的Hive使用
package com.mengyao.hudi import com.mengyao.Configured import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.model.EmptyHoodieRecordPayload import org.apache.hudi.config.HoodieIndexConfig._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.index.HoodieIndex import org.apache.spark.SparkConf import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.JavaConverters._ import scala.collection.mutable._ /** * Spark on Hudi(COW表) to Hive * @ClassName Demo3 * @Description * @Created by: MengYao * @Date: 2021-01-23 16:13:23 * @Version V1.0 */ class Demo3(db:String, table:String, partition:String, url:String, user:String="hive", password:String="hive") { private val hiveCfg = hiveConfig(db,table,partition, url, user, password) private val SOURCE = "hudi" /** * 新增數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def insert(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(默認COW) TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL, // 執行insert操作 OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的,必須是數值類型 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.GLOBAL_BLOOM.name, // 執行insert操作的shuffle並行度 INSERT_PARALLELISM->"2" ) ++= hiveCfg ) // 如果數據存在會覆蓋 .mode(Overwrite) .save(path) } /** * 修改數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def update(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(默認COW) TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL, // 執行upsert操作 OPERATION_OPT_KEY->UPSERT_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.GLOBAL_BLOOM.name, // 執行upsert操作的shuffle並行度 UPSERT_PARALLELISM->"2" ) ++= hiveCfg ) // 如果數據存在會覆蓋 .mode(Append) .save(path) } /** * 刪除數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def delete(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(默認COW) TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL, // 執行delete操作 OPERATION_OPT_KEY->DELETE_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name, // 執行delete操作的shuffle並行度 DELETE_PARALLELISM->"2", // 刪除策略有軟刪除(保留主鍵且其余字段為null)和硬刪除(從數據集中徹底刪除)兩種,此處為硬刪除 PAYLOAD_CLASS_OPT_KEY->classOf[EmptyHoodieRecordPayload].getName ) ++= hiveCfg ) // 如果數據存在會覆蓋 .mode(Append) .save(path) } private def hiveConfig(db:String, table:String, partition:String,url:String,user:String="hive", password:String="hive"): Map[String, String] = { scala.collection.mutable.Map( // 設置jdbc 連接同步 HIVE_URL_OPT_KEY -> url, // 設置訪問Hive的用戶名 HIVE_USER_OPT_KEY -> user, // 設置訪問Hive的密碼 HIVE_PASS_OPT_KEY -> password, // 設置Hive數據庫名稱 HIVE_DATABASE_OPT_KEY -> db, // 設置Hive表名稱 HIVE_TABLE_OPT_KEY->table, // 設置要同步的分區列名 HIVE_PARTITION_FIELDS_OPT_KEY->partition, // 設置數據集注冊並同步到hive HIVE_SYNC_ENABLED_OPT_KEY -> "true", // 設置當分區變更時,當前數據的分區目錄是否變更 BLOOM_INDEX_UPDATE_PARTITION_PATH -> "true", // HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) } } object Demo3 { private val APP_NAME = classOf[Demo3].getClass.getSimpleName val insertData = Array[TemperatureBean]( new TemperatureBean("4301",1,28.6,"2019-12-07 12:35:33"), new TemperatureBean("4312",0,31.4,"2019-12-07 12:25:03"), new TemperatureBean("4302",1,30.1,"2019-12-07 12:32:17"), new TemperatureBean("4305",3,31.5,"2019-12-07 12:33:11"), new TemperatureBean("4310",2,29.9,"2019-12-07 12:34:42") ) val updateData = Array[TemperatureBean]( new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器發生修改,溫度值由29.9->30.4,時間由12:34:42->12:35:42 ) val deleteData = Array[TemperatureBean]( new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器要被刪除,必須與最新的數據保持一致(如果字段值不同時無法刪除) ) def main(args: Array[String]): Unit = { // 類似Hive中的DB(basePath的schema決定了最終要操作的文件系統,如果是file:則為LocalFS,如果是hdfs:則為HDFS) val basePath = "hdfs://node01:9820/apps/demo/hudi/hudi-hive-cow/" // Hive中的Table val tableName = "tbl_hudi_temperature_cow" // 數據所在的路徑 val path = s"$basePath/$tableName" // 創建SparkConf val conf = new SparkConf() .set("spark.app.name", APP_NAME) .setAll(Configured.sparkConf().asScala) // 創建SparkSession val spark = SparkSession.builder() .config(conf) .getOrCreate() // 關閉日志 spark.sparkContext.setLogLevel("DEBUG") // 導入隱式轉換 import spark.implicits._ // 創建Demo3實例 val demo = new Demo3("test_db", tableName,"deviceType","jdbc:hive2://node01:10000","root","123456") // 插入數據 // demo.insert(spark.createDataFrame(insertData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path) // 修改數據 // demo.update(spark.createDataFrame(updateData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path) // 刪除數據 // demo.delete(spark.createDataFrame(deleteData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path) spark.stop() } }
首先,在Demo3類的main方法中,僅將demo.insert的代碼取消注釋,確保demo.update和demo.delete的代碼是被注釋掉的。然后打包並上傳到服務器。
然后,在服務器中執行如下腳本:
spark-submit --verbose --class com.mengyao.hudi.Demo3 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar
在yarn中,看到作業執行成功時:
再去查看hive中test_db庫下是否多了一張叫做tbl_hudi_temperature_cow的表
查看詳細的建表信息
首先,在Demo3類的main方法中,僅將demo.update的代碼取消注釋,確保demo.insert和demo.delete的代碼是被注釋掉的。然后打包並上傳到服務器。
然后,在服務器中執行如下腳本:
spark-submit --verbose --class com.mengyao.hudi.Demo3 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar
在yarn中,看到作業執行成功時
再去Hive中查詢數據(發現deviceid為4310的數據的溫度變成了30.4度)
為了看出更新的效果,本次修改與上次插入的數據對比圖如下:
c) 測試刪除數據
首先,在Demo3類的main方法中,僅將demo.delete的代碼取消注釋,確保demo.insert和demo.update的代碼是被注釋掉的。然后打包並上傳到服務器。
然后,在服務器中執行如下腳本:
spark-submit --verbose --class com.mengyao.hudi.Demo3 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar
在yarn中,看到作業執行成功時
再去Hive中查詢數據(查看deviceid為4310的數據是否已經被成功刪除)
為了看出更新的效果,本次刪除與上次修改以及上上次插入的數據對比圖如下:
1.6.2.2 基於MOR表的Hive使用
package com.mengyao.hudi import com.mengyao.Configured import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.model.EmptyHoodieRecordPayload import org.apache.hudi.config.HoodieIndexConfig._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.index.HoodieIndex import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.SaveMode._ import scala.collection.JavaConverters._ import scala.collection.mutable._ /** * Spark on Hudi(MOR表) to Hive * @ClassName Demo4 * @Description * @Created by: MengYao * @Date: 2021-01-23 16:13:23 * @Version V1.0 */ class Demo4(db:String, table:String, partition:String, url:String, user:String="hive", password:String="hive") { private val hiveCfg = hiveConfig(db,table,partition, url, user, password) private val SOURCE = "hudi" /** * 新增數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def insert(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(默認COW) TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL, // 執行insert操作 OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的,必須是數值類型 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.GLOBAL_BLOOM.name, // 執行insert操作的shuffle並行度 INSERT_PARALLELISM->"2" ) ++= hiveCfg ) // 如果數據存在會覆蓋 .mode(Overwrite) .save(path) } /** * 修改數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def update(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(默認COW) TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL, // 執行upsert操作 OPERATION_OPT_KEY->UPSERT_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.GLOBAL_BLOOM.name, // 執行upsert操作的shuffle並行度 UPSERT_PARALLELISM->"2" ) ++= hiveCfg ) // 如果數據存在會覆蓋 .mode(Append) .save(path) } /** * 刪除數據 * @param df 數據集 * @param tableName Hudi表 * @param primaryKey 主鍵列名 * @param partitionField 分區列名 * @param changeDateField 變更時間列名 * @param path 數據的存儲路徑 */ def delete(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = { df.write.format(SOURCE) .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(默認COW) TABLE_TYPE_OPT_KEY->MOR_TABLE_TYPE_OPT_VAL, // 執行delete操作 OPERATION_OPT_KEY->DELETE_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的 PRECOMBINE_FIELD_OPT_KEY->changeDateField, // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name, // 執行delete操作的shuffle並行度 DELETE_PARALLELISM->"2", // 刪除策略有軟刪除(保留主鍵且其余字段為null)和硬刪除(從數據集中徹底刪除)兩種,此處為硬刪除 PAYLOAD_CLASS_OPT_KEY->classOf[EmptyHoodieRecordPayload].getName ) ++= hiveCfg ) // 如果數據存在會覆蓋 .mode(Append) .save(path) } private def hiveConfig(db:String, table:String, partition:String,url:String,user:String="hive", password:String="hive"): Map[String, String] = { scala.collection.mutable.Map( // 設置jdbc 連接同步 HIVE_URL_OPT_KEY -> url, // 設置訪問Hive的用戶名 HIVE_USER_OPT_KEY -> user, // 設置訪問Hive的密碼 HIVE_PASS_OPT_KEY -> password, // 設置Hive數據庫名稱 HIVE_DATABASE_OPT_KEY -> db, // 設置Hive表名稱 HIVE_TABLE_OPT_KEY->table, // 設置要同步的分區列名 HIVE_PARTITION_FIELDS_OPT_KEY->partition, // 設置數據集注冊並同步到hive HIVE_SYNC_ENABLED_OPT_KEY -> "true", // 設置當分區變更時,當前數據的分區目錄是否變更 BLOOM_INDEX_UPDATE_PARTITION_PATH -> "true", // HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) } } object Demo4 { private val APP_NAME = classOf[Demo3].getClass.getSimpleName val insertData = Array[TemperatureBean]( new TemperatureBean("4301",1,28.6,"2019-12-07 12:35:33"), new TemperatureBean("4312",0,31.4,"2019-12-07 12:25:03"), new TemperatureBean("4302",1,30.1,"2019-12-07 12:32:17"), new TemperatureBean("4305",3,31.5,"2019-12-07 12:33:11"), new TemperatureBean("4310",2,29.9,"2019-12-07 12:34:42") ) val updateData = Array[TemperatureBean]( new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器發生修改,溫度值由29.9->30.4,時間由12:34:42->12:35:42 ) val deleteData = Array[TemperatureBean]( new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器要被刪除,必須與最新的數據保持一致(如果字段值不同時無法刪除) ) def main(args: Array[String]): Unit = { // 類似Hive中的DB(basePath的schema決定了最終要操作的文件系統,如果是file:則為LocalFS,如果是hdfs:則為HDFS) val basePath = "hdfs://node01:9820/apps/demo/hudi/hudi-hive-mor/" // Hive中的Table val tableName = "tbl_hudi_temperature_mor1" // 數據所在的路徑 val path = s"$basePath/$tableName" // 創建SparkConf val conf = new SparkConf() .set("spark.app.name", APP_NAME) .setAll(Configured.sparkConf().asScala) // 創建SparkSession val spark = SparkSession.builder() .config(conf) .getOrCreate() // 關閉日志 spark.sparkContext.setLogLevel("DEBUG") // 導入隱式轉換 import spark.implicits._ // 創建Demo4實例 val demo = new Demo4("test_db", tableName,"deviceType","jdbc:hive2://node01:10000","root","123456") // 插入數據 // demo.insert(spark.createDataFrame(insertData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path) // 修改數據 // demo.update(spark.createDataFrame(updateData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path) // 刪除數據 demo.delete(spark.createDataFrame(deleteData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path) spark.stop() } }
首先,在Demo4類的main方法中,僅將demo.insert的代碼取消注釋,確保demo.update和demo.delete的代碼是被注釋掉的。然后打包並上傳到服務器。
然后,在服務器中執行如下腳本:
spark-submit --verbose --class com.mengyao.hudi.Demo4 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar
在yarn中,看到作業執行成功時
再去查看hive中test_db庫下是否多了2張表,分別是:
核心實現類 | 說明 | |
---|---|---|
tbl_hudi_temperature_mor1_ro表 | org.apache.hudi.hadoop.HoodieParquetInputFormat | _ro表的意思是Read Optimized,此表的查詢性能最好。 |
tbl_hudi_temperature_mor1_rt表 | org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat |
查看tbl_hudi_temperature_mor1_ro表的詳細建表信息
查看tbl_hudi_temperature_mor1_rt表的詳細建表信息
b) 測試修改數據
首先,在Demo4類的main方法中,僅將demo.update的代碼取消注釋,確保demo.insert和demo.delete的代碼是被注釋掉的。然后打包並上傳到服務器。
然后,在服務器中執行如下腳本:
spark-submit --verbose --class com.mengyao.hudi.Demo4 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar
在yarn中,看到作業執行成功時
c) 測試刪除數據
首先,在Demo4類的main方法中,僅將demo.delete的代碼取消注釋,確保demo.insert和demo.update的代碼是被注釋掉的。然后打包並上傳到服務器。
然后,在服務器中執行如下腳本:
spark-submit --verbose --class com.mengyao.hudi.Demo4 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar
在yarn中,看到作業執行成功時
1.7 Hudi的WriteClient使用
package com.mengyao.hudi; import com.mengyao.Configured; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.collection.JavaConverters; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.HashMap; /** * WriteClient模式是直接使用RDD級別api進行Hudi編程 * Application需要使用HoodieWriteConfig對象,並將其傳遞給HoodieWriteClient構造函數。 HoodieWriteConfig可以使用以下構建器模式構建。 * @ClassName WriteClientMain * @Description * @Created by: MengYao * @Date: 2021-01-26 10:40:29 * @Version V1.0 */ public class WriteClientMain { private static final String APP_NAME = WriteClientMain.class.getSimpleName(); private static final String MASTER = "local[2]"; private static final String SOURCE = "hudi"; public static void main(String[] args) { // 創建SparkConf SparkConf conf = new SparkConf() .setAll(JavaConverters.mapAsScalaMapConverter(Configured.sparkConf()).asScala()); // 創建SparkContext JavaSparkContext jsc = new JavaSparkContext(MASTER, APP_NAME, conf); // 創建Hudi的WriteConfig HoodieWriteConfig hudiCfg = HoodieWriteConfig.newBuilder() .forTable("tableName") .withSchema("avroSchema") .withPath("basePath") .withProps(new HashMap()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .build(); // 創建Hudi的WriteClient HoodieWriteClient hudiWriteCli = new HoodieWriteClient<OverwriteWithLatestAvroPayload>(jsc, hudiCfg); // 1、執行新增操作 JavaRDD<HoodieRecord> insertData = jsc.parallelize(Arrays.asList()); String insertInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); JavaRDD<WriteStatus> insertStatus = hudiWriteCli.insert(insertData, insertInstantTime); // 【注意:為了便於理解,以下所有判斷Hudi操作數據的狀態不進行額外的方法封裝】 if (insertStatus.filter(ws->ws.hasErrors()).count()>0) {// 當提交后返回的狀態中包含error時 hudiWriteCli.rollback(insertInstantTime);// 從時間線(insertInstantTime)中回滾,插入失敗 } else { hudiWriteCli.commit(insertInstantTime, insertStatus);// 否則提交時間線(insertInstantTime)中的數據,到此,插入完成 } // 2、也可以使用批量加載的方式新增數據 String builkInsertInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); JavaRDD<WriteStatus> bulkInsertStatus = hudiWriteCli.bulkInsert(insertData, builkInsertInstantTime); if (bulkInsertStatus.filter(ws->ws.hasErrors()).count()>0) {// 當提交后返回的狀態中包含error時 hudiWriteCli.rollback(builkInsertInstantTime);// 從時間線(builkInsertInstantTime)中回滾,批量插入失敗 } else { hudiWriteCli.commit(builkInsertInstantTime, bulkInsertStatus);// 否則提交時間線(builkInsertInstantTime)中的數據,到此,批量插入完成 } // 3、執行修改or新增操作 JavaRDD<HoodieRecord> updateData = jsc.parallelize(Arrays.asList()); String updateInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); JavaRDD<WriteStatus> updateStatus = hudiWriteCli.upsert(updateData, updateInstantTime); if (updateStatus.filter(ws->ws.hasErrors()).count()>0) {// 當提交后返回的狀態中包含error時 hudiWriteCli.rollback(updateInstantTime);// 從時間線(updateInstantTime)中回滾,修改失敗 } else { hudiWriteCli.commit(updateInstantTime, updateStatus);// 否則提交時間線(updateInstantTime)中的數據,到此,修改完成 } // 4、執行刪除操作 JavaRDD<HoodieRecord> deleteData = jsc.parallelize(Arrays.asList()); String deleteInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); JavaRDD<WriteStatus> deleteStatus = hudiWriteCli.delete(deleteData, deleteInstantTime); if (deleteStatus.filter(ws->ws.hasErrors()).count()>0) {// 當提交后返回的狀態中包含error時 hudiWriteCli.rollback(deleteInstantTime);// 從時間線(deleteInstantTime)中回滾,刪除失敗 } else { hudiWriteCli.commit(deleteInstantTime, deleteStatus);// 否則提交時間線(deleteInstantTime)中的數據,到此,刪除完成 } // 退出WriteClient hudiWriteCli.close(); // 退出SparkContext jsc.stop(); } }
1.8 如何使用索引
1.8.1 使用SparkSQL的數據源配置
在SparkSQL的數據源配置中,面向讀寫的通用配置參數均通過options或option來指定,可用的功能包括: 定義鍵和分區、選擇寫操作、指定如何合並記錄或選擇要讀取的視圖類型。
df.write.format("hudi") .options(Map( // 要操作的表 TABLE_NAME->tableName, // 操作的表類型(默認COW) TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL, /** * 執行insert/upsert/delete操作,默認是upsert * OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL, * BULK_INSERT_OPERATION_OPT_VAL, * UPSERT_OPERATION_OPT_VAL, * DELETE_OPERATION_OPT_VAL, */ OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL, // 設置主鍵列 RECORDKEY_FIELD_OPT_KEY->primaryKey, // 設置分區列,類似Hive的表分區概念 PARTITIONPATH_FIELD_OPT_KEY->partitionField, // 設置數據更新時間列,該字段數值大的數據會覆蓋小的,必須是數值類型 PRECOMBINE_FIELD_OPT_KEY->changeDateField, /** * 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器 * INDEX_TYPE_PROP -> HoodieIndex.IndexType.SIMPLE.name, * HoodieIndex.IndexType.GLOBAL_SIMPLE.name, * HoodieIndex.IndexType.INMEMORY.name, * HoodieIndex.IndexType.HBASE.name, * HoodieIndex.IndexType.BLOOM.name, * HoodieIndex.IndexType.GLOBAL_SIMPLE.name, */ INDEX_TYPE_PROP -> HoodieIndex.IndexType.BLOOM.name, // 執行insert操作的shuffle並行度 INSERT_PARALLELISM->"2" )) // 如果數據存在會覆蓋 .mode(Overwrite) .save(path)
1.8.2 使用WriteClient方式配置
WriteClient是使用基於Java的RDD級別API進行編程的的一種方式,需要先構建HoodieWriteConfig對象,然后再作為參數傳遞給HoodieWriteClient構造函數。
// 創建Hudi的WriteConfig HoodieWriteConfig hudiCfg = HoodieWriteConfig.newBuilder() .forTable("tableName") .withSchema("avroSchema") .withPath("basePath") .withProps(new HashMap()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType( HoodieIndex.IndexType.BLOOM // HoodieIndex.IndexType.GLOBAL_BLOOM // HoodieIndex.IndexType.INMEMORY // HoodieIndex.IndexType.HBASE // HoodieIndex.IndexType.SIMPLE // HoodieIndex.IndexType.GLOBAL_SIMPLE ).build()) .build();
1.8.3 聲明索引的關鍵參數
在Hudi中,使用索引的關鍵參數主要有2個,即hoodie.index.type和hoodie.index.class兩個。這兩個參數只需要配置其中一個即可,原因如下:
索引的配置參數 | 對應代碼中的KEY | 解釋 |
---|---|---|
hoodie.index.type | HoodieIndexConfig.INDEX_TYPE_PROP | 當配置此參數時,會從HBASE、INMEMORY、BLOOM、GLOBAL_BLOOM、SIMPLE、GLOBAL_SIMPLE這幾個index中選擇對應的Index實現類。 |
hoodie.index.class | HoodieIndexConfig.INDEX_CLASS_PROP | 當配置此參數時,如果是HBASE、INMEMORY、BLOOM、GLOBAL_BLOOM、SIMPLE、GLOBAL_SIMPLE這幾個index實現類的類全名就會直接通過反射實例化,否則會按照自定義Index類(必須是繼承自HoodieIndex)進行加載。 |
1.8.4 索引參數在源碼中的實現
可以在Hudi索引超類HoodieIndex的源碼中看到createIndex方法的定義和實現:
public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Serializable { protected final HoodieWriteConfig config; protected HoodieIndex(HoodieWriteConfig config) { this.config = config; } public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(HoodieWriteConfig config) throws HoodieIndexException { if (!StringUtils.isNullOrEmpty(config.getIndexClass())) { Object instance = ReflectionUtils.loadClass(config.getIndexClass(), new Object[]{config}); if (!(instance instanceof HoodieIndex)) { throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex"); } else { return (HoodieIndex)instance; } } else { switch(config.getIndexType()) { case HBASE: return new HBaseIndex(config); case INMEMORY: return new InMemoryHashIndex(config); case BLOOM: return new HoodieBloomIndex(config); case GLOBAL_BLOOM: return new HoodieGlobalBloomIndex(config); case SIMPLE: return new HoodieSimpleIndex(config); case GLOBAL_SIMPLE: return new HoodieGlobalSimpleIndex(config); default: throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType()); } } } @PublicAPIMethod( maturity = ApiMaturityLevel.STABLE ) public abstract JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> var1, JavaSparkContext var2, HoodieTable<T> var3); @PublicAPIMethod( maturity = ApiMaturityLevel.STABLE ) public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> var1, JavaSparkContext var2, HoodieTable<T> var3) throws HoodieIndexException; @PublicAPIMethod( maturity = ApiMaturityLevel.STABLE ) public abstract JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> var1, JavaSparkContext var2, HoodieTable<T> var3) throws HoodieIndexException; @PublicAPIMethod( maturity = ApiMaturityLevel.STABLE ) public abstract boolean rollbackCommit(String var1); @PublicAPIMethod( maturity = ApiMaturityLevel.STABLE ) public abstract boolean isGlobal(); @PublicAPIMethod( maturity = ApiMaturityLevel.EVOLVING ) public abstract boolean canIndexLogFiles(); @PublicAPIMethod( maturity = ApiMaturityLevel.STABLE ) public abstract boolean isImplicitWithStorage(); public void close() { } public static enum IndexType { HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE; private IndexType() { } } }
1.9 生產環境下的推薦配置
spark.driver.extraClassPath=/etc/hive/conf
spark.driver.extraJavaOptions=-XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.driver.maxResultSize=2g
spark.driver.memory=4g
spark.executor.cores=1
spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.executor.id=driver
spark.executor.instances=300
spark.executor.memory=6g
spark.rdd.compress=true
spark.kryoserializer.buffer.max=512m
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled=true
spark.sql.hive.convertMetastoreParquet=false
spark.submit.deployMode=cluster
spark.task.cpus=1
spark.task.maxFailures=4
spark.yarn.driver.memoryOverhead=1024
spark.yarn.executor.memoryOverhead=3072
spark.yarn.max.executor.failures=100
1.10 上面代碼依賴的其他類

package com.mengyao.hudi import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import java.sql.Date import java.sql.Timestamp object DemoUtils { def schemaExtractor(caz: Class[_]): StructType = { val fields = caz.getDeclaredFields.map(field => { StructField(field.getName, field.getType.getSimpleName.toLowerCase match { case "byte" => DataTypes.ByteType case "short" => DataTypes.ShortType case "int" => DataTypes.IntegerType case "double" => DataTypes.DoubleType case "float" => DataTypes.FloatType case "long" => DataTypes.LongType case "boolean" => DataTypes.BooleanType // java.sql.Date case "date" => DataTypes.DateType // java.sql.Timestamp case "timestamp" => DataTypes.TimestampType case _ => DataTypes.StringType }, true) }) StructType(fields) } /** * 測試輸出 * */ def query(df: DataFrame, numRows: Int = Int.MaxValue): Unit = { df.show(numRows, false) } case class A(a:Byte,b:Short,c:Int,d:Double,e:Float,f:Long,g:Boolean,h:Date,i:Timestamp,j:String) { def getA = a def getB = b def getC = c def getD = d def getE = e def getF = f def getG = g def getH = h def getI = i def getJ = j } def main(args: Array[String]): Unit = { val data = A(1.byteValue, 2.shortValue, 3.intValue, 4.doubleValue, 5.floatValue, 6.longValue, true, new Date(System.currentTimeMillis), new Timestamp(System.currentTimeMillis), "hello") println(schemaExtractor(classOf[A]).mkString("\n")) val spark = SparkSession.builder().appName("A").master("local[*]").getOrCreate() import scala.collection.JavaConverters._ spark.createDataFrame(Array[A](data).toBuffer.asJava, classOf[A]).show(false) spark.close } }

package com.mengyao; import java.util.*; import java.util.stream.Collectors; /** * * @ClassName Configured * @Description * @Created by: MengYao * @Date: 2021-01-11 10:10:53 * @Version V1.0 */ public class Configured { private static final ResourceBundle bundle = ResourceBundle.getBundle("config", Locale.CHINESE); private static Map<String, String> config = new HashMap<>(); public static final String USER_NAME = "user.name"; public static final String JAVA_HOME = "JAVA_HOME"; public static final String HADOOP_HOME = "HADOOP_HOME"; public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR"; public static final String HADOOP_HOME_DIR = "hadoop.home.dir"; public static final String SPARK_HOME = "SPARK_HOME"; public static final String DEFAULT_CHARSET = "default.charset"; public static Map<String, String> get() { return bundle.keySet().stream() .collect(Collectors.toMap(key->key, key->bundle.getString(key), (e1,e2)->e2, HashMap::new)); } public static Map<String, String> env() { return bundle.keySet().stream() .filter(k -> k.matches("(JAVA_HOME|HADOOP_CONF_DIR|SPARK_HOME|SPARK_PRINT_LAUNCH_COMMAND)")) .collect(Collectors.toMap(key->key, key->bundle.getString(key), (e1,e2)->e2, HashMap::new)); } public static Map<String, String> sparkConf() { return bundle.keySet().stream() .filter(key->Objects.nonNull(key)&&key.startsWith("spark")) .collect(Collectors.toMap(key->key, key->bundle.getString(key), (e1,e2)->e2, HashMap::new)); } public static Map<String, String> appConf() { return bundle.keySet().stream() .filter(key->Objects.nonNull(key)&&key.startsWith("app")) .collect(Collectors.toMap(key->key, key->bundle.getString(key), (e1,e2)->e2, HashMap::new)); } public static String getDefaultCharset() { return bundle.getString(DEFAULT_CHARSET); } public static String getUserName() { return bundle.getString(USER_NAME); } public static String getJavaHome() { return bundle.getString(JAVA_HOME); } public static String getHadoopHome() { return bundle.getString(HADOOP_HOME); } public static String getHadoopConfDir() { return bundle.getString(HADOOP_CONF_DIR); } public static String getSparkHome() { return bundle.getString(SPARK_HOME); } }