使用sparksql訪問幾個hive表join的情況時結果為空,且這個sql在hive里執行是成功的。
val sparkSession = SparkSession .builder() .config("jars","lib/*") .appName("Spark Hive Example") .enableHiveSupport() .getOrCreate() sparkSession.sql("select t1.c2,count(*) from t1 join t2 on (t1.c1=t2.c1) group by t1.c2").collect().map(r => mergeToOracle(r))
查看了t1,t2表的結構
- t1是json格式,MR任務生成
- t2是parquet格式,sqoop導出
單獨查詢兩個表的結果
sparkSession.sql("select * from t1 limit 10").collect().map(r => println(r)) //正常顯示
sparkSession.sql("select * from t2 limit 10").collect().map(r => println(r)) //有結果,全部為null
因此可以判斷是讀parquet的結果出錯,因此導致兩個表join也沒有結果。如果直接按文件讀取parquet文件,使用臨時表查詢呢,結果正常顯示,且與其他表join也是正常。
sparkSession.read.parquet("/path of the hive table/").createOrReplaceTempView("temp_a") val rs = sparkSession.sql("select * from temp_a") rs.printSchema() rs.show()
線上環境剛好還有另外一套sparksql運行的beta環境,將sql拿去執行是沒有問題的,因此比較了當前執行環境和該beta環境的配制,發現其中有一個區別是spark.sql.hive.convertMetastoreParquet配置為false。
sparkSession.sqlContext.setConf("spark.sql.hive.convertMetastoreParquet","false")
加上這句之后數據正常了,這個配制有什么用,看字面也能理解了。
spark.sql.hive.convertMetastoreParquet default is true.When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in support.
當向Hive metastore中讀寫Parquet表時,Spark SQL將使用Spark SQL自帶的Parquet SerDe(SerDe:Serialize/Deserilize的簡稱,目的是用於序列化和反序列化),而不是用Hive的SerDe,Spark SQL自帶的SerDe擁有更好的性能。這個優化的配置參數為spark.sql.hive.convertMetastoreParquet,默認值為開啟。