Spark读取Hive数据的方式主要有两种:
1、通过访问hive metastore的方式,这种方式通过访问hive的metastore元数据的方式获取表结构信息和该表数据所存放的HDFS路径,这种方式的特点是效率高、数据吞吐量大、使用spark操作起来更加友好。
2、通过spark jdbc的方式访问,就是通过链接hiveserver2的方式获取数据,这种方式底层上跟spark链接其他rdbms上一样,可以采用sql的方式先在其数据库中查询出来结果再获取其结果数据,这样大部分数据计算的压力就放在了数据库上。
两种方式的具体实现示例
首先创建Spark Session对象:
val spark = SparkSession.builder() .appName("test") .enableHiveSupport() .getOrCreate()
方式一(推荐)直接采用Spark on Hive的方式读取数据,这样SparkSession在使用sql的时候会去找集群hive中的库表,加载其hdfs数据与其元数据组成DataFrame
val df = spark.sql("select * from test.user_info")
方式二采用spark jdbc的方式,如果有特别的使用场景的话也可以通过这种方法来实现。
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} object test{ def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("test") .getOrCreate() register() //如果不手动注册,只能获取到数据库中的表结构,而不能获取到数据 val df = spark.read .format("jdbc") .option("driver","org.apache.hive.jdbc.HiveDriver") .option("url","jdbc:hive2://xxx:10000/") .option("user","hive") .option("password",xxx) .option("fetchsize", "2000") .option("dbtable","test.user_info") .load() df.show(10) } def register(): Unit = { JdbcDialects.registerDialect(HiveSqlDialect) } case object HiveSqlDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2") override def quoteIdentifier(colName: String): String = { colName.split('.').map(part => s"`$part`").mkString(".") } } }