Apache Hudi(0.6.0)快速入門


 

1.1 Hudi是什么

  Apache Hudi(Hadoop Upserts Deletes and Incrementals,簡稱Hudi,發音為Hoodie)由UBer開源,它以極低的延遲將數據快速攝取到HDFS或雲存儲(S3)中,其最主要的特點是支持記錄(Record)級別的插入更新(Upsert)和刪除,同時還提供增量查詢的支持。

  本質上,Hudi並非是一種全新的文件格式,相反,它僅僅是充分利用了開源的列格式(Parquet)和行格式(Avro)的文件作為數據的存儲形式,並在數據寫入的同時生成特定的索引,進而可以提供更快的查詢性能。

  Hudi自身無法完成對數據的讀寫操作,它強依賴於外部的Spark、Flink、Presto和Impala等計算引擎才可以使用,目前尤其對Spark依賴嚴重(在0.7.0中新增了Flink支持)。

 

1.2 Hudi的應用場景

1.2.1 近實時攝取

  將外部源(點擊流日志、數據庫BinLog、API)的數據攝取到Hadoop數據湖是一種必要的數據遷移過程,但現有的大多數遷移方案都是通過組合多種攝取工具來解決的。而Hudi則是一種通用的增量數據處理框架,可以很容易的與多種現有計算引擎集成,有效縮短了過去冗長的數據攝取鏈路(各種組件相互配合使用),進而可以對多種數據源提供更加穩定且有效的數據攝取,如下:

數據源系統 Hudi的攝取方式 目標存儲系統
RDBMS 使用Upsert加載數據,通過讀取BinLog或Sqoop增量數據寫入到HDFS上的Hudi表,可以避免低效的全量加載。 HDFS
NoSQL 使用HBase表來維護Hudi數據的索引,但實際數據依舊是位於HDFS中。 HBase+HDFS
MQ Hudi在處理Kafka這種數據源時可以強制使用最小文件大小來改善NameNode負載,避免了頻繁創建小文件的問題。 HDFS

1.2.2 近實時分析

  SQL on Hadoop解決方案(如Presto和Spark SQL)具有出色的性能,一般可以在**幾秒鍾內完成查詢**。而Hudi是一個可以提供面向實時分析更有效的替代方案,並支持對存儲在HDFS中更大規模數據集的近實時查詢。此外,它是一個非常輕量級的庫,無需額外的組件依賴即可使用,並不會增加操作開銷

1.2.3 增量處理管道

  過去的增量處理往往通過划分成小時粒度的分區為單位,當屬於此分區內的數據寫入完成時就能對外提供相應的查詢,這使數據的新鮮程度得到顯著提高。但如果發生數據遲到現象,唯一的補救措施是通過對整個分區的重新計算來保證正確性,從而增加了整個系統的在計算和存儲方面的性能開銷。Hudi支持Record級別的方式從上游消費新數據,可以僅處理發生變更的數據到相應的表分區,同時還可以將分區的粒度縮短到分鍾級,而且還不會引入額外的系統資源開銷。

1.2.4 HDFS數據分發

  一個常見的用例是先在Hadoop體系中進行處理數據,然后再分發回面向在線服務的存儲系統,以供應用程序使用。在這種用例中一般都會引入諸如Kafka之類的隊列系統來防止目標存儲系統被壓垮。但如果不使用Kafka的情況下,僅將每次運行的Spark管道更新插入的輸出轉換為Hudi數據集,也可以有效地解決這個問題,然后以增量方式獲取數據(就像讀取Kafka topic一樣)寫入服務存儲層。

 

1.3 Hudi的核心概念

1.3.1 時間軸(Timeline)

  Hudi最核心的特性是在Hudi表中維護了一個時間軸(Timeline),每一次對表操作(比如新增、修改或刪除)都會在時間軸上創建一個即時(Instant)時間,從而可以實現僅查詢某個時間點之后成功提交的數據,或是僅查詢某個時間點之前成功提交的數據,有效避免了掃描更大時間范圍的數據。同時,還可以高效地只查詢更改前的文件(例如在某個Instant提交了更改操作后,此時查詢該Instant之前的數據則仍可以查詢到修改前的數據)

  • 時間軸(Timeline)的實現類(位於hudi-common-0.6.0.jar中)

    注意:由於hudi-spark-bundle.jarhudi-hadoop-mr-bundle.jar屬於Uber類型的jar包,已經將hudi-common-0.6.0.jar的所有class打包進去了。時間軸相關的實現類位於org.apache.hudi.common.table.timeline包下。

    最頂層的接口約定類為:HoodieTimeline

    默認使用的時間軸類:HoodieDefaultTimeline繼承自HoodieTimeline。

    活動時間軸類為:HoodieActiveTimeline(此類維護最近12小時內的時間,可配置)。

    存檔時間軸類為:HoodieArchivedTimeline(超出12小時的時間在此類中維護,可配置)。

  • 時間軸(Timeline)的核心組件

組件名稱 組件說明
Instant action

在時間軸上執行的操作

  COMMITS(一次提交表示將一組記錄原子寫入到數據集中)

  CLEANS(刪除數據集中不再需要的舊文件版本的后台活動)

  DELTA_COMMIT(增量提交,將一批記錄原子寫入到MOR表)

  COMPACTION(比如更新從基於行的日志文件變成列格式)

  ROLLBACK(指提交/增量提交不成功且已回滾並刪除在寫入時產生的文件)

  SAVEPOINT(在發生失敗時將數據還原到時間軸的某個即時時間)

實現類:org.apache.hudi.common.table.timeline.HoodieTimeline

Instant time

是一個時間戳(格式為20190117010349)且單調增加。

實現類:org.apache.hudi.common.table.timeline.HoodieInstant

State

即時的狀態包括:

  REQUESTED(已調度但尚未啟動)

  INFLIGHT(正在執行操作)

  COMPLETED(操作完成)

  INVALID(操作失敗)

實現類:org.apache.hudi.common.table.timeline.HoodieInstant.State

1.3.2 文件組織形式

  Hudi將DFS上的數據集組織到基本路徑(HoodieWriteConfig.BASE_PATH_PROP)下的目錄結構中。數據集分為多個分區(DataSourceOptions.PARTITIONPATH_FIELD_OPT_KEY),這些分區與Hive表非常相似,是包含該分區的數據文件的文件夾。

 

 

   在每個分區內,文件被組織為文件組,由文件id充當唯一標識。 每個文件組包含多個文件切片,其中每個切片包含在某個即時時間的提交/壓縮生成的基本列文件(.parquet)以及一組日志文件(.log),該文件包含自生成基本文件以來對基本文件的插入/更新。 Hudi采用MVCC設計,其中壓縮操作將日志和基本文件合並以產生新的文件切片,而清理操作則將未使用的/較舊的文件片刪除以回收DFS上的空間。

1.3.3 索引機制(4類6種)

  Hudi通過索引機制提供高效的Upsert操作,該機制會將一個RecordKey+PartitionPath組合的方式作為唯一標識映射到一個文件ID,而且,這個唯一標識和文件組/文件ID之間的映射自記錄被寫入文件組開始就不會再改變。Hudi內置了4類(6個)索引實現,均是繼承自頂層的抽象類HoodieIndex而來,如下:

索引類型 實現類 索引規則
Simple  HoodieSimpleIndex 簡單索引,基於RecordKey+PartitionPath組合的方式作為索引,僅在特定分區內查找數據。
HoodieGlobalSimpleIndex 簡單的全局索引,基於RecordKey+PartitionPath組合的方式作為索引,在全部分區中查找數據。
Memory InMemoryHashIndex 由內存中維護的HashMap來支持的Hoodie Index實現,基於RecordKey+PartitionPath組合的方式作為索引。
HBase HBaseIndex

在HBase中維護Hoodie Index。

存儲Index的HBase表名稱使用HoodieHBaseIndexConfig.HBASE_TABLENAME_PROP指定,

Row_Key是RecordKey,

列簇是HBaseIndex.SYSTEM_COLUMN_FAMILY(默認為_s),

還包括3個默認的列,即:

  HBaseIndex.COMMIT_TS_COLUMN(默認為commit_ts)、

  HBaseIndex.FILE_NAME_COLUMN(默認為file_name)、

  HBaseIndex.PARTITION_PATH_COLUMN(默認為partition_path)。

Bloom  HoodieBloomIndex 基於布隆過濾器實現的索引機制,僅在特定分區內查找。每個Parquet文件在其元數據中都包含其row_key的Bloom篩選器。
HoodieGlobalBloomIndex 基於布隆過濾器實現的全局索引機制,會在所有分區中查找。它會先獲取所有分區(只加載帶有.hoodie_partition_metadata文件的分區),然后再加載各分區內的最新文件。

注意

  • 全局索引:指在全表的所有分區范圍下強制要求鍵保持唯一,即確保對給定的鍵有且只有一個對應的記錄。全局索引提供了更強的保證,也使得更刪的消耗隨着表的大小增加而增加(O(表的大小)),更適用於是小表。

  • 非全局索引:僅在表的某一個分區內強制要求鍵保持唯一,它依靠寫入器為同一個記錄的更刪提供一致的分區路徑,但由此同時大幅提高了效率,因為索引查詢復雜度成了O(更刪的記錄數量)且可以很好地應對寫入量的擴展。

1.3.4 查詢視圖(3類)

  • 讀優化視圖 : 直接查詢基本文件(數據集的最新快照),其實就是列式文件(Parquet)。並保證與非Hudi列式數據集相比,具有相同的列式查詢性能。

  • 增量視圖 : 僅查詢新寫入數據集的文件,需要指定一個Commit/Compaction的即時時間(位於Timeline上的某個Instant)作為條件,來查詢此條件之后的新數據。

  • 實時快照視圖 : 查詢某個增量提交操作中數據集的最新快照,會先進行動態合並最新的基本文件(Parquet)和增量文件(Avro)來提供近實時數據集(通常會存在幾分鍾的延遲)。

 

權衡 讀優化視圖 實時快照視圖 增量視圖
數據延遲 更高 更低 更高
查詢延遲 更低(原始列式性能) 更低(查詢會先合並列式+行增量) 更低

 

1.4 Hudi支持的存儲類型

1.4.1 寫時復制(Copy on Write,COW)表

  COW表主要使用列式文件格式(Parquet)存儲數據,在寫入數據過程中,執行同步合並,更新數據版本並重寫數據文件,類似RDBMS中的B-Tree更新。

  1) 更新:在更新記錄時,Hudi會先找到包含更新數據的文件,然后再使用更新值(最新的數據)重寫該文件,包含其他記錄的文件保持不變。當突然有大量寫操作時會導致重寫大量文件,從而導致極大的I/O開銷

  2)讀取:在讀取數據集時,通過讀取最新的數據文件來獲取最新的更新,此存儲類型適用於少量寫入和大量讀取的場景。

1.4.2 讀時合並(Merge On Read,MOR)表

  MOR表是COW表的升級版,它使用列式(parquet)與行式(avro)文件混合的方式存儲數據。在更新記錄時,類似NoSQL中的LSM-Tree更新。

  1) 更新:在更新記錄時,僅更新到增量文件(Avro)中,然后進行異步(或同步)的compaction,最后創建列式文件(parquet)的新版本。此存儲類型適合頻繁寫的工作負載,因為新記錄是以追加的模式寫入增量文件中。

  2) 讀取:在讀取數據集時,需要先將增量文件與舊文件進行合並,然后生成列式文件成功后,再進行查詢。

1.4.3 COW和MOR的對比

權衡 寫時復制COW 讀時合並MOR
數據延遲 更高 更低
更新代價(I/O) 更高(重寫整個parquet文件) 更低(追加到增量日志)
Parquet文件大小 更小(高更新代價(I/O)) 更大(低更新代價)
寫放大 更高 更低(取決於壓縮策略)
適用場景 寫少讀多 寫多讀少

1.4.4 COW和MOR支持的視圖

存儲類型 支持的視圖 不支持的視圖
COW 讀優化 + 增量 + 實時視圖  無  
MOR 讀優化 + 近實時

增量(如果使用增量視圖查詢時會提示:Incremental view not implemented yet, for merge-on-read tables)

 

 

1.5 安裝Hudi

1.5.1 自行編譯(待驗證)

// 下載hudi的源碼發行版
wget -P /tmp/ http://archive.apache.org/dist/hudi/0.6.0/hudi-0.6.0.src.tgz
// 解壓hudi到/usr/hdp/current/下
tar -zxf /tmp/hudi-0.6.0.src.tgz -C /usr/hdp/current/
// 編譯hudi(確保服務器上有$SCALA_HOME)
mvn -DskipTests clean package

1.5.2 使用官方預編譯好的二進制發行版的jar包

// 下載Hudi on Spark的Jar包(放入$SPARK_HOME/jars/下)
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.11/0.6.0/hudi-spark-bundle_2.11-0.6.0.jar
// 下載Hudi on MapReduce的Jar包(放入$HADOOP_HOME/share/hadoop/mapreduce/下)
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/0.6.0/hudi-hadoop-mr-bundle-0.6.0.jar
// 創建軟連接(在$HIVE_HOME/lib/下)
ln -s $SPARK_HOME/jars/ $HIVE_HOME/lib/
ln -s $HADOOP_HOME/share/hadoop/mapreduce/hudi-hadoop-mr-bundle-0.6.0.jar $HIVE_HOME/lib/

 

1.6 Hudi的SparkSQL使用

  Hudi支持對文件系統(HDFS、LocalFS)和Hive的讀寫操作,以下分別使用COW和MOR存儲類型來操作文件系統和Hive的案例。

1.6.1 文件系統操作

1.6.1.1 基於COW表的LocalFS/HDFS使用

package com.mengyao.hudi

import com.mengyao.Configured
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.EmptyHoodieRecordPayload
import org.apache.hudi.config.HoodieIndexConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.JavaConverters._

/**
 * Spark on Hudi(COW表) to HDFS/LocalFS
 * @ClassName Demo1
 * @Description
 * @Created by: MengYao
 * @Date: 2021-01-11 10:10:53
 * @Version V1.0
 */
object Demo1 {

  private val APP_NAME = Demo1.getClass.getSimpleName
  private val MASTER = "local[2]"
  val SOURCE = "hudi"
  val insertData = Array[TemperatureBean](
    new TemperatureBean("4301",1,28.6,"2019-12-07 12:35:33"),
    new TemperatureBean("4312",0,31.4,"2019-12-07 12:25:03"),
    new TemperatureBean("4302",1,30.1,"2019-12-07 12:32:17"),
    new TemperatureBean("4305",3,31.5,"2019-12-07 12:33:11"),
    new TemperatureBean("4310",2,29.9,"2019-12-07 12:34:42")
  )
  val updateData = Array[TemperatureBean](
    new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器發生修改,溫度值由29.9->30.4,時間由12:34:42->12:35:42
  )
  val deleteData = Array[TemperatureBean](
    new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器要被刪除,必須與最新的數據保持一致(如果字段值不同時無法刪除)
  )

  def main(args: Array[String]): Unit = {
    System.setProperty(Configured.HADOOP_HOME_DIR, Configured.getHadoopHome)

    // 創建SparkConf
    val conf = new SparkConf()
      .set("spark.master", MASTER)
      .set("spark.app.name", APP_NAME)
      .setAll(Configured.sparkConf().asScala)
    // 創建SparkSession
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    // 關閉日志
    spark.sparkContext.setLogLevel("OFF")
    // 導入隱式轉換
    import spark.implicits._
    import DemoUtils._

    // 類似Hive中的DB(basePath的schema決定了最終要操作的文件系統,如果是file:則為LocalFS,如果是hdfs:則為HDFS)
    val basePath = "file:/D:/tmp"
    // 類似Hive中的Table
    val tableName = "tbl_temperature_cow1"
    // 數據所在的路徑
    val path = s"$basePath/$tableName"

    // 插入數據
    insert(spark.createDataFrame(insertData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)
    // 修改數據
//    update(spark.createDataFrame(updateData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)
    // 刪除數據
//    delete(spark.createDataFrame(deleteData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)
    // 【查詢方式1:默認為快照(基於行或列獲取最新視圖)查詢
//    query(spark.read.format(SOURCE).load(s"$path/*/*").orderBy($"deviceId".asc))
//    query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_SNAPSHOT_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查詢方式為:快照(默認)")).orderBy($"deviceId".asc))
    // 【查詢方式2:讀時優化
//    query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查詢方式為:讀時優化")).orderBy($"deviceId".asc))
    // 【查詢方式3:增量查詢
    // 先取出最近一次提交的時間
//    val commitTime: String = spark.read.format(SOURCE).load(s"$path/*/*").dropDuplicates("_hoodie_commit_time").select($"_hoodie_commit_time".as("commitTime")).orderBy($"commitTime".desc).first().getAs(0)
    // 再查詢最近提交時間之后的數據
//    query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_INCREMENTAL_OPT_VAL,Option((commitTime.toLong-2).toString))).load(s"$path/*/*").withColumn("queryType", lit("查詢方式為:增量查詢")).orderBy($"deviceId".asc).toDF())

    spark.close()
  }

  /**
   * 新增數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def insert(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
        // 要操作的表
        TABLE_NAME->tableName,
        // 操作的表類型(默認COW)
        TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL,
        // 執行insert操作
        OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL,
        // 設置主鍵列
        RECORDKEY_FIELD_OPT_KEY->primaryKey,
        // 設置分區列,類似Hive的表分區概念
        PARTITIONPATH_FIELD_OPT_KEY->partitionField,
        // 設置數據更新時間列,該字段數值大的數據會覆蓋小的,必須是數值類型
        PRECOMBINE_FIELD_OPT_KEY->changeDateField,
        // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
        INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,
        // 執行insert操作的shuffle並行度
        INSERT_PARALLELISM->"2"
      ))
      // 如果數據存在會覆蓋
      .mode(Overwrite)
      .save(path)
  }

  /**
   * 修改數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def update(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
        // 要操作的表
        TABLE_NAME->tableName,
        // 操作的表類型(默認COW)
        TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL,
        // 執行upsert操作
        OPERATION_OPT_KEY->UPSERT_OPERATION_OPT_VAL,
        // 設置主鍵列
        RECORDKEY_FIELD_OPT_KEY->primaryKey,
        // 設置分區列,類似Hive的表分區概念
        PARTITIONPATH_FIELD_OPT_KEY->partitionField,
        // 設置數據更新時間列,該字段數值大的數據會覆蓋小的
        PRECOMBINE_FIELD_OPT_KEY->changeDateField,
        // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
        INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,
        // 執行upsert操作的shuffle並行度
        UPSERT_PARALLELISM-> "2"
      ))
      // 如果數據存在會覆蓋
      .mode(Append)
      .save(path)
  }

  /**
   * 刪除數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def delete(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
        // 要操作的表
        TABLE_NAME->tableName,
        // 操作的表類型(默認COW)
        TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL,
        // 執行delete操作
        OPERATION_OPT_KEY->DELETE_OPERATION_OPT_VAL,
        // 設置主鍵列
        RECORDKEY_FIELD_OPT_KEY->primaryKey,
        // 設置分區列,類似Hive的表分區概念
        PARTITIONPATH_FIELD_OPT_KEY->partitionField,
        // 設置數據更新時間列,該字段數值大的數據會覆蓋小的
        PRECOMBINE_FIELD_OPT_KEY->changeDateField,
        // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
        INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,
        // 執行delete操作的shuffle並行度
        DELETE_PARALLELISM->"2",
        // 刪除策略有軟刪除(保留主鍵且其余字段為null)和硬刪除(從數據集中徹底刪除)兩種,此處為硬刪除
        PAYLOAD_CLASS_OPT_KEY->classOf[EmptyHoodieRecordPayload].getName
      ))
      // 如果數據存在會覆蓋
      .mode(Append)
      .save(path)
  }

  /**
   * 查詢類型
   *    <br>Hoodie具有3種查詢模式:</br>
   *      <br>1、默認是快照模式(Snapshot mode,根據行和列數據獲取最新視圖)</br>
   *      <br>2、增量模式(incremental mode,查詢從某個commit時間片之后的數據)</br>
   *      <br>3、讀時優化模式(Read Optimized mode,根據列數據獲取最新視圖)</br>
   * @param queryType
   * @param queryTime
   * @return
   */
  def buildQuery(queryType: String, queryTime: Option[String]=Option.empty) = Map(
    queryType match {
      // 如果是讀時優化模式(read_optimized,根據列數據獲取最新視圖)
      case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_READ_OPTIMIZED_OPT_VAL
      // 如果是增量模式(incremental mode,查詢從某個時間片之后的新數據)
      case QUERY_TYPE_INCREMENTAL_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_INCREMENTAL_OPT_VAL
      // 默認使用快照模式查詢(snapshot mode,根據行和列數據獲取最新視圖)
      case _ => QUERY_TYPE_OPT_KEY->QUERY_TYPE_SNAPSHOT_OPT_VAL
    },
    if(queryTime.nonEmpty) BEGIN_INSTANTTIME_OPT_KEY->queryTime.get else BEGIN_INSTANTTIME_OPT_KEY->"0"
  )

}

1.6.1.2 基於MOR表的LocalFS/HDFS使用

package com.mengyao.hudi

import com.mengyao.Configured
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.EmptyHoodieRecordPayload
import org.apache.hudi.config.HoodieIndexConfig.INDEX_TYPE_PROP
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SaveMode._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.lit

import scala.collection.JavaConverters._

/**
 * Spark on Hudi(MOR表) to HDFS/LocalFS
 * @ClassName Demo1
 * @Description
 * @Created by: MengYao
 * @Date: 2021-01-11 10:10:53
 * @Version V1.0
 */
object Demo2 {

  private val APP_NAME: String = Demo1.getClass.getSimpleName
  private val MASTER: String = "local[2]"
  val SOURCE: String = "hudi"
  val insertData = Array[TemperatureBean](
    new TemperatureBean("4301",1,28.6,"2019-12-07 12:35:33"),
    new TemperatureBean("4312",0,31.4,"2019-12-07 12:25:03"),
    new TemperatureBean("4302",1,30.1,"2019-12-07 12:32:17"),
    new TemperatureBean("4305",3,31.5,"2019-12-07 12:33:11"),
    new TemperatureBean("4310",2,29.9,"2019-12-07 12:34:42")
  )
  val updateData = Array[TemperatureBean](
    new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器發生修改,溫度值由29.9->30.4,時間由12:34:42->12:35:42
  )
  val deleteData = Array[TemperatureBean](
    new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器要被刪除,必須與最新的數據保持一致(如果字段值不同時無法刪除)
  )

  def main(args: Array[String]): Unit = {
    System.setProperty(Configured.HADOOP_HOME_DIR, Configured.getHadoopHome)

    // 創建SparkConf
    val conf = new SparkConf()
      .set("spark.master", MASTER)
      .set("spark.app.name", APP_NAME)
      .setAll(Configured.sparkConf().asScala)
    // 創建SparkSession
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    // 關閉日志
    spark.sparkContext.setLogLevel("OFF")
    // 導入隱式轉換
    import spark.implicits._
    import DemoUtils._

    // 類似Hive中的DB(basePath的schema決定了最終要操作的文件系統,如果是file:則為LocalFS,如果是hdfs:則為HDFS)
    val basePath = "file:/D:/tmp"
    // 類似Hive中的Table
    val tableName = "tbl_temperature_mor"
    // 數據所在的路徑
    val path = s"$basePath/$tableName"

    // 插入數據
//    insert(spark.createDataFrame(insertData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)
    // 修改數據
//    update(spark.createDataFrame(updateData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)
    // 刪除數據
//    delete(spark.createDataFrame(deleteData.toBuffer.asJava, classOf[TemperatureBean]), tableName, "deviceId", "deviceType", "cdt", path)
    // 【查詢方式1:默認為快照(基於行或列獲取最新視圖)查詢
    query(spark.read.format(SOURCE).load(s"$path/*/*").orderBy($"deviceId".asc))
//    query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_SNAPSHOT_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查詢方式為:快照(默認)")).orderBy($"deviceId".asc))
    // 【查詢方式2:讀時優化
    query(spark.read.format(SOURCE).options(buildQuery(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)).load(s"$path/*/*").withColumn("queryType", lit("查詢方式為:讀時優化")).orderBy($"deviceId".asc))
    // 【查詢方式3:增量查詢(不支持)

    spark.close()
  }

  /**
   * 新增數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def insert(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
        // 要操作的表
        TABLE_NAME->tableName,
        // 操作的表類型(使用MOR)
        TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL,
        // 執行insert操作
        OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL,
        // 設置主鍵列
        RECORDKEY_FIELD_OPT_KEY->primaryKey,
        // 設置分區列,類似Hive的表分區概念
        PARTITIONPATH_FIELD_OPT_KEY->partitionField,
        // 設置數據更新時間列,該字段數值大的數據會覆蓋小的,必須是數值類型
        PRECOMBINE_FIELD_OPT_KEY->changeDateField,
        // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
        INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,
        // 執行insert操作的shuffle並行度
        INSERT_PARALLELISM->"2"
      ))
      // 如果數據存在會覆蓋
      .mode(Overwrite)
      .save(path)
  }

  /**
   * 修改數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def update(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
        // 要操作的表
        TABLE_NAME->tableName,
        // 操作的表類型(使用MOR)
        TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL,
        // 執行upsert操作
        OPERATION_OPT_KEY->UPSERT_OPERATION_OPT_VAL,
        // 設置主鍵列
        RECORDKEY_FIELD_OPT_KEY->primaryKey,
        // 設置分區列,類似Hive的表分區概念
        PARTITIONPATH_FIELD_OPT_KEY->partitionField,
        // 設置數據更新時間列,該字段數值大的數據會覆蓋小的
        PRECOMBINE_FIELD_OPT_KEY->changeDateField,
        // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
        INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,
        // 執行upsert操作的shuffle並行度
        UPSERT_PARALLELISM-> "2"
      ))
      // 如果數據存在會覆蓋
      .mode(Append)
      .save(path)
  }

  /**
   * 刪除數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def delete(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
        // 要操作的表
        TABLE_NAME->tableName,
        // 操作的表類型(使用MOR)
        TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL,
        // 執行delete操作
        OPERATION_OPT_KEY->DELETE_OPERATION_OPT_VAL,
        // 設置主鍵列
        RECORDKEY_FIELD_OPT_KEY->primaryKey,
        // 設置分區列,類似Hive的表分區概念
        PARTITIONPATH_FIELD_OPT_KEY->partitionField,
        // 設置數據更新時間列,該字段數值大的數據會覆蓋小的
        PRECOMBINE_FIELD_OPT_KEY->changeDateField,
        // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
        INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,
        // 執行delete操作的shuffle並行度
        DELETE_PARALLELISM->"2",
        // 刪除策略有軟刪除(保留主鍵且其余字段為null)和硬刪除(從數據集中徹底刪除)兩種,此處為硬刪除
        PAYLOAD_CLASS_OPT_KEY->classOf[EmptyHoodieRecordPayload].getName
      ))
      // 如果數據存在會覆蓋
      .mode(Append)
      .save(path)
  }

  /**
   * 查詢類型
   *    <br>Hoodie具有3種查詢模式:</br>
   *      <br>1、默認是快照模式(Snapshot mode,根據行和列數據獲取最新視圖)</br>
   *      <br>2、增量模式(incremental mode,查詢從某個commit時間片之后的數據)</br>
   *      <br>3、讀時優化模式(Read Optimized mode,根據列數據獲取最新視圖)</br>
   * @param queryType
   * @param queryTime
   * @return
   */
  def buildQuery(queryType: String, queryTime: Option[String]=Option.empty): Map[String, String] = {
    Map(
      queryType match {
        // 如果是讀時優化模式(read_optimized,根據列數據獲取最新視圖)
        case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_READ_OPTIMIZED_OPT_VAL
        // 如果是增量模式(incremental mode,查詢從某個時間片之后的新數據)
        case QUERY_TYPE_INCREMENTAL_OPT_VAL => QUERY_TYPE_OPT_KEY->QUERY_TYPE_INCREMENTAL_OPT_VAL
        // 默認使用快照模式查詢(snapshot mode,根據行和列數據獲取最新視圖)
        case _ => QUERY_TYPE_OPT_KEY->QUERY_TYPE_SNAPSHOT_OPT_VAL
      },
      if(queryTime.nonEmpty) BEGIN_INSTANTTIME_OPT_KEY->queryTime.get else BEGIN_INSTANTTIME_OPT_KEY->"0"
    )
  }

}

    1.6.2 Hive操作

1.6.2.1 基於COW表的Hive使用

package com.mengyao.hudi

import com.mengyao.Configured
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.EmptyHoodieRecordPayload
import org.apache.hudi.config.HoodieIndexConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.JavaConverters._
import scala.collection.mutable._

/**
 * Spark on Hudi(COW表) to Hive
 * @ClassName Demo3
 * @Description
 * @Created by: MengYao
 * @Date: 2021-01-23 16:13:23
 * @Version V1.0
 */
class Demo3(db:String,
            table:String,
            partition:String,
            url:String,
            user:String="hive",
            password:String="hive") {

  private val hiveCfg = hiveConfig(db,table,partition, url, user, password)
  private val SOURCE = "hudi"

  /**
   * 新增數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def insert(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
          // 要操作的表
          TABLE_NAME->tableName,
          // 操作的表類型(默認COW)
          TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL,
          // 執行insert操作
          OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL,
          // 設置主鍵列
          RECORDKEY_FIELD_OPT_KEY->primaryKey,
          // 設置分區列,類似Hive的表分區概念
          PARTITIONPATH_FIELD_OPT_KEY->partitionField,
          // 設置數據更新時間列,該字段數值大的數據會覆蓋小的,必須是數值類型
          PRECOMBINE_FIELD_OPT_KEY->changeDateField,
          // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
          INDEX_TYPE_PROP-> HoodieIndex.IndexType.GLOBAL_BLOOM.name,
          // 執行insert操作的shuffle並行度
          INSERT_PARALLELISM->"2"
        ) ++= hiveCfg
      )
      // 如果數據存在會覆蓋
      .mode(Overwrite)
      .save(path)
  }

  /**
   * 修改數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def update(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
          // 要操作的表
          TABLE_NAME->tableName,
          // 操作的表類型(默認COW)
          TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL,
          // 執行upsert操作
          OPERATION_OPT_KEY->UPSERT_OPERATION_OPT_VAL,
          // 設置主鍵列
          RECORDKEY_FIELD_OPT_KEY->primaryKey,
          // 設置分區列,類似Hive的表分區概念
          PARTITIONPATH_FIELD_OPT_KEY->partitionField,
          // 設置數據更新時間列,該字段數值大的數據會覆蓋小的
          PRECOMBINE_FIELD_OPT_KEY->changeDateField,
          // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
          INDEX_TYPE_PROP-> HoodieIndex.IndexType.GLOBAL_BLOOM.name,
          // 執行upsert操作的shuffle並行度
          UPSERT_PARALLELISM->"2"
        ) ++= hiveCfg
      )
      // 如果數據存在會覆蓋
      .mode(Append)
      .save(path)
  }

  /**
   * 刪除數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def delete(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
          // 要操作的表
          TABLE_NAME->tableName,
          // 操作的表類型(默認COW)
          TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL,
          // 執行delete操作
          OPERATION_OPT_KEY->DELETE_OPERATION_OPT_VAL,
          // 設置主鍵列
          RECORDKEY_FIELD_OPT_KEY->primaryKey,
          // 設置分區列,類似Hive的表分區概念
          PARTITIONPATH_FIELD_OPT_KEY->partitionField,
          // 設置數據更新時間列,該字段數值大的數據會覆蓋小的
          PRECOMBINE_FIELD_OPT_KEY->changeDateField,
          // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
          INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,
          // 執行delete操作的shuffle並行度
          DELETE_PARALLELISM->"2",
          // 刪除策略有軟刪除(保留主鍵且其余字段為null)和硬刪除(從數據集中徹底刪除)兩種,此處為硬刪除
          PAYLOAD_CLASS_OPT_KEY->classOf[EmptyHoodieRecordPayload].getName
        ) ++= hiveCfg
      )
      // 如果數據存在會覆蓋
      .mode(Append)
      .save(path)
  }

  private def hiveConfig(db:String, table:String, partition:String,url:String,user:String="hive", password:String="hive"): Map[String, String] = {
    scala.collection.mutable.Map(
      // 設置jdbc 連接同步
      HIVE_URL_OPT_KEY -> url,
      // 設置訪問Hive的用戶名
      HIVE_USER_OPT_KEY -> user,
      // 設置訪問Hive的密碼
      HIVE_PASS_OPT_KEY -> password,
      // 設置Hive數據庫名稱
      HIVE_DATABASE_OPT_KEY -> db,
      // 設置Hive表名稱
      HIVE_TABLE_OPT_KEY->table,
      // 設置要同步的分區列名
      HIVE_PARTITION_FIELDS_OPT_KEY->partition,
      // 設置數據集注冊並同步到hive
      HIVE_SYNC_ENABLED_OPT_KEY -> "true",
      // 設置當分區變更時,當前數據的分區目錄是否變更
      BLOOM_INDEX_UPDATE_PARTITION_PATH -> "true",
      //
      HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
    )
  }

}

object Demo3 {

  private val APP_NAME = classOf[Demo3].getClass.getSimpleName
  val insertData = Array[TemperatureBean](
    new TemperatureBean("4301",1,28.6,"2019-12-07 12:35:33"),
    new TemperatureBean("4312",0,31.4,"2019-12-07 12:25:03"),
    new TemperatureBean("4302",1,30.1,"2019-12-07 12:32:17"),
    new TemperatureBean("4305",3,31.5,"2019-12-07 12:33:11"),
    new TemperatureBean("4310",2,29.9,"2019-12-07 12:34:42")
  )
  val updateData = Array[TemperatureBean](
    new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器發生修改,溫度值由29.9->30.4,時間由12:34:42->12:35:42
  )
  val deleteData = Array[TemperatureBean](
    new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器要被刪除,必須與最新的數據保持一致(如果字段值不同時無法刪除)
  )

  def main(args: Array[String]): Unit = {
    // 類似Hive中的DB(basePath的schema決定了最終要操作的文件系統,如果是file:則為LocalFS,如果是hdfs:則為HDFS)
    val basePath = "hdfs://node01:9820/apps/demo/hudi/hudi-hive-cow/"
    // Hive中的Table
    val tableName = "tbl_hudi_temperature_cow"
    // 數據所在的路徑
    val path = s"$basePath/$tableName"

    // 創建SparkConf
    val conf = new SparkConf()
      .set("spark.app.name", APP_NAME)
      .setAll(Configured.sparkConf().asScala)
    // 創建SparkSession
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    // 關閉日志
    spark.sparkContext.setLogLevel("DEBUG")
    // 導入隱式轉換
    import spark.implicits._
    // 創建Demo3實例
    val demo = new Demo3("test_db", tableName,"deviceType","jdbc:hive2://node01:10000","root","123456")
    // 插入數據
//    demo.insert(spark.createDataFrame(insertData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path)
    // 修改數據
//    demo.update(spark.createDataFrame(updateData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path)
    // 刪除數據
//    demo.delete(spark.createDataFrame(deleteData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path)

    spark.stop()
  }

}

a) 測試新增數據

  首先,在Demo3類的main方法中,僅將demo.insert的代碼取消注釋確保demo.update和demo.delete的代碼是被注釋掉的。然后打包並上傳到服務器。

  然后,在服務器中執行如下腳本:

spark-submit --verbose --class com.mengyao.hudi.Demo3 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar

  在yarn中,看到作業執行成功時:

  再去查看hive中test_db庫下是否多了一張叫做tbl_hudi_temperature_cow的表

 

 

 

 

  查看詳細的建表信息

  最后,查詢表的數據,如下:

b) 測試修改數據

  首先,在Demo3類的main方法中,僅將demo.update的代碼取消注釋確保demo.insert和demo.delete的代碼是被注釋掉的。然后打包並上傳到服務器。

  然后,在服務器中執行如下腳本:

spark-submit --verbose --class com.mengyao.hudi.Demo3 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar

  在yarn中,看到作業執行成功時

  再去Hive中查詢數據(發現deviceid為4310的數據的溫度變成了30.4度)

  為了看出更新的效果,本次修改與上次插入的數據對比圖如下:

c) 測試刪除數據

  首先,在Demo3類的main方法中,僅將demo.delete的代碼取消注釋確保demo.insert和demo.update的代碼是被注釋掉的。然后打包並上傳到服務器。

  然后,在服務器中執行如下腳本:

spark-submit --verbose --class com.mengyao.hudi.Demo3 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar

  在yarn中,看到作業執行成功時

  再去Hive中查詢數據(查看deviceid為4310的數據是否已經被成功刪除)

  為了看出更新的效果,本次刪除與上次修改以及上上次插入的數據對比圖如下:

1.6.2.2 基於MOR表的Hive使用

package com.mengyao.hudi

import com.mengyao.Configured
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.EmptyHoodieRecordPayload
import org.apache.hudi.config.HoodieIndexConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SaveMode._

import scala.collection.JavaConverters._
import scala.collection.mutable._

/**
 * Spark on Hudi(MOR表) to Hive
 * @ClassName Demo4
 * @Description
 * @Created by: MengYao
 * @Date: 2021-01-23 16:13:23
 * @Version V1.0
 */
class Demo4(db:String,
            table:String,
            partition:String,
            url:String,
            user:String="hive",
            password:String="hive") {

  private val hiveCfg = hiveConfig(db,table,partition, url, user, password)
  private val SOURCE = "hudi"

  /**
   * 新增數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def insert(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
          // 要操作的表
          TABLE_NAME->tableName,
          // 操作的表類型(默認COW)
          TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL,
          // 執行insert操作
          OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL,
          // 設置主鍵列
          RECORDKEY_FIELD_OPT_KEY->primaryKey,
          // 設置分區列,類似Hive的表分區概念
          PARTITIONPATH_FIELD_OPT_KEY->partitionField,
          // 設置數據更新時間列,該字段數值大的數據會覆蓋小的,必須是數值類型
          PRECOMBINE_FIELD_OPT_KEY->changeDateField,
          // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
          INDEX_TYPE_PROP-> HoodieIndex.IndexType.GLOBAL_BLOOM.name,
          // 執行insert操作的shuffle並行度
          INSERT_PARALLELISM->"2"
        ) ++= hiveCfg
      )
      // 如果數據存在會覆蓋
      .mode(Overwrite)
      .save(path)
  }

  /**
   * 修改數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def update(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
          // 要操作的表
          TABLE_NAME->tableName,
          // 操作的表類型(默認COW)
          TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL,
          // 執行upsert操作
          OPERATION_OPT_KEY->UPSERT_OPERATION_OPT_VAL,
          // 設置主鍵列
          RECORDKEY_FIELD_OPT_KEY->primaryKey,
          // 設置分區列,類似Hive的表分區概念
          PARTITIONPATH_FIELD_OPT_KEY->partitionField,
          // 設置數據更新時間列,該字段數值大的數據會覆蓋小的
          PRECOMBINE_FIELD_OPT_KEY->changeDateField,
          // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
          INDEX_TYPE_PROP-> HoodieIndex.IndexType.GLOBAL_BLOOM.name,
          // 執行upsert操作的shuffle並行度
          UPSERT_PARALLELISM->"2"
        ) ++= hiveCfg
      )
      // 如果數據存在會覆蓋
      .mode(Append)
      .save(path)
  }

  /**
   * 刪除數據
   * @param df                  數據集
   * @param tableName           Hudi表
   * @param primaryKey          主鍵列名
   * @param partitionField      分區列名
   * @param changeDateField     變更時間列名
   * @param path                數據的存儲路徑
   */
  def delete(df: DataFrame, tableName: String, primaryKey: String, partitionField: String, changeDateField: String, path: String): Unit = {
    df.write.format(SOURCE)
      .options(Map(
          // 要操作的表
          TABLE_NAME->tableName,
          // 操作的表類型(默認COW)
          TABLE_TYPE_OPT_KEY->MOR_TABLE_TYPE_OPT_VAL,
          // 執行delete操作
          OPERATION_OPT_KEY->DELETE_OPERATION_OPT_VAL,
          // 設置主鍵列
          RECORDKEY_FIELD_OPT_KEY->primaryKey,
          // 設置分區列,類似Hive的表分區概念
          PARTITIONPATH_FIELD_OPT_KEY->partitionField,
          // 設置數據更新時間列,該字段數值大的數據會覆蓋小的
          PRECOMBINE_FIELD_OPT_KEY->changeDateField,
          // 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
          INDEX_TYPE_PROP-> HoodieIndex.IndexType.BLOOM.name,
          // 執行delete操作的shuffle並行度
          DELETE_PARALLELISM->"2",
          // 刪除策略有軟刪除(保留主鍵且其余字段為null)和硬刪除(從數據集中徹底刪除)兩種,此處為硬刪除
          PAYLOAD_CLASS_OPT_KEY->classOf[EmptyHoodieRecordPayload].getName
        ) ++= hiveCfg
      )
      // 如果數據存在會覆蓋
      .mode(Append)
      .save(path)
  }

  private def hiveConfig(db:String, table:String, partition:String,url:String,user:String="hive", password:String="hive"): Map[String, String] = {
    scala.collection.mutable.Map(
      // 設置jdbc 連接同步
      HIVE_URL_OPT_KEY -> url,
      // 設置訪問Hive的用戶名
      HIVE_USER_OPT_KEY -> user,
      // 設置訪問Hive的密碼
      HIVE_PASS_OPT_KEY -> password,
      // 設置Hive數據庫名稱
      HIVE_DATABASE_OPT_KEY -> db,
      // 設置Hive表名稱
      HIVE_TABLE_OPT_KEY->table,
      // 設置要同步的分區列名
      HIVE_PARTITION_FIELDS_OPT_KEY->partition,
      // 設置數據集注冊並同步到hive
      HIVE_SYNC_ENABLED_OPT_KEY -> "true",
      // 設置當分區變更時,當前數據的分區目錄是否變更
      BLOOM_INDEX_UPDATE_PARTITION_PATH -> "true",
      //
      HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
    )
  }

}

object Demo4 {

  private val APP_NAME = classOf[Demo3].getClass.getSimpleName
  val insertData = Array[TemperatureBean](
    new TemperatureBean("4301",1,28.6,"2019-12-07 12:35:33"),
    new TemperatureBean("4312",0,31.4,"2019-12-07 12:25:03"),
    new TemperatureBean("4302",1,30.1,"2019-12-07 12:32:17"),
    new TemperatureBean("4305",3,31.5,"2019-12-07 12:33:11"),
    new TemperatureBean("4310",2,29.9,"2019-12-07 12:34:42")
  )
  val updateData = Array[TemperatureBean](
    new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器發生修改,溫度值由29.9->30.4,時間由12:34:42->12:35:42
  )
  val deleteData = Array[TemperatureBean](
    new TemperatureBean("4310",2,30.4,"2019-12-07 12:35:42")// 設備ID為4310的傳感器要被刪除,必須與最新的數據保持一致(如果字段值不同時無法刪除)
  )

  def main(args: Array[String]): Unit = {
    // 類似Hive中的DB(basePath的schema決定了最終要操作的文件系統,如果是file:則為LocalFS,如果是hdfs:則為HDFS)
    val basePath = "hdfs://node01:9820/apps/demo/hudi/hudi-hive-mor/"
    // Hive中的Table
    val tableName = "tbl_hudi_temperature_mor1"
    // 數據所在的路徑
    val path = s"$basePath/$tableName"

    // 創建SparkConf
    val conf = new SparkConf()
      .set("spark.app.name", APP_NAME)
      .setAll(Configured.sparkConf().asScala)
    // 創建SparkSession
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    // 關閉日志
    spark.sparkContext.setLogLevel("DEBUG")
    // 導入隱式轉換
    import spark.implicits._
    // 創建Demo4實例
    val demo = new Demo4("test_db", tableName,"deviceType","jdbc:hive2://node01:10000","root","123456")
    // 插入數據
//    demo.insert(spark.createDataFrame(insertData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path)
    // 修改數據
//    demo.update(spark.createDataFrame(updateData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path)
    // 刪除數據
    demo.delete(spark.createDataFrame(deleteData.toBuffer.asJava, classOf[TemperatureBean]),tableName, "deviceId", "deviceType", "cdt", path)

    spark.stop()
  }

}

a) 測試新增數據

  首先,在Demo4類的main方法中,僅將demo.insert的代碼取消注釋確保demo.update和demo.delete的代碼是被注釋掉的。然后打包並上傳到服務器。

  然后,在服務器中執行如下腳本:

spark-submit --verbose --class com.mengyao.hudi.Demo4 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar

  在yarn中,看到作業執行成功時

  再去查看hive中test_db庫下是否多了2張表,分別是:

表類型 核心實現類 說明
tbl_hudi_temperature_mor1_ro表 org.apache.hudi.hadoop.HoodieParquetInputFormat _ro表的意思是Read Optimized,此表的查詢性能最好。
tbl_hudi_temperature_mor1_rt表 org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat _rt表的意思是Near-Realtime,該表總是能看到最新的數據,確保數據的新鮮程度更高。

  注意:如果你總是想看到最新的數據,那么請查詢_rt表。相反,如果不在意數據的新鮮程度(比如說查詢昨天的數據、上一個小時的數據),那么ro表會有更好的查詢性能。

  查看tbl_hudi_temperature_mor1_ro表的詳細建表信息

   查看tbl_hudi_temperature_mor1_rt表的詳細建表信息

  最后,先查詢tbl_hudi_temperature_mor1_ro表的數據(請忽略Hive顯示的字段錯位),如下:

  最后,再查詢tbl_hudi_temperature_mor1_rt表的數據(請忽略Hive顯示的字段錯位),如下:

b) 測試修改數據

  首先,在Demo4類的main方法中,僅將demo.update的代碼取消注釋確保demo.insert和demo.delete的代碼是被注釋掉的。然后打包並上傳到服務器。

  然后,在服務器中執行如下腳本:

spark-submit --verbose --class com.mengyao.hudi.Demo4 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar

  在yarn中,看到作業執行成功時

  再去Hive中查詢數據(rt表的deviceid為4310的溫度數據被修改成了30.4度,但ro表不會變

c) 測試刪除數據

  首先,在Demo4類的main方法中,僅將demo.delete的代碼取消注釋確保demo.insert和demo.update的代碼是被注釋掉的。然后打包並上傳到服務器。

  然后,在服務器中執行如下腳本:

spark-submit --verbose --class com.mengyao.hudi.Demo4 --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --executor-cores 1 --queue default demo.jar

  在yarn中,看到作業執行成功時

再查看tbl_hudi_temperature_mor1_ro表和tbl_hudi_temperature_mor1_ro表(僅_rt表的deviceid為4310的溫度數據被刪除了,但_ro表不會變):

1.7 Hudi的WriteClient使用

package com.mengyao.hudi;

import com.mengyao.Configured;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.collection.JavaConverters;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;

/**
 * WriteClient模式是直接使用RDD級別api進行Hudi編程
 *      Application需要使用HoodieWriteConfig對象,並將其傳遞給HoodieWriteClient構造函數。 HoodieWriteConfig可以使用以下構建器模式構建。
 * @ClassName WriteClientMain
 * @Description
 * @Created by: MengYao
 * @Date: 2021-01-26 10:40:29
 * @Version V1.0
 */
public class WriteClientMain {

    private static final String APP_NAME = WriteClientMain.class.getSimpleName();
    private static final String MASTER = "local[2]";
    private static final String SOURCE = "hudi";

    public static void main(String[] args) {
        // 創建SparkConf
        SparkConf conf = new SparkConf()
                .setAll(JavaConverters.mapAsScalaMapConverter(Configured.sparkConf()).asScala());
        // 創建SparkContext
        JavaSparkContext jsc = new JavaSparkContext(MASTER, APP_NAME, conf);
        // 創建Hudi的WriteConfig
        HoodieWriteConfig hudiCfg = HoodieWriteConfig.newBuilder()
                .forTable("tableName")

                .withSchema("avroSchema")
                .withPath("basePath")
                .withProps(new HashMap())
 .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
                .build();
        // 創建Hudi的WriteClient
        HoodieWriteClient hudiWriteCli = new HoodieWriteClient<OverwriteWithLatestAvroPayload>(jsc, hudiCfg);

        // 1、執行新增操作
        JavaRDD<HoodieRecord> insertData = jsc.parallelize(Arrays.asList());
        String insertInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
        JavaRDD<WriteStatus> insertStatus = hudiWriteCli.insert(insertData, insertInstantTime);
        // 【注意:為了便於理解,以下所有判斷Hudi操作數據的狀態不進行額外的方法封裝】
        if (insertStatus.filter(ws->ws.hasErrors()).count()>0) {// 當提交后返回的狀態中包含error時
            hudiWriteCli.rollback(insertInstantTime);// 從時間線(insertInstantTime)中回滾,插入失敗
        } else {
            hudiWriteCli.commit(insertInstantTime, insertStatus);// 否則提交時間線(insertInstantTime)中的數據,到此,插入完成
        }

        // 2、也可以使用批量加載的方式新增數據
        String builkInsertInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
        JavaRDD<WriteStatus> bulkInsertStatus = hudiWriteCli.bulkInsert(insertData, builkInsertInstantTime);
        if (bulkInsertStatus.filter(ws->ws.hasErrors()).count()>0) {// 當提交后返回的狀態中包含error時
            hudiWriteCli.rollback(builkInsertInstantTime);// 從時間線(builkInsertInstantTime)中回滾,批量插入失敗
        } else {
            hudiWriteCli.commit(builkInsertInstantTime, bulkInsertStatus);// 否則提交時間線(builkInsertInstantTime)中的數據,到此,批量插入完成
        }

        // 3、執行修改or新增操作
        JavaRDD<HoodieRecord> updateData = jsc.parallelize(Arrays.asList());
        String updateInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
        JavaRDD<WriteStatus> updateStatus = hudiWriteCli.upsert(updateData, updateInstantTime);
        if (updateStatus.filter(ws->ws.hasErrors()).count()>0) {// 當提交后返回的狀態中包含error時
            hudiWriteCli.rollback(updateInstantTime);// 從時間線(updateInstantTime)中回滾,修改失敗
        } else {
            hudiWriteCli.commit(updateInstantTime, updateStatus);// 否則提交時間線(updateInstantTime)中的數據,到此,修改完成
        }

        // 4、執行刪除操作
        JavaRDD<HoodieRecord> deleteData = jsc.parallelize(Arrays.asList());
        String deleteInstantTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
        JavaRDD<WriteStatus> deleteStatus = hudiWriteCli.delete(deleteData, deleteInstantTime);
        if (deleteStatus.filter(ws->ws.hasErrors()).count()>0) {// 當提交后返回的狀態中包含error時
            hudiWriteCli.rollback(deleteInstantTime);// 從時間線(deleteInstantTime)中回滾,刪除失敗
        } else {
            hudiWriteCli.commit(deleteInstantTime, deleteStatus);// 否則提交時間線(deleteInstantTime)中的數據,到此,刪除完成
        }

        // 退出WriteClient
        hudiWriteCli.close();
        // 退出SparkContext
        jsc.stop();
    }

}

 

1.8 如何使用索引

1.8.1 使用SparkSQL的數據源配置

  在SparkSQL的數據源配置中,面向讀寫的通用配置參數均通過options或option來指定,可用的功能包括: 定義鍵和分區、選擇寫操作、指定如何合並記錄或選擇要讀取的視圖類型。

df.write.format("hudi")
  .options(Map(
    // 要操作的表
    TABLE_NAME->tableName,
    // 操作的表類型(默認COW)
    TABLE_TYPE_OPT_KEY -> COW_TABLE_TYPE_OPT_VAL,
    /**
     * 執行insert/upsert/delete操作,默認是upsert
     * OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL,
     *                    BULK_INSERT_OPERATION_OPT_VAL,
     *                    UPSERT_OPERATION_OPT_VAL,
     *                    DELETE_OPERATION_OPT_VAL,
     */
    OPERATION_OPT_KEY->INSERT_OPERATION_OPT_VAL,
    // 設置主鍵列
    RECORDKEY_FIELD_OPT_KEY->primaryKey,
    // 設置分區列,類似Hive的表分區概念
    PARTITIONPATH_FIELD_OPT_KEY->partitionField,
    // 設置數據更新時間列,該字段數值大的數據會覆蓋小的,必須是數值類型
    PRECOMBINE_FIELD_OPT_KEY->changeDateField,
    /**
     * 要使用的索引類型,可用的選項是[SIMPLE|BLOOM|HBASE|INMEMORY],默認為布隆過濾器
     * INDEX_TYPE_PROP -> HoodieIndex.IndexType.SIMPLE.name,
     *                    HoodieIndex.IndexType.GLOBAL_SIMPLE.name,
     *                    HoodieIndex.IndexType.INMEMORY.name,
     *                    HoodieIndex.IndexType.HBASE.name,
     *                    HoodieIndex.IndexType.BLOOM.name,
     *                    HoodieIndex.IndexType.GLOBAL_SIMPLE.name,
     */ 
    INDEX_TYPE_PROP -> HoodieIndex.IndexType.BLOOM.name,
    // 執行insert操作的shuffle並行度
    INSERT_PARALLELISM->"2"
  ))
  // 如果數據存在會覆蓋
  .mode(Overwrite)
  .save(path)

1.8.2 使用WriteClient方式配置

WriteClient是使用基於Java的RDD級別API進行編程的的一種方式,需要先構建HoodieWriteConfig對象,然后再作為參數傳遞給HoodieWriteClient構造函數。

// 創建Hudi的WriteConfig
HoodieWriteConfig hudiCfg = HoodieWriteConfig.newBuilder()
    .forTable("tableName")
    .withSchema("avroSchema")
    .withPath("basePath")
    .withProps(new HashMap())
    .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(
        HoodieIndex.IndexType.BLOOM
        // HoodieIndex.IndexType.GLOBAL_BLOOM
        // HoodieIndex.IndexType.INMEMORY
        // HoodieIndex.IndexType.HBASE
        // HoodieIndex.IndexType.SIMPLE
        // HoodieIndex.IndexType.GLOBAL_SIMPLE
    ).build())
    .build();

1.8.3 聲明索引的關鍵參數

  在Hudi中,使用索引的關鍵參數主要有2個,即hoodie.index.type和hoodie.index.class兩個。這兩個參數只需要配置其中一個即可,原因如下:

索引的配置參數 對應代碼中的KEY 解釋
hoodie.index.type HoodieIndexConfig.INDEX_TYPE_PROP 當配置此參數時,會從HBASE、INMEMORY、BLOOM、GLOBAL_BLOOM、SIMPLE、GLOBAL_SIMPLE這幾個index中選擇對應的Index實現類。
hoodie.index.class HoodieIndexConfig.INDEX_CLASS_PROP 當配置此參數時,如果是HBASE、INMEMORY、BLOOM、GLOBAL_BLOOM、SIMPLE、GLOBAL_SIMPLE這幾個index實現類的類全名就會直接通過反射實例化,否則會按照自定義Index類(必須是繼承自HoodieIndex)進行加載

1.8.4 索引參數在源碼中的實現

  可以在Hudi索引超類HoodieIndex的源碼中看到createIndex方法的定義和實現:

public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Serializable {
    protected final HoodieWriteConfig config;
​
    protected HoodieIndex(HoodieWriteConfig config) {
        this.config = config;
    }
​
    public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(HoodieWriteConfig config) throws HoodieIndexException {
        if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
            Object instance = ReflectionUtils.loadClass(config.getIndexClass(), new Object[]{config});
            if (!(instance instanceof HoodieIndex)) {
                throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex");
            } else {
                return (HoodieIndex)instance;
            }
        } else {
            switch(config.getIndexType()) {
            case HBASE:
                return new HBaseIndex(config);
            case INMEMORY:
                return new InMemoryHashIndex(config);
            case BLOOM:
                return new HoodieBloomIndex(config);
            case GLOBAL_BLOOM:
                return new HoodieGlobalBloomIndex(config);
            case SIMPLE:
                return new HoodieSimpleIndex(config);
            case GLOBAL_SIMPLE:
                return new HoodieGlobalSimpleIndex(config);
            default:
                throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType());
            }
        }
    }
​
    @PublicAPIMethod(
        maturity = ApiMaturityLevel.STABLE
    )
    public abstract JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> var1, JavaSparkContext var2, HoodieTable<T> var3);
​
    @PublicAPIMethod(
        maturity = ApiMaturityLevel.STABLE
    )
    public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> var1, JavaSparkContext var2, HoodieTable<T> var3) throws HoodieIndexException;
​
    @PublicAPIMethod(
        maturity = ApiMaturityLevel.STABLE
    )
    public abstract JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> var1, JavaSparkContext var2, HoodieTable<T> var3) throws HoodieIndexException;
​
    @PublicAPIMethod(
        maturity = ApiMaturityLevel.STABLE
    )
    public abstract boolean rollbackCommit(String var1);
​
    @PublicAPIMethod(
        maturity = ApiMaturityLevel.STABLE
    )
    public abstract boolean isGlobal();
​
    @PublicAPIMethod(
        maturity = ApiMaturityLevel.EVOLVING
    )
    public abstract boolean canIndexLogFiles();
​
    @PublicAPIMethod(
        maturity = ApiMaturityLevel.STABLE
    )
    public abstract boolean isImplicitWithStorage();
​
    public void close() {
    }
​
    public static enum IndexType {
        HBASE,
        INMEMORY,
        BLOOM,
        GLOBAL_BLOOM,
        SIMPLE,
        GLOBAL_SIMPLE;
​
        private IndexType() {
        }
    }
}

 

1.9 生產環境下的推薦配置

spark.driver.extraClassPath=/etc/hive/conf
spark.driver.extraJavaOptions=-XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.driver.maxResultSize=2g
spark.driver.memory=4g
spark.executor.cores=1
spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.executor.id=driver
spark.executor.instances=300
spark.executor.memory=6g
spark.rdd.compress=true
 
spark.kryoserializer.buffer.max=512m
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled=true
spark.sql.hive.convertMetastoreParquet=false
spark.submit.deployMode=cluster
spark.task.cpus=1
spark.task.maxFailures=4
 
spark.yarn.driver.memoryOverhead=1024
spark.yarn.executor.memoryOverhead=3072
spark.yarn.max.executor.failures=100

 

1.10 上面代碼依賴的其他類

package com.mengyao.hudi

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

import java.sql.Date
import java.sql.Timestamp

object DemoUtils {

  def schemaExtractor(caz: Class[_]): StructType = {
    val fields = caz.getDeclaredFields.map(field => {
      StructField(field.getName, field.getType.getSimpleName.toLowerCase match {
        case "byte" => DataTypes.ByteType
        case "short" => DataTypes.ShortType
        case "int" => DataTypes.IntegerType
        case "double" => DataTypes.DoubleType
        case "float" => DataTypes.FloatType
        case "long" => DataTypes.LongType
        case "boolean" => DataTypes.BooleanType
        // java.sql.Date
        case "date" => DataTypes.DateType
        // java.sql.Timestamp
        case "timestamp" => DataTypes.TimestampType
        case _ => DataTypes.StringType
      }, true)
    })
    StructType(fields)
  }

  /**
   * 測試輸出
   *
   */
  def query(df: DataFrame, numRows: Int = Int.MaxValue): Unit = {
    df.show(numRows, false)
  }

  case class A(a:Byte,b:Short,c:Int,d:Double,e:Float,f:Long,g:Boolean,h:Date,i:Timestamp,j:String) {
    def getA = a
    def getB = b
    def getC = c
    def getD = d
    def getE = e
    def getF = f
    def getG = g
    def getH = h
    def getI = i
    def getJ = j
  }


  def main(args: Array[String]): Unit = {
    val data = A(1.byteValue, 2.shortValue, 3.intValue, 4.doubleValue, 5.floatValue, 6.longValue, true, new Date(System.currentTimeMillis), new Timestamp(System.currentTimeMillis), "hello")
    println(schemaExtractor(classOf[A]).mkString("\n"))
    val spark = SparkSession.builder().appName("A").master("local[*]").getOrCreate()
    import scala.collection.JavaConverters._
    spark.createDataFrame(Array[A](data).toBuffer.asJava, classOf[A]).show(false)
    spark.close
  }

}
DemoUtils.scala
package com.mengyao;

import java.util.*;
import java.util.stream.Collectors;

/**
 *
 * @ClassName Configured
 * @Description
 * @Created by: MengYao
 * @Date: 2021-01-11 10:10:53
 * @Version V1.0
 */
public class Configured {

    private static final ResourceBundle bundle = ResourceBundle.getBundle("config", Locale.CHINESE);
    private static Map<String, String> config = new HashMap<>();
    public static final String USER_NAME = "user.name";
    public static final String JAVA_HOME = "JAVA_HOME";
    public static final String HADOOP_HOME = "HADOOP_HOME";
    public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
    public static final String HADOOP_HOME_DIR = "hadoop.home.dir";
    public static final String SPARK_HOME = "SPARK_HOME";
    public static final String DEFAULT_CHARSET = "default.charset";


    public static Map<String, String> get() {
        return bundle.keySet().stream()
                .collect(Collectors.toMap(key->key, key->bundle.getString(key), (e1,e2)->e2, HashMap::new));
    }

    public static Map<String, String> env() {
        return bundle.keySet().stream()
                .filter(k -> k.matches("(JAVA_HOME|HADOOP_CONF_DIR|SPARK_HOME|SPARK_PRINT_LAUNCH_COMMAND)"))
                .collect(Collectors.toMap(key->key, key->bundle.getString(key), (e1,e2)->e2, HashMap::new));
    }

    public static Map<String, String> sparkConf() {
        return bundle.keySet().stream()
                .filter(key->Objects.nonNull(key)&&key.startsWith("spark"))
                .collect(Collectors.toMap(key->key, key->bundle.getString(key), (e1,e2)->e2, HashMap::new));
    }

    public static Map<String, String> appConf() {
        return bundle.keySet().stream()
                .filter(key->Objects.nonNull(key)&&key.startsWith("app"))
                .collect(Collectors.toMap(key->key, key->bundle.getString(key), (e1,e2)->e2, HashMap::new));
    }

    public static String getDefaultCharset() {
        return bundle.getString(DEFAULT_CHARSET);
    }

    public static String getUserName() {
        return bundle.getString(USER_NAME);
    }
    public static String getJavaHome() {
        return bundle.getString(JAVA_HOME);
    }
    public static String getHadoopHome() {
        return bundle.getString(HADOOP_HOME);
    }
    public static String getHadoopConfDir() {
        return bundle.getString(HADOOP_CONF_DIR);
    }
    public static String getSparkHome() {
        return bundle.getString(SPARK_HOME);
    }

}
Configured.java

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM