Spark DataFrame vector 類型存儲到Hive表


1. 軟件版本

軟件 版本
Spark 1.6.0
Hive 1.2.1

2. 場景描述

在使用Spark時,有時需要存儲DataFrame數據到Hive表中,一般的存儲方式如下:

 // 注冊臨時表
 myDf.registerTempTable("t1") // 使用SQLContext從臨時表創建Hive表
 sqlContext.sql("create table h1 as select * from t1")

在DataFrame中存儲一般的數據類型,比如Double、Float、String等到Hive表是沒有問題的,但是在DataFrame中還有一個數據類型:vector , 如果存儲這種類型到Hive表那么會報錯,類似:

org.apache.spark.sql.AnalysisException: cannot resolve 'cast(norF as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)' 
due to data type mismatch: cannot cast org.apache.spark.mllib.linalg.VectorUDT@f71b0bce to StructType(StructField(type,ByteType,true), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,true),true), StructField(values,ArrayType(DoubleType,true),true));

這個錯誤如果搜索的話,可以找到類似這種結果: Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String)

也即是說暫時使用Spark是不能夠直接存儲vector類型的DataFrame到Hive表的,那么有沒有一種方法可以存儲呢? 
想到這里,那么在Spark中是有一個工具類VectorAssembler 可以達到相反的目的,即把多個列(也需要要求這些列的類型是一致的)合並成一個vector列。但是並沒有相反的工具類,也就是我們的需求。

3. 問題的迂回解決方法

這里提出一個解決方法如下: 
假設: 
1. DataFrame中數據類型是vector的列中的數據類型都是已知的,比如Double,數值類型; 
2. vector列中的具體子列個數也是已知的; 
有了上面兩個假設就可以通過構造RDD[Row]以及schema的方式來生成新的DataFrame,並且這個新的DataFrame的類型是基本類型,如Double。這樣就可以保存到Hive中了。

4. 示例

本例流程如下:

代碼如下:

// 1.讀取數據
val data = sqlContext.sql("select * from normalize")

讀取數據如下: 

// 2.構造vector數據
import org.apache.spark.ml.feature.VectorAssembler val cols = data.schema.fieldNames val newFeature = "fea" val asb = new VectorAssembler().setInputCols(cols).setOutputCol(newFeature) val newDf = asb.transform(data) newDf.show(1)

// 3.做歸一化
import org.apache.spark.ml.feature.Normalizer val norFeature ="norF" val normalizer = new Normalizer().setInputCol(newFeature).setOutputCol(norFeature).setP(1.0) val l1NormData = normalizer.transform(newDf) l1NormData.show(1) // 存儲DataFrame vector類型報錯 // l1NormData.select(norFeature).registerTempTable("t1") // sqlContext.sql("create table h2 as select * from t1")

// 4.扁平轉換vector到row
import org.apache.spark.sql.Row val finalRdd= l1NormData.select(norFeature).rdd.map(row => Row.fromSeq(row.getAs[org.apache.spark.mllib.linalg.DenseVector](0).toArray)) val finalDf = sqlContext.createDataFrame(finalRdd,data.schema) finalDf.show(1)

// 5. 存儲到Hive中
finalDf.registerTempTable("t1") sqlContext.sql("create table h1 as select * from t1")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM