1. 問題描述
記錄關聯問題(Record Linkage):有大量從一個或多個源系統來的記錄,其中有些記錄可能代表了相同的基礎實體。
每個實體有若干個屬性,比如姓名、地址、生日。我們需要根據這些屬性找到那些代表相同實體的記錄。
不幸的是,有些屬性值有問題:格式不一致,或有筆誤,或信息缺失。如果簡單的對這些屬性做相等性測試,就會漏掉很多重復記錄。
可以看出,下面兩條記錄看起來是兩個不同的咖啡店,其實是同一個咖啡店:
而下面兩條記錄看起來是兩條相同的記錄,卻是兩個不同的業務部門:
2. 樣例數據:
樣例數據來自加州大學歐文分校機器學習資料庫(UC Irvine Machine Learning Repository)。
這里要分析的數據集來源於一項紀錄關聯研究,是德國一家醫院在2010年完成的。這個數據及包含數百萬對病人記錄,每隊記錄都根據不同標准來匹配。比如病人姓名、地址、生日。
每個匹配字段都被賦予一個數值評分,范圍為0.0 到 1.0,分值根據字符串相似度得出。然后這些數據交給人工處理,標記出哪些代表同一個人哪些代表不同的人。
為了保護病人隱私,創建的數據集的每個字段原始值被刪除。病人的ID、字段匹配分數、匹配對標識(包括匹配的和不匹配的)等信息是公開的,可用於紀錄關聯研究。
3. 獲取數據:
$ mkdir linkage
$ cd linkage/
$ wget https://archive.ics.uci.edu/ml/machine-learning-databases/00210/donation.zip
$ unzip donation.zip
$ unzip 'block_*.zip'
放入HDFS:
$ hadoop fs -mkdir linkage
$ hadoop fs -put block_*.csv linkage
4. Spark 步驟:
一般來說,Spark 程序通常包括一系列相關步驟:
1. 在輸入數據集上定義一組轉換
2. 調用action,用以將轉換后的數據保存到持久存儲上,或者把結果返回到驅動程序的本地內存
3. 運行本地計算,本地計算處理分布式計算的結果。本地計算有助於你確定下一步的轉換和action
5. Spark 基本操作:
在集群上啟動Spark Shell:
spark-shell --master yarn
基本操作:
:help
:history
:paste => 進入paste模式,拷貝到里面,然后執行
Spark context available as 'sc' (master = yarn, app id = application_1529488616304_14393).
sc表示對SparkContext的引用,它負責協調集群上Spark作業的執行
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@58a7a58d
這個表示sc是一個SparkContext對象,這個對象為:對象名@對象內存地址
既然為對象,即會有方法。
6. RDD
SparkContext 里使用的最多的方法為創建RDD(Resilient Distributed Dataset),彈性分布式數據集。
RDD是Spark所提供的最基本的抽象,代表分布在集群中多台機器上的對象集合。
Spark 有兩種方法可以創建RDD:
1. 用SparkContext 基於外部數據源創建 RDD, 外部數據源包括HDFS上的文件、通過jdbc 訪問的數據庫表或 Spark shell 中創建的本地對象集合;
2. 在一個或多個已有 RDD 上執行轉換操作來創建 RDD,這些轉換操作包括記錄過濾、對具有相同鍵值的記錄做匯總、把多個RDD 關聯在一起等;
對 RDD 可以很方便地描述對數據要進行的一串小而獨立的計算步驟。
RDD 特點:
1. RDD 以分區(partition)的形式分布在集群中多個機器上
2. 每個分區代表了數據集的一個子集
3. 分區定義了Spark 中數據的並行單位
4. Spark 框架並行處理多個分區,一個分區內的數據對象則是順序處理
創建RDD 最簡單的方法:
scala> var rdd = sc.parallelize(Array(1, 2, 2, 4), 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
第一個代表待並行化的對象集合,第二個參數代表分區的個數。當要對一個分區內的對象進行計算時,Spark 從驅動程序進程里獲取對象集合的一個子集
7. Spark 作業提交
在最高層,它有兩個獨立的實體:driver 和 executor
the driver, which hosts the application (SparkContext) and schedules tasks for a job;
and the executors, which are exclusive to the application, run for the duration of the application, and execute the application’s tasks.
當對RDD 執行一個動作(比如 count)時,會自動提交一個Spark job,從內部來看:
1. 對SparkContext 調用runJob()
2. 將調用傳遞給調度程序 DAGScheduler
3. DAG調度把作業分解為多個階段(stages),並由這些構成
一個DAG
4. 任務調度程序則負責把每個階段中的任務提交
給集群
這里 DAG和任務調度程序便構成了driver實體,而執行任務的為executor實體
8. 創建數據RDD
1. 要在分布式文件系統(比如HDFS)上的文件或目錄上創建RDD,可以給textFile 方法傳入文件或目錄的名稱:
scala> var textFile = sc.textFile("hdfs:///user/hadoop/hi2", 4)
textFile: org.apache.spark.rdd.RDD[String] = hdfs:///user/hadoop/hi2 MapPartitionsRDD[11] at textFile at <console>:24
2. 如果輸入是目錄而不是單個文件,Spark會把該目錄下所有文件作為RDD輸入
3. 實際上Spark 並未將數據讀取到集群內存中。當需要對分區內的對象進行計算時,Spark 才會讀入輸入文件的某個部分(也稱切片),然后應用其他RDD 定義的后續轉換操作(過濾和匯總等)
9. 讀取數據
$ val rawblocks = sc.textFile("hdfs:///user/hadoop/linkage")
rawblocks: org.apache.spark.rdd.RDD[String] = hdfs:///user/hadoop/linkage MapPartitionsRDD[19] at textFile at <console>:24
聲明了一個rawblocks 的變量,它的類型為 RDD[String]
雖然我們沒有在變量聲明時指定它的類型,但是 Scala 會使用“類型推斷”來判斷變量的類型。
在上面的例子中,Scala 會查找 SparkContext 對象 textFile 函數的返回值類型,發現該函數返回 RDD[String] 類型,於是就將 RDD[String] 類型賦值給rawblocks 變量
10. 把數據從集群獲取到客戶端
現在數據在集群上,如何在saprk-shell 里查看這些數據?
1. first 方法:返回RDD 第一個元素(對數據集做常規檢查)
2. collect:返回一個包含所有 RDD 內容的數組(一般不這么做,因為不知道數據量大小)
3. take:向客戶端返回一個包含指定數量記錄的數組
scala> rawblocks.first
res3: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2"…
scala> val head = rawblocks.take(10)
head: Array[String] = Array("id_1","id_2" …)
scala> head.length
res6: Int = 10
11. 動作(Action)
1. 創建RDD 的操作(action)並不會導致集群執行分布式計算。相反,RDD 只是定義了作為計算過程中間步驟的邏輯數據集。只有調用RDD 上的 action 時分布式計算才會執行。如 count,collect等。
2. 動作不一定會向本地進程返回結果。saveAsTextFile 動作將RDD 的內容保存到持久化存儲(如HDFS)。
該動作創建一個目錄並為每個分區輸出一個文件。
12. foreach
之前可以看到 head 是一個數組,打印head 的數據后,並不整齊。在此我們可以用foreach 方法:
scala> head.foreach(println)
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE
13. def
查看head 打印的數據,可以發現對於這次的數據分析任務來說,第一行我們是不需要的,所以我們需要將它清洗掉。
這里我們寫一個函數判斷哪些行是我們不需要的,如:
scala> def isHeader(line:String) = line.contains("id_1")
isHeader: (line: String)Boolean
def定義一個方法,line為參數,String為參數類型。SparkContext根據 line.contains() 方法的返回類型,指定了方法 isHeader() 的返回類型。
如果方法比較復雜且包含多個return,那么建議在定義方法時顯示指定返回類型以獲取更好的可讀性,如:
scala> def isHeader(line:String) : Boolean = {
| line.contains("id_1")
| }
isHeader: (line: String)Boolean
14. filter
使用Scala 的 Array 類的filter/filterNot 方法可以過濾 Array 里的數據:
head.filterNot(isHeader).foreach(println)
scala> head.filterNot(isHeader).foreach(println)
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE
如果是用匿名函數:
scala> head.filter(x => !isHeader(x)).length
res10: Int = 9
這里 x 為一個變量,遍歷head數組,每次執行一個isHeader()
當然,匿名函數可以簡寫為:
scala> head.filter(!isHeader(_)).length
res11: Int = 9
15. 把代碼從客戶端發送到集群
現在我們對head的處理仍在客戶端,如果要應用到整個集群的RDD上:
scala> val noheader = rawblocks.filter(!isHeader(_))
noheader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27
然后使用新的變量.first 來查驗結果:
scala> noheader.first
res12: String = 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
這意味着,我們可以先從集群采樣得到小數據及,在小數據集上開發和調試數據處理代碼,等一切就緒后再把代碼發送到集群上處理完整的數據集。
16. 數據結構化
我們看看head 數組的內容:
scala> head
res7: Array[String] = Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match", 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE, 39086,47614,1,?,1,?,1,1,1,1,1,TRUE, 70031,70237,1,?,1,?,1,1,1,1,1,TRUE, 84795,97439,1,?,1,?,1,1,1,1,1,TRUE, 36950,42116,1,?,1,1,1,1,1,1,1,TRUE, 42413,48491,1,?,1,?,1,1,1,1,1,TRUE, 25965,64753,1,?,1,?,1,1,1,1,1,TRUE, 49451,90407,1,?,1,?,1,1,1,1,0,TRUE, 39932,40902,1,?,1,?,1,1,1,1,1,TRUE)
數據的結構如下:
- 前兩個字段是整型ID,代表記錄中匹配的兩個病人
- 后面9個值是雙精度浮點數,代表病人記錄中不同的字段(姓名、生日、地址)的匹配分值(可能包含數據丟失的情況)
- 最后一個字段是布爾型,代表該行病人記錄對是否匹配
現在的每行數據都是一條String,為了更容易分析這些數據,我們需要把字符串解析成結構化的格式,把不同字段轉化成正確的數據類型,比如整數或雙精度浮點數。
取一行進行分割:
scala> var line = head(5)
scala> val pieces =line.split(",")
scala> val id1 = pieces(0).toInt
scala> val id2 = pieces(1).toInt
scala> val matched = pieces(11).toBoolean
matched: Boolean = true
這里我們可以發現Scala 訪問數組的方式是:
val id2 = pieces(1)
這里訪問數組用的是函數調用,不是特殊操作符。Scala 允許在類里定義一個特殊函數 apply,當把對象當作函數處理的時候,這個apply 函數會被調用,所以 pieces(1) 等同於 pieces.apply(5)
與之前的 contains 方法和 split 方法不同的是:toInt 和 toBoolean 方法並不是由 Java 的 String 類定義的
這里用到了 Scala 的特性:隱式類型轉換
工作原理如下:
當調用 Scala 對象的方法時,如果在定義該對象的類中找不到方法的定義,Scala 編譯器就將該對象轉換成有相應方法定義的類的實例。
在這個例子中,編譯器發現Java 的 String 類沒有定義 toInt 方法而 StringOps有,既然StringOps 定義了這個方法,那么就可以將String 類的實例轉換成 StringOps 類的實例。
這時編譯器就悄悄把String 對象轉換成了 StringOps 對象,然后在新對象上調用 toInt 方法。
在轉換了前三個字段后,我們仍需要轉換雙精度浮點數類型。
要一次完成轉換,可以先用Scala Array 類的 slice 方法提取一部分數組元素,然后調用高階函數 map 把 slice 中每個元素的類型從 String 轉換為 Double:
scala> val rawscores = pieces.slice(2, 11)
rawscores: Array[String] = Array(1, ?, 1, 1, 1, 1, 1, 1, 1)
scala> rawscores.map(_.toDouble)
java.lang.NumberFormatException: For input string: "?"
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)
從上面的報錯,我們可以看到:由於數組里有未知元素’?’,所以StringOps 的 toDouble 方法不知道如何把 ? 轉換成 double。
這里我們可以再寫一個toDouble函數:
scala> def toDouble(line:String) = {
| if ("?".equals(line)) Double.NaN else line.toDouble
| }
toDouble: (line: String)Double
如果碰到”?”,則設置為 NaN 的值。
scala> rawscores.map(toDouble)
// 這里調用的是定義的toDouble函數而_.toDouble調用的是StringOps的toDouble
res22: Array[Double] = Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
最后把解析代碼合並到一個函數,在一個元組中返回所有解析好的值:
def parse(line: String) = {
val pieces = line.split(",")
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val scores = pieces.slice(2,11).map(toDouble)
val matched = pieces(11).toBoolean
(id1, id2, scores, matched)
}
scala> parse(line)
res6: (Int, Int, Array[Double], Boolean) = (36950,42116,Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0), true)
17. 元組
從元組中獲取單個字段的值,可以用下標函數,從 _1 開始,或者用 productElement 方法,它是從 0 開始計數。也可以用 productArity 方法得到元組大小:
scala> var y = parse(line)
y: (Int, Int, Array[Double], Boolean) = (36950,42116,Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0),true)
scala> y.productArity
res14: Int = 4
18. case class
在定義好元組數據后,我們需要使用下標來訪問數據。如果我們可以使用更有意義的名稱來訪問數據,會使代碼更容易理解。
這里我們使用case class (其實有點像Java 里定義為某個需求單獨定義的一個類):
scala> case class MatchData(id1:Int, id2:Int, scores:Array[Double], matched:Boolean)
defined class MatchData
將 case class 加入到方法parse:
scala> def parse(line:String) = {
| val pieces = line.split(',')
| val id1 = pieces(0).toInt
| val id2 = pieces(1).toInt
| val scores = pieces.slice(2, 11).map(toDouble)
| val matched = pieces(11).toBoolean
| MatchData(id1, id2, scores, matched)
| }
parse: (line: String)MatchData
19. 將方法應用到集群數據
1. 在小數據集上進行測試:
scala> head.filter(!isHeader(_)).map(parse(_))
2. 然后將方法用於集群上的數據:
scala> val parsed = noheader.map(parse(_))
parsed: org.apache.spark.rdd.RDD[MatchData] = MapPartitionsRDD[4] at map at <console>:27
由於沒有對RDD 執行某個需要輸出的調用,所以實際上這個方法還沒有應用到原RDD 數據集
20. 緩存
現在數據已經解析好,我們想以解析了的格式把數據存到集群上,這樣就不需要每次遇到新問題時都重新解析。
在實例上調用cache 方法,可以指示在內存里緩存某個RDD:
parsed.cache()
parsed.cache() => 下次計算RDD 后,要把RDD 存儲起來
parsed.count() => 計算RDD,由於設置了cache,RDD結果會保存在內存
parsed.take(10) => take RDD里的10 個條目,會直接從內存中取
Spark 為持久化RDD 定義了幾種不同的機制,用不同的 StorageLevel 值表示:
1. rdd.cache() 是 rdd.persist(StorageLevel.MEMORY) 的簡寫,它將 RDD 存儲為未序列化的Java
對象。當Spark 估計內存不夠存放一個分區時,它干脆就不在內存中存放該分區,這樣在下次需要時就必須重新計算。
在對象需要頻繁訪問或低延訪問時適合使用此級別,因為它可以避免序列化開銷。但是相比其他選項,此方法要占用更大的內存空間。
另外,大量小對象會對Java的垃圾回收造成壓力,會導致程序停頓和常見的速度緩慢問題。
2. MEMORY_SER 的存儲級別:用於在內存中分配大字節緩沖區以存儲 RDD 序列化內容。如果使用的當,序列化數據占用的空間比未經序列化的數據占用的空間往往要少兩到五倍。
Spark 也可以用磁盤來緩存 RDD。存儲級別為 MEMORY_AND_DISK 和 MEMORY_AND_DISK_SER 分別類似於 MEMORY 和 MEMORY_SER。
對於 MEMORY 和 MEMORY_SER,如果一個分區在內存里放不下,整個分區都不會放在內存。
對於MEMORY_AND_DISK 和 MEMORY_AND_DISK_SER,如果分區在內存里放不下,Spark 會將其溢寫到磁盤上。
一般情況下,如果多個動作需要用到某個RDD,而它的計算代價又很高,那么就應該把這個RDD 緩存起來。
21. 聚合
因為大規模的數據集分布在多台機器上,對數據進行聚合時,我們更擔心的是數據傳輸的效率。
我們接下來分別在本地客戶端和集群上分別對MatchData 做簡單的聚合操作,目的是計算匹配和不匹配的記錄數量。
對於mds 數組的本地MatchData 記錄,我們用groupBy 方法來創建一個 Scala Map[Boolean, Array[MatchData]],它的鍵是基於MatchData的matched 字段:
scala> val grouped = mds.groupBy(_.matched)
grouped: scala.collection.immutable.Map[Boolean,Array[MatchData]] = Map(true -> Array(MatchData(37291,53113,[D@39f29540,true], MatchData(39086,47614,[D@2bb5637a,true], MatchData(70031,70237,[D@1decf3e3,true], MatchData(84795,97439,[D@722b4f64,true], MatchData(36950,42116,[D@5fde8cf4,true], MatchData(42413,48491,[D@5f708ac6,true], MatchData(25965,64753,[D@25f90920,true], MatchData(49451,90407,[D@d813294,true], MatchData(39932,40902,[D@2613a6fc,true]))
得到grouped 的變量后,就可以通過在 grouped 上調用 mapValues() 方法得到計數。mapValues 方法和 map 方法類似,但作用在Map 對象中的值:
scala> grouped.mapValues(_.size).foreach(println)
(true,9)
以上只是對本地客戶端數據進行聚合,但是對集群進行聚合時,一定要記住:
- 要分析的數據是放在多個機器上的
- 聚合需要通過網絡移動數據
- 跨網絡移動數據需要許多計算資源,包括確定每條記錄要傳到哪些服務器、數據序列化、數據壓縮、通過網絡發送數據、解壓縮,接着序列化結果,最后在聚合后的數據上執行計算
- 為了提高速度,我們需要盡可能少地移動數據。在聚合前能過濾掉的數據越多,就能越快得到結果
RDD 類定義了一個名為 countByValue 的動作,該動作對於計數類運算效率非常高,它向客戶端返回 Map[T, Long] 類型的結果。
scala> val matchCounts = parsed.map(_.matched).countByValue()
matchCounts: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 5728201)
22. Seq
Scala 的Map 類沒有提供根據內容的鍵或值排序的方法,但是我們可以將 Map 轉換成Scala 的Seq 類型,而Seq 支持排序。
Scala 的 Seq 類和 Java 的 List 類接口類似,都是可迭代集合,即具有確定的長度並且可以根據下標來查找值。
scala> val matchCountsSeq = matchCounts.toSeq
matchCountsSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201))
23. Scala集合
Scala 集合類庫很龐大,包括 list、set、map 和 array。利用 toList、toSet 和 toArray 方法,各種集合類型可以方便地相互轉換。
24. sortBy
我們可以看到 matchCountSeq 的類型是:
Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201))
我們可以用 sortBy 方法控制用哪個指標排序:
scala> matchCountsSeq.sortBy(_._1).foreach(println) // 用第一列的指標
(false,5728201)
(true,20931)
scala> matchCountsSeq.sortBy(_._2).foreach(println) // 用第二列的指標
(true,20931)
(false,5728201)
默認sortBy 函數對數值按升序排列,如果需要用降序,可以用 reverse 方法,在打印前改變排序方式:
scala> matchCountsSeq.sortBy(_._2).reverse.foreach(println)
(false,5728201)
(true,20931)
如果對於離散型變量,可以使用countByValue 動作計算各個變量的數量。但是如果是連續型變量?
對於連續型變量,我們一般需要獲取其分布的基本統計信息,如均值、標准差和極值(最大值與最小值)
RDD[Double] 有個隱式動作叫 stats,它可以提供RDD 值概要統計信息:
scala> parsed.map(_.scores(0)).stats
res23: org.apache.spark.util.StatCounter = (count: 5749132, mean: NaN, stdev: NaN, max: NaN, min: NaN)
但是由於數據不干凈,有缺失值,所以影響了統計信息。
25. java.lang.Double.isNaN
對於缺失值,我們可以引入 Java Double 類的 isNaN 函數手動過濾:
scala> import java.lang.Double.isNaN
scala> parsed.map(_.scores(0)).filter(!isNaN(_)).stats()
res24: org.apache.spark.util.StatCounter = (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000)
使用Scala 的 Range 結構創建一個循環,遍歷下標並計算該列的統計信息:
val stats = (0 until 9).map( i => {
parsed.map(_.scores(i)).filter(!isNaN(_)).stats()
})
scala> stats(1)
res25: org.apache.spark.util.StatCounter = (count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000)
25. NAStatCounter
如果使用上面的循環方法,我們需要一遍遍的遍歷數據,並不是一個好的方式。所以我們使用一個類來跟蹤之前的score 值:
這個類里有兩個變量:stats 和 missing
stats 用於記錄統計信息
missing 用於記錄缺失值的數量
scala> val ns1 = NAStatCounter(10.0)
ns1: NAStatCounter = stat(count: 1, mean: 10.000000, stdev: 0.000000, max: 10.000000, min: 10.000000)NAN: 0
scala> ns1.add(2.1)
res1: NAStatCounter = stat(count: 2, mean: 6.050000, stdev: 3.950000, max: 10.000000, min: 2.100000)NAN: 0
scala> ns1.add(java.lang.Double.NaN)
res2: NAStatCounter = stat(count: 2, mean: 6.050000, stdev: 3.950000, max: 10.000000, min: 2.100000)NAN: 1
將NAStateCounter 應用到數據集:
scala> val nasRDD = parsed.map(_.scores.map(NAStatCounter(_)))
nasRDD: org.apache.spark.rdd.RDD[Array[NAStatCounter]] = MapPartitionsRDD[35] at map at <console>:30
也就是把scores 里的每一個Double 轉換為NAStatCounter 的形式:stats 與 missing
scala> nasRDD.first
res46: Array[org.apache.spark.util.StatCounter] = Array((count: 1, mean: 0.833333, stdev: 0.000000, max: 0.833333, min: 0.833333), (count: 1, mean: NaN, stdev: NaN, max: NaN, min: NaN), (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000), (count: 1, mean: NaN, stdev: NaN, max: NaN, min: NaN), (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000), (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000), (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000), (count: 1, mean: 1.000000, stdev: 0.000000, max: 1.000000, min: 1.000000), (count: 1, mean: 0.000000, stdev: 0.000000, max: 0.000000, min: 0.000000))
26. 數據小結
現在我們對我們的數據做一下小結:
1. parsed 里的數據為:
scala> parsed.first
res2: MatchData = MatchData(37291,53113,[D@3789bd95,true]
對原數據處理后,有四個字段,分別為id1,id2,scores,matched。對於matched 字段我們已進行過簡單的分析。
2. parsed.first.scores:
res5: Array[Double] = Array(0.833333333333333, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 0.0)
scores 是一個Double 數組,長度為9,里面存放了各個字段的相關系數,有缺失值。
3. nasRDD.first:
res4: Array[NAStatCounter] = Array(stat(count: 1, mean: 0.833333, stdev: 0.000000, max: 0.833333, min: 0.833333)NAN: 0, stat(count: 0, mean: 0.000000, stdev: NaN, max: -Infinity, min: Infinity)NAN: 1 ……)
length = 9
nasRDD 里存放的是NAStatCounter 的數組。9個元素與scores里的9個元素一一對應。
之前提到對於連續型變量,我們希望得到它們的統計值。所以接下來,我們統計的應該是9個列里每個列的stat()。如果要這么做,那么應該是單獨計算:
nasRDD.first(0)、nasRDD.second(0) … nasRDD.nth(0)
nasRDD.first(1)、nasRDD.second(1) … nasRDD.nth(1)
…
27. zip
zip 函數可以將兩個Array 合並:
a: Array[Int] = Array(1, 2, 3)
b: Array[Int] = Array(4, 5, 6)
scala> a.zip(b)
res18: Array[(Int, Int)] = Array((1,4), (2,5), (3,6))
我們可以使用zip 將每列的數據合並,然后用merge函數將每列的數據整合,如:
var azip = a.zip(b)
scala> naszip.map(a => a._1.merge(a._2))
res20: Array[NAStatCounter] = Array(stat(count: 2, mean: 0.916667, stdev: 0.083333, max: 1.000000, min: 0.833333)NAN: 0, …)
length = 9
在對整個數據集的時候,我們可以使用reduce函數。
reduce函數的輸入是一個關聯函數,該函數把兩個 T 類型的參數映射為一個 T 類型的返回值。
之前寫的合並邏輯是關聯性的,所以我們可以把它作為reduce的輸入,並應用在Array[NAStatCounter] 類型的集合上。
scala> val merged = nasRDD.reduce( (nas1, nas2) => {
| nas1.zip(nas2).map(a => a._1.merge(a._2)) })
merged: Array[NAStatCounter] = Array(stat(count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000)NAN: 1007, stat(count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000)NAN: 5645434, …
length = 9
在reduce后得到的9個指標的統計值:
scala> merged.foreach(println)
stat(count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000)NAN: 1007
stat(count: 103698, mean: 0.900018, stdev: 0.271316, max: 1.000000, min: 0.000000)NAN: 5645434
stat(count: 5749132, mean: 0.315628, stdev: 0.334234, max: 1.000000, min: 0.000000)NAN: 0
stat(count: 2464, mean: 0.318413, stdev: 0.368492, max: 1.000000, min: 0.000000)NAN: 5746668
stat(count: 5749132, mean: 0.955001, stdev: 0.207301, max: 1.000000, min: 0.000000)NAN: 0
stat(count: 5748337, mean: 0.224465, stdev: 0.417230, max: 1.000000, min: 0.000000)NAN: 795
stat(count: 5748337, mean: 0.488855, stdev: 0.499876, max: 1.000000, min: 0.000000)NAN: 795
stat(count: 5748337, mean: 0.222749, stdev: 0.416091, max: 1.000000, min: 0.000000)NAN: 795
stat(count: 5736289, mean: 0.005529, stdev: 0.074149, max: 1.000000, min: 0.000000)NAN: 12843
我們把分析缺失值分析代碼打包為一個函數,放入之前的 StatsWtihMissing.scala 里:
def StatsWithMissing(rdd: RDD[Array[Double]]): Array[NAStatCounter]={
val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => {
val nas: Array[NAStatCounter] = iter.next().map( d => NAStatCounter(d)) iter.foreach(arr => {
nas.zip(arr).foreach({ case (n, d) => n.add(d)}) })
Iterator(nas)
})
nastats.reduce((n1, n2) => { n1.zip(n2).map({case (a, b) => a.merge(b)}) })}
28. 變量選擇與評分
有了StatsWithMissing 函數,我們就可以分析 parsed RDD 中匹配和不匹配記錄的匹配分值數組的分布差異了:
scala> val statsm = StatsWithMissing(parsed.filter(_.matched).map(_.scores))
scala> val statsn = StatsWithMissing(parsed.filter(!_.matched).map(_.scores))
statsm 與 statsn 對應兩個不同的數據子集,分別是匹配的與不匹配的分值數組的概要統計信息。
一個好的特征有兩個屬性:
1. 對匹配和不匹配記錄,它的值往往差別很大(因此均值的差異也很大)
2. 在數據中出現的頻率高,這樣我們才能指望它在任何一對記錄里都有值
我們將匹配與不匹配的數據做一個簡單的差異分析:
scala> statsm.zip(statsn).map{ case(m, n) =>
| (m.missing + n.missing, m.stats.mean - n.stats.mean)
| }.foreach(println)
(1007,0.2854529057466858)
(5645434,0.09104268062279874)
(0,0.6838772482597569)
(5746668,0.8064147192926269)
(0,0.03240818525033462)
(795,0.7754423117834042)
(795,0.5109496938298719)
(795,0.7762059675300521)
(12843,0.9563812499852178)
通過觀察評分結果,我們可以得到以下信息:
- 特征1 的作用不大,它缺失的情況很多,並且對匹配記錄和非匹配記錄的均值差也小
- 特征4 也不是特別有幫助:盡管它沒有缺失值情況,但對匹配記錄和非匹配記錄它的均值差只有0.03
- 特征 5 和 特征 7 就比較好:他們基本上對每條記錄都有值,並且對匹配記錄和非匹配記錄的均值差別較大(均超過0.77)
- 特征2、特征6、特征8看起來也有用:它們在數據集中通常都有值,匹配記錄和非匹配記錄的均值差別也不小
- 特征0 和特征3 有點處於中間地帶:特征0 的區分度不太好,但是它在記錄中通常都有值;特征3 匹配記錄和非匹配記錄的均值差別大但卻幾乎總是缺失。根據這個數據很難清晰界定什么情況下我們該把這兩個特征加入到我們的模型中
現在我們用一個簡單的評分模型,該模型把記錄對的相似度排序。相似度的計算為特征2、5、6、7 和 8 的值相加,這些特征明顯是好特征。少數記錄中這幾個特征有缺失的情況,對於這些記錄的相加結果我們以 0 代替NaN。
scala> def naz(d: Double) = if (Double.NaN.equals(d)) 0.0 else d
scala> case class Scored(md: MatchData, score: Double)
scala> val ct = parsed.map(md => {
| val score = Array(2, 5, 6, 7, 8).map( i => naz(md.scores(i))).sum
| Scored(md, score)
| })
scala> ct.filter(s => s.score >= 4.0).map(s => s.md.matched).countByValue()
res1: scala.collection.Map[Boolean,Long] = Map(true -> 20871, false -> 637)
scala> ct.filter(s => s.score >= 2.0).map(s => s.md.matched).countByValue()
res2: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 596414)
scala> ct.filter(s => s.score >= 4.0).map(s => s.md.matched).countByValue()
res1: scala.collection.Map[Boolean,Long] = Map(true -> 20871, false -> 637)
過濾閥值為4.0,意味着5個特征的平均值是0.8。我們過濾掉了幾乎所有不匹配的記錄,同時保留了超過90% 的匹配記錄。
scala> ct.filter(s => s.score >= 2.0).map(s => s.md.matched).countByValue()
res2: scala.collection.Map[Boolean,Long] = Map(true –> 20931, false -> 596414)
較低的閾值 2.0,我們可以捕捉所有已知的匹配記錄,但代價是誤報率高
源數據:
(true,20931)
(false,5728201)