當數據量很大的時候,分類任務通常使用【離散特征+LR】集成【連續特征+xgboost】,如果把連續特征加入到LR、決策樹中,容易造成overfit。
如果想用上連續型特征,使用集成學習集成多種算法是一種方法,但是一是過程復雜了一些,另外訓練過程會非常耗時,在不損失很多特征信息的情況下,可以考慮將連續特征轉換成離散特征加入到LR模型中。
轉換特征分成兩種情況:
- 第一種情況: 特征還未轉化成訓練數據所需要的向量格式,此時每個特征為單獨的一列,需要對這些單獨的列進行離散化分桶。
- 第二種情況: 所有特征已經轉化成訓練數據所需要的向量格式,但是離散化的特征編號雜亂,例如:編號為[10,15,128,……],需要轉化為[0,1,2,……],此時所有特征已經合並成一個向量,但是這個向量為單獨的一列但是包含了離散特征和連續特征,那么需要先識別出離散特征,再把離散特征進行規范化。
1. 第一種情況
1.1.二元轉化
Binarization is the process of thresholding numerical features to binary (0/1) features.(二元轉化,把連續特征轉化為0/1特征)
Binarizer takes the common parameters inputCol and outputCol, as well as the threshold for binarization. Feature values greater than the threshold are binarized to 1.0; values equal to or less than the threshold are binarized to 0.0. Both Vector and Double types are supported for inputCol.(支持兩種格式,double&vector,大於閾值的改為1.0,低於閾值的改為0.0)
import org.apache.spark.ml.feature.Binarizer val data = Array((0, 0.1), (1, 0.8), (2, 0.2)) val dataFrame = spark.createDataFrame(data).toDF("id", "feature") val binarizer: Binarizer = new Binarizer() .setInputCol("feature") .setOutputCol("binarized_feature") .setThreshold(0.5) val binarizedDataFrame = binarizer.transform(dataFrame) println(s"Binarizer output with Threshold = ${binarizer.getThreshold}") binarizedDataFrame.show()
1.2.多元轉換(分桶Bucketizer)
Bucketizer transforms a column of continuous features to a column of feature buckets, where the buckets are specified by users. It takes a parameter:
splits: Parameter for mapping continuous features into buckets. With n+1 splits, there are n buckets. A bucket defined by splits x,y holds values in the range [x,y) except the last bucket, which also includes y. Splits should be strictly increasing. Values at -inf, inf must be explicitly provided to cover all Double values; Otherwise, values outside the splits specified will be treated as errors. Two examples of splits are Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity) and Array(0.0, 1.0, 2.0).
二元轉換的時候需要給出一個閥值,在多元換轉換中,如果要分成n類,就要給出n+1個閥值組成的array,任意一個數都可以被放在某兩個閥值的區間內,就像把它放進屬於它的桶中,故稱為分桶策略。
比如有x,y兩個閥值,那么他們組成的區間是[x,y)的前開后閉區間;對於最后一個區間是前閉后閉區間。
給出的這個閥值array,里面的元素必須是遞增的。如果在轉換的過程中有一個數沒有被包含在區間內,那么就會報錯,所以,如果不確定特征值的最小與最大值,那么就添加Double.NegativeInfinity(負無窮)和Double.PositiveInfinity(正無窮)到array的兩側。
Note that if you have no idea of the upper and lower bounds of the targeted column, you should add Double.NegativeInfinity and Double.PositiveInfinity as the bounds of your splits to prevent a potential out of Bucketizer bounds exception. 當不知道范圍的時候設定成正負無窮作為邊界。
Note also that the splits that you provided have to be in strictly increasing order, i.e. s0 < s1 < s2 < ... < sn.
import org.apache.spark.ml.feature.Bucketizer val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity) val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9) val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") val bucketizer = new Bucketizer() .setInputCol("features") .setOutputCol("bucketedFeatures") .setSplits(splits) // Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame) println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets") bucketedData.show() val splitsArray = Array( Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity), Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity)) val data2 = Array( (-999.9, -999.9), (-0.5, -0.2), (-0.3, -0.1), (0.0, 0.0), (0.2, 0.4), (999.9, 999.9)) val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2") val bucketizer2 = new Bucketizer() .setInputCols(Array("features1", "features2")) .setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2")) .setSplitsArray(splitsArray) // Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2) println(s"Bucketizer output with [" + s"${bucketizer2.getSplitsArray(0).length-1}, " + s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column") bucketedData2.show()
封裝成函數調用:
//連續特征離散化(分多個桶)
def QuantileDiscretizer_multi_class(df:DataFrame,InputCol:String,OutputCol:String,NumBuckets:Int):(DataFrame) = { import org.apache.spark.ml.feature.Bucketizer val discretizer = new QuantileDiscretizer() .setHandleInvalid("skip") .setInputCol(InputCol) .setOutputCol(OutputCol) .setNumBuckets(NumBuckets) println("\n\n*********分桶數量:"+ NumBuckets + "***********分桶列:" + InputCol + "**********輸出列:" + OutputCol + "**********\n\n") val result = discretizer.fit(df).transform(df) result.show(false) result }
1.3.QuantileDiscretizer(分位數離散化)
QuantileDiscretizer takes a column with continuous features and outputs a column with binned categorical features. The number of bins is set by the numBuckets parameter. It is possible that the number of buckets used will be smaller than this value, for example, if there are too few distinct values of the input to create enough distinct quantiles.
NaN values: NaN values will be removed from the column during QuantileDiscretizer fitting. This will produce a Bucketizermodel for making predictions. During the transformation, Bucketizer will raise an error when it finds NaN values in the dataset, but the user can also choose to either keep or remove NaN values within the dataset by setting handleInvalid. If the user chooses to keep NaN values, they will be handled specially and placed into their own bucket, for example, if 4 buckets are used, then non-NaN data will be put into buckets[0-3], but NaNs will be counted in a special bucket[4].
Algorithm: The bin ranges are chosen using an approximate algorithm (see the documentation for approxQuantile for a detailed description). The precision of the approximation can be controlled with the relativeError parameter. When set to zero, exact quantiles are calculated (Note: Computing exact quantiles is an expensive operation). The lower and upper bin bounds will be -Infinity and +Infinity covering all real values.
QuantileDiscretizer(分位數離散化)。通過取一個樣本的數據,並將其分為大致相等的部分,設定范圍。其下限為 -Infinity(負無重大) ,上限為+Infinity(正無重大)。
分桶的數量由numbucket參數設置,但如果樣本數據只存在n個區間,此時設置numBuckets為n+1,則仍只能划分出n個區間。
分級的范圍有漸進算法決定。漸進的精度由relativeError參數決定。當relativeError設置為0時,將會計算精確的分位點(計算代價較大,通常使用默認即可)。relativeError參數必須在[0,1]范圍內,默認值為0.001。
當分桶器分桶遇到NaN值時,會出現一個錯誤(默認)。handleInvalid參數可以來選擇保留或者刪除NaN值,如果選擇不刪除,NaN值的數據會單獨放入一個桶中。
handleInvalid的選項有'skip'(過濾掉具有無效值的行)、'error'(拋出錯誤)或'keep'(將無效值保留在一個特殊的額外bucket中,默認是'error'。
import org.apache.spark.ml.feature.QuantileDiscretizer val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)) val df = spark.createDataFrame(data).toDF("id", "hour") val discretizer = new QuantileDiscretizer() .setHandleInvalid("skip") .setInputCol("hour") .setOutputCol("result") .setNumBuckets(3) val result = discretizer.fit(df).transform(df) result.show(false)
封裝使用:
//連續特征離散化(分多個桶)
def QuantileDiscretizer_multi_class(df:DataFrame,InputCol:String,OutputCol:String,NumBuckets:Int):(DataFrame) = { import org.apache.spark.ml.feature.QuantileDiscretizer val discretizer = new QuantileDiscretizer() .setHandleInvalid("skip") .setInputCol(InputCol) .setOutputCol(OutputCol) .setNumBuckets(NumBuckets) println("\n\n*********分桶數量:"+ NumBuckets + "***********分桶列:" + InputCol + "**********輸出列:" + OutputCol + "**********\n\n") val result = discretizer.fit(df).transform(df) result.show(false) result }
實際使用中不建議直接對全量數據做處理,因為通常全量數據都很大,使用這個函數時集群經常會出現各種問題,建議只對訓練集做處理或者對全量數據采樣處理,再保存訓練好的模型直接轉換全量數據。
2. 第二種情況
2.1.向量轉規范的離散特征-VectorIndexer
import org.apache.spark.ml.feature.VectorIndexer VectorIndexerModel featureIndexerModel=new VectorIndexer() .setInputCol("features") //定義特征列
.setMaxCategories(5) //多於5個取值視為連續值,連續值不進行轉換。
.setOutputCol("indexedFeatures") .fit(rawData); //加入到Pipeline
Pipeline pipeline=new Pipeline() .setStages(new PipelineStage[] {labelIndexerModel, featureIndexerModel, dtClassifier, converter}); pipeline.fit(rawData).transform(rawData).select("features","indexedFeatures").show(20,false);
2.2.字符串轉離散特征
import org.apache.spark.ml.feature.StringIndexer val df = spark.createDataFrame(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))).toDF("id", "category") val indexer = new StringIndexer() .setInputCol("category") //改為帶索引的標簽,索引為該標簽出現的次數。
.setOutputCol("categoryIndex") .setHandleInvalid("skip") //如果category數量多於label的數量,選擇error會報錯,選擇skip則直接跳過這些數據
val indexed = indexer.fit(df).transform(df) indexed.show()
