Alink漫談(九) :特征工程之特征哈希/標准化縮放
0x00 摘要
Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平台,是業界首個同時支持批式算法、流式算法的機器學習平台。本文將剖析Alink “特征工程” 部分對應代碼實現。
0x01 相關概念
1.1 特征工程
機器學習的特征工程是將原始的輸入數據轉換成特征,以便於更好的表示潛在的問題,並有助於提高預測模型准確性的過程。
找出合適的特征是很困難且耗時的工作,它需要專家知識,而應用機器學習基本也可以理解成特征工程。但是,特征工程對機器學習模型的應用有很大影響,有句俗話叫做“數據和特征決定了機器學習模型的性能上限”。
機器學習的輸入特征包括幾種:
- 數值特征:包括整形、浮點型等,可以有順序意義,或者無序數據。
- 分類特征:如ID、性別等。
- 時間特征:時間序列如月份、年份、季度、日期、小時等。
- 空間特征:經緯度等,可以轉換成郵編,城市等。
- 文本特征:文檔,自然語言,語句等。
特征工程處理技巧大概有:
-
分箱(Binning)
-
獨熱編碼(One-Hot Encoding)
-
特征哈希(Hashing Trick)
-
嵌套法(Embedding)
-
取對數(Log Transformation)
-
特征縮放(Scaling)
-
標准化(Normalization)
-
特征交互(Feature Interaction)
本文將為大家講解特征縮放和特征哈希的實現。
1.2 特征縮放(Scaling)
特征縮放是一種用於標准化獨立變量或數據特征范圍的方法。 在數據處理中,它也稱為數據標准化,並且通常在數據預處理步驟期間執行。特征縮放可以將很大范圍的數據限定在指定范圍內。由於原始數據的值范圍變化很大,在一些機器學習算法中,如果沒有標准化,目標函數將無法正常工作。 例如,大多數分類器按歐幾里德距離計算兩點之間的距離。 如果其中一個要素具有寬范圍的值,則距離將受此特定要素的控制。 因此,應對所有特征的范圍進行歸一化,以使每個特征大致與最終距離成比例。
應用特征縮放的另一個原因是梯度下降與特征縮放比沒有它時收斂得快得多。特征縮放主要包括兩種:
- 最大最小縮放(Min-max Scaling)
- 標准化縮放(Standard(Z) Scaling)
1.3 特征哈希(Hashing Trick)
大多數機器學習算法的輸入要求都是實數矩陣,將原始數據轉換成實數矩陣就是所謂的特征工程,而特征哈希(feature hashing,也稱哈希技巧,hashing trick)就是一種特征工程技術。
特征哈希的目標就是將一個數據點轉換成一個向量 或者 把原始的高維特征向量壓縮成較低維特征向量,且盡量不損失原始特征的表達能力。
特征哈希利用的是哈希函數將原始數據轉換成指定范圍內的散列值,相比較獨熱模型具有很多優點,如支持在線學習,維度減小很多。
比如我們將梁山好漢進行特征哈希,以關勝為例:
姓 名:關勝
排 名:坐第5把交椅
籍 貫:運城(今山西省-運城市)
綽 號:大刀
武 器:青龍偃月刀
星 號:天勇星
相 貌:堂堂八尺五六身軀,細細三柳髭髯,兩眉入鬢,鳳眼朝天,面如重棗,唇若塗朱。
原 型:南宋初,劉豫任濟南知府,金軍攻濟南。劉豫受金人利誘,殺守將關勝,降金。這段故事被清陳忱加以演義,寫入了《水滸后傳》。此關勝可能就是小說中的原型。
出場回合:第063回
后 代:關鈴,在《說岳全傳》出場,岳雲的義弟。
上面都是原始的輸入數據,包括數值特征,分類特征,文本特征等等,計算機無法識別,必須用特征哈希轉換成計算機可以識別的數據。
轉換之后如下(虛構,只是展示使用 _):
// 假設結果是一個 30000 大小的稀疏向量,下面格式是:"index":"value"
"725":"0.8223484445229384" //姓 名
"1000":"0.8444219609970856" //排 名
"4995":"-0.18307661612028242 " //籍 貫
"8049":"0.060151616110215377" //綽 號
"8517":"-0.7340742756048447 " //武 器
"26798":":-0.734299689415312" //星 號
"24390":"0.545435" //相 貌
"25083":"0.4543543" //原 型
"25435":"-0.243432" //出場回合
"25721":"-0.7340742756048447" //后 代
這樣關勝就變成了一個可以被程序處理的向量。
0x02 數據集
我們的數據集和示例代碼都是從FTRLExample獲取到的。
首先看數據集。
String schemaStr
= "id string, click string, dt string, C1 string, banner_pos int, site_id string, site_domain string, "
+ "site_category string, app_id string, app_domain string, app_category string, device_id string, "
+ "device_ip string, device_model string, device_type string, device_conn_type string, C14 int, C15 int, "
+ "C16 int, C17 int, C18 int, C19 int, C20 int, C21 int";
//打印出前面幾列看看
trainBatchData.firstN(5).print();
id|click|dt|C1|banner_pos|site_id|site_domain|site_category|app_id|app_domain|app_category|device_id|device_ip|device_model|device_type|device_conn_type|C14|C15|C16|C17|C18|C19|C20|C21
--|-----|--|--|----------|-------|-----------|-------------|------|----------|------------|---------|---------|------------|-----------|----------------|---|---|---|---|---|---|---|---
3199889859719711212|0|14102101|1005|0|1fbe01fe|f3845767|28905ebd|ecad2386|7801e8d9|07d7df22|a99f214a|cfa82746|c6263d8a|1|0|15708|320|50|1722|0|35|-1|79
3200127078337687811|0|14102101|1005|1|e5c60a05|7256c623|f028772b|ecad2386|7801e8d9|07d7df22|a99f214a|ffb0e59a|83ca6fdb|1|0|19771|320|50|2227|0|687|100075|48
3200382705425230287|1|14102101|1005|0|85f751fd|c4e18dd6|50e219e0|98fed791|d9b5648e|0f2161f8|a99f214a|f69683cc|f51246a7|1|0|20984|320|50|2371|0|551|-1|46
320073658191290816|0|14102101|1005|0|1fbe01fe|f3845767|28905ebd|ecad2386|7801e8d9|07d7df22|a99f214a|8e5b1a31|711ee120|1|0|15706|320|50|1722|0|35|100083|79
3200823995473818776|0|14102101|1005|0|f282ab5a|61eb5bc4|f028772b|ecad2386|7801e8d9|07d7df22|a99f214a|9cf693b4|8a4875bd|1|0|18993|320|50|2161|0|35|-1|157
0x03 示例代碼
從示例代碼可以看到,首先做特征縮放,然后做特征哈希。
String[] selectedColNames = new String[]{
"C1", "banner_pos", "site_category", "app_domain",
"app_category", "device_type", "device_conn_type",
"C14", "C15", "C16", "C17", "C18", "C19", "C20", "C21",
"site_id", "site_domain", "device_id", "device_model"};
String[] categoryColNames = new String[]{
"C1", "banner_pos", "site_category", "app_domain",
"app_category", "device_type", "device_conn_type",
"site_id", "site_domain", "device_id", "device_model"};
String[] numericalColNames = new String[]{
"C14", "C15", "C16", "C17", "C18", "C19", "C20", "C21"};
// setup feature engineering pipeline
Pipeline featurePipeline = new Pipeline()
.add( // 特征縮放
new StandardScaler()
.setSelectedCols(numericalColNames) // 對Double類型的列做變換
)
.add( // 特征哈希
new FeatureHasher()
.setSelectedCols(selectedColNames)
.setCategoricalCols(categoryColNames)
.setOutputCol(vecColName)
.setNumFeatures(numHashFeatures)
);
// fit feature pipeline model
PipelineModel featurePipelineModel = featurePipeline.fit(trainBatchData);
0x04 標准化縮放 StandardScaler
StandardScaler的作用是把數據集的每一個特征進行標准差(standard deviation)轉換 和/或 零均值化(zero mean)。transforms a dataset, normalizing each feature to have unit standard deviation and/or zero mean.
對於做特征縮放的好處,網上文章說的挺好:
當x全為正或者全為負時,每次返回的梯度都只會沿着一個方向發生變化,即梯度變化的方向就會向圖中紅色箭頭所示,一會向上太多,一會向下太多。這樣就會使得權重收斂效率很低。
但當x正負數量“差不多”時,就能對梯度變化方向進行“修正”,加速了權重的收斂。
讓我們想想如果做標准化縮放,具體需要怎么做:
- 需要把需要處理的列的means, stdEnv這樣的都計算出來,這就需要遍歷整個表。所以這個是訓練過程。
- 需要根據上述訓練出來的means, stdEnv等數值,遍歷整個表中的每個數據,應用means, stdEnv結果逐一計算。所以這個是mapper過程。
4.1 StandardScalerTrainBatchOp
StandardScalerTrainBatchOp 類做了標准化縮放相關工作。這里只對數字類型的列做轉換。
/* StandardScaler transforms a dataset, normalizing each feature to have unit standard deviation and/or zero mean. */
public class StandardScalerTrainBatchOp extends BatchOperator<StandardScalerTrainBatchOp>
implements StandardTrainParams<StandardScalerTrainBatchOp> {
@Override
public StandardScalerTrainBatchOp linkFrom(BatchOperator<?>... inputs) {
BatchOperator<?> in = checkAndGetFirst(inputs);
String[] selectedColNames = getSelectedCols();
StandardScalerModelDataConverter converter = new StandardScalerModelDataConverter();
converter.selectedColNames = selectedColNames;
converter.selectedColTypes = new TypeInformation[selectedColNames.length];
// 獲取需要轉換的列
for (int i = 0; i < selectedColNames.length; i++) {
converter.selectedColTypes[i] = Types.DOUBLE;
}
//得到變量如下
converter = {StandardScalerModelDataConverter@9229}
selectedColNames = {String[8]@9228}
0 = "C14"
1 = "C15"
2 = "C16"
3 = "C17"
4 = "C18"
5 = "C19"
6 = "C20"
7 = "C21"
selectedColTypes = {TypeInformation[8]@9231}
0 = {FractionalTypeInfo@9269} "Double"
1 = {FractionalTypeInfo@9269} "Double"
2 = {FractionalTypeInfo@9269} "Double"
3 = {FractionalTypeInfo@9269} "Double"
4 = {FractionalTypeInfo@9269} "Double"
5 = {FractionalTypeInfo@9269} "Double"
6 = {FractionalTypeInfo@9269} "Double"
7 = {FractionalTypeInfo@9269} "Double"
// 用獲取到的列信息通過 StatisticsHelper.summary 做總結,然后通過 BuildStandardScalerModel 進行操作
DataSet<Row> rows = StatisticsHelper.summary(in, selectedColNames)
.flatMap(new BuildStandardScalerModel(converter.selectedColNames,
converter.selectedColTypes,
getWithMean(),
getWithStd()));
this.setOutput(rows, converter.getModelSchema());
return this;
}
這里調用一環套一環,所以先打印出構建執行計划時候的調用棧給大家看看。
summarizer:277, StatisticsHelper (com.alibaba.alink.operator.common.statistics)
summarizer:240, StatisticsHelper (com.alibaba.alink.operator.common.statistics)
summary:71, StatisticsHelper (com.alibaba.alink.operator.common.statistics)
linkFrom:49, StandardScalerTrainBatchOp (com.alibaba.alink.operator.batch.dataproc)
train:22, StandardScaler (com.alibaba.alink.pipeline.dataproc)
fit:34, Trainer (com.alibaba.alink.pipeline)
fit:117, Pipeline (com.alibaba.alink.pipeline)
main:59, FTRLExample (com.alibaba.alink)
StandardScalerTrainBatchOp.linkFrom 構建出來的執行計划從邏輯上講是:
- 1)獲取需要轉換的列信息
- 2)用獲取到的列信息通過 StatisticsHelper.summary (StatisticsHelper類是batch statistical calculation的工具類) 做總結
- 2.1)用 summarizer 獲取table統計信息
- 2.1.1)用in = in.select(selectedColNames);獲取輸入數據中需要調整的列 所對應的數據
- 2.1.2)調用同名 summarizer 函數對in做操作進行統計
- 2.1.2.1)調用 TableSummarizerPartition 對每個partition數據進行統計。
- 2.1.2.1.1) 調用 TableSummarizer.visit 對本 partition 傳入的Row(就是上面in的每個數據)進行計算,得出統計數據比如 squareSum,min, max, normL1。
- 2.1.2.2)回到 summarizer 函數,調用reduce 對所有partition得的統計數據進行匯總。
- 2.1.2.1)調用 TableSummarizerPartition 對每個partition數據進行統計。
- 2.2)對 summarizer 的結果調用 summarizer.toSummary() 進行map,得到 TableSummary,其就是一個簡單統計。
- 2.1)用 summarizer 獲取table統計信息
- 3)對 StatisticsHelper.summary 的結果 通過flatMap( BuildStandardScalerModel ) 進行生成模型 / 存儲操作
- 3.1)BuildStandardScalerModel.flatMap調用 StandardScalerModelDataConverter.save
- 3.1.1)data.add(JsonConverter.toJson(means)); 存means
- 3.1.2)data.add(JsonConverter.toJson(stdDevs)); 存 stdDevs
- 3.1)BuildStandardScalerModel.flatMap調用 StandardScalerModelDataConverter.save
具體結合代碼如下
4.2 StatisticsHelper.summary
StatisticsHelper.summary 首先調用summarizer對原始輸入table做總結,對應代碼 2)
/* table summary, selectedColNames must be set. */
public static DataSet<TableSummary> summary(BatchOperator in, String[] selectedColNames) {
return summarizer(in, selectedColNames, false) // 將會調用代碼 2.1)
.map(new MapFunction<TableSummarizer, TableSummary>() {
@Override
public TableSummary map(TableSummarizer summarizer) throws Exception {
return summarizer.toSummary(); // 對應代碼 2.2)
}
}).name("toSummary");
}
summarizer(in, selectedColNames, false)
從原始輸入中獲取到那些選中的列,然后繼續調用另外同名函數summarizer。
/**
* table stat
*/
private static DataSet<TableSummarizer> summarizer(BatchOperator in, String[] selectedColNames, boolean calculateOuterProduct) { // 對應代碼 2.1)
in = in.select(selectedColNames); // 對應代碼2.1.1)
return summarizer(in.getDataSet(), calculateOuterProduct, getNumericalColIndices(in.getColTypes()), selectedColNames); //對應代碼2.1.2)
}
同名函數summarizer調用 TableSummarizerPartition 對每個partition處理,當然大家知道現在只是把執行計划搭建起來,不是真正的執行。當對每個partition處理完成之后,會回到這里的reduce函數進行merge。
/* given data, return summary. numberIndices is the indices of cols which are number type in selected cols. */
private static DataSet<TableSummarizer> summarizer(DataSet<Row> data, boolean bCov, int[] numberIndices, String[] selectedColNames) {
return data // mapPartition 對應代碼 2.1.2.1)
.mapPartition(new TableSummarizerPartition(bCov, numberIndices, selectedColNames))
.reduce(new ReduceFunction<TableSummarizer>() { // reduce對應代碼 2.1.2.2)
@Override
public TableSummarizer reduce(TableSummarizer left, TableSummarizer right) {
return TableSummarizer.merge(left, right); //最終會merge所有的partition處理結果
}
});
}
TableSummarizerPartition針對每個partition,讓每個worker用來TableSummarizer.visit來做table summary,以后會合並。對應代碼 2.1.2.1.1) 。
/* It is table summary partition of one worker, will merge result later. */
public static class TableSummarizerPartition implements MapPartitionFunction<Row, TableSummarizer> {
@Override
public void mapPartition(Iterable<Row> iterable, Collector<TableSummarizer> collector) {
TableSummarizer srt = new TableSummarizer(selectedColNames, numericalIndices, outerProduct);
srt.colNames = selectedColNames;
for (Row sv : iterable) {
srt = (TableSummarizer) srt.visit(sv);
}
collector.collect(srt);
}
}
// 變量如下
srt = {TableSummarizer@10742} "count: 0\n"
sv = {Row@10764} "15708,320,50,1722,0,35,-1,79"
srt.colNames = {String[8]@10733}
0 = "C14"
1 = "C15"
2 = "C16"
3 = "C17"
4 = "C18"
5 = "C19"
6 = "C20"
7 = "C21"
我們可以看到,上面代碼中會對 iterable 做循環調用 TableSummarizer.visit函數。即通過visit
來對輸入的每個item(這個item就是srt.colNames對應的那些列集合起來做了一個Row)做累積計算,算出比如squareSum,min, max, normL1等等,具體在下面的變量中有體現。
this = {TableSummarizer@10742} "count: 1\nsum: 15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0\nsquareSum: 2.46741264E8 102400.0 2500.0 2965284.0 0.0 1225.0 1.0 6241.0\nmin: 15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0\nmax: 15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0"
colNames = {String[8]@10733}
xSum = null
xSquareSum = null
xyCount = null
numericalColIndices = {int[8]@10734}
numMissingValue = {DenseVector@10791} "0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0"
sum = {DenseVector@10792} "15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0"
squareSum = {DenseVector@10793} "2.46741264E8 102400.0 2500.0 2965284.0 0.0 1225.0 1.0 6241.0"
min = {DenseVector@10794} "15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0"
max = {DenseVector@10795} "15708.0 320.0 50.0 1722.0 0.0 35.0 -1.0 79.0"
normL1 = {DenseVector@10796} "15708.0 320.0 50.0 1722.0 0.0 35.0 1.0 79.0"
vals = {Double[8]@10797}
outerProduct = null
count = 1
calculateOuterProduct = false
4.3 BuildStandardScalerModel
這里的功能是生成模型 / 存儲。
/* table summary build model. */
public static class BuildStandardScalerModel implements FlatMapFunction<TableSummary, Row> {
private String[] selectedColNames;
private TypeInformation[] selectedColTypes;
private boolean withMean;
private boolean withStdDevs;
@Override
public void flatMap(TableSummary srt, Collector<Row> collector) throws Exception {
if (null != srt) {
StandardScalerModelDataConverter converter = new StandardScalerModelDataConverter();
converter.selectedColNames = selectedColNames;
converter.selectedColTypes = selectedColTypes;
// 業務
converter.save(new Tuple3<>(this.withMean, this.withStdDevs, srt), collector);
}
}
}
save函數調用的是StandardScalerModelDataConverter.save,邏輯比較清晰:
- 存儲mean
- 存儲stdDevs
- 構建元數據Params
- 序列化
- 發送序列化結果
/*
* Serialize the model data to "Tuple3<Params, List<String>, List<Row>>".
*
* @param modelData The model data to serialize.
* @return The serialization result.
*/
@Override
public Tuple3<Params, Iterable<String>, Iterable<Row>> serializeModel(Tuple3<Boolean, Boolean, TableSummary> modelData) {
Boolean withMean = modelData.f0;
Boolean withStandarDeviation = modelData.f1;
TableSummary summary = modelData.f2;
String[] colNames = summary.getColNames();
double[] means = new double[colNames.length];
double[] stdDevs = new double[colNames.length];
for (int i = 0; i < colNames.length; i++) {
means[i] = summary.mean(colNames[i]); // 1. 存儲mean
stdDevs[i] = summary.standardDeviation(colNames[i]); // 2. 存儲stdDevs
}
for (int i = 0; i < colNames.length; i++) {
if (!withMean) {
means[i] = 0;
}
if (!withStandarDeviation) {
stdDevs[i] = 1;
}
}
// 3. 構建元數據Params
Params meta = new Params()
.set(StandardTrainParams.WITH_MEAN, withMean)
.set(StandardTrainParams.WITH_STD, withStandarDeviation);
// 4. 序列化
List<String> data = new ArrayList<>();
data.add(JsonConverter.toJson(means));
data.add(JsonConverter.toJson(stdDevs));
return new Tuple3<>(meta, data, new ArrayList<>());
}
調用棧和變量如下,我們可以看出來模型是如何構建的。
save:68, RichModelDataConverter (com.alibaba.alink.common.model)
flatMap:84, StandardScalerTrainBatchOp$BuildStandardScalerModel (com.alibaba.alink.operator.batch.dataproc)
flatMap:63, StandardScalerTrainBatchOp$BuildStandardScalerModel (com.alibaba.alink.operator.batch.dataproc)
collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
run:152, AllReduceDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
// 以下是輸入
modelData = {Tuple3@10723}
f0 = {Boolean@10726} true
f1 = {Boolean@10726} true
f2 = {TableSummary@10707} "colName|count|numMissingValue|numValidValue|sum|mean|variance|standardDeviation|min|max|normL1|normL2\r\n-------|-----|---------------|-------------|---|----|--------|-----------------|---|---|------|------\nC14|399999|0.0000|399999.0000|7257042877.0000|18142.6525|10993280.1107|3315.6116|375.0000|21705.0000|7257042877.0000|11664445.8724\nC15|399999|0.0000|399999.0000|127629988.0000|319.0758|411.3345|20.2814|120.0000|1024.0000|127629988.0000|202208.2328\nC16|399999|0.0000|399999.0000|22663266.0000|56.6583|1322.7015|36.3690|20.0000|768.0000|22663266.0000|42580.9842\nC17|399999|0.0000|399999.0000|809923879.0000|2024.8148|170166.5008|412.5124|112.0000|2497.0000|809923879.0000|1306909.3634\nC18|399999|0.0000|399999.0000|414396.0000|1.0360|1.5871|1.2598|0.0000|3.0000|414396.0000|1031.5736\nC19|399999|0.0000|399999.0000|77641159.0000|194.1034|73786.4929|271.6367|33.0000|1835.0000|77641159.0000|211151.2756\nC20|399999|0.0000|399999.0000|16665597769.0000|41664.0986|2434589745.2799|49341.5620|-1.0000|100"
colNames = {String[8]@10728}
numMissingValue = {DenseVector@10729} "0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0"
sum = {DenseVector@10730} "7.257042877E9 1.27629988E8 2.2663266E7 8.09923879E8 414396.0 7.7641159E7 1.6665597769E10 3.0589982E7"
squareSum = {DenseVector@10731} "1.36059297509295E14 4.0888169392E10 1.813140212E9 1.708012084269E12 1064144.0 4.4584861175E10 1.668188137320503E15 3.044124336E9"
min = {DenseVector@10732} "375.0 120.0 20.0 112.0 0.0 33.0 -1.0 13.0"
max = {DenseVector@10733} "21705.0 1024.0 768.0 2497.0 3.0 1835.0 100248.0 195.0"
normL1 = {DenseVector@10734} "7.257042877E9 1.27629988E8 2.2663266E7 8.09923879E8 414396.0 7.7641159E7 1.6666064771E10 3.0589982E7"
numericalColIndices = {int[8]@10735}
count = 399999
// 這是輸出
model = {Tuple3@10816} "(Params {withMean=true, withStd=true},[[18142.652549131373,319.07576768941925,56.658306645766615,2024.814759536899,1.035992589981475,194.1033827584569,41664.098582746454,76.47514618786548], [3315.6115741652725,20.281383913437733,36.36896282478844,412.51242496870356,1.259797591740416,271.6366927754722,49341.56204742555,41.974829196745965]],[])"
f0 = {Params@10817} "Params {withMean=true, withStd=true}"
f1 = {ArrayList@10820} size = 2
0 = "[18142.652549131373,319.07576768941925,56.658306645766615,2024.814759536899,1.035992589981475,194.1033827584569,41664.098582746454,76.47514618786548]"
1 = "[3315.6115741652725,20.281383913437733,36.36896282478844,412.51242496870356,1.259797591740416,271.6366927754722,49341.56204742555,41.974829196745965]"
f2 = {ArrayList@10818} size = 0
4.4 轉換 mapper
訓練好之后,當轉換時候,會對每個item row進行map,這里面使用之前計算出來的 means/stdDevs 進行具體標准化。
@Override
public Row map(Row row) throws Exception {
Row r = new Row(this.selectedColIndices.length);
for (int i = 0; i < this.selectedColIndices.length; i++) {
Object obj = row.getField(this.selectedColIndices[i]);
if (null != obj) {
if (this.stddevs[i] > 0) {
double d = (((Number) obj).doubleValue() - this.means[i]) / this.stddevs[i];
r.setField(i, d);
} else {
r.setField(i, 0.0);
}
}
}
return this.predResultColsHelper.getResultRow(row, r);
}
// means,stddevs 是對應那幾列之前統計出來的總體數值,是根據這些來進行轉換的。
this = {StandardScalerModelMapper@10909}
selectedColNames = {String[8]@10873}
selectedColTypes = {TypeInformation[8]@10874}
selectedColIndices = {int[8]@10912}
means = {double[8]@10913}
0 = 18142.652549131373
...
7 = 76.47514618786548
stddevs = {double[8]@10914}
0 = 3315.6115741652725
...
7 = 41.974829196745965
變量如下,Row是輸入數據,r 是對那幾個需要轉換的數據進行轉換之后,生成的數據。
標准化之后,用 OutputColsHelper.getResultRow把 Row 和 r 歸並起來。
row = {Row@10865} "3200382705425230287,1,14102101,1005,0,85f751fd,c4e18dd6,50e219e0,98fed791,d9b5648e,0f2161f8,a99f214a,f69683cc,f51246a7,1,0,20984,320,50,2371,0,551,-1,46"
其中 "20984,320,50,2371,0,551,-1,46" 是需要轉換的數據。
r = {Row@10866} "0.8569602884149525,0.04557047559108551,-0.18307661612028242,0.8392116685682023,-0.8223484445229384,1.313874843618953,-0.8444219609970856,-0.7260338343491822"
這里是上面需要轉換的數據進行標准化之后的結果
堆棧如下
getResultRow:177, OutputColsHelper (com.alibaba.alink.common.utils)
map:88, StandardScalerModelMapper (com.alibaba.alink.operator.common.dataproc)
map:43, ModelMapperAdapter (com.alibaba.alink.common.mapper)
map:18, ModelMapperAdapter (com.alibaba.alink.common.mapper)
run:103, MapDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)
0x05 特征哈希 FeatureHasher
FeatureHasher完成了特征哈希功能,這個沒有訓練,就是mapper。具體細節是:
- 把 categorical特征 或者 數值特征 投射到給定領域的特征向量上。
- 使用 MurMurHash3 算法。
- 對於categorical特征,使用"colName=value"進行哈希。colName是特征列名,value是特征值。相關哈希值是 1.0
- 對於數值特征,使用"colName"做哈希。相關哈希值就是特征值
- categorical特征 或者 數值特征 是自動發現的。
對應代碼看。
5.1 稀疏矩陣
最終生成了一個30000大小的,最后名字是"vec"的特征矩陣。這里是稀疏矩陣。
String vecColName = "vec";
int numHashFeatures = 30000;
// setup feature engineering pipeline
Pipeline featurePipeline = new Pipeline()
.add(
new StandardScaler()
.setSelectedCols(numericalColNames)
)
.add(
new FeatureHasher()
.setSelectedCols(selectedColNames)
.setCategoricalCols(categoryColNames)
.setOutputCol(vecColName)
.setNumFeatures(numHashFeatures)
);
5.2 FeatureHasherMapper
傳入map函數時候,Row就是 “原始數據經過標准化處理之后的數據”。
遍歷數值特征列,進行哈希變換;遍歷categorical特征列,進行哈希轉換。
public class FeatureHasherMapper extends Mapper {
/**
* Projects a number of categorical or numerical features into a feature vector of a specified dimension.
*
* @param row the input Row type data
* @return the output row.
*/
@Override
public Row map(Row row) {
TreeMap<Integer, Double> feature = new TreeMap<>();
// 遍歷數值特征列,進行哈希變換;
for (int key : numericColIndexes) {
if (null != row.getField(key)) {
double value = ((Number)row.getField(key)).doubleValue();
String colName = colNames[key];
updateMap(colName, value, feature, numFeature);
}
}
// 遍歷categorical特征列,進行哈希轉換
for (int key : categoricalColIndexes) {
if (null != row.getField(key)) {
String colName = colNames[key];
updateMap(colName + "=" + row.getField(key).toString(), 1.0, feature, numFeature);
}
}
return outputColsHelper.getResultRow(row, Row.of(new SparseVector(numFeature, feature)));
}
}
//運行時候打印變量如下
selectedCols = {String[19]@9817}
0 = "C1"
1 = "banner_pos"
2 = "site_category"
3 = "app_domain"
4 = "app_category"
5 = "device_type"
6 = "device_conn_type"
7 = "C14"
8 = "C15"
9 = "C16"
10 = "C17"
11 = "C18"
12 = "C19"
13 = "C20"
14 = "C21"
15 = "site_id"
16 = "site_domain"
17 = "device_id"
18 = "device_model"
numericColIndexes = {int[8]@10789}
0 = 16
1 = 17
2 = 18
3 = 19
4 = 20
5 = 21
6 = 22
7 = 23
categoricalColIndexes = {int[11]@10791}
0 = 3
1 = 4
2 = 7
3 = 9
4 = 10
5 = 14
6 = 15
7 = 5
8 = 6
9 = 11
10 = 13
5.3 哈希操作 updateMap
updateMap完成了具體哈希操作,用哈希函數生成了稀疏矩陣的index,然后把value放入對應的index中。
具體哈希函數使用 org.apache.flink.shaded.guava18.com.google.common.hash
。
/* Update the treeMap which saves the key-value pair of the final vector, use the hash value of the string as key
* and the accumulate the corresponding value.
*
* @param s the string to hash
* @param value the accumulated value */
private static void updateMap(String s, double value, TreeMap<Integer, Double> feature, int numFeature) {
// HASH = {Murmur3_32HashFunction@10755} "Hashing.murmur3_32(0)"
int hashValue = Math.abs(HASH.hashUnencodedChars(s).asInt());
int index = Math.floorMod(hashValue, numFeature);
if (feature.containsKey(index)) {
feature.put(index, feature.get(index) + value);
} else {
feature.put(index, value);
}
}
比如當如下輸入時候,得到index是26798,所以會在vec中的 26798 中設置Value
s = "C14"
value = 0.33428145187593655
feature = {TreeMap@10836} size = 1
{Integer@10895} 26798 -> {Double@10896} 0.33428145187593655
numFeature = 30000
hashValue = 23306798
index = 26798
最終特征哈希之后,得到的vec會附加在原始Row上的第25項(原來是24項,現在在最后附加一項),就是下面的 24 = {SparseVector@10932}
。
row = {Row@10901}
fields = {Object[25]@10907}
0 = "3199889859719711212"
1 = "0"
2 = "14102101"
3 = "1005"
4 = {Integer@10912} 0
5 = "1fbe01fe" // "device_type" 是這個數值,這個是原始輸入,大家如果遺忘可以回頭看看示例代碼輸出。
6 = "f3845767"
7 = "28905ebd"
8 = "ecad2386"
9 = "7801e8d9"
10 = "07d7df22"
11 = "a99f214a"
12 = "cfa82746"
13 = "c6263d8a"
14 = "1"
15 = "0"
16 = {Double@10924} -0.734299689415312
17 = {Double@10925} 0.04557047559108551
18 = {Double@10926} -0.18307661612028242
19 = {Double@10927} -0.7340742756048447
20 = {Double@10928} -0.8223484445229384
21 = {Double@10929} -0.5857212482334542
22 = {Double@10930} -0.8444219609970856
23 = {Double@10931} 0.060151616110215377
24 = {SparseVector@10932} "$30000$725:-0.8223484445229384 1000:1.0 3044:-0.8444219609970856 4995:-0.18307661612028242 8049:0.060151616110215377 8517:1.0 10962:1.0 17954:1.0 18556:1.0 21430:1.0 23250:1.0 24010:1.0 24390:1.0 25083:0.04557047559108551 25435:-0.5857212482334542 25721:-0.7340742756048447 26169:1.0 26798:-0.734299689415312 29671:1.0"
// 30000 表示一共是30000大小的稀疏向量
// 725:-0.8223484445229384 表示第725的item中的數值是-0.8223484445229384,依次類推。