Spark2.0 特征提取、轉換、選擇之一:數據規范化,String-Index、離散-連續特征相互轉換


數據規范化(標准化)

在數據預處理時,這兩個術語可以互換使用。(不考慮標准化在統計學中有特定的含義)。 
下面所有的規范化操作都是針對一個特征向量(dataFrame中的一個colum)來操作的。 
首先舉一個例子:

//MaxAbsScaler例子(參考后面MaxAbsScaler部分)
//例子:對特征0,特征1,特征2,分別進行縮放,使得值為[-1,1]
//例如特征0,其特征向量為[1000,100,-10] absMax=1000,因此縮放為[1.0,0.1,-0.01]

+-----+--------------------------------+----------------------------+
|label|features                        |maxAbsScalerFeatures        |
+-----+--------------------------------+----------------------------+
|1.0  |(3,[0,1,2],[1000.0,0.1,-25.0])  |(3,[0,1,2],[1.0,0.001,-1.0])|
|2.0  |(3,[0,1,2],[100.0,-100.0,-25.0])|(3,[0,1,2],[0.1,-1.0,-1.0]) |
|3.0  |(3,[0,1,2],[-10.0,35.0,12.5])   |(3,[0,1,2],[-0.01,0.35,0.5])|
+-----+--------------------------------+----------------------------+

Normalizer 規范化

將某個特征向量(由所有樣本某一個特征組成的向量)計算其p-范數,然后對該每個元素除以p-范數。將原始特征Normalizer以后可以使得機器學習算法有更好的表現。

單位P-范數定義如下: 
這里寫圖片描述

當p取1,2,∞的時候分別是以下幾種最簡單的情形: 
1-范數(L1)x1=x1+x2++xn 
2-范數(L1)x2=x12+x22++xn21/2 
∞-范數(L)x=maxx1x2xn 
其中2-范數就是通常意義下的距離。

Normalizer is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have unit norm. It takes parameter p, which specifies the p-norm used for normalization. (p=2 by default.) This normalization can help standardize your input data and improve the behavior of learning algorithms.

//完整Java版代碼
package my.spark.ml.practice.classification;
import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class myNorm {
    public static void main(String[] args) {
        SparkSession spark=SparkSession
                .builder()
                .appName("CoFilter")
                .master("local[4]")
                .config("spark.sql.warehouse.dir",
                        "file///:G:/Projects/Java/Spark/spark-warehouse" )
                .getOrCreate();         
        String path="/spark/data/mllib/sample_multiclass_classification_data.txt";          

        Dataset<Row> dataFrame =
          spark.read().format("libsvm").load(path);

        //對每一行(即一個樣點不同特征組成的向量),使用p-范數進行正則化
        //1-范數進行正則化
        Normalizer normalizerL1=new Normalizer()
                .setInputCol("features")
                .setOutputCol("normfeaturesL1")
                .setP(1.0);
        normalizerL1.transform(dataFrame).show(2,false);

        //2-范數進行正則化
        Normalizer normalizerL2=new Normalizer()
                .setInputCol("features")
                .setOutputCol("normfeaturesL1")
                .setP(2);
        normalizerL2.transform(dataFrame).show(2,false);

        //∞-范數進行正則化
        Normalizer normalizerLinf=new Normalizer()
                .setInputCol("features")
                .setOutputCol("normfeaturesL1")
                .setP(Double.POSITIVE_INFINITY);
        normalizerLinf.transform(dataFrame).show(2,false);  
    }
}

StandardScaler

zscore規范化,又叫零均值規范化 
將某個特征向量(由所有樣本某一個特征組成的向量)進行標准化,使數據均值為0,方差為1。Spark中可以選擇是帶或者不帶均值和方差。 

注意:尤其是離群點左右了MinMaxScaler規范化,需要使用StandardScaler。 
Spark中有兩個參數可以選擇: 
1、withStd=true,將方差縮放到1, 
2、withMean-將均值移到0,注意對於稀疏輸入矩陣不可以用。默認為false。

StandardScaler transforms a dataset of Vector rows, normalizing each feature to have unit standard deviation and/or zero mean. It takes parameters:

    1. withStd: True by default. Scales the data to unit standard 
      deviation.
    2. withMean: False by default. Centers the data with mean before 
      scaling. It will build a dense output, so this does not work on 
      sparse input and will raise an exception.
//關鍵代碼,其余代碼參考本文“Normalizer規范化”那個例子
StandardScaler scaler=new StandardScaler()
                .setInputCol("features")
                .setOutputCol("scFeatures")
                .setWithMean(false)//數據為稀疏矩陣,必須設置為false
                .setWithStd(true);
StandardScalerModel model=scaler.fit(dataFrame);
model.transform(dataFrame).show(10,false);

MinMaxScaler

最大-最小規范化: 
將所有特征向量線性變換到用戶指定最大-最小值之間。但注意在計算時還是一個一個特征向量分開計算的(見下面公式)通常將最大,最小值設置為1和0,這樣就歸一化到[0,1]。Spark中可以對min和max進行設置,默認就是[0,1]。

注意:(1)最大最小值可能受到離群值的左右。(2)零值可能會轉換成一個非零值,因此稀疏矩陣將變成一個稠密矩陣。 
MinMaxScaler transforms a dataset of Vector rows, rescaling each feature to a specific range (often [0, 1]). It takes parameters:

參數1:min: 0.0 by default. Lower bound after transformation, shared by all features. 
參數2:max: 1.0 by default. Upper bound after transformation, shared by all features.

Note that since zero values will probably be transformed to non-zero values, output of the transformer will be DenseVector even for sparse input. 

max,min是用戶可以重新自定義的范圍,默認為[0,1],由所有特征共享(所有特征向量都是相同的設置)

MinMaxScaler minMaxScaler=new MinMaxScaler()
                          .setInputCol("features")
                          .setOutputCol("minmaxFeatures")
                          .setMax(100.0)//將數據線性變換到[-100,100]
                          .setMin(-100.0);
MinMaxScalerModel minMaxScalerModel=minMaxScaler.fit(dataFrame);
minMaxScalerModel.transform(dataFrame).show(3,false);
//輸出舉例
/*
+-----+--------------------------------+----------------------------------+
|label|features                        |minmaxFeatures                    |
+-----+--------------------------------+----------------------------------+
|1.0  |(3,[0,1,2],[1000.0,0.1,-25.0])  |[100.0,48.296296296296276,-100.0] |
|2.0  |(3,[0,1,2],[100.0,-100.0,-25.0])|[-78.21782178217822,-100.0,-100.0]|
|3.0  |(3,[0,1,2],[-10.0,35.0,12.5])   |[-100.0,100.0,100.0]              |
+-----+--------------------------------+----------------------------------+
*/

MaxAbsScaler

       同樣是對某一個特征操作,各特征值除以最大絕對值,因此縮放到[-1,1]之間。且不移動中心點。不會將稀疏矩陣變得稠密。例如一個叫長度的特征,有三個樣本有此特征,特征向量為[-1000,100,10],最大絕對值為1000,因此轉換為[-1000/1000,100/100,10/1000]=[-1,0.1,0.01]。 
       因此如果最大絕對值是一個離群點,顯然這種處理方式是很不合理的。

MaxAbsScaler transforms a dataset of Vector rows, rescaling each feature to range [-1, 1] by dividing through the maximum absolute value in each feature. It does not shift/center the data, and thus does not destroy any sparsity.

MaxAbsScaler computes summary statistics on a data set and produces a MaxAbsScalerModel. The model can then transform each feature individually to range [-1, 1].

//關鍵代碼,無需參數設置
 MaxAbsScalerModel maxAbsScalerModel=new MaxAbsScaler()
                                .setInputCol("features")
                                .setOutputCol("maxAbsScalerFeatures")
                                .fit(dataFrame);
maxAbsScalerModel.transform(dataFrame).show(10,false);

String<->Index 相互轉換

VectorIndexer

主要作用:提高決策樹或隨機森林等ML方法的分類效果。 
VectorIndexer是對數據集特征向量中的類別(離散值)特征(index categorical features categorical features )進行編號。 
它能夠自動判斷那些特征是離散值型的特征,並對他們進行編號,具體做法是通過設置一個maxCategories,特征向量中某一個特征不重復取值個數小於maxCategories,則被重新編號為0~K(K<=maxCategories-1)。某一個特征不重復取值個數大於maxCategories,則該特征視為連續值,不會重新編號(不會發生任何改變)。結合例子看吧,實在太繞了。

    VectorIndexer helps index categorical features in datasets of Vectors. It can both automatically decide which features are categorical and convert original values to category indices. Specifically, it does the following:

    Take an input column of type Vector and a parameter maxCategories. Decide which features should be categorical based on the number of distinct values, where features with at most maxCategories are declared categorical.
Compute 0-based category indices for each categorical feature.
Index categorical features and transform original feature values to indices.

    Indexing categorical features allows algorithms such as Decision Trees and Tree Ensembles to treat categorical features appropriately, improving performance.

    This transformed data could then be passed to algorithms such as DecisionTreeRegressor that handle categorical features.

用一個簡單的數據集舉例如下:

//定義輸入輸出列和最大類別數為5,某一個特征
//(即某一列)中多於5個取值視為連續值
VectorIndexerModel featureIndexerModel=new VectorIndexer()
                 .setInputCol("features")
                 .setMaxCategories(5)
                 .setOutputCol("indexedFeatures")
                 .fit(rawData);
//加入到Pipeline
Pipeline pipeline=new Pipeline()
                 .setStages(new PipelineStage[]
                         {labelIndexerModel,
                         featureIndexerModel,
                         dtClassifier,
                         converter});
pipeline.fit(rawData).transform(rawData).select("features","indexedFeatures").show(20,false);
//顯示如下的結果:        
+-------------------------+-------------------------+
|features                 |indexedFeatures          |
+-------------------------+-------------------------+
|(3,[0,1,2],[2.0,5.0,7.0])|(3,[0,1,2],[2.0,1.0,1.0])|
|(3,[0,1,2],[3.0,5.0,9.0])|(3,[0,1,2],[3.0,1.0,2.0])|
|(3,[0,1,2],[4.0,7.0,9.0])|(3,[0,1,2],[4.0,3.0,2.0])|
|(3,[0,1,2],[2.0,4.0,9.0])|(3,[0,1,2],[2.0,0.0,2.0])|
|(3,[0,1,2],[9.0,5.0,7.0])|(3,[0,1,2],[9.0,1.0,1.0])|
|(3,[0,1,2],[2.0,5.0,9.0])|(3,[0,1,2],[2.0,1.0,2.0])|
|(3,[0,1,2],[3.0,4.0,9.0])|(3,[0,1,2],[3.0,0.0,2.0])|
|(3,[0,1,2],[8.0,4.0,9.0])|(3,[0,1,2],[8.0,0.0,2.0])|
|(3,[0,1,2],[3.0,6.0,2.0])|(3,[0,1,2],[3.0,2.0,0.0])|
|(3,[0,1,2],[5.0,9.0,2.0])|(3,[0,1,2],[5.0,4.0,0.0])|
+-------------------------+-------------------------+
結果分析:特征向量包含3個特征,即特征0,特征1,特征2。如Row=1,對應的特征分別是2.0,5.0,7.0.被轉換為2.0,1.0,1.0。
我們發現只有特征1,特征2被轉換了,特征0沒有被轉換。這是因為特征0有6中取值(234589),多於前面的設置setMaxCategories(5)
,因此被視為連續值了,不會被轉換。
特征1中,(45679)-->(0,1,2,3,4,5)
特征2中,  (2,7,9)-->(0,1,2)

輸出DataFrame格式說明(Row=1):
3個特征 特征0,12      轉換前的值  
|(3,    [0,1,2],      [2.0,5.0,7.0])
3個特征 特征1,12       轉換后的值
|(3,    [0,1,2],      [2.0,1.0,1.0])|

StringIndexer

理解了前面的VectorIndexer之后,StringIndexer對數據集的label進行重新編號就很容易理解了,都是采用類似的轉換思路,看下面的例子就可以了。

//定義一個StringIndexerModel,將label轉換成indexedlabel
StringIndexerModel labelIndexerModel=new StringIndexer().
                setInputCol("label")
                .setOutputCol("indexedLabel")
                .fit(rawData);
//加labelIndexerModel加入到Pipeline中
Pipeline pipeline=new Pipeline()
                 .setStages(new PipelineStage[]
                         {labelIndexerModel,
                         featureIndexerModel,
                         dtClassifier,
                         converter});
//查看結果
pipeline.fit(rawData).transform(rawData).select("label","indexedLabel").show(20,false);

按label出現的頻次,轉換成0~num numOfLabels-1(分類個數),頻次最高的轉換為0,以此類推:
label=3,出現次數最多,出現了4次,轉換(編號)為0
其次是label=2,出現了3次,編號為1,以此類推
+-----+------------+
|label|indexedLabel|
+-----+------------+
|3.0  |0.0         |
|4.0  |3.0         |
|1.0  |2.0         |
|3.0  |0.0         |
|2.0  |1.0         |
|3.0  |0.0         |
|2.0  |1.0         |
|3.0  |0.0         |
|2.0  |1.0         |
|1.0  |2.0         |
+-----+------------+

在其它地方應用StringIndexer時還需要注意兩個問題: 
(1)StringIndexer本質上是對String類型–>index( number);如果是:數值(numeric)–>index(number),實際上是對把數值先進行了類型轉換( cast numeric to string and then index the string values.),也就是說無論是String,還是數值,都可以重新編號(Index); 
(2)利用獲得的模型轉化新數據集時,可能遇到異常情況,見下面例子。

StringIndexer對String按頻次進行編號
 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0
 如果轉換模型(關系)是基於上面數據得到的 (a,b,c)->(0.0,2.0,1.0),如果用此模型轉換category多於(a,b,c)的數據,比如多了d,e,就會遇到麻煩:
 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | d        |3  | e        |4  | a        | 0.0
 5  | c        | 1.0
 Spark提供了兩種處理方式:
 StringIndexerModel labelIndexerModel=new StringIndexer().
                setInputCol("label")
                .setOutputCol("indexedLabel")
                //.setHandleInvalid("error")
                .setHandleInvalid("skip")
                .fit(rawData);
 (1)默認設置,也就是.setHandleInvalid("error"):會拋出異常
 org.apache.spark.SparkException: Unseen label: d,e
 (2).setHandleInvalid("skip") 忽略這些label所在行的數據,正常運行,將輸出如下結果:
 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 4  | a        | 0.0
 5  | c        | 1.0

IndexToString

相應的,有StringIndexer,就應該有IndexToString。在應用StringIndexer對labels進行重新編號后,帶着這些編號后的label對數據進行了訓練,並接着對其他數據進行了預測,得到預測結果,預測結果的label也是重新編號過的,因此需要轉換回來。見下面例子,轉換回來的convetedPrediction才和原始的label對應。

Symmetrically to StringIndexer, IndexToString maps a column of label indices back to a column containing the original labels as strings. 
A common use case is to produce indices from labels with StringIndexer, train a model with those indices and retrieve the original labels from the column of predicted indices with IndexToString.
IndexToString converter=new IndexToString()
                .setInputCol("prediction")//Spark默認預測label行
                .setOutputCol("convetedPrediction")//轉換回來的預測label
                .setLabels(labelIndexerModel.labels());//需要指定前面建好相互相互模型
Pipeline pipeline=new Pipeline()
                 .setStages(new PipelineStage[]
                         {labelIndexerModel,
                         featureIndexerModel,
                         dtClassifier,
                         converter});
pipeline.fit(rawData).transform(rawData)
        .select("label","prediction","convetedPrediction").show(20,false);  
|label|prediction|convetedPrediction|
+-----+----------+------------------+
|3.0  |0.0       |3.0               |
|4.0  |1.0       |2.0               |
|1.0  |2.0       |1.0               |
|3.0  |0.0       |3.0               |
|2.0  |1.0       |2.0               |
|3.0  |0.0       |3.0               |
|2.0  |1.0       |2.0               |
|3.0  |0.0       |3.0               |
|2.0  |1.0       |2.0               |
|1.0  |2.0       |1.0               |
+-----+----------+------------------+

離散<->連續特征或Label相互轉換

oneHotEncoder

獨熱編碼將類別特征(離散的,已經轉換為數字編號形式),映射成獨熱編碼。這樣在諸如Logistic回歸這樣需要連續數值值作為特征輸入的分類器中也可以使用類別(離散)特征。

獨熱編碼即 One-Hot 編碼,又稱一位有效編碼,其方法是使用N位 狀態寄存 
器來對N個狀態進行編碼,每個狀態都由他獨立的寄存器 位,並且在任意 
時候,其 中只有一位有效。 
例如: 自然狀態碼為:000,001,010,011,100,101 
獨熱編碼為:000001,000010,000100,001000,010000,100000 
可以這樣理解,對於每一個特征,如果它有m個可能值,那么經過獨 熱編碼 
后,就變成了m個二元特征。並且,這些特征互斥,每次只有 一個激活。因 
此,數據會變成稀疏的。 
這樣做的好處主要有: 
解決了分類器不好處理屬性數據的問題 
在一定程度上也起到了擴充特征的作用

One-hot encoding maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features.

//onehotencoder前需要轉換為string->numerical
        Dataset<Row> indexedDf=new StringIndexer()
                        .setInputCol("category")
                        .setOutputCol("indexCategory")
                        .fit(df)
                        .transform(df);
        //對隨機分布的類別進行OneHotEncoder,轉換后可以當成連續數值輸入
        Dataset<Row> coderDf=new OneHotEncoder()
                        .setInputCol("indexCategory")
                        .setOutputCol("ontHotCategory")//不需要fit                     
                        .transform(indexedDf);

Bucketizer

分箱(分段處理):將連續數值轉換為離散類別 
        比如特征是年齡,是一個連續數值,需要將其轉換為離散類別(未成年人、青年人、中年人、老年人),就要用到Bucketizer了。 
        分類的標准是自己定義的,在Spark中為split參數,定義如下: 
        double[] splits = {0, 18, 35,50, Double.PositiveInfinity} 
        將數值年齡分為四類0-18,18-35,35-50,55+四個段。 
     如果左右邊界拿不准,就設置為,Double.NegativeInfinity, Double.PositiveInfinity,不會有錯的。

Bucketizer transforms a column of continuous features to a column of 
feature buckets, where the buckets are specified by users.

//
double[] splits={0,18,35,55,Double.POSITIVE_INFINITY};Dataset<Row> bucketDf=new Bucketizer()
             .setInputCol("ages")
             .setOutputCol("bucketCategory")
             .setSplits(splits)//設置分段標准
             .transform(df);
//輸出
/*
+---+----+--------------+
|id |ages|bucketCategory|
+---+----+--------------+
|0.0|2.0 |0.0           |
|1.0|67.0|3.0           |
|2.0|36.0|2.0           |
|3.0|14.0|0.0           |
|4.0|5.0 |0.0           |
|5.0|98.0|3.0           |
|6.0|65.0|3.0           |
|7.0|23.0|1.0           |
|8.0|37.0|2.0           |
|9.0|76.0|3.0           |
+---+----+--------------+

*/

QuantileDiscretizer

        分位樹為數離散化,和Bucketizer(分箱處理)一樣也是:將連續數值特征轉換為離散類別特征。實際上Class QuantileDiscretizer extends Bucketizer。

  • 參數1:不同的是這里不再自己定義splits(分類標准),而是定義分幾箱(段)就可以了。QuantileDiscretizer自己調用函數計算分位數,並完成離散化。 
    參數2: 另外一個參數是精度,如果設置為0,則計算最精確的分位數,這是一個高時間代價的操作。
  • 另外上下邊界將設置為正負無窮,覆蓋所有實數范圍。

QuantileDiscretizer takes a column with continuous features and outputs a column with binned categorical features. The number of bins is set by the numBuckets parameter. The bin ranges are chosen using an approximate algorithm (see the documentation for approxQuantile for a detailed description). The precision of the approximation can be controlled with the relativeError parameter. When set to zero, exact quantiles are calculated (Note: Computing exact quantiles is an expensive operation). The lower and upper bin bounds will be -Infinity and +Infinity covering all real values.

new QuantileDiscretizer()
             .setInputCol("ages")
             .setOutputCol("qdCategory")
             .setNumBuckets(4)//設置分箱數
             .setRelativeError(0.1)//設置precision-控制相對誤差
             .fit(df)
             .transform(df)
             .show(10,false);    
//例子:
+---+----+----------+
|id |ages|qdCategory|
+---+----+----------+
|0.0|2.0 |0.0       |
|1.0|67.0|3.0       |
|2.0|36.0|2.0       |
|3.0|14.0|1.0       |
|4.0|5.0 |0.0       |
|5.0|98.0|3.0       |
|6.0|65.0|2.0       |
|7.0|23.0|1.0       |
|8.0|37.0|2.0       |
|9.0|76.0|3.0       |
+---+----+----------+

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM