由於我Spark采用的是Cloudera公司的CDH,並且安裝的時候是在線自動安裝和部署的集群。最近在學習SparkSQL,看到SparkSQL on HIVE。下面主要是介紹一下如何通過SparkSQL在讀取HIVE的數據。
(說明:如果不是采用CDH在線自動安裝和部署的話,可能需要對源碼進行編譯,使它能夠兼容HIVE。
編譯的方式也很簡單,只需要在Spark_SRC_home(源碼的home目錄下)執行如下命令:
./
make
-distribution.sh --tgz -Phadoop-2.2 -Pyarn -DskipTests -Dhadoop.version=2.6.0-cdh5.4.4 -Phive
編譯好了之后,會在lib目錄下多幾個jar包。)
下面我主要介紹一下我使用的情況:
1、為了讓Spark能夠連接到Hive的原有數據倉庫,我們需要將Hive中的hive-site.xml文件拷貝到Spark的conf目錄下,這樣就可以通過這個配置文件找到Hive的元數據以及數據存放。
在這里由於我的Spark是自動安裝和部署的,因此需要知道CDH將hive-site.xml放在哪里。經過摸索。該文件默認所在的路徑是:/etc/hive/conf 下。
同理,spark的conf也是在/etc/spark/conf。
此時,如上所述,將對應的hive-site.xml拷貝到spark/conf目錄下即可
如果Hive的元數據存放在Mysql中,我們還需要准備好Mysql相關驅動,比如:mysql-connector-java-5.1.22-bin.jar。
2、編寫測試代碼
val conf=new SparkConf().setAppName("Spark-Hive").setMaster("local") val sc=new SparkContext(conf) //create hivecontext val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ") //這里需要注意數據的間隔符 sqlContext.sql("LOAD DATA INPATH '/user/liujiyu/spark/kv1.txt' INTO TABLE src "); sqlContext.sql(" SELECT * FROM jn1").collect().foreach(println) sc.stop()
3、下面列舉一下出現的問題:
(1)如果沒有將hive-site.xml拷貝到spark/conf目錄下,會出現:
分析:從錯誤提示上面就知道,spark無法知道hive的元數據的位置,所以就無法實例化對應的client。
解決的辦法就是必須將hive-site.xml拷貝到spark/conf目錄下
(2)測試代碼中沒有加sc.stop會出現如下錯誤:
ERROR scheduler.LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
在代碼最后一行添加sc.stop()解決了該問題。