1、概念
將連續數值轉換為離散類別特征。 比如需求把人分為50以上和50以下太不精准了,應該分為20歲以下,20-30歲,30-40歲,36-50歲,50以上,那么就得用到數值離散化的處理方法了。
離散化就是把特征進行適當的離散處理,比如上面所說的年齡是個連續的特征,但是我把它分為不同的年齡階段就是把它離散化了,這樣更利於我們分析用戶行為進行精准推薦。
Bucketizer能方便的將一堆數據分成不同的區間。
2、code
package com.home.spark.ml import org.apache.spark.SparkConf import org.apache.spark.ml.feature.Bucketizer import org.apache.spark.sql.SparkSession /** * Bucketizer將一列連續特征轉換為一列特征存儲桶,其中存儲桶由用戶指定。它帶有一個參數: * * splits:用於將連續特征映射到存儲桶的參數。使用n + 1個拆分,有n個存儲桶。 * 拆分x,y定義的存儲區除最后一個存儲區(也包含y)外,其值都在[x,y)范圍內。 * 分割數應嚴格增加。必須明確提供-inf,inf的值以覆蓋所有Double值;否則,超出指定分割的值將被視為錯誤。 * * 拆分的兩個示例是Array(Double.NegativeInfinity,0.0,1.0,Double.PositiveInfinity)和Array(0.0,1.0,2.0)。 * * 請注意,如果您不了解目標列的上限和下限,則應添加Double.NegativeInfinity和Double.PositiveInfinity作為拆分的邊界,以防止出現超出Bucketizer邊界的異常。 * * 注意,提供的拆分必須嚴格按升序排列,即s0 <s1 <s2 <... <sn。 **/ object Ex_Bucketizer { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf(true).setMaster("local[2]").setAppName("spark ml") val spark = SparkSession.builder().config(conf).getOrCreate() val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity) val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2,0.5,999.9) val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") val bucketizer = new Bucketizer().setInputCol("features").setOutputCol("bucketedFeatures").setSplits(splits) // Transform original data into its bucket index. val bucketedData = bucketizer.transform(dataFrame) println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets") bucketedData.show() val splitsArray = Array( Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity), Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity)) val data2 = Array( (-999.9, -999.9), (-0.5, -0.2), (-0.3, -0.1), (0.0, 0.0), (0.2, 0.4), (999.9, 999.9)) val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2") val bucketizer2 = new Bucketizer() .setInputCols(Array("features1", "features2")) .setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2")) .setSplitsArray(splitsArray) // Transform original data into its bucket index. val bucketedData2 = bucketizer2.transform(dataFrame2) println(s"Bucketizer output with [" + s"${bucketizer2.getSplitsArray(0).length-1}, " + s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column") bucketedData2.show() spark.stop() } }
Bucketizer output with 4 buckets
+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
| -999.9| 0.0|
| -0.5| 1.0|
| -0.3| 1.0|
| 0.0| 2.0|
| 0.2| 2.0|
| 0.5| 3.0|
| 999.9| 3.0|
+--------+----------------+
Bucketizer output with [4, 4] buckets for each input column
+---------+---------+-----------------+-----------------+
|features1|features2|bucketedFeatures1|bucketedFeatures2|
+---------+---------+-----------------+-----------------+
| -999.9| -999.9| 0.0| 0.0|
| -0.5| -0.2| 1.0| 1.0|
| -0.3| -0.1| 1.0| 1.0|
| 0.0| 0.0| 2.0| 2.0|
| 0.2| 0.4| 2.0| 3.0|
| 999.9| 999.9| 3.0| 3.0|
+---------+---------+-----------------+-----------------+
