一、將Hudi數據同步到Hive
1)需要將編譯好的hudi-hadoop-mr-bundle-0.10.0.jar,放到對應的環境中,../CDH/jars 和 ../CDH/lib/hive/lib下面,具體步驟可以參考Flink1.3.1+Hudi0.10初探
cd /app/hudi-0.10.0/packaging/hudi-hadoop-mr-bundle/target
cp hudi-hadoop-mr-bundle-0.10.0.jar /opt/cloudera/parcels/CDH/lib/hive/lib
cp hudi-hadoop-mr-bundle-0.10.0.jar /opt/cloudera/parcels/CDH/jars
# 在hive的輔助jar,auxlib目錄下也需要放相應的jar
cp hudi-hadoop-mr-bundle-0.10.0.jar /usr/local/src/hook/hive
2)測試數據
uuid,name,addr,phone,update_date,bir_date 1,逝去的青春,上海市寶山區,183****1111,20200805,20020101 2,葬愛,上海市虹口區,183****2222,20200805,20020101 3,罙罙の回憶,上海市虹口區,183****3333,20200805,20020101 4,忘了天空的顏色,上海市虹口區,183****4444,20200805,20020101 5,李彥龍,上海市松江區,183****5555,20200801,20010101 6,李浩鵬,上海市松江區,183****6666,20200801,20010101 7,李天一,上海市松江區,183****7777,20200801,20010101 8,李朵雯,上海市松江區,183****8888,20200801,20010101 9,李雨杭,上海市松江區,183****9999,20200801,20010101 10,王滿,杭州市西湖區,153****0000,20200802,20000101 11,王琳,杭州市西湖區,153****1111,20200802,20000101 12,王昕,杭州市西湖區,153****2222,20200802,20000101 13,賈一一,杭州市西湖區,153****3333,20200802,20000101 14,石浩,西安市蓮湖區,137****4444,20200803,19970101 15,石子彤,西安市蓮湖區,137****5555,20200803,19970101 16,許放炮的,西安市蓮湖區,137****6666,20200803,19970101
3)pom.xml:這里我的hudi-spark-bundle_2.11已經引入到工程中,maven里沒有寫
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.shydow</groupId> <artifactId>spark-hudi-tutorial</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-avro_2.11</artifactId> <version>2.4.5</version> </dependency> <!-- <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.0.0-cdh6.2.1</version> </dependency> --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.25</version> </dependency> <!-- Hudi依賴 --> <!-- <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark-bundle_2.11</artifactId> <scope>provided</scope> <version>0.10.0</version> </dependency> --> </dependencies> <build> <plugins> <!-- 指定編譯java的插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!-- 指定編譯scala的插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <!-- 把依賴jar中的用到的類,提取到自己的jar中 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <!--下面是為了使用 mvn package命令,如果不加則使用mvn assembly--> <executions> <execution> <id>make-assemble</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
二、在使用spark2.4.0-cdh6.2.1查詢同步的hive表時,存在錯誤,因為先前修改源碼為了寫入數據,建議升級spark為2.4.3以上(以下實踐在spark2.4.5上進行)
注:如果只是將上游數據寫入hudi,同時同步到hive中,如果后續spark不讀取生成rt或者ro表,使用2.4.0版本是沒問題的,但如果需要使用spark繼續對生成的hive進行處理,建議升級版本2.4.3以上,並將hive-site.xml拷貝到spark/conf下
1)寫入數據
package com.shydow.Hudi import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieIndexConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.index.HoodieIndex import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} /** * @author Shydow * @date 2021/12/26 0:35 * @desc hudi同步數據到hive */ object HudiSyncHiveTest { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[*]") .appName("insert") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ /* 插入數據,同時同步寫hive */ insert(spark, "file:///D:\\IdeaProjects\\spark-hudi-tutorial\\data\\test.csv") /* 讀hive中同步的表 */ spark.read.table("default.member_rt").where("part=20200801").show() /* 讀hudi表 */ spark.read.format("hudi") .load("/workspace/hudi/hive") .show() spark.close() } def insert(spark: SparkSession, path: String) = { import org.apache.spark.sql.functions._ val timestamp: String = System.currentTimeMillis().toString // 生成提交時間 val frame: DataFrame = spark.read.option("header", "true") .csv(path) val insertDF: DataFrame = frame.withColumn("ts", lit(timestamp)) .withColumn("part", col("update_date")) Class.forName("org.apache.hive.jdbc.HiveDriver") insertDF.write.format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //選擇表的類型 到底是MERGE_ON_READ 還是 COPY_ON_WRITE .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid") //設置主鍵 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") //數據更新時間戳的 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "part") //hudi分區列 .option("hoodie.table.name", "member") //hudi表名 "hoodie.table.name" .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://hadoop001:10000") //hiveserver2地址 .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default") //設置hudi與hive同步的數據庫 .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member") //設置hudi與hive同步的表名 .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "part") //hive表同步的分區列 .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName) // 分區提取器 按/ 提取分區 .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") //設置數據集注冊並同步到hive .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") //設置當分區變更時,當前數據的分區目錄是否變更 .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //設置索引類型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四種索引 為了保證分區變更后能找到必須設置全局GLOBAL_BLOOM .option("hoodie.insert.shuffle.parallelism", "12") .option("hoodie.upsert.shuffle.parallelism", "12") .mode(SaveMode.Overwrite) .save("/workspace/hudi/hive") } }
spark-submit提交任務:
./bin/spark-submit --master yarn --driver-memory 1g --num-executors 2 --executor-memory 2g --executor-cores 2 --jars ./jars/hudi-spark-bundle_2.11-0.10.0.jar --class com.shydow.Launcher ./workspace/spark-hudi-tutorial-1.0-SNAPSHOT-jar-with-dependencies.jar
hive中生成的ro與rt表結構:
CREATE EXTERNAL TABLE `user_info_ro`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `uuid` string, `name` string, `addr` string, `phone` string, `update_date` string, `bir_date` string, `ts` string) PARTITIONED BY ( `part` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH SERDEPROPERTIES ( 'hoodie.query.as.ro.table'='true', 'path'='/workspace/launcher2') STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://hadoop001:8020/workspace/launcher2' TBLPROPERTIES ( 'last_commit_time_sync'='20211227235552794', 'spark.sql.sources.provider'='hudi', 'spark.sql.sources.schema.numPartCols'='1', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"addr","type":"string","nullable":true,"metadata":{}},{"name":"phone","type":"string","nullable":true,"metadata":{}},{"name":"update_date","type":"string","nullable":true,"metadata":{}},{"name":"bir_date","type":"string","nullable":true,"metadata":{}},{"name":"ts","type":"string","nullable":false,"metadata":{}},{"name":"part","type":"string","nullable":true,"metadata":{}}]}', 'spark.sql.sources.schema.partCol.0'='part', 'transient_lastDdlTime'='1640620564')
CREATE EXTERNAL TABLE `user_info_rt`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `uuid` string, `name` string, `addr` string, `phone` string, `update_date` string, `bir_date` string, `ts` string) PARTITIONED BY ( `part` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH SERDEPROPERTIES ( 'hoodie.query.as.ro.table'='false', 'path'='/workspace/launcher2') STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://hadoop001:8020/workspace/launcher2' TBLPROPERTIES ( 'last_commit_time_sync'='20211227235552794', 'spark.sql.sources.provider'='hudi', 'spark.sql.sources.schema.numPartCols'='1', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"addr","type":"string","nullable":true,"metadata":{}},{"name":"phone","type":"string","nullable":true,"metadata":{}},{"name":"update_date","type":"string","nullable":true,"metadata":{}},{"name":"bir_date","type":"string","nullable":true,"metadata":{}},{"name":"ts","type":"string","nullable":false,"metadata":{}},{"name":"part","type":"string","nullable":true,"metadata":{}}]}', 'spark.sql.sources.schema.partCol.0'='part', 'transient_lastDdlTime'='1640620564')
Hudi表類型選擇DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL在sync hive時將生成ro和rt表,當選擇DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL只會生成一張表
2)更新數據
def update(spark: SparkSession, path: String) = { import org.apache.spark.sql.functions._ val frame: DataFrame = spark.read.option("header", "true") .csv(path) // 修改部分數據 val timestamp: String = System.currentTimeMillis().toString val result: DataFrame = frame.where("bir_date='19970101'") .withColumn("bir_date", from_unixtime(unix_timestamp(col("bir_date"), "yyyyMMdd"), "yyyy/MM/dd")) .withColumn("update_date", lit("20211227")) .withColumn("name", lit("baron")) .withColumn("ts", lit(timestamp)) .withColumn("part", col("update_date")) result.write.format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //選擇表的類型 到底是MERGE_ON_READ 還是 COPY_ON_WRITE .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid") //設置主鍵 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") //數據更新時間戳的 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "part") //hudi分區列 .option("hoodie.table.name", "member") //hudi表名 "hoodie.table.name" .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://hadoop001:10000") //hiveserver2地址 .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default") //設置hudi與hive同步的數據庫 .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "member") //設置hudi與hive同步的表名 .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "part") //hive表同步的分區列 .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName) // 分區提取器 按/ 提取分區 .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") //設置數據集注冊並同步到hive .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") //設置當分區變更時,當前數據的分區目錄是否變更 .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) //設置索引類型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四種索引 為了保證分區變更后能找到必須設置全局GLOBAL_BLOOM .option("hoodie.insert.shuffle.parallelism", "12") .option("hoodie.upsert.shuffle.parallelism", "12") .mode(SaveMode.Append) .save("/workspace/hudi/hive") }
3)查詢增量視圖
def incrementalQuery(spark: SparkSession) = { spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), "20211227000657989") .option(DataSourceReadOptions.END_INSTANTTIME.key(), "20211228003845629") .load("/workspace/hudi/hive") .show(false) }