1. 軟件版本
軟件 | 版本 |
---|---|
Spark | 1.6.0 |
Hive | 1.2.1 |
2. 場景描述
在使用Spark時,有時需要存儲DataFrame數據到Hive表中,一般的存儲方式如下:
// 注冊臨時表
myDf.registerTempTable("t1") // 使用SQLContext從臨時表創建Hive表
sqlContext.sql("create table h1 as select * from t1")
在DataFrame中存儲一般的數據類型,比如Double、Float、String等到Hive表是沒有問題的,但是在DataFrame中還有一個數據類型:vector , 如果存儲這種類型到Hive表那么會報錯,類似:
org.apache.spark.sql.AnalysisException: cannot resolve 'cast(norF as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)'
due to data type mismatch: cannot cast org.apache.spark.mllib.linalg.VectorUDT@f71b0bce to StructType(StructField(type,ByteType,true), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,true),true), StructField(values,ArrayType(DoubleType,true),true));
這個錯誤如果搜索的話,可以找到類似這種結果: Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String)
也即是說暫時使用Spark是不能夠直接存儲vector類型的DataFrame到Hive表的,那么有沒有一種方法可以存儲呢?
想到這里,那么在Spark中是有一個工具類VectorAssembler 可以達到相反的目的,即把多個列(也需要要求這些列的類型是一致的)合並成一個vector列。但是並沒有相反的工具類,也就是我們的需求。
3. 問題的迂回解決方法
這里提出一個解決方法如下:
假設:
1. DataFrame中數據類型是vector的列中的數據類型都是已知的,比如Double,數值類型;
2. vector列中的具體子列個數也是已知的;
有了上面兩個假設就可以通過構造RDD[Row]以及schema的方式來生成新的DataFrame,並且這個新的DataFrame的類型是基本類型,如Double。這樣就可以保存到Hive中了。
4. 示例
本例流程如下:
代碼如下:
// 1.讀取數據
val data = sqlContext.sql("select * from normalize")
讀取數據如下:
// 2.構造vector數據
import org.apache.spark.ml.feature.VectorAssembler val cols = data.schema.fieldNames val newFeature = "fea" val asb = new VectorAssembler().setInputCols(cols).setOutputCol(newFeature) val newDf = asb.transform(data) newDf.show(1)
// 3.做歸一化
import org.apache.spark.ml.feature.Normalizer val norFeature ="norF" val normalizer = new Normalizer().setInputCol(newFeature).setOutputCol(norFeature).setP(1.0) val l1NormData = normalizer.transform(newDf) l1NormData.show(1) // 存儲DataFrame vector類型報錯 // l1NormData.select(norFeature).registerTempTable("t1") // sqlContext.sql("create table h2 as select * from t1")
// 4.扁平轉換vector到row
import org.apache.spark.sql.Row val finalRdd= l1NormData.select(norFeature).rdd.map(row => Row.fromSeq(row.getAs[org.apache.spark.mllib.linalg.DenseVector](0).toArray)) val finalDf = sqlContext.createDataFrame(finalRdd,data.schema) finalDf.show(1)
// 5. 存儲到Hive中
finalDf.registerTempTable("t1") sqlContext.sql("create table h1 as select * from t1")