一.本地向量
有如下幾個類: Vector(基類),DenseVector,SparseVector,Vectors(工廠方法,推薦用)
工廠模式是:定義一個用於創建對象的接口,讓子類決定實例化哪一個類,使一個類的實例化延遲到子類
import org.apache.spark.mllib.linalg.{Vectors,Vector} # linalg is short for linear algebra
val v1=Vectors.dense(1.0,2.0,3.0) #定義1
val v2 =Vectors.sparse(3,(1,2),(10,100)) #長度為3,第1,2個位置的值為10和100
val v3=Vectors.sparse(3,Seq((1,10),(2,100))) #結果同上
二.帶有標簽的向量
主要應用在有監督學習中,二分類(0,1),多分類(0,1,2,3,....)
import org.apache.spark.mllib.Regression.LabeledPoint;
val vl1=LabeledPoint(1,Vectors.dense(1,2,3,4))
val vl2=LabeledPoint(0,Vectors.sparse(3,(1,2),(10,100)))
三.讀取LIBSVM格式的數據
<label> <index1>:<value1> <index2>:<value2> ...
其中<label> 是訓練數據集的目標值,對於分類,它是標識某類的整數(支持多個類);對於回歸,是任意實數。<index> 是以1開始的整數,可以是不連續的;<value>;為實數,也就是我們常說的自變量。檢驗數據文件中的label只用於計算准確度或誤差,如果它是未知的,只需用一個數填寫這一欄,也可以空着不填.
例如:
0 1:10 3:19
1 1:18 3:20 4:178
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
val svmfile=MLUtils.loadLibSVMFile(sc,"svmdata2")
四.創建本地矩陣
本地矩陣是行列號索引,值為double類型的數據,存儲在單獨的機器上.支持稠密矩陣和稀疏矩陣。
與Vector和Vectors的關系類似,Matrix有對應的Matrices
對於稀疏矩陣的壓縮方法,具體可以參考http://www.tuicool.com/articles/A3emmqi,spark默認的為CSC格式的壓縮
import org.apache.spark.mllib.linalg.{Matrix,Matrices}
val m1=Matrices.dense(3,2,Array(1,2,3,4,5,6))
val m2=Matrices.sparse(3,2,Array(0,1,3),Array(0,2,1),Array(9,6,8))
參考csc壓縮方法,m2 手工算的結果,應該是
(0,0)9
(2,0)6
(1,1)8
與spark計算的有出入。
五.分布式矩陣
選擇一個正確的形式去存儲大的分布式矩陣非常重要, 將分布式矩陣轉化為不同的格式需要全局的shuffle,代價很大。目前有三種類型的分布式矩陣,RowMatrix,IndexedRowMatrix,CoordinateMatrix.
什么是shuffle呢?參考http://dongxicheng.org/framework-on-yarn/apache-spark-shuffle-details/
通常shuffle分為兩部分,map階段的數據准備以及Reduce階段的數據拷貝,Map階段需要根據Reduce階段的Task數量決定每個Map Task輸出的數據分片數目
RowMatrix是沒有行索引,例如一些特征向量,沒一行是一個本地向量。
IndexedRowMatrix,有行索引,可以用於識別行和執行鏈接操作
CoordinateMatrix存成COO形式
構造RowMatrix
import org.apache.spark.mllib.linalg.{Vector,Vectors}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data=sc.parallelize(1 to 9,3) #RDD形式
val rows=data.map(x=>Vectors.dense(x))
val m1=new RowMatrix(rows,3,3)
m1.numRows
m1.numCols
構造IndexedRowMatrix
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val data1=sc.parallelize(1 to 12,2)
val rows1=data1.map(x=>IndexedRow(2,Vectors.dense(x)))
val mat=new IndexedRowMatrix(rows1,3,4)
mat.numRows()
mat.numCols()
構造COO #對於稀疏矩陣比較有用,指定非空元素的行列以及value即可
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val data2=sc.parallelize(1 to 20 ,4)
val rows2=data1.map(x=>MatrixEntry(1,1,3))
val m2=new CoordinateMatrix(rows2,4,5)
#
val data3=sc.textFile("coo").map(_.split(' ')).map(_.map(_.toDouble)).map(m=>(m(0).toLong,m(1).toLong,m(2))).map(x=>new MatrixEntry(x._1,x._2,x._3))
val m3=new CoordinateMatrix(data3,3,4)
#構造BlockMatrix
val m4=m3.toBlockMatrix()