SparkMLlib-----GMM算法


  Gaussian Mixture Model(GMM)是一個很流行的聚類算法。它與K-Means的很像,但是K-Means的計算結果是算出每個數據點所屬的簇,而GMM是計算出這些數據點分配到各個類別的概率。與K-Means對比K-Means存在一些缺點,比如K-Means的聚類結果易受樣本中的一些極值點影響。此外GMM的計算結果由於是得出一個概率,得出一個概率包含的信息量要比簡單的一個結果多,對於49%和51%的發生的事件如果僅僅使用簡單的50%作為閾值來分為兩個類別是非常危險的。
Gaussian Mixture Model,顧名思義,它是假設數據服從高斯混合分布,或者說是從多個高斯分布中生成出來的。每個GMM由K個高斯分布組成,每個高斯分布稱為一個"Component",這些Component線性加在一起就組成了GMM的概率密度函數:

  使用GMM做聚類的方法,我們先使用R等工具采樣數據繪出數據點分布的圖觀察是否符合高斯混合分布,或者直接假設我們的數據是符合高斯混合分布的,之后根據數據推算出GMM的概率分布,對應的每個高斯分布就是每個類別,因為我們已知(假設)了概率密度分布的形式,要去求出其中參數,所以是一個參數估計的過程,我們要推導出每個混合成分的參數(均值向量mu,協方差矩陣sigma,權重weight),高斯混合模型在訓練時使用了極大似然估計法,最大化以下對數似然函數:
    

  該式無法直接解析求解,因此采用了期望-最大化方法(Expectation-Maximization,EM)方法求解,具體步驟如下:
  1.根據給定的K值,初始化K個多元高斯分布以及其權重;
  2.根據貝葉斯定理,估計每個樣本由每個成分生成的后驗概率;(EM方法中的E步)
  3.根據均值,協方差的定義以及2步求出的后驗概率,更新均值向量、協方差矩陣和權重;(EM方法的M步)重復2~3步,直到似然函數增加值已小於收斂閾值,或達到最大迭代次數


  接下來進行模型的訓練與分析,我們采用了mllib包封裝的GMM算法,具體代碼如下

package com.xj.da.gmm

import breeze.linalg.DenseVector
import breeze.numerics.sqrt
import org.apache.commons.math.stat.correlation.Covariance
import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vectors}
import org.apache.spark.mllib.stat.distribution.MultivariateGaussian
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * author : kongcong  
  * number : 27
  * date : 2017/7/19
  */
object GMMWithMultivariate {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      //.setMaster("local")
      .setAppName("GMMWithMultivariate")
    val sc = new SparkContext(conf)

    val rawData: RDD[String] = sc.textFile("hdfs://master:8020/home/kongc/data/query_result.csv")
    //val rawData: RDD[String] = sc.textFile("data/query_result.csv")
    println("count:  " + rawData.count())
    //println(rawData.count())
    // col1, col2, status
    val data: RDD[linalg.Vector] = rawData.map { line =>
      val raw: Array[String] = line.split(",")
      Vectors.dense(raw(0).toDouble, raw(1).toDouble, raw(4).toDouble)
    }
    // data.collect().take(10).foreach(println(_))
    // col1, col2, status
    val trainData: RDD[linalg.Vector] = rawData.map { line =>
      val raw: Array[String] = line.split(",")
      Vectors.dense(raw(0).toDouble, raw(1).toDouble)
    }
    // trainData.collect().take(10).foreach(println(_))

    // 指定初始模型
    // 0
    val filter0: RDD[linalg.Vector] = data.filter(_.toArray(2) == 0)
    println(filter0.count())  //23195
    // 1
    val filter1: RDD[linalg.Vector] = data.filter(_.toArray(2) == 1)
    println(filter1.count()) //14602

    val w1: Double = (filter0.count()/319377.toDouble)
    val w2: Double = (filter1.count()/319377.toDouble)
    println(s"w1 = $w1")

    // 均值
    val m0x: Double = filter0.map(_.toArray(0)).mean()
    val m0y: Double = filter0.map(_.toArray(1)).mean()
    val m1x: Double = filter1.map(_.toArray(0)).mean()
    val m1y: Double = filter1.map(_.toArray(1)).mean()
    // 方差
    val vx0: Double = filter0.map(_.toArray(0)).variance()
    val vy0: Double = filter0.map(_.toArray(1)).variance()
    val vx1: Double = filter1.map(_.toArray(0)).variance()
    val vy1: Double = filter1.map(_.toArray(1)).variance()

    // 均值向量
    val mu1: linalg.Vector = Vectors.dense(Array(m0x, m0y))
    val mu2: linalg.Vector = Vectors.dense(Array(m1x, m1y))
    println(s"mu1 : $mu1")
    println(s"mu2 : $mu2")

    val array: RDD[Array[Double]] = rawData.map { line =>
      val raw: Array[String] = line.split(",")
      Array(raw(0).toDouble, raw(1).toDouble, raw(4).toDouble)
    }

    val f0: RDD[Array[Double]] = array.filter(_(2) == 0)
    val f1: RDD[Array[Double]] = array.filter(_(2) == 1)
    println("f0.count:"+f0.count())
    println("f1.count:"+f1.count())

    // 0 x,y求協方差矩陣
    val x0: RDD[Double] = f0.map(_(0))
    val y0: RDD[Double] = f0.map(_(1))
    //println(x0.collect().length == y0.collect().length)
    // 1 x,y求協方差矩陣
    val x1: RDD[Double] = f1.map(_(0))
    val y1: RDD[Double] = f1.map(_(1))
    val ma0: Array[Array[Double]] = Array(x0.collect(),y0.collect())
    val ma1: Array[Array[Double]] = Array(x1.collect(),y1.collect())

    val r0: RDD[Array[Double]] = sc.parallelize(ma0)
    val r1: RDD[Array[Double]] = sc.parallelize(ma1)

    val rdd0: RDD[linalg.Vector] = r0.map(f => Vectors.dense(f))
    val rdd1: RDD[linalg.Vector] = r1.map(f => Vectors.dense(f))

    val RM0: RowMatrix = new RowMatrix(rdd0)
    val RM1: RowMatrix = new RowMatrix(rdd1)

    // 計算協方差矩陣
    //println(RM0.computeCovariance().numCols)

    /*val i: Double = DenseVector(1.0, 2.0, 3.0, 4.0) dot DenseVector(1.0, 1.0, 1.0, 1.0)
    val c0yx: Double = i - m0x * m0y*/

    val c0yx: Double = DenseVector(x0.collect()) dot DenseVector(y0.collect()) - m0x * m0y
    val c1yx: Double = DenseVector(x1.collect()) dot DenseVector(y1.collect()) - m1x * m1y

    //cov(Vectors.dense(x0.collect()),Vectors.dense(y0.collect()))
    val sigma1 = Matrices.dense(2, 2, Array(vx0, c0yx, c0yx, vy0))
    val sigma2 = Matrices.dense(2, 2, Array(vx1, c1yx, c1yx, vy1))
    val gmm1 = new MultivariateGaussian(mu1, sigma1)
    val gmm2 = new MultivariateGaussian(mu2, sigma2)

    val gaussians = Array(gmm1, gmm2)

    // 構建一個GaussianMixtureModel需要兩個參數 一個是權重數組 一個是組成混合高斯分布的每個高斯分布
    val initModel = new GaussianMixtureModel(Array(w1, w2), gaussians)

    for (i <- 0 until initModel.k) {
      println("weight=%f\nmu=%s\nsigma=\n%s\n" format
        (initModel.weights(i), initModel.gaussians(i).mu, initModel.gaussians(i).sigma))
    }

    val gaussianMixture = new GaussianMixture()
    val mixtureModel = gaussianMixture
      .setInitialModel(initModel)
      .setK(2)
      .setConvergenceTol(0.0001)
      .run(trainData)

    val predict: RDD[Int] = mixtureModel.predict(trainData)
    rawData.zip(predict).saveAsTextFile("hdfs://master:8020/home/kongc/data/out/gmm/predict2")

    for (i <- 0 until mixtureModel.k) {
      println("weight=%f\nmu=%s\nsigma=\n%s\n" format
        (mixtureModel.weights(i), mixtureModel.gaussians(i).mu, mixtureModel.gaussians(i).sigma))
    }

  }
}

  參考:http://blog.pluskid.org/?p=39

    http://dblab.xmu.edu.cn/blog/1456/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM