配置
1、將Hive-site.xml復制到Spark/conf目錄下
如果hive-site中配置了查詢引擎,需要將其注掉
<!-- <property> <name>hive.execution.engine</name> <value>tez</value> </property> -->
2、將把 Mysql 的驅動 mysql-connector-java-5.1.27-bin.jar copy 到 Spark/jars/目錄下
3、保險起見,可將core-site.xml和hdfs-site.xml 拷貝到Spark/conf/目錄下
4、如果hive中表是采用Lzo或snappy等壓縮格式,需要配置spark-defaults.conf,詳情參考https://www.cnblogs.com/yangxusun9/p/12827957.html#fneQWfJQ,或者直接將lzo包拷貝到jars目錄下
花式連接
利用spark-sql 來代替 hive
最普遍的應用就是在腳本中, 用 ''spark-sql --master yarn '' 來代替 " hive",來提高運行速度
開啟thriftserver服務,利用beeline連接
開啟thriftserver服務
sbin/start-thriftserver.sh \ --master yarn \ --hiveconf hive.server2.thrift.bind.host=hadoop102 \ ##默認 -–hiveconf hive.server2.thrift.port=10000 \ ##默認
使用beeline
bin/beeline # 然后輸入 !connect jdbc:hive2://hadoop102:10000 # 然后按照提示輸入用戶名和密碼
利用第三方工具(如IDEA)連接
添加依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.1</version> </dependency>
配置文件
保險起見,把core-site.xml,hdfs-site.xml,hive-site.xml都拷貝到Rsource目錄下
代碼
注意,此時又可能會報權限不足的錯 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied:
需要在代碼首行添加用戶名
object JDBCDemo { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "guigu")//!!!解決權限報錯 val conf = new SparkConf().setAppName("JDBCDemo$").setMaster("local[*]") val spark = SparkSession.builder().config(conf) .enableHiveSupport()//!!!!默認不支持外部hive,這里需調用方法支持外部hive .getOrCreate() import spark.implicits._ spark.sql("use gmall") spark.sql("show tables").show() } }
對hive中的表進行讀寫操作
讀按照上面代碼就可以進行正常操作,但是寫的時候需要注意,創建數據庫時數據庫位置需要提前聲明,不然就是在本地創建,創建表則不用聲明
如果之前導入了core-site,需要將其中的lzo配置注掉,並且在指定文件位置時會默認集群位置,如果要用本地,就用 file://
object ConnectDemo { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "guigu") val conf = new SparkConf().setAppName("ConnectDemo$").setMaster("local[*]") val spark = SparkSession .builder().config(conf) .enableHiveSupport() //需指明數據庫地址,不然就是在本地創建 .config("spark.sql.warehouse.dir", "hdfs://hadoop102:9000/user/hive/warehouse") .getOrCreate() import spark.implicits._ spark.sql("use spark1") // val df = spark.read.json("User") //第一列為 name:string 第二列為salary:long // spark.sql("drop table user") // df.printSchema() // df.write.mode(SaveMode.Append).saveAsTable("user") //saveAsTable寫入,要求字段名字和數據類型與原表保持一致,字段順序可隨意 // val df1 = List((1000l, "ace"), (1011l, "jaek")).toDF("salary", "name") //這里數據類型必須跟原表保持一致 // df1.write.mode(SaveMode.Append).saveAsTable("user") //insertInto //要求字段數據類型和順序必須保持一致,名字可隨意,否則 // +------+------+ // | name|salary| // +------+------+ // | Andy| 4500| // |Justin| 3500| // | Berta| 4000| // | jaek| 1011| // | ace| 1000| // | 1000| null| // | 1011| null| // +------+------+ // val df1 = List((1000l, "ace"), (1011l, "jaek")).toDF("s", "n") // df1.write.insertInto("user") // 使用 hive的insert 語句 spark.sql("insert into table user values ('andy',5000)") spark.sql("select * from user").show() } }