Spark讀取Hive數據的方式


Spark讀取Hive數據的方式主要有兩種:

1、通過訪問hive metastore的方式,這種方式通過訪問hive的metastore元數據的方式獲取表結構信息和該表數據所存放的HDFS路徑,這種方式的特點是效率高、數據吞吐量大、使用spark操作起來更加友好。

2、通過spark jdbc的方式訪問,就是通過鏈接hiveserver2的方式獲取數據,這種方式底層上跟spark鏈接其他rdbms上一樣,可以采用sql的方式先在其數據庫中查詢出來結果再獲取其結果數據,這樣大部分數據計算的壓力就放在了數據庫上。

兩種方式的具體實現示例

首先創建Spark Session對象:

    val spark = SparkSession.builder()
      .appName("test")
      .enableHiveSupport()
      .getOrCreate()

方式一(推薦)直接采用Spark on Hive的方式讀取數據,這樣SparkSession在使用sql的時候會去找集群hive中的庫表,加載其hdfs數據與其元數據組成DataFrame

val df = spark.sql("select * from test.user_info")

 

方式二采用spark jdbc的方式,如果有特別的使用場景的話也可以通過這種方法來實現。

 
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
 
 
object test{
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("test")
      .getOrCreate()
 
    register() //如果不手動注冊,只能獲取到數據庫中的表結構,而不能獲取到數據
    val df = spark.read
      .format("jdbc")
      .option("driver","org.apache.hive.jdbc.HiveDriver")
      .option("url","jdbc:hive2://xxx:10000/")
      .option("user","hive")
      .option("password",xxx)
      .option("fetchsize", "2000")
      .option("dbtable","test.user_info")
      .load()
    df.show(10)
  }
 
  def register(): Unit = {
    JdbcDialects.registerDialect(HiveSqlDialect)
  }
 
 
  case object HiveSqlDialect extends JdbcDialect {
    override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
 
    override def quoteIdentifier(colName: String): String = {
      colName.split('.').map(part => s"`$part`").mkString(".")
    }
  }
 
}

 

 

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM