Spark-Mllib(一)數據類型


一.本地向量

  有如下幾個類: Vector(基類),DenseVector,SparseVector,Vectors(工廠方法,推薦用)

工廠模式是:定義一個用於創建對象的接口,讓子類決定實例化哪一個類,使一個類的實例化延遲到子類

import org.apache.spark.mllib.linalg.{Vectors,Vector}    # linalg is short for linear algebra 

val v1=Vectors.dense(1.0,2.0,3.0)  #定義1

val v2 =Vectors.sparse(3,(1,2),(10,100)) #長度為3,第1,2個位置的值為10和100

val v3=Vectors.sparse(3,Seq((1,10),(2,100))) #結果同上

 

二.帶有標簽的向量

  主要應用在有監督學習中,二分類(0,1),多分類(0,1,2,3,....)

import org.apache.spark.mllib.Regression.LabeledPoint;

val vl1=LabeledPoint(1,Vectors.dense(1,2,3,4))

val vl2=LabeledPoint(0,Vectors.sparse(3,(1,2),(10,100)))

 

三.讀取LIBSVM格式的數據

<label> <index1>:<value1> <index2>:<value2> ...
其中<label> 是訓練數據集的目標值,對於分類,它是標識某類的整數(支持多個類);對於回歸,是任意實數。<index> 是以1開始的整數,可以是不連續的;<value>;為實數,也就是我們常說的自變量。檢驗數據文件中的label只用於計算准確度或誤差,如果它是未知的,只需用一個數填寫這一欄,也可以空着不填.
例如:

0 1:10 3:19
1 1:18 3:20 4:178

 
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
 
val svmfile=MLUtils.loadLibSVMFile(sc,"svmdata2")
 
四.創建本地矩陣
  本地矩陣是行列號索引,值為double類型的數據,存儲在單獨的機器上.支持稠密矩陣和稀疏矩陣。
與Vector和Vectors的關系類似,Matrix有對應的Matrices
  對於稀疏矩陣的壓縮方法,具體可以參考http://www.tuicool.com/articles/A3emmqi,spark默認的為CSC格式的壓縮
 
import org.apache.spark.mllib.linalg.{Matrix,Matrices}
val m1=Matrices.dense(3,2,Array(1,2,3,4,5,6))
val m2=Matrices.sparse(3,2,Array(0,1,3),Array(0,2,1),Array(9,6,8))  
參考csc壓縮方法,m2 手工算的結果,應該是 
(0,0)9
(2,0)6
(1,1)8
與spark計算的有出入。
 
五.分布式矩陣
  選擇一個正確的形式去存儲大的分布式矩陣非常重要, 將分布式矩陣轉化為不同的格式需要全局的shuffle,代價很大。目前有三種類型的分布式矩陣,RowMatrix,IndexedRowMatrix,CoordinateMatrix.
  什么是shuffle呢?參考http://dongxicheng.org/framework-on-yarn/apache-spark-shuffle-details/
通常shuffle分為兩部分,map階段的數據准備以及Reduce階段的數據拷貝,Map階段需要根據Reduce階段的Task數量決定每個Map Task輸出的數據分片數目
 
RowMatrix是沒有行索引,例如一些特征向量,沒一行是一個本地向量。
IndexedRowMatrix,有行索引,可以用於識別行和執行鏈接操作
CoordinateMatrix存成COO形式
 
構造RowMatrix
import org.apache.spark.mllib.linalg.{Vector,Vectors}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data=sc.parallelize(1 to 9,3) #RDD形式
val rows=data.map(x=>Vectors.dense(x))
val m1=new RowMatrix(rows,3,3)
m1.numRows
m1.numCols
 
 構造IndexedRowMatrix
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val data1=sc.parallelize(1 to 12,2)
val rows1=data1.map(x=>IndexedRow(2,Vectors.dense(x)))
val mat=new IndexedRowMatrix(rows1,3,4) 
mat.numRows()
mat.numCols()
 
構造COO #對於稀疏矩陣比較有用,指定非空元素的行列以及value即可
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 
val data2=sc.parallelize(1 to 20 ,4)
val rows2=data1.map(x=>MatrixEntry(1,1,3))  
val m2=new CoordinateMatrix(rows2,4,5)
#
val data3=sc.textFile("coo").map(_.split(' ')).map(_.map(_.toDouble)).map(m=>(m(0).toLong,m(1).toLong,m(2))).map(x=>new MatrixEntry(x._1,x._2,x._3))
val m3=new CoordinateMatrix(data3,3,4)
 
#構造BlockMatrix
val m4=m3.toBlockMatrix()
 
 

 

 
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM