規范化,有關之前都是用 python寫的, 偶然要用scala 進行寫, 看到這位大神寫的, 那個網頁也不錯,那個連接圖做的還蠻不錯的,那天也將自己的博客弄一下那個插件。
本文來源 原文地址:http://www.neilron.xyz/spark-ml-feature-scaler/
下面是大神寫的:
org.apache.spark.ml.feature包中包含了4種不同的歸一化方法:
- Normalizer
- StandardScaler
- MinMaxScaler
- MaxAbsScaler
有時感覺會容易混淆,借助官方文檔和實際數據的變換,在這里做一次總結。
0 數據准備
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, -1.0)),
(1, Vectors.dense(2.0, 1.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")
dataFrame.show
// 原始數據
+---+--------------+
| id| features|
+---+--------------+
| 0|[1.0,0.5,-1.0]|
| 1| [2.0,1.0,1.0]|
| 2|[4.0,10.0,2.0]|
+---+--------------+
|
1 Normalizer
Normalizer的作用范圍是每一行,使每一個行向量的范數變換為一個單位范數,下面的示例代碼都來自spark官方文檔加上少量改寫和注釋。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import org.apache.spark.ml.feature.Normalizer
// 正則化每個向量到1階范數
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)
val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()
// 將每一行的規整為1階范數為1的向量,1階范數即所有值絕對值之和。
+---+--------------+------------------+
| id| features| normFeatures|
+---+--------------+------------------+
| 0|[1.0,0.5,-1.0]| [0.4,0.2,-0.4]|
| 1| [2.0,1.0,1.0]| [0.5,0.25,0.25]|
| 2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+
// 正則化每個向量到無窮階范數
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
// 向量的無窮階范數即向量中所有值中的最大值
+---+--------------+--------------+
| id| features| normFeatures|
+---+--------------+--------------+
| 0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
| 1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
| 2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+
|
2 StandardScaler
StandardScaler處理的對象是每一列,也就是每一維特征,將特征標准化為單位標准差或是0均值,或是0均值單位標准差。
主要有兩個參數可以設置:
- withStd: 默認為真。將數據標准化到單位標准差。
- withMean: 默認為假。是否變換為0均值。 (此種方法將產出一個稠密輸出,所以不適用於稀疏輸入。)
StandardScaler需要fit數據,獲取每一維的均值和標准差,來縮放每一維特征。
StandardScaler是一個Estimator,它可以fit數據集產生一個StandardScalerModel,用來計算匯總統計。
然后產生的模可以用來轉換向量至統一的標准差以及(或者)零均值特征。
注意如果特征的標准差為零,則該特征在向量中返回的默認值為0.0。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import org.apache.spark.ml.feature.StandardScaler
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)
// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show
// 將每一列的標准差縮放到1。
+---+--------------+------------------------------------------------------------+
|id |features |scaledFeatures |
+---+--------------+------------------------------------------------------------+
|0 |[1.0,0.5,-1.0]|[0.6546536707079772,0.09352195295828244,-0.6546536707079771]|
|1 |[2.0,1.0,1.0] |[1.3093073414159544,0.1870439059165649,0.6546536707079771] |
|2 |[4.0,10.0,2.0]|[2.618614682831909,1.870439059165649,1.3093073414159542] |
+---+--------------+------------------------------------------------------------+
|
3 MinMaxScaler
MinMaxScaler作用同樣是每一列,即每一維特征。將每一維特征線性地映射到指定的區間,通常是[0, 1]。
MinMaxScaler計算數據集的匯總統計量,並產生一個MinMaxScalerModel。
注意因為零值轉換后可能變為非零值,所以即便為稀疏輸入,輸出也可能為稠密向量。
該模型可以將獨立的特征的值轉換到指定的范圍內。
它也有兩個參數可以設置:
- min: 默認為0。指定區間的下限。
- max: 默認為1。指定區間的上限。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import org.apache.spark.ml.feature.MinMaxScaler
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show
// 每維特征線性地映射,最小值映射到0,最大值映射到1。
+--------------+-----------------------------------------------------------+
|features |scaledFeatures |
+--------------+-----------------------------------------------------------+
|[1.0,0.5,-1.0]|[0.0,0.0,0.0] |
|[2.0,1.0,1.0] |[0.3333333333333333,0.05263157894736842,0.6666666666666666]|
|[4.0,10.0,2.0]|[1.0,1.0,1.0] |
+--------------+-----------------------------------------------------------+
|
4 MaxAbsScaler
MaxAbsScaler將每一維的特征變換到[-1, 1]閉區間上,通過除以每一維特征上的最大的絕對值,它不會平移整個分布,也不會破壞原來每一個特征向量的稀疏性。
因為它不會轉移/集中數據,所以不會破壞數據的稀疏性。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import org.apache.spark.ml.feature.MaxAbsScaler
val scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
// 每一維的絕對值的最大值為[4, 10, 2]
+--------------+----------------+
| features| scaledFeatures|
+--------------+----------------+
|[1.0,0.5,-1.0]|[0.25,0.05,-0.5]|
| [2.0,1.0,1.0]| [0.5,0.1,0.5]|
|[4.0,10.0,2.0]| [1.0,1.0,1.0]|
+--------------+----------------+
|
總結
所有4種歸一化方法都是線性的變換,當某一維特征上具有非線性的分布時,還需要配合其它的特征預處理方法。
補充:
其他特征轉換
VectorIndexer
算法介紹:
VectorIndexer解決數據集中的類別特征Vector。它可以自動識別哪些特征是類別型的,並且將原始值轉換為類別指標。它的處理流程如下:
1.獲得一個向量類型的輸入以及maxCategories參數。
2.基於原始數值識別哪些特征需要被類別化,其中最多maxCategories需要被類別化。
3.對於每一個類別特征計算0-based類別指標。
4.對類別特征進行索引然后將原始值轉換為指標。
索引后的類別特征可以幫助決策樹等算法處理類別型特征,並得到較好結果。
在下面的例子中,我們讀入一個數據集,然后使用VectorIndexer來決定哪些特征需要被作為非數值類型處理,將非數值型特征轉換為他們的索引。
調用示例:
Scala:
- import org.apache.spark.ml.feature.VectorIndexer
- val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
- val indexer = new VectorIndexer()
- .setInputCol("features")
- .setOutputCol("indexed")
- .setMaxCategories(10)
- val indexerModel = indexer.fit(data)
- val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
- println(s"Chose ${categoricalFeatures.size} categorical features: " +
- categoricalFeatures.mkString(", "))
- // Create new column "indexed" with categorical values transformed to indices
- val indexedData = indexerModel.transform(data)
- indexedData.show()
Python:
- from pyspark.ml.feature import VectorIndexer
- data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
- indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
- indexerModel = indexer.fit(data)
- # Create new column "indexed" with categorical values transformed to indices
- indexedData = indexerModel.transform(data)
- indexedData.show()
ElementwiseProduct
算法介紹:
ElementwiseProduct按提供的“weight”向量,返回與輸入向量元素級別的乘積。即是說,按提供的權重分別對輸入數據進行縮放,得到輸入向量v以及權重向量w的Hadamard積。

下面例子展示如何通過轉換向量的值來調整向量。
調用示例:
Scala:
- import org.apache.spark.ml.feature.ElementwiseProduct
- import org.apache.spark.ml.linalg.Vectors
- // Create some vector data; also works for sparse vectors
- val dataFrame = spark.createDataFrame(Seq(
- ("a", Vectors.dense(1.0, 2.0, 3.0)),
- ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
- val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
- val transformer = new ElementwiseProduct()
- .setScalingVec(transformingVector)
- .setInputCol("vector")
- .setOutputCol("transformedVector")
- // Batch transform the vectors to create new column:
- transformer.transform(dataFrame).show()
Python:
- from pyspark.ml.feature import ElementwiseProduct
- from pyspark.ml.linalg import Vectors
- # Create some vector data; also works for sparse vectors
- data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
- df = spark.createDataFrame(data, ["vector"])
- transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
- inputCol="vector", outputCol="transformedVector")
- # Batch transform the vectors to create new column:
- transformer.transform(df).show()
SQLTransformer
算法介紹:
SQLTransformer工具用來轉換由SQL定義的陳述。目前僅支持SQL語法如"SELECT ...FROM __THIS__ ...",其中"__THIS__"代表輸入數據的基礎表。選擇語句指定輸出中展示的字段、元素和表達式,支持Spark SQL中的所有選擇語句。用戶可以基於選擇結果使用Spark SQL建立方程或者用戶自定義函數。SQLTransformer支持語法示例如下:
1. SELECTa, a + b AS a_b FROM __THIS__
2. SELECTa, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
3. SELECTa, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
示例:
假設我們有如下DataFrame包含id,v1,v2列:
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
使用SQLTransformer語句"SELECT *,(v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"轉換后得到輸出如下:
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0| 3.0 | 4.0 | 3.0
2 | 2.0| 5.0 | 7.0 |10.0
調用示例:
Scala:
- import org.apache.spark.ml.feature.SQLTransformer
- val df = spark.createDataFrame(
- Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
- val sqlTrans = new SQLTransformer().setStatement(
- "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
- sqlTrans.transform(df).show()
Java:
- import java.util.Arrays;
- import java.util.List;
- import org.apache.spark.ml.feature.SQLTransformer;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.RowFactory;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.types.*;
- List<Row> data = Arrays.asList(
- RowFactory.create(0, 1.0, 3.0),
- RowFactory.create(2, 2.0, 5.0)
- );
- StructType schema = new StructType(new StructField [] {
- new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
- new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
- new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
- });
- Dataset<Row> df = spark.createDataFrame(data, schema);
- SQLTransformer sqlTrans = new SQLTransformer().setStatement(
- "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");
- sqlTrans.transform(df).show();
Python:
- from pyspark.ml.feature import SQLTransformer
- df = spark.createDataFrame([
- (0, 1.0, 3.0),
- (2, 2.0, 5.0)
- ], ["id", "v1", "v2"])
- sqlTrans = SQLTransformer(
- statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
- sqlTrans.transform(df).show()
VectorAssembler
算法介紹:
VectorAssembler是一個轉換器,它將給定的若干列合並為一列向量。它可以將原始特征和一系列通過其他轉換器得到的特征合並為單一的特征向量,來訓練如邏輯回歸和決策樹等機器學習算法。VectorAssembler可接受的輸入列類型:數值型、布爾型、向量型。輸入列的值將按指定順序依次添加到一個新向量中。
示例:
假設我們有如下DataFrame包含id,hour,mobile, userFeatures以及clicked列:
id | hour | mobile| userFeatures | clicked
----|------|--------|------------------|---------
0 |18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures列中含有3個用戶特征。我們想將hour,mobile以及userFeatures合並為一個新列。將VectorAssembler的輸入指定為hour,mobile以及userFeatures,輸出指定為features,通過轉換我們將得到以下結果:
id | hour | mobile| userFeatures | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 |18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
調用示例:
Scala:
- import org.apache.spark.ml.feature.VectorAssembler
- import org.apache.spark.ml.linalg.Vectors
- val dataset = spark.createDataFrame(
- Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
- ).toDF("id", "hour", "mobile", "userFeatures", "clicked")
- val assembler = new VectorAssembler()
- .setInputCols(Array("hour", "mobile", "userFeatures"))
- .setOutputCol("features")
- val output = assembler.transform(dataset)
- println(output.select("features", "clicked").first())
Python:
- from pyspark.ml.linalg import Vectors
- from pyspark.ml.feature import VectorAssembler
- dataset = spark.createDataFrame(
- [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
- ["id", "hour", "mobile", "userFeatures", "clicked"])
- assembler = VectorAssembler(
- inputCols=["hour", "mobile", "userFeatures"],
- outputCol="features")
- output = assembler.transform(dataset)
- print(output.select("features", "clicked").first())
QuantileDiscretizer
算法介紹:
QuantileDiscretizer講連續型特征轉換為分級類別特征。分級的數量由numBuckets參數決定。分級的范圍有漸進算法決定。漸進的精度由relativeError參數決定。當relativeError設置為0時,將會計算精確的分位點(計算代價較高)。分級的上下邊界為負無窮到正無窮,覆蓋所有的實數值。
示例:
假設我們有如下DataFrame包含id,hour:
id | hour
----|------
0 |18.0
----|------
1 |19.0
----|------
2 | 8.0
----|------
3 | 5.0
----|------
4 | 2.2
hour是一個Double類型的連續特征,將參數numBuckets設置為3,我們可以將hour轉換為如下分級特征。
id | hour | result
----|------|------
0 |18.0 | 2.0
----|------|------
1 |19.0 | 2.0
----|------|------
2 |8.0 | 1.0
----|------|------
3 |5.0 | 1.0
----|------|------
4 |2.2 | 0.0
調用示例:
Scala:
- import org.apache.spark.ml.feature.QuantileDiscretizer
- val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
- var df = spark.createDataFrame(data).toDF("id", "hour")
- val discretizer = new QuantileDiscretizer()
- .setInputCol("hour")
- .setOutputCol("result")
- .setNumBuckets(3)
- val result = discretizer.fit(df).transform(df)
- result.show()
Python:
- from pyspark.ml.feature import QuantileDiscretizer
- data = [(0, 18.0,), (1, 19.0,), (2, 8.0,), (3, 5.0,), (4, 2.2,)]
- df = spark.createDataFrame(data, ["id", "hour"])
- discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
- result = discretizer.fit(df).transform(df)
- result.show()
