展示從Oracle與sqlServer數據寫入到Hive中
在idea的resources文件夾放入配置文件:core-site.xml、hdfs-site.xml,hive-site.xml
hive-site.xml內容 <configuration> <property> <!--根據ambari中General內容的屬性值進行修改 --> <name>hive.metastore.uris</name> <value>thrift://dnode5:9083</value> </property> <property> <name>hive.execution.engine</name> <value>mr</value> </property> </configuration>
代碼
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object WriteToHive {
def main(args: Array[String]): Unit = {
//創建sparkConf
val sparkConf = new SparkConf().setAppName("Oracle_SqlSever_Hive").setMaster("local[*]")
//創建SparkSession
val spark = SparkSession.builder.config(sparkConf)
.enableHiveSupport//開啟hive支持
.getOrCreate
//通過jdbc獲取數據轉化為DataFrame
val o_table = ReadOracle(spark)
val sqs_table = ReadSqlserver(spark)
//創建臨時表
o_table.createOrReplaceTempView("v_account")
sqs_table.createOrReplaceTempView("v_record_copy")
val sql = "select " +
"vr.CardData carddata," +
"va.SNO pcode," +
"vr.PName pname," +
"vr.OccTime occtime," +
"vr.CodeIndex codeindex," +
"vr.PortNum portnum," +
"vr.EquptID equptid," +
"vr.EquptName equptname," +
"vr.LctnName lctnname," +
"date_format(occtime,'yyyy') occyear," +
"date_format(occtime,'MM') occmonth," +
"date_format(occtime,'dd') occday from v_record_copy vr " +
"left join v_account va on vr.CardData = va.CARDID " +
"order by vr.OccTime desc"
//通過sparkSql操作hive執行hive語句
spark.sql(sql).createOrReplaceTempView("v_table")
spark.sql("use sjkm")
spark.sql("select * from v_table").write.mode("overwrite").saveAsTable("view_record")
spark.stop()
}
//通過jdbc連接oracle
def ReadOracle(sparkSession: SparkSession): DataFrame = {
sparkSession.read.format("jdbc")
.option("url", "jdbc:oracle:thin:@ip:5521/服務名")
.option("dbtable", "庫名.表名")
.option("user", "xxx")
.option("password", "xxxx")
.option("driver", "oracle.jdbc.OracleDriver")
.load()
}
//通過jdbc連接sqlserver
def ReadSqlserver(sparkSession: SparkSession): DataFrame = {
sparkSession.read.format("jdbc")
.option("url", "jdbc:sqlserver://ip:5521;databaseName=數據庫名")
.option("dbtable", "模式名.表名")
.option("user", "xxxx")
.option("password", "xxxx")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.load()
}
}
