Spark讀取Hive數據的方式主要有兩種:
1、通過訪問hive metastore的方式,這種方式通過訪問hive的metastore元數據的方式獲取表結構信息和該表數據所存放的HDFS路徑,這種方式的特點是效率高、數據吞吐量大、使用spark操作起來更加友好。
2、通過spark jdbc的方式訪問,就是通過鏈接hiveserver2的方式獲取數據,這種方式底層上跟spark鏈接其他rdbms上一樣,可以采用sql的方式先在其數據庫中查詢出來結果再獲取其結果數據,這樣大部分數據計算的壓力就放在了數據庫上。
兩種方式的具體實現示例
首先創建Spark Session對象:
val spark = SparkSession.builder() .appName("test") .enableHiveSupport() .getOrCreate()
方式一(推薦)直接采用Spark on Hive的方式讀取數據,這樣SparkSession在使用sql的時候會去找集群hive中的庫表,加載其hdfs數據與其元數據組成DataFrame
val df = spark.sql("select * from test.user_info")
方式二采用spark jdbc的方式,如果有特別的使用場景的話也可以通過這種方法來實現。
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} object test{ def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("test") .getOrCreate() register() //如果不手動注冊,只能獲取到數據庫中的表結構,而不能獲取到數據 val df = spark.read .format("jdbc") .option("driver","org.apache.hive.jdbc.HiveDriver") .option("url","jdbc:hive2://xxx:10000/") .option("user","hive") .option("password",xxx) .option("fetchsize", "2000") .option("dbtable","test.user_info") .load() df.show(10) } def register(): Unit = { JdbcDialects.registerDialect(HiveSqlDialect) } case object HiveSqlDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2") override def quoteIdentifier(colName: String): String = { colName.split('.').map(part => s"`$part`").mkString(".") } } }