1、卡方檢驗理論
1.1、 簡介
總體的分布函數完全未知或只知形式、但不知其參數的情況,為了推斷總體的某些未知特性,提出某些關於總體的假設。我們要根據樣本對所提出的假設作出是接受,還是拒絕的決策。假設檢驗是作出這一決策的過程。卡方檢驗即是假設檢驗的一種。
1.2、卡方檢驗基本思想
首先假設H0成立,基於此前提計算出χ2值,它表示觀察值與理論值之間的偏離程度。根據χ2分布及自由度可以確定在H0假設成立的情況下獲得當前統計量及更極端情況的概率P。如果P值很小,說明觀察值與理論值偏離程度太大,應當拒絕無效假設,表示比較資料之間有顯著差異;否則就不能拒絕無效假設,尚不能認為樣本所代表的實際情況和理論假設有差別
1.3、數學公式
Pearson χ2 計算公式:

其中:k為分類數,i水平的觀察頻數,n為總頻數,i水平的期望概率
由卡方的計算公式可知,當觀察頻數與期望頻數完全一致時,χ2值為0;觀察頻數與期望頻數越接近,兩者之間的差異越小,χ2值越小;反之,觀察頻數與期望頻數差別越大,兩者之間的差異越大,χ2值越大。換言之,大的χ2值表明觀察頻數遠離期望頻數,即表明遠離假設。小的χ2值表明觀察頻數接近期望頻數,接近假設。因此,χ2是觀察頻數與期望頻數之間距離的一種度量指標,也是假設成立與否的度量指標。如果χ2值“小”,研究者就傾向於不拒絕H0;如果χ2值大,就傾向於拒絕H0。至於χ2在每個具體研究中究竟要大到什么程度才能拒絕H0,則要借助於卡方分布求出所對應的P值(p-value)來確定。
1.4、卡方檢驗作用
卡方檢驗主要有以下兩種作用:
1) 皮爾森獨立性檢驗(Pearson's independence test)
驗證從兩個變量抽出的配對觀察值組是否互相獨立。例如:例如:每次都從A國和B國各抽一個人,看他們的反應是否與國籍無關。
2)適度檢驗(Goodness of Fit test)
實際執行多項式試驗而得到的觀察次數,與虛無假設的期望次數相比較,稱為卡方適度檢驗,即在於檢驗二者接近的程度,利用樣本數據以檢驗總體分布是否為某一特定分布的統計方法。
2、MLlib與卡方檢驗
2.1、MLlib庫簡介
MLlib 是Spark對常用的機器學習算法的實現庫,同時包括相關的測試和數據生成器。
2.2、MLlib卡方檢驗
卡方檢驗是在spark mllib 1.1版本新增加的功能,其源碼文件在stat包中。
2.2.1、實現
MLlib中卡方檢驗依照皮爾森卡方(pearson)檢驗實現,可以用其進行獨立性檢驗及適度檢驗。其對外提供接口有如下三種:
(1) def chiSquaredMatrix(counts:Matrix,methodName:String=PEARSON.name)
:ChiSqTestResult
(2) def chiSquaredFeatures(data: RDD[LabeledPoint],methodName: String = PEARSON.name)
:Array[ChiSqTestResult]
(3) def chiSquared(observed: Vector, expected: Vector = Vectors.dense(Array[Double]()),
methodName: String = PEARSON.name):ChiSqTestResult
其中(1)、(2)用於獨立性檢驗,(3)用於適度檢驗。
方法(1) 中參數是統計后的對象的不同特征在特定取值區間出現的頻數。因此可直接對傳入進行卡方數學計算。由於統計后數據較小,程序采用串行方式處理。
方法(2) 中參數為RDD[LabeledPoint]類型的數據。LabeledPoint格式的數據多用於機器學習方面,如回歸分析等,其數據格式為label:features。此種類型的數據多為收集的原始數據或經過維歸約、聚集等手段處理過的數據,需要進行頻數統計才可進行卡方數學計算。通過分析源碼本方法先將RDD[LabeledPoint]類型的數據轉換成矩陣,然后調用方法(1)。
方法(3) 與方法(1)相似,直接利用參數提供數據,根據卡方定義進行數學計算。
2.2.2、具體流程
通過2.2.1分析可知,上述方法(2)過程最為復雜,其包含數據預處理階段(頻數統計並轉換成矩陣)及處理階段(卡方數學計算)兩個過程,下面以方法(2)程序邏輯為基礎就這兩個過程分別進行討論。
2.2.2.1、預處理階段
預處理階段要做的主要任務為:將RDD[LabeledPoint]類型的數據轉換為三元組,然后分別對轉換后的三元組進行頻數統計,最后根據統計信息分別將每個特征所對應三元組轉換成矩陣並進行卡方計算。
下面通過舉例,詳細說明預處理過程,假如采集到如下數據:
| 姓名 |
性別 |
年齡 |
文化程度 |
結婚次數 |
…… |
| 張三 |
男 |
22 |
大學 |
1 |
|
| 李四 |
女 |
20 |
未讀大學 |
1 |
|
| 王五 |
男 |
29 |
未讀大學 |
3 |
|
| 趙六 |
男 |
27 |
大學 |
2 |
|
| 孫七 |
女 |
21 |
未讀大學 |
2 |
|
| 錢八 |
男 |
23 |
大學 |
1 |
|
| 蔣九 |
女 |
37 |
未讀大學 |
3 |
|
| …… |
…… |
…… |
…… |
…… |
|
|
|
|
|
|
|
|
現在要用這批數據檢驗結婚次數是否與文化程度有關, 通過降維、聚集等手段進行轉換,結果如下圖所示:
| 文化程度 |
結婚次數 |
| 大學 |
1次 |
| 未讀大學 |
1次 |
| 未讀大學 |
超過1次 |
| 大學 |
超過1次 |
| 未讀大學 |
超過1次 |
| 大學 |
1次 |
| 未讀大學 |
超過1次 |
然后用上述處理過的數據創建RDD[LabeledPoint],其中label為“大學|未讀大學”,features為“{1次,超過1次}”,然后交由2.2.1中所述MLlib卡方提供的接口(2)處理。處理過程如下:
1> 將RDD[LabeledPoint] 映射成RDD[(col,feature,label),1],其中col為feature在features中所在的列的下標,feature為特征,label為某對象,1為初始頻數。對應上表來說數據樣例為:(0,一次,大學): 1 ;(0,一次以上,大學): 1
2> 將RDD[(col,feature,label),1] 按(col,feature,label)進行合並,生成Map[(col, feature,
label), n],其中n代表(col, feature,label)一共出現n次。對應上表數據樣例為:(0,一次,大學): 2 ;(0,一次以上,大學): 1
3> 將Map[(col,feature,label),n]以三元組(col,feature,label)中col進行分組。然后統計每個組中無重復feature值的個數及根據唯一label個數創建矩陣,矩陣單元格的值為(col,feature,label)的個數n。因樣例數據feature為1維所以2>中結果即為最終結果。
4> 對矩陣進行數學計算得出結論(此部分將在下文處理階段詳細說明)。
其中1>,2>步為並行處理部分,3>,4>步串行處理部分。上述程序流程圖為:

其中countByKey操作中RDD轉換過程如下圖:

三元組轉換為矩陣的具體流程為:
2.2.2.2、處理階段
利用預處理階段產生的矩陣,計算數學期望,自由度,p值,通過p值決定接受還是拒絕原假設。此階段為翻譯卡方檢驗數學公式不再詳述。
2.2.3、不足
1)本程序僅根據皮爾森卡方進行實現,當某特征出現理論(期望)頻數小於5時,會使“近似於卡方分配”的假設不可信,本程序的處理方法是直接拋出異常。可否通過修正(葉氏連續修正),使存在期望頻數小於5時仍得出可信的結果。
2)本程序預處理階段將RDD[LabeledPoint]轉換成Map[(col,feature,label),num]過程是並行的,從Map[(col,feature,label),num]轉換成矩陣過程及計算卡方的過程為串行處理。由源碼分析知RDD 做為參數的卡方檢驗接口是檢測每一個feature與label的獨立性。不同feature這間無影響,可以進行並行處理。
附源碼:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.stat.test
import breeze.linalg.{DenseMatrix => BDM}
import org.apache.commons.math3.distribution.ChiSquaredDistribution
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import scala.collection.mutable
/**
* Conduct the chi-squared test for the input RDDs using the specified method.
* Goodness-of-fit test is conducted on two `Vectors`, whereas test of independence is conducted
* on an input of type `Matrix` in which independence between columns is assessed.
* We also provide a method for computing the chi-squared statistic between each feature and the
* label for an input `RDD[LabeledPoint]`, return an `Array[ChiSquaredTestResult]` of size =
* number of features in the input RDD.
*
* Supported methods for goodness of fit: `pearson` (default)
* Supported methods for independence: `pearson` (default)
*
* More information on Chi-squared test: http://en.wikipedia.org/wiki/Chi-squared_test
*/
/**
* 卡方檢驗
*/
private[stat] object ChiSqTest extends Logging {
/**
* @param name String name for the method.
* @param chiSqFunc Function for computing the statistic given the observed and expected counts.
*/
/**
* case 樣例類,進行模式匹配
* @param name method名稱
* @param chiSqFunc 用給定的觀察值及期望值 計算分析
*/
case class Method(name: String, chiSqFunc: (Double, Double) => Double)
// Pearson's chi-squared test: http://en.wikipedia.org/wiki/Pearson%27s_chi-squared_test
/**
* 樣例類Method 實例
* 皮爾森卡方計算公式
* 根據(observed-expected)^2 /expected計算 卡方值
*/
val PEARSON = new Method("pearson", (observed: Double, expected: Double) => {
val dev = observed - expected
dev * dev / expected
})
// Null hypothesis for the two different types of chi-squared tests to be included in the result.
//零假設:又稱原假設,指進行統計檢驗時預先建立的假設, 零假設成立時,有關統計量應服從已知的某種概率分布.
object NullHypothesis extends Enumeration {
type NullHypothesis = Value
val goodnessOfFit = Value("observed follows the same distribution as expected.")
val independence = Value("the occurrence of the outcomes is independent.")
}
// Method identification based on input methodName string
/**
* 根據 methodName 獲取卡方檢驗Method實例 , 本方法現在: 只提供皮爾森方法
* @param methodName 卡方檢驗方法名
*/
private def methodFromString(methodName: String): Method = {
methodName match { //…… 模式匹配
case PEARSON.name => PEARSON
case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.")
}
}
/**
* Conduct Pearson's independence test for each feature against the label across the input RDD.
* The contingency table is constructed from the raw (feature, label) pairs and used to conduct
* the independence test.
* Returns an array containing the ChiSquaredTestResult for every feature against the label.
*/
/**
* 皮爾森獨立性檢驗:驗證從兩個變量抽出的配對觀察值組是否互相獨立
* (例如:每次都從A國和B國各抽一個人,看他們的反應是否與國籍無關)
* @param data 待檢測的數據 RDD[LabeledPoint] 類型
* @param methodName 使用的檢驗方法
* @result Array[ChiSqTestResult] 返回值
*/
def chiSquaredFeatures(data: RDD[LabeledPoint],
methodName: String = PEARSON.name): Array[ChiSqTestResult] = {
val maxCategories = 10000 //最大分類
val numCols = data.first().features.size //特征值中有多少種數據
val results = new Array[ChiSqTestResult](numCols) //存儲卡方檢驗結果:numCols個元素的卡方檢驗結果數組
var labels: Map[Double, Int] = null
// at most 1000 columns at a time
val batchSize = 1000 //每批數據量
var batch = 0 //批次
while (batch * batchSize < numCols) { //處理
// The following block of code can be cleaned up and made public as
// chiSquared(data: RDD[(V1, V2)])
val startCol = batch * batchSize //開始執行位置
val endCol = startCol + math.min(batchSize, numCols - startCol) //相對startCol的偏移量
val pairCounts = data.mapPartitions { iter => //對每個分區執行iter指向操作
val distinctLabels = mutable.HashSet.empty[Double] //不同的標簽(研究對象分類)
/*創建一個Map對象,並初始化:key為列的編號,value用空的HashSet*/
val allDistinctFeatures: Map[Int, mutable.HashSet[Double]] =
Map((startCol until endCol).map(col => (col, mutable.HashSet.empty[Double])): _*)
var i = 1 //計數器,避免頻繁進行判斷
/*對此分片進行flatMap操作, 將LabeledPoint(label, features) 轉換成三元組*/
iter.flatMap { case LabeledPoint(label, features) =>
if (i % 1000 == 0) {
/*若分類過多,導致理論頻數(即期望計數)小於5的太多,會導至皮爾森方法失效*/
if (distinctLabels.size > maxCategories) {
throw new SparkException(s"Chi-square test expect factors (categorical values) but "
+ s"found more than $maxCategories distinct label values.")
}
allDistinctFeatures.foreach { case (col, distinctFeatures) =>
if (distinctFeatures.size > maxCategories) {
throw new SparkException(s"Chi-square test expect factors (categorical values) but "
+ s"found more than $maxCategories distinct values in column $col.")
}
}
}
i += 1
distinctLabels += label
/*將features,加上索引,然后切片,再轉將其通過map 操作 賦值到allDistinctFeatures*/
features.toArray.view.zipWithIndex.slice(startCol, endCol).map { case (feature, col) =>
allDistinctFeatures(col) += feature
(col, feature, label)
}
}
}.countByValue()
if (labels == null) {
// Do this only once for the first column since labels are invariant across features.
/*
*取出col 是startCol(實質每行一個分類,此filter即相當於從原數據每一行中拿出第一個)的數據,
*將label取出加入labels
* 方法僅執行一次。
*/
labels =
pairCounts.keys.filter(_._1 == startCol).map(_._3).toArray.distinct.zipWithIndex.toMap
}
/*標簽,標識矩陣的列, (因為矩陣是列優先存儲的)*/
val numLabels = labels.size
pairCounts.keys.groupBy(_._1).map { case (col, keys) =>
val features = keys.map(_._2).toArray.distinct.zipWithIndex.toMap
val numRows = features.size //特征值個數, 矩陣的行
val contingency = new BDM(numRows, numLabels, new Array[Double](numRows * numLabels)) //創建(空)矩陣
/*賦值操作*/
keys.foreach { case (_, feature, label) =>
val i = features(feature)
val j = labels(label)
contingency(i, j) += pairCounts((col, feature, label))
}
/*轉換為Spark mllib庫中的矩陣,並對矩陣作卡方檢驗*/
results(col) = chiSquaredMatrix(Matrices.fromBreeze(contingency), methodName)
}
batch += 1
}
results
}
/*
* Pearson's goodness of fit test on the input observed and expected counts/relative frequencies.
* Uniform distribution is assumed when `expected` is not passed in.
*/
/**
* 適度檢驗(Goodness of Fit test):實際執行多項式試驗而得到的觀察次數,與虛無假設的期望次數相比較,
* 稱為卡方適度檢驗,即在於檢驗二者接近的程度,利用樣本數據以檢驗總體分布是否為某一特定分布的統計方法
* @param observed 觀察值
* @param expected 期望值(理論值), 若為均勻分布, 此參數可為空。
* @param methodName 檢驗方法名(默認皮爾森, 現階段只實現了皮爾森)
*/
def chiSquared(observed: Vector,
expected: Vector = Vectors.dense(Array[Double]()),
methodName: String = PEARSON.name): ChiSqTestResult = {
// Validate input arguments
val method = methodFromString(methodName) //獲取檢驗方法
/*期望值存在條件下,判斷觀察值與期望值的個數是否相等(也就是是否配對)*/
if (expected.size != 0 && observed.size != expected.size) {
throw new IllegalArgumentException("observed and expected must be of the same size.")
}
val size = observed.size
/*若分類過多,導致理論頻數(即期望計數)小於5的太多,會導至皮爾森方法失效*/
if (size > 1000) {
logWarning("Chi-squared approximation may not be accurate due to low expected frequencies "
+ s" as a result of a large number of categories: $size.")
}
val obsArr = observed.toArray
/*若傳入的期望值不存在元素 則Array初始化為 1.0/size,否則傳入參數值 */
val expArr = if (expected.size == 0) Array.tabulate(size)(_ => 1.0 / size) else expected.toArray
if (!obsArr.forall(_ >= 0.0)) { //判斷是否存在< 0.0元素
throw new IllegalArgumentException("Negative entries disallowed in the observed vector.")
}
if (expected.size != 0 && ! expArr.forall(_ >= 0.0)) {
throw new IllegalArgumentException("Negative entries disallowed in the expected vector.")
}
// Determine the scaling factor for expected
val obsSum = obsArr.sum //觀察值的頻次和
val expSum = if (expected.size == 0.0) 1.0 else expArr.sum //期望值之和
val scale = if (math.abs(obsSum - expSum) < 1e-7) 1.0 else obsSum / expSum //???????????
// compute chi-squared statistic
/*
* zip拉鏈操作, 組成(obs, exp)對,
* 然后對通過foldLeft(0.0) 求每一對的卡方值,並累加… 其中參數 0.0 為累加和的初始值
*/
val statistic = obsArr.zip(expArr).foldLeft(0.0) { case (stat, (obs, exp)) =>
if (exp == 0.0) {
if (obs == 0.0) {
throw new IllegalArgumentException("Chi-squared statistic undefined for input vectors due"
+ " to 0.0 values in both observed and expected.")
} else {
return new ChiSqTestResult(0.0, size - 1, Double.PositiveInfinity, PEARSON.name,
NullHypothesis.goodnessOfFit.toString)
}
}
if (scale == 1.0) {
stat + method.chiSqFunc(obs, exp)
} else {
stat + method.chiSqFunc(obs, exp * scale)
}
}
val df = size - 1
val pValue = 1.0 - new ChiSquaredDistribution(df).cumulativeProbability(statistic)
new ChiSqTestResult(pValue, df, statistic, PEARSON.name, NullHypothesis.goodnessOfFit.toString)
}
/*
* Pearson's independence test on the input contingency matrix.
* TODO: optimize for SparseMatrix when it becomes supported.
*/
/**
* 獨立性檢驗
* @param counts 要檢測的數據(矩陣)
* @param methodName 使用的檢測方法(默認:皮爾森)
*/
def chiSquaredMatrix(counts: Matrix, methodName:String = PEARSON.name): ChiSqTestResult = {
val method = methodFromString(methodName) //獲取Method (case類)
val numRows = counts.numRows //矩陣行數
val numCols = counts.numCols //矩陣列數
// get row and column sums
val colSums = new Array[Double](numCols) //存放每列數據和(用於計算概率)
val rowSums = new Array[Double](numRows) //存放每行數據和
val colMajorArr = counts.toArray //存在所有數據(用於計算 樣本總數)
var i = 0
while (i < colMajorArr.size) {
val elem = colMajorArr(i)
if (elem < 0.0) {
throw new IllegalArgumentException("Contingency table cannot contain negative entries.")
}
colSums(i / numRows) += elem //賦值
rowSums(i % numRows) += elem //賦值
i += 1
}
val total = colSums.sum //計算樣本總量
// second pass to collect statistic
var statistic = 0.0
var j = 0
while (j < colMajorArr.size) {
val col = j / numRows
val colSum = colSums(col)
if (colSum == 0.0) {
throw new IllegalArgumentException("Chi-squared statistic undefined for input matrix due to"
+ s"0 sum in column [$col].")
}
val row = j % numRows
val rowSum = rowSums(row)
if (rowSum == 0.0) {
throw new IllegalArgumentException("Chi-squared statistic undefined for input matrix due to"
+ s"0 sum in row [$row].")
}
val expected = colSum * rowSum / total //計算期望值
statistic += method.chiSqFunc(colMajorArr(j), expected) //計算卡方值、並疊加至statistic中
j += 1
}
val df = (numCols - 1) * (numRows - 1) //計算自由度
if (df == 0) {
// 1 column or 1 row. Constant distribution is independent of anything.
// pValue = 1.0 and statistic = 0.0 in this case.
new ChiSqTestResult(1.0, 0, 0.0, methodName, NullHypothesis.independence.toString)
} else {
/*
* cumulativeProbability() 依據下面理論進行實現
* <a href="http://mathworld.wolfram.com/Chi-SquaredDistribution.html">
* 自由度為n的卡方分布是 a=n/2 和 λ = 1/2 的伽馬分布
* 此方法計算的概率是 P{X <= x} ;
*
* 當x是某個常數,選擇x是為了得到想要的檢驗顯著性水平a, 也就是說x的選擇滿足Ho為真時, 應為:P(X > x) = a;
* 與上述方法計算值互補
* 又pValue = P{X > x} = a;
* pValue 為拒絕原假設的最小顯著性水平
*/
val pValue = 1.0 - new ChiSquaredDistribution(df).cumulativeProbability(statistic)
//生成結果(根據pValue值進行比較,得出獨立性強弱)
new ChiSqTestResult(pValue, df, statistic, methodName, NullHypothesis.independence.toString)
}
}
}
以上皆個人理解,請各位批評指正
http://www.cnblogs.com/barrenlake/p/4354579.html
