1、需求背景
通過Spark將關系型數據庫(以Oracle為例)的表同步的Hive,這里講的只是同步歷史數據,不包括同步增量數據。
2、Oracle和Hive的字段類型對應
利用Spark的字段類型自動匹配,本來以為Spark匹配的不是很好,只是簡單的判斷一下是否為數字、字符串,結果經驗證,Spark可以獲取到Oracle的小數點精度,Spark的字段類型對應和我自己整理的差不多,所以就索性用Spark自帶的字段類型匹配,而不是自己去Oracle相關表獲取每個字段類型,然后一一轉化為Hive對應的字段類型,下面是Oracle和Hive的字段類型對應,只是整理了大概:
Oracle | Hive |
---|---|
VARCHAR2 | String |
NVARCHAR2 | String |
NUMBER | DECIMAL/Int |
DATE | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
CHAR | String |
CLOB(一般用不到) | String |
BLOB(一般用不到) | BINARY |
RAW (一般用不到) | BINARY |
Other | String |
2.1 看一下Spark字段類型對應
首先建一張包含大部分字段類型的Oracle表
CREATE TABLE TEST ( COL1 VARCHAR2(25), COL2 NVARCHAR2(18), COL3 INTEGER, COL4 NUMBER(10,4), COL5 NUMBER(30,7), COL6 NUMBER, COL7 DATE, COL8 TIMESTAMP, COL9 CHAR(30), COL10 CLOB, COL11 BLOB, COL12 RAW(12) ) ; COMMENT ON COLUMN TEST.COL2 IS '注釋2' ; COMMENT ON COLUMN TEST.COL7 IS '注釋7' ; COMMENT ON COLUMN TEST.COL10 IS '注釋10' ;

然后用Spark打印一下獲取到的字段類型。

可以看到Spark成功的完成上述表格的字段類型轉化,小數的精度和是否為空都可以獲取到,但是不完美的一點是沒有將NUMBER標度為零的轉換為Int,而還是以DECIMAL(38,0)的形式表示,雖然都是表示的整數,但是在后面Spark讀取hive的時候,還需要將DECIMAL轉為Int。
2.2 按需修改字段類型對應
以上面講的將DECIMAL(38,0)轉為Int為例:
先嘗試通過修改schema實現
import org.apache.spark.sql.types._ val schema = df.schema.map(s => { if (s.dataType.equals(DecimalType(38, 0))) { new StructField(s.name, IntegerType, s.nullable) } else { s } }) //根據添加了注釋的schema,新建DataFrame val new_df = spark.createDataFrame(df.rdd, StructType(schema)).repartition(160) new_df.printSchema()

可以看到,已經成功的將COL3的字段轉為了Int。但是這樣構造的DataFrame是不能用的,如執行new_df.show會報如下錯誤:
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of int
原因是rdd的數據類型和schema的數據類型不匹配。
最后可以通過如下方式實現:
import scala.collection.mutable.ArrayBuffer //需要轉換的列名 val colName = ArrayBuffer[String]() val schema = df.schema.foreach(s => { if (s.dataType.equals(DecimalType(38, 0))) { colName += s.name } }) import org.apache.spark.sql.functions._ var df_int = df colName.foreach(name => { df_int = df_int.withColumn(name, col(name).cast(IntegerType)) }) df_int.printSchema() df_int.show

3、Oracle全部歷史數據同步Hive
3.1 再新建一張表
這里的目的是表示多個表,而不是一個表,上面已經建了一張表,再建一張表,以驗證代碼可以將所有的表都同步過去,這里用上一篇博客上的建表Sql即
CREATE TABLE ORA_TEST ( ID VARCHAR2(100), NAME VARCHAR2(100) ); COMMENT ON COLUMN ORA_TEST.ID IS 'ID'; COMMENT ON COLUMN ORA_TEST.NAME IS '名字'; COMMENT ON TABLE ORA_TEST IS '測試';
再在每張表里造點數據,這里就不截圖了。
3.2 代碼
上篇博客里用到的注釋,是在程序里手工添加的注釋,下面的代碼是從Oracle里取的,且同步的是一個用戶下所有的表。
package com.dkl.leanring.spark.sql.Oracle
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import java.util.Properties import scala.collection.mutable.ArrayBuffer /** * * Spark自動建表(帶字段注釋、暫無表注釋) * 並將中間庫Oracle的歷史數據全部初始化到hive * * 實現方案: * 1、 利用Spark的自動字段類型匹配 * 2、 讀取Oracle表字段注釋,添加到DataFrame的元數據 * 3、按需修改Spark默認的字段類型轉換 * * 注:需要提前建好對應的hive數據庫 */ object Oracle2Hive { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("Oracle2Hive") .master("local") .config("spark.sql.parquet.writeLegacyFormat", true) .enableHiveSupport() .getOrCreate() //oracle的連接信息 val p = new Properties() p.put("driver", "oracle.jdbc.driver.OracleDriver") p.put("url", "jdbc:oracle:thin:@192.168.44.128:1521:orcl") p.put("user", "bigdata") p.put("password", "bigdata") import scala.collection.JavaConversions._ val database_conf: scala.collection.mutable.Map[String, String] = p //Oracle是分用戶的,這里以用戶BIGDATA為例 val owner = "BIGDATA" val sql_in_owner = s"('${owner}')" database_conf.put("dbtable", "TEST") spark.sql(s"use ${owner}") database_conf.put("dbtable", s"(select table_name from all_tables where owner in ${sql_in_owner})a") //所有的表名 val allTableNames = getDataFrame(spark, database_conf) database_conf.put("dbtable", s"(select * from all_col_comments where owner in ${sql_in_owner})a") //所有的表字段對應的注釋 val allColComments = getDataFrame(spark, database_conf).repartition(160).cache allTableNames.select("table_name").collect().foreach(row => { //表名 val table_name = row.getAs[String]("table_name") database_conf.put("dbtable", table_name) //根據表名從Oracle取數 val df = getDataFrame(spark, database_conf) //字段名 和注 對應的map val colName_comments_map = allColComments.where(s"TABLE_NAME='${table_name}'") .select("COLUMN_NAME", "COMMENTS") .na.fill("", Array("COMMENTS")) .rdd.map(row => (row.getAs[String]("COLUMN_NAME"), row.getAs[String]("COMMENTS"))) .collect() .toMap val colName = ArrayBuffer[String]() //為schema添加注釋信息 val schema = df.schema.map(s => { if (s.dataType.equals(DecimalType(38, 0))) { colName += s.name new StructField(s.name, IntegerType, s.nullable, s.metadata).withComment(colName_comments_map(s.name)) } else { s.withComment(colName_comments_map(s.name)) } }) import org.apache.spark.sql.functions._ var df_int = df colName.foreach(name => { df_int = df_int.withColumn(name, col(name).cast(IntegerType)) }) //根據添加了注釋的schema,新建DataFrame val new_df = spark.createDataFrame(df_int.rdd, StructType(schema)) new_df.write.mode("overwrite").saveAsTable(table_name) // new_df.schema.foreach(s => println(s.metadata)) // new_df.printSchema() }) spark.stop } /** * @param spark SparkSession * @param database_conf 數據庫配置項Map,包括driver,url,username,password,dbtable等內容,提交程序時需用--jars選項引用相關jar包 * @return 返回DataFrame對象 */ def getDataFrame(spark: SparkSession, database_conf: scala.collection.Map[String, String]) = { spark.read.format("jdbc").options(database_conf).load() } }
3.3 看一下Hive里的結果

這樣就成功的完成了Oracle歷史數據到Hive的同步!
4、關於增量數據的同步
4.1 實時同步
可以考慮這樣,先用ogg將Oracle的增量數據實時同步到kafka,再用Spark Streaming實現kafka到hive的實時同步。
- 下面兩篇文章提供參考:利用ogg實現oracle到kafka的增量數據實時同步、Spark Streamming+Kafka提交offset實現有且僅有一次,其中Spark Streaming的代碼並沒有實現寫入hive的功能,但是實時讀取kafka的功能已經實現,只要自己處理一下解析kafka里json格式的增量數據,轉成DataFrame保存到hive表里即可。
4.2 非實時
如果Oracle的每個表里都有時間字段,那么可以通過時間字段來過濾增量數據,用上面的Spark程序去定時的跑,如果沒有時間字段的話,可以用ogg的colmap函數增加時間字段,先實時同步到中間的Oracle庫,再根據時間字段來同步。