sparkSql將不同數據庫數據寫入hive


 

展示從Oracle與sqlServer數據寫入到Hive中

在idea的resources文件夾放入配置文件:core-site.xml、hdfs-site.xml,hive-site.xml

hive-site.xml內容 
<configuration>
    <property>
        <!--根據ambari中General內容的屬性值進行修改 -->
        <name>hive.metastore.uris</name>
        <value>thrift://dnode5:9083</value>
    </property>
 
    <property>
        <name>hive.execution.engine</name>
        <value>mr</value>
    </property>
</configuration>

代碼

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object WriteToHive {
  def main(args: Array[String]): Unit = {

    //創建sparkConf
    val sparkConf = new SparkConf().setAppName("Oracle_SqlSever_Hive").setMaster("local[*]")

    //創建SparkSession
    val spark = SparkSession.builder.config(sparkConf)
      .enableHiveSupport//開啟hive支持
      .getOrCreate


    //通過jdbc獲取數據轉化為DataFrame
    val o_table = ReadOracle(spark)
    val sqs_table = ReadSqlserver(spark)

    //創建臨時表
    o_table.createOrReplaceTempView("v_account")
    sqs_table.createOrReplaceTempView("v_record_copy")

    val sql = "select " +
      "vr.CardData carddata," +
      "va.SNO pcode," +
      "vr.PName pname," +
      "vr.OccTime occtime," +
      "vr.CodeIndex codeindex," +
      "vr.PortNum portnum," +
      "vr.EquptID equptid," +
      "vr.EquptName equptname," +
      "vr.LctnName lctnname," +
      "date_format(occtime,'yyyy') occyear," +
      "date_format(occtime,'MM') occmonth," +
      "date_format(occtime,'dd') occday from v_record_copy vr " +
      "left join v_account va on vr.CardData = va.CARDID " +
      "order by vr.OccTime desc"

    //通過sparkSql操作hive執行hive語句
    spark.sql(sql).createOrReplaceTempView("v_table")
    spark.sql("use sjkm")
    spark.sql("select * from v_table").write.mode("overwrite").saveAsTable("view_record")
    spark.stop()
  }

  //通過jdbc連接oracle
  def ReadOracle(sparkSession: SparkSession): DataFrame = {
    sparkSession.read.format("jdbc")
      .option("url", "jdbc:oracle:thin:@ip:5521/服務名")
      .option("dbtable", "庫名.表名")
      .option("user", "xxx")
      .option("password", "xxxx")
      .option("driver", "oracle.jdbc.OracleDriver")
      .load()

  }

  //通過jdbc連接sqlserver
  def ReadSqlserver(sparkSession: SparkSession): DataFrame = {
    sparkSession.read.format("jdbc")
      .option("url", "jdbc:sqlserver://ip:5521;databaseName=數據庫名")
      .option("dbtable", "模式名.表名")
      .option("user", "xxxx")
      .option("password", "xxxx")
      .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
      .load()
  }

}
  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM