Hudi,Hive Sync,實現湖倉一體操作


一、將Hudi數據同步到Hive

1)需要將編譯好的hudi-hadoop-mr-bundle-0.10.0.jar,放到對應的環境中,../CDH/jars 和 ../CDH/lib/hive/lib下面,具體步驟可以參考Flink1.3.1+Hudi0.10初探

cd /app/hudi-0.10.0/packaging/hudi-hadoop-mr-bundle/target
cp hudi-hadoop-mr-bundle-0.10.0.jar /opt/cloudera/parcels/CDH/lib/hive/lib
cp hudi-hadoop-mr-bundle-0.10.0.jar /opt/cloudera/parcels/CDH/jars

# 在hive的輔助jar,auxlib目錄下也需要放相應的jar
cp hudi-hadoop-mr-bundle-0.10.0.jar /usr/local/src/hook/hive

 

2)測試數據

uuid,name,addr,phone,update_date,bir_date
1,逝去的青春,上海市寶山區,183****1111,20200805,20020101
2,葬愛,上海市虹口區,183****2222,20200805,20020101
3,罙罙の回憶,上海市虹口區,183****3333,20200805,20020101
4,忘了天空的顏色,上海市虹口區,183****4444,20200805,20020101
5,李彥龍,上海市松江區,183****5555,20200801,20010101
6,李浩鵬,上海市松江區,183****6666,20200801,20010101
7,李天一,上海市松江區,183****7777,20200801,20010101
8,李朵雯,上海市松江區,183****8888,20200801,20010101
9,李雨杭,上海市松江區,183****9999,20200801,20010101
10,王滿,杭州市西湖區,153****0000,20200802,20000101
11,王琳,杭州市西湖區,153****1111,20200802,20000101
12,王昕,杭州市西湖區,153****2222,20200802,20000101
13,賈一一,杭州市西湖區,153****3333,20200802,20000101
14,石浩,西安市蓮湖區,137****4444,20200803,19970101
15,石子彤,西安市蓮湖區,137****5555,20200803,19970101
16,許放炮的,西安市蓮湖區,137****6666,20200803,19970101

 

3)pom.xml:這里我的hudi-spark-bundle_2.11已經引入到工程中,maven里沒有寫

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.shydow</groupId>
  <artifactId>spark-hudi-tutorial</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.12</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-avro_2.11</artifactId>
      <version>2.4.5</version>
    </dependency>
    <!--
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.0.0-cdh6.2.1</version>
    </dependency>
    -->
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.25</version>
    </dependency>

    <!-- Hudi依賴 -->
    <!--
    <dependency>
      <groupId>org.apache.hudi</groupId>
      <artifactId>hudi-spark-bundle_2.11</artifactId>
      <scope>provided</scope>
      <version>0.10.0</version>
    </dependency>
    -->

  </dependencies>

  <build>
    <plugins>
      <!-- 指定編譯java的插件 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.5.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>

      <!-- 指定編譯scala的插件 -->
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>

      <!--  把依賴jar中的用到的類,提取到自己的jar中 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.6</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <!--下面是為了使用 mvn package命令,如果不加則使用mvn assembly-->
        <executions>
          <execution>
            <id>make-assemble</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

 

二、在使用spark2.4.0-cdh6.2.1查詢同步的hive表時,存在錯誤,因為先前修改源碼為了寫入數據,建議升級spark為2.4.3以上(以下實踐在spark2.4.5上進行)

注:如果只是將上游數據寫入hudi,同時同步到hive中,如果后續spark不讀取生成rt或者ro表,使用2.4.0版本是沒問題的,但如果需要使用spark繼續對生成的hive進行處理,建議升級版本2.4.3以上,並將hive-site.xml拷貝到spark/conf下

1)寫入數據

package com.shydow.Hudi

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
 * @author Shydow
 * @date 2021/12/26 0:35
 * @desc hudi同步數據到hive
 */
object HudiSyncHiveTest {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("insert")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    /* 插入數據,同時同步寫hive */
    insert(spark, "file:///D:\\IdeaProjects\\spark-hudi-tutorial\\data\\test.csv")

    /* 讀hive中同步的表 */
    spark.read.table("default.member_rt").where("part=20200801").show()

    /* 讀hudi表 */
    spark.read.format("hudi")
      .load("/workspace/hudi/hive")
      .show()

    spark.close()
  }

  def insert(spark: SparkSession, path: String) = {
    import org.apache.spark.sql.functions._
    val timestamp: String = System.currentTimeMillis().toString  // 生成提交時間
    val frame: DataFrame = spark.read.option("header", "true")
      .csv(path)
    val insertDF: DataFrame = frame.withColumn("ts", lit(timestamp))
      .withColumn("part", col("update_date"))
    Class.forName("org.apache.hive.jdbc.HiveDriver")
    insertDF.write.format("hudi")
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //選擇表的類型 到底是MERGE_ON_READ 還是 COPY_ON_WRITE
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid") //設置主鍵
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") //數據更新時間戳的
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "part") //hudi分區列
      .option("hoodie.table.name", "member") //hudi表名 "hoodie.table.name"
      .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://hadoop001:10000") //hiveserver2地址
      .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default") //設置hudi與hive同步的數據庫
      .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member") //設置hudi與hive同步的表名
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "part") //hive表同步的分區列
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName) // 分區提取器 按/ 提取分區
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") //設置數據集注冊並同步到hive
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") //設置當分區變更時,當前數據的分區目錄是否變更
      .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //設置索引類型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四種索引 為了保證分區變更后能找到必須設置全局GLOBAL_BLOOM
      .option("hoodie.insert.shuffle.parallelism", "12")
      .option("hoodie.upsert.shuffle.parallelism", "12")
      .mode(SaveMode.Overwrite)
      .save("/workspace/hudi/hive")
  }
}

 

spark-submit提交任務:

./bin/spark-submit --master yarn --driver-memory 1g --num-executors 2 --executor-memory 2g --executor-cores 2 --jars ./jars/hudi-spark-bundle_2.11-0.10.0.jar --class com.shydow.Launcher ./workspace/spark-hudi-tutorial-1.0-SNAPSHOT-jar-with-dependencies.jar

 

hive中生成的ro與rt表結構:

CREATE EXTERNAL TABLE `user_info_ro`(
  `_hoodie_commit_time` string, 
  `_hoodie_commit_seqno` string, 
  `_hoodie_record_key` string, 
  `_hoodie_partition_path` string, 
  `_hoodie_file_name` string, 
  `uuid` string, 
  `name` string, 
  `addr` string, 
  `phone` string, 
  `update_date` string, 
  `bir_date` string, 
  `ts` string)
PARTITIONED BY ( 
  `part` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
WITH SERDEPROPERTIES ( 
  'hoodie.query.as.ro.table'='true', 
  'path'='/workspace/launcher2') 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://hadoop001:8020/workspace/launcher2'
TBLPROPERTIES (
  'last_commit_time_sync'='20211227235552794', 
  'spark.sql.sources.provider'='hudi', 
  'spark.sql.sources.schema.numPartCols'='1', 
  'spark.sql.sources.schema.numParts'='1', 
  'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"addr","type":"string","nullable":true,"metadata":{}},{"name":"phone","type":"string","nullable":true,"metadata":{}},{"name":"update_date","type":"string","nullable":true,"metadata":{}},{"name":"bir_date","type":"string","nullable":true,"metadata":{}},{"name":"ts","type":"string","nullable":false,"metadata":{}},{"name":"part","type":"string","nullable":true,"metadata":{}}]}', 
  'spark.sql.sources.schema.partCol.0'='part', 
  'transient_lastDdlTime'='1640620564')
CREATE EXTERNAL TABLE `user_info_rt`(
  `_hoodie_commit_time` string, 
  `_hoodie_commit_seqno` string, 
  `_hoodie_record_key` string, 
  `_hoodie_partition_path` string, 
  `_hoodie_file_name` string, 
  `uuid` string, 
  `name` string, 
  `addr` string, 
  `phone` string, 
  `update_date` string, 
  `bir_date` string, 
  `ts` string)
PARTITIONED BY ( 
  `part` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
WITH SERDEPROPERTIES ( 
  'hoodie.query.as.ro.table'='false', 
  'path'='/workspace/launcher2') 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://hadoop001:8020/workspace/launcher2'
TBLPROPERTIES (
  'last_commit_time_sync'='20211227235552794', 
  'spark.sql.sources.provider'='hudi', 
  'spark.sql.sources.schema.numPartCols'='1', 
  'spark.sql.sources.schema.numParts'='1', 
  'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"addr","type":"string","nullable":true,"metadata":{}},{"name":"phone","type":"string","nullable":true,"metadata":{}},{"name":"update_date","type":"string","nullable":true,"metadata":{}},{"name":"bir_date","type":"string","nullable":true,"metadata":{}},{"name":"ts","type":"string","nullable":false,"metadata":{}},{"name":"part","type":"string","nullable":true,"metadata":{}}]}', 
  'spark.sql.sources.schema.partCol.0'='part', 
  'transient_lastDdlTime'='1640620564')

 

Hudi表類型選擇DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL在sync hive時將生成ro和rt表,當選擇DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL只會生成一張表

 

2)更新數據

def update(spark: SparkSession, path: String) = {
    import org.apache.spark.sql.functions._
    val frame: DataFrame = spark.read.option("header", "true")
      .csv(path)

    // 修改部分數據
    val timestamp: String = System.currentTimeMillis().toString
    val result: DataFrame = frame.where("bir_date='19970101'")
      .withColumn("bir_date", from_unixtime(unix_timestamp(col("bir_date"), "yyyyMMdd"), "yyyy/MM/dd"))
      .withColumn("update_date", lit("20211227"))
      .withColumn("name", lit("baron"))
      .withColumn("ts", lit(timestamp))
      .withColumn("part", col("update_date"))

    result.write.format("hudi")
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //選擇表的類型 到底是MERGE_ON_READ 還是 COPY_ON_WRITE
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid") //設置主鍵
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") //數據更新時間戳的
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "part") //hudi分區列
      .option("hoodie.table.name", "member") //hudi表名 "hoodie.table.name"
      .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://hadoop001:10000") //hiveserver2地址
      .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default") //設置hudi與hive同步的數據庫
      .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member") //設置hudi與hive同步的表名
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "part") //hive表同步的分區列
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName) // 分區提取器 按/ 提取分區
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") //設置數據集注冊並同步到hive
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") //設置當分區變更時,當前數據的分區目錄是否變更
      .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //設置索引類型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四種索引 為了保證分區變更后能找到必須設置全局GLOBAL_BLOOM
      .option("hoodie.insert.shuffle.parallelism", "12")
      .option("hoodie.upsert.shuffle.parallelism", "12")
      .mode(SaveMode.Append)
      .save("/workspace/hudi/hive")
  }

 

3)查詢增量視圖

def incrementalQuery(spark: SparkSession) = {
    spark.read.format("hudi")
      .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), "20211227000657989") 
      .option(DataSourceReadOptions.END_INSTANTTIME.key(), "20211228003845629")
      .load("/workspace/hudi/hive")
      .show(false)
  }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM