Alink漫談(十一) :線性回歸 之 L-BFGS優化
0x00 摘要
Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平台,是業界首個同時支持批式算法、流式算法的機器學習平台。本文介紹了線性回歸的L-BFGS優化在Alink是如何實現的,希望可以作為大家看線性回歸代碼的Roadmap。
因為Alink的公開資料太少,所以以下均為自行揣測,肯定會有疏漏錯誤,希望大家指出,我會隨時更新。
本系列目前已有十一篇,歡迎大家指點
Alink漫談(一) : 從KMeans算法實現不同看Alink設計思想
Alink漫談(二) : 從源碼看機器學習平台Alink設計和架構
Alink漫談(八) : 二分類評估 AUC、K-S、PRC、Precision、Recall、LiftChart 如何實現
0x01 回顧
到目前為止,已經處理完畢輸入,接下來就是優化。優化的主要目標是找到一個方向,參數朝這個方向移動之后使得損失函數的值能夠減小,這個方向往往由一階偏導或者二階偏導各種組合求得。 所以我們再次復習下基本思路。
1.1 優化基本思路
對於線性回歸模型 f(x) = w'x+e,我們構造一個Cost函數(損失函數)J(θ),並且通過找到 J(θ) 函數的最小值,就可以確定線性模型的系數 w 了。
最終的優化函數是:min(L(Y, f(x)) + J(x)) ,即最優化經驗風險和結構風險,而這個函數就被稱為目標函數。
我們要做的是依據我們的訓練集,選取最優的θ,在我們的訓練集中讓f(x)盡可能接近真實的值。我們定義了一個函數來描述 “f(x)和真實的值y之間的差距“,這個函數稱為目標函數,表達式如下:
這里的目標函數就是著名的最小二乘函數。
我們要選擇最優的θ,使得f(x)最近進真實值。這個問題就轉化為求解最優的θ,使目標函數 J(θ) 取最小值。
1.2 各類優化方法
尋找合適的W令目標函數f(W) 最小,是一個無約束最優化問題,解決這個問題的通用做法是隨機給定一個初始的W0,通過迭代,在每次迭代中計算目標函數的下降方向並更新W,直到目標函數穩定在最小的點。
不同的優化算法的區別就在於目標函數下降方向Dt的計算。下降方向是通過對目標函數在當前的W下求一階倒數(梯度,Gradient)和求二階導數(海森矩陣,Hessian Matrix)得到。常見的算法有梯度下降法、牛頓法、擬牛頓法。
- 梯度下降法直接采用目標函數在當前W的梯度的反方向作為下降方向。
- 牛頓法是在當前W下,利用二次泰勒展開近似目標函數,然后利用該近似函數來求解目標函數的下降方向。其中Bt為目標函數f(W)在Wt處的海森矩陣。這個搜索方向也稱作牛頓方向。
- 擬牛頓法只要求每一步迭代中計算目標函數的梯度,通過擬合的方式找到一個近似的海森矩陣用於計算牛頓方向。
- L-BFGS(Limited-memory BFGS)則是解決了BFGS中每次迭代后都需要保存N*N階海森逆矩陣的問題,只需要保存每次迭代的兩組向量和一組標量即可。
Alink中,UnaryLossObjFunc是目標函數,SquareLossFunc 是損失函數,使用L-BFGS算法對模型進行優化。
即 優化方向由擬牛頓法L-BFGS搞定(具體如何弄就是看f(x)的泰勒二階展開),損失函數最小值是平方損失函數來計算。
0x02 基本概念
因為L-BFGS算法是擬牛頓法的一種,所以我們先從牛頓法的本質泰勒展開開始介紹。
2.1 泰勒展開
泰勒展開是希望基於某區間一點x_0展開,用一組簡單的冪函數來近似一個復雜的函數f(x)在該區間的局部。泰勒展開的應用場景例如:我們很難直接求得sin(1)的值,但我們可以通過sin的泰勒級數求得sin(1)的近似值,且展開項越多,精度越高。計算機一般都是把 sin(x)進行泰勒展開進行計算的。
泰勒當年為什么要發明這條公式?
因為當時數學界對簡單函數的研究和應用已經趨於成熟,而復雜函數,比如:f(x) = sin(x)ln(1+x) 這種一看就頭疼的函數,還有那種根本就找不到表達式的曲線。除了代入一個x可以得到它的y,就啥事都很難干了。所以泰勒同學就迎難而上!決定讓這些式子統統現出原形,統統變簡單。
要讓一個復雜函數變簡單,能不能把它轉換成別的表達式?泰勒公式一句話描述:就是用多項式函數去逼近光滑函數。即,根據“以直代曲、化整為零”的數學思想,產生了泰勒公式。
泰勒公式通過把【任意函數表達式】轉換(重寫)為【多項式】形式,是一種極其強大的函數近似工具。為什么說它強大呢?
- 多項式非常【友好】,三易,易計算,易求導,易積分
- 幾何感覺和計算感覺都很直觀,如拋物線和幾次方就是底數自己乘自己乘幾次
如何通俗推理?
泰勒公式干的事情就是:使用多項式表達式估計(近似) 在 附近的值。
當我們想要仿造一個東西的時候,無形之中都會按照如下思路,即先保證大體上相似,再保證局部相似,再保證細節相似,再保證更細微的地方相似……不斷地細化下去,無窮次細化以后,仿造的東西將無限接近真品。真假難辨。
物理學家得出結論:把生活中關於“仿造”的經驗運用到運動學問題中,如果想仿造一段曲線,那么首先應該保證曲線的起始點一樣,其次保證起始點處位移隨時間的變化率一樣(速度相同),再次應該保證前兩者相等的同時關於時間的二階變化率一樣(加速度相同)……如果隨時間每一階變化率(每一階導數)都一樣,那這倆曲線肯定是完全等價的。
所以如果泰勒想一個辦法讓自己避免接觸 sin(x)這類函數,即把這類函數替換掉。 就可以根據這類函數的圖像,仿造一個圖像,與原來的圖像相類似,這種行為在數學上叫近似。不扯這個名詞。講講如何仿造圖像。
仿造的第一步,就是讓仿造的曲線也過這個點。
完成了仿造的第一步,很粗糙,甚至完全看不出來這倆有什么相似的地方,那就繼續細節化。開始考慮曲線的變化趨勢,即導數,保證在此處的導數相等。
經歷了第二步,現在起始點相同了,整體變化趨勢相近了,可能看起來有那么點意思了。想進一步精確化,應該考慮凹凸性。高中學過:表征圖像的凹凸性的參數為“導數的導數”。所以,下一步就讓二者的導數的導數相等。
起始點相同,增減性相同,凹凸性相同后,仿造的函數更像了。如果再繼續細化下去,應該會無限接近。所以泰勒認為“仿造一段曲線,要先保證起點相同,再保證在此處導數相同,繼續保證在此處的導數的導數相同……”
泰勒展開式就是把一個三角函數或者指數函數或者其他比較難纏的函數用多項式替換掉。
也就是說,有一個原函數f(x) ,我再造一個圖像與原函數圖像相似的多項式函數 g(x) ,為了保證相似,我只需要保證這倆函數在某一點的初始值相等,1階導數相等,2階導數相等,……n階導數相等。
2.2 牛頓法
牛頓法的基本思路是,在現有極小點估計值的附近對 f(x) 做二階泰勒展開,進而找到極小點的下一個估計值。其核心思想是利用迭代點 x_k 處的一階導數(梯度)和二階導數(Hessen矩陣)對目標函數進行二次函數近似,然后把二次模型的極小點作為新的迭代點,並不斷重復這一過程,直至求得滿足精度的近似極小值。
梯度下降算法是將函數在 xn 位置進行一次函數近似,也就是一條直線。計算梯度,從而決定下一步優化的方向是梯度的反方向。而牛頓法是將函數在 xn 位置進行二階函數近似,也就是二次曲線。計算梯度和二階導數,從而決定下一步的優化方向。
我們要優化的都是多元函數,x往往不是一個實數,而是一個向量。所以f(x) 的一階導數也是一個向量,再求導的二階導數是一個矩陣,就是Hessian矩陣。
2.2.1 泰勒一階展開
牛頓法求根可以按照泰勒一階展開。例如對於方程 f(x) = 0,我們在任意一點 x0 處,進行一階泰勒展開:
令 f(x) = 0,帶入上式,即可得到:
注意,這里使用了近似,得到的 x 並不是方程的根,只是近似解。但是,x 比 x0 更接近於方程的根。
然后,利用迭代方法求解,以 x 為 x0,求解下一個距離方程的根更近的位置。迭代公式可以寫成:
2.2.2 泰勒二階展開
機器學習、深度學習中,損失函數的優化問題一般是基於一階導數梯度下降的。現在,從另一個角度來看,想要讓損失函數最小化,這其實是一個最值問題,對應函數的一階導數 f'(x) = 0。也就是說,如果我們找到了能讓 f'(x) = 0 的點 x,損失函數取得最小值,也就實現了模型優化目標。
現在的目標是計算 f’(x) = 0 對應的 x,即 f’(x) = 0 的根。轉化為求根問題,就可以利用上一節的牛頓法了。
與上一節有所不同,首先,對 f(x) 在 x0 處進行二階泰勒展開:
上式成立的條件是 f(x) 近似等於 f(x0)。令 f(x) = f(x0),並對 (x - x0) 求導,可得:
同樣,雖然 x 並不是最優解點,但是 x 比 x0 更接近 f'(x) = 0 的根。這樣,就能得到最優化的迭代公式:
通過迭代公式,就能不斷地找到 f'(x) = 0 的近似根,從而也就實現了損失函數最小化的優化目標。
2.2.3 高維空間
在機器學習的優化問題中,我們要優化的都是多元函數,所以x往往不是一個實數,而是一個向量。所以將牛頓求根法利用到機器學習中時,x是一個向量,f(x) 的一階導數也是一個向量,再求導的二階導數是一個矩陣,就是Hessian矩陣。
在高維空間,我們用梯度替代一階導數,用Hessian矩陣替代二階導數,牛頓法的迭代公式不變:
其中 J 定義為 雅克比矩陣,對應一階偏導數。
推導如下 :
我們假設f(x)是二階可微實函數,把f(x)在xk處Taylor展開並取二階近似為
我們的目標是求f(x)的最小值,而導數為0的點極有可能為極值點,故在此對f(x)求導,並令其導數為0,即∇f(x)=0,可得
設∇2f(x)可逆,由(2)可以得到牛頓法的迭代公式
當原函數存在一階二階連續可導時,可以采用牛頓法進行一維搜索,收斂速度快,具有局部二階收斂速度。
2.2.4 牛頓法基本流程
總結(模仿)一下使用牛頓法求根的步驟:
a.已知函數的情況下,隨機產生點.
b.由已知的點按照的公式進行k次迭代.
c.如果與上一次的迭代結果相同或者相差結果小於一個閾值時,本次結果就是函數的根.
偽代碼如下
def newton(feature, label, iterMax, sigma, delta):
'''牛頓法
input: feature(mat):特征
label(mat):標簽
iterMax(int):最大迭代次數
sigma(float), delta(float):牛頓法中的參數
output: w(mat):回歸系數
'''
n = np.shape(feature)[1]
w = np.mat(np.zeros((n, 1)))
it = 0
while it <= iterMax:
g = first_derivativ(feature, label, w) # 一階導數
G = second_derivative(feature) # 二階導數
d = -G.I * g
m = get_min_m(feature, label, sigma, delta, d, w, g) # 得到步長中最小的值m
w = w + pow(sigma, m) * d
it += 1
return w
2.2.5 問題點及解決
牛頓法不僅需要計算 Hessian 矩陣,而且需要計算 Hessian 矩陣的逆。當數據量比較少的時候,運算速度不會受到大的影響。但是,當數據量很大,特別在深度神經網絡中,計算 Hessian 矩陣和它的逆矩陣是非常耗時的。從整體效果來看,牛頓法優化速度沒有梯度下降算法那么快。所以,目前神經網絡損失函數的優化策略大多都是基於梯度下降。
另一個問題是,如果某個點的Hessian矩陣接近奇異(條件數過大),逆矩陣會導致數值不穩定,甚至迭代可能不會收斂。
當x的維度特別多的時候,我們想求得f(x) 的二階導數很困難,而牛頓法求駐點又是一個迭代算法,所以這個困難我們還要面臨無限多次,導致了牛頓法求駐點在機器學習中無法使用。有沒有什么解決辦法呢?
實際應用中,我們通常不去求解逆矩陣,而是考慮求解Hessian矩陣的近似矩陣,最好是只利用一階導數近似地得到二階導數的信息,從而在較少的計算量下得到接近二階的收斂速率。這就是擬牛頓法。
擬牛頓法的想法其實很簡單,就像是函數值的兩點之差可以逼近導數一樣,一階導數的兩點之差也可以逼近二階導數。幾何意義是求一階導數的“割線”,取極限時,割線會逼近切線,矩陣B就會逼近Hessian矩陣。
2.3 擬牛頓法
擬牛頓法就是模擬出Hessen矩陣的構造過程,通過迭代來逼近。主要包括DFP擬牛頓法,BFGS擬牛頓法。擬牛頓法只要求每一步迭代時知道目標函數的梯度。
各種擬牛頓法使用迭代法分別近似海森矩陣的逆和它自身;
在各種擬牛頓法中,一般的構造Hk+1的策略是,H1通常選擇任意的一個n階對稱正定矩陣(一般為I),然后通過不斷的修正Hk給出Hk+1,即
比如:BFGS法每次更新矩陣H(Hessian矩陣的逆矩陣)需要的是第k步的迭代點差s和梯度差y,第k+1步的H相當於需要從開始到第k步的所用s和y的值。
我們要通過牛頓求駐點法和BFGS算法來求得一個函數的根,兩個算法都需要迭代,所以我們干脆讓他倆一起迭代就好了。兩個算法都是慢慢逼近函數根,所以經過k次迭代以后,所得到的解就是機器學習中目標函數導函數的根。這種兩個算法共同迭代的計算方式,我們稱之為On The Fly.
在BFGS算法迭代的第一步,單位矩陣與梯度 g 相乘,就等於梯度 g,形式上同梯度下降的表達式是相同的。所以BFGS算法可以理解為從梯度下降逐步轉換為牛頓法求函數解的一個算法。
雖然我們使用了BFGS算法來利用單位矩陣逐步逼近H矩陣,但是每次計算的時候都要存儲D矩陣,D矩陣有多大。呢。假設我們的數據集有十萬個維度(不算特別大),那么每次迭代所要存儲D矩陣的結果是74.5GB。我們無法保存如此巨大的矩陣內容,如何解決呢?使用L-BFGS算法.
2.4 L-BFGS算法
L-BFGS算法的基本思想是:算法只保存並利用最近m次迭代的曲率信息來構造海森矩陣的近似矩陣。
我們要通過牛頓求駐點法和BFGS算法來求得一個函數的根,兩個算法都需要迭代,所以我們干脆讓他倆一起迭代就好了。兩個算法都是慢慢逼近函數根,所以經過k次迭代以后,所得到的解就是機器學習中目標函數導函數的根。這種兩個算法共同迭代的計算方式,我們稱之為On The Fly.
在BFGS算法迭代的第一步,單位矩陣與梯度g相乘,就等於梯度g,形式上同梯度下降的表達式是相同的。所以BFGS算法可以理解為從梯度下降逐步轉換為牛頓法求函數解的一個算法。
雖然我們使用了BFGS算法來利用單位矩陣逐步逼近H矩陣,但是每次計算的時候都要存儲D矩陣,D矩陣有多大。呢。假設我們的數據集有十萬個維度(不算特別大),那么每次迭代所要存儲D矩陣的結果是74.5GB。我們無法保存如此巨大的矩陣內容,如何解決呢?使用L-BFGS算法.
我們每一次對D矩陣的迭代,都是通過迭代計算sk和yk得到的。我們的內存存不下時候只能丟掉一些存不下的數據。假設我們設置的存儲向量數為100,當s和y迭代超過100時,就會扔掉第一個s和y。每多一次迭代就對應的扔掉最前邊的s和y。這樣雖然損失了精度,但確可以保證使用有限的內存將函數的解通過BFGS算法求得到。 所以L-BFGS算法可以理解為對BFGS算法的又一次近似或者逼近。
這里不介紹數學論證,因為網上優秀文章有很多,這里只是介紹工程實現。總結L-BFGS算法的大致步驟如下:
Step1: 選初始點x_0,存儲最近迭代次數m;
Step2: k=0, H_0=I, r=f(x_0);
Step3: 根據更新的參數計算梯度和損失值,如果達到閾值,則返回最優解x_{k+1},否則轉Step4;
Step4: 計算本次迭代的可行梯度下降方向 p_k=-r_k;
Step5: 計算步長alpha_k,進行一維搜索;
Step6: 更新權重x;
Step7: 只保留最近m次的向量對;
Step8: 計算並保存 s_k, y_k
Step9: 用two-loop recursion算法求r_k;
k=k+1,轉Step3。
0x03 優化模型 -- L-BFGS算法
回到代碼,BaseLinearModelTrainBatchOp.optimize
函數調用的是
return new Lbfgs(objFunc, trainData, coefficientDim, params).optimize();
優化后將返回線性模型的系數。
/**
* optimizer api.
*
* @return the coefficient of linear problem.
*/
@Override
public DataSet <Tuple2 <DenseVector, double[]>> optimize() {
/**
* solving problem using iteration.
* trainData is the distributed samples.
* initCoef is the initial model coefficient, which will be broadcast to every worker.
* objFuncSet is the object function in dataSet format
* .add(new PreallocateCoefficient(OptimName.currentCoef)) allocate memory for current coefficient
* .add(new PreallocateCoefficient(OptimName.minCoef)) allocate memory for min loss coefficient
* .add(new PreallocateLossCurve(OptimVariable.lossCurve)) allocate memory for loss values
* .add(new PreallocateVector(OptimName.dir ...)) allocate memory for dir
* .add(new PreallocateVector(OptimName.grad)) allocate memory for grad
* .add(new PreallocateSkyk()) allocate memory for sK yK
* .add(new CalcGradient(objFunc)) calculate local sub gradient
* .add(new AllReduce(OptimName.gradAllReduce)) sum all sub gradient with allReduce
* .add(new CalDirection()) get summed gradient and use it to calc descend dir
* .add(new CalcLosses(objFunc, OptimMethod.GD)) calculate local losses for line search
* .add(new AllReduce(OptimName.lossAllReduce)) sum all losses with allReduce
* .add(new UpdateModel(maxIter, epsilon ...)) update coefficient
* .setCompareCriterionOfNode0(new IterTermination()) judge stop of iteration
*/
DataSet <Row> model = new IterativeComQueue()
.initWithPartitionedData(OptimVariable.trainData, trainData)
.initWithBroadcastData(OptimVariable.model, coefVec)
.initWithBroadcastData(OptimVariable.objFunc, objFuncSet)
.add(new PreallocateCoefficient(OptimVariable.currentCoef))
.add(new PreallocateCoefficient(OptimVariable.minCoef))
.add(new PreallocateLossCurve(OptimVariable.lossCurve, maxIter))
.add(new PreallocateVector(OptimVariable.dir, new double[] {0.0, OptimVariable.learningRate}))
.add(new PreallocateVector(OptimVariable.grad))
.add(new PreallocateSkyk(OptimVariable.numCorrections))
.add(new CalcGradient())
.add(new AllReduce(OptimVariable.gradAllReduce))
.add(new CalDirection(OptimVariable.numCorrections))
.add(new CalcLosses(OptimMethod.LBFGS, numSearchStep))
.add(new AllReduce(OptimVariable.lossAllReduce))
.add(new UpdateModel(params, OptimVariable.grad, OptimMethod.LBFGS, numSearchStep))
.setCompareCriterionOfNode0(new IterTermination())
.closeWith(new OutputModel())
.setMaxIter(maxIter)
.exec();
return model.mapPartition(new ParseRowModel());
}
所以我們接下來的就是看Lbfgs,其重點就是參數更新的下降方向和搜索步長。
3.1 如何分布式實施
如果由於所有輸入的數據都是相同維度的;算法過程中不會對輸入修改,就可以將這些輸入數據進行切分。這樣的話,應該可以通過一次map-reduce來計算。
我們理一下L-BFGS中可以分布式並行計算的步驟:
- 計算梯度 可以並行化 ,比如在機器1計算梯度1,在機器2計算梯度2....,然后通過一個Reduce把這些合成一個完整的梯度向量。
- 計算方向 可以並行化,同樣可以通過把數據分區,然后Map算各自分區上的值,Reduce合起來得到方向。
- 計算損失 可以並行化,同樣可以通過把數據分區,然后Map算各自分區上的值,Reduce合起來得到損失。
Alink中,使用AllReduce功能而非Flink原生Map / Reduce來完成了以上三點的並行計算和通信。
3.2 CalcGradient
線搜索只有兩步,確定方向、確定步長。確定方向和模擬Hessian矩陣都需要計算梯度。目標函數的梯度向量計算中只需要進行向量間的點乘和相加,可以很容易將每個迭代過程拆分成相互獨立的計算步驟,由不同的節點進行獨立計算,然后歸並計算結果。
Alink將樣本特征向量分布到不同的計算節點,由各計算節點完成自己所負責樣本的點乘與求和計算,然后將計算結果進行歸並,則實現了按行並行的LR。
實際情況中也會存在針對高維特征向量進行邏輯回歸的場景,僅僅按行進行並行處理,無法滿足這類場景的需求,因此還需要按列將高維的特征向量拆分成若干小的向量進行求解。這個也許是Alink以后需要優化的一個點吧 。
CalcGradient是Alink迭代算子的派生類,函數總結如下:
- 獲取經過處理的輸入數據labledVectors,
- 從靜態內存中獲取 "迭代參數coef","優化函數objFunc" 和 "梯度";
- 計算局部梯度 objFunc.calcGradient(labledVectors, coef, grad.f0); 這里調用到了目標函數的梯度相關API;
objFunc.calcGradient
根據采樣點計算梯度。Alink這里就是把x, y代入,求損失函數的梯度就是對 Coef求偏導數。具體我們在前文已經提到。- 對計算樣本中的每一個樣本,分別計算不同特征的計算梯度。
- 將新計算出來的梯度乘以權重之后,存入靜態內存 gradAllReduce 中。
- 后續會通過聚合函數,對所有計算樣本的特征的梯度進行累加,得到每一個特征的累積梯度以及損失。
public class CalcGradient extends ComputeFunction {
/**
* object function class, it supply the functions to calc local gradient (or loss).
*/
private OptimObjFunc objFunc;
@Override
public void calc(ComContext context) {
Iterable<Tuple3<Double, Double, Vector>> labledVectors = context.getObj(OptimVariable.trainData);
// 經過處理的輸入數據
labledVectors = {ArrayList@9877} size = 4
0 = {Tuple3@9895} "(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"
1 = {Tuple3@9896} "(1.0,6.7,1.0 1.0 -0.338240712601273 -0.7385489458759964)"
2 = {Tuple3@9897} "(1.0,6.9,1.0 1.0 -0.7892283294029703 -0.3692744729379982)"
3 = {Tuple3@9898} "(1.0,8.0,1.0 1.0 -0.338240712601273 -0.3692744729379982)"
// get iterative coefficient from static memory.
Tuple2<DenseVector, Double> state = context.getObj(OptimVariable.currentCoef);
int size = state.f0.size();
// 是Coef,1.7976931348623157E308是默認最大值
state = {Tuple2@9878} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)"
f0 = {DenseVector@9879} "0.001 0.0 0.0 0.0"
f1 = {Double@9889} 1.7976931348623157E308
DenseVector coef = state.f0;
if (objFunc == null) {
objFunc = ((List<OptimObjFunc>)context.getObj(OptimVariable.objFunc)).get(0);
}
// 變量如下
objFunc = {UnaryLossObjFunc@9882}
unaryLossFunc = {SquareLossFunc@9891}
l1 = 0.0
l2 = 0.0
params = {Params@9892} "Params {featureCols=["f0","f1","f2"], labelCol="label", predictionCol="linpred"}"
Tuple2<DenseVector, double[]> grad = context.getObj(OptimVariable.dir);
// 變量如下
grad = {Tuple2@9952} "(0.0 0.0 0.0 0.0,[0.0, 0.1])"
f0 = {DenseVector@9953} "0.0 0.0 0.0 0.0"
f1 = {double[2]@9969}
coef = {DenseVector@9951} "0.001 0.0 0.0 0.0"
data = {double[4]@9982}
// calculate local gradient,使用目標函數
Double weightSum = objFunc.calcGradient(labledVectors, coef, grad.f0);
// prepare buffer vec for allReduce. the last element of vec is the weight Sum.
double[] buffer = context.getObj(OptimVariable.gradAllReduce);
if (buffer == null) {
buffer = new double[size + 1];
context.putObj(OptimVariable.gradAllReduce, buffer);
}
for (int i = 0; i < size; ++i) {
buffer[i] = grad.f0.get(i) * weightSum;
}
/* the last element is the weight value */
buffer[size] = weightSum;
}
}
// 最后結果是
buffer = {double[5]@9910}
0 = -38.396
1 = -38.396
2 = -14.206109929253465
3 = -14.364776997288134
4 = 0.0
3.3 AllReduce
這里是前面提到的 "通過聚合函數,對所有計算樣本的特征的梯度進行累加,得到每一個特征的累積梯度以及損失"。
具體關於AllReduce如何運作,可以參見文章 [Alink漫談之三] AllReduce通信模型
.add(new AllReduce(OptimVariable.gradAllReduce))
3.4 CalDirection
此時得到的梯度,已經是聚合之后的,所以可以開始計算方向。
3.4.1 預先分配
OptimVariable.grad 是預先分配的。
public class PreallocateSkyk extends ComputeFunction {
private int numCorrections;
/**
* prepare hessian matrix of lbfgs method. we allocate memory fo sK, yK at first iteration step.
*
* @param context context of iteration.
*/
@Override
public void calc(ComContext context) {
if (context.getStepNo() == 1) {
Tuple2<DenseVector, double[]> grad = context.getObj(OptimVariable.grad);
int size = grad.f0.size();
DenseVector[] sK = new DenseVector[numCorrections];
DenseVector[] yK = new DenseVector[numCorrections];
for (int i = 0; i < numCorrections; ++i) {
sK[i] = new DenseVector(size);
yK[i] = new DenseVector(size);
}
context.putObj(OptimVariable.sKyK, Tuple2.of(sK, yK));
}
}
}
3.4.2 計算方向
在計算的過程中,需要不斷的計算和存儲歷史的Hessian矩陣,在L-BFGS算法,希望只保留最近的m次迭代信息,便能夠擬合Hessian矩陣。在L-BFGS算法中,不再保存完整的Hk,而是存儲向量序列{sk}和{yk},需要矩陣時Hk,使用向量序列{sk}和{yk}計算就可以得到,而向量序列{sk}和{yk}也不是所有都要保存,只要保存最新的m步向量即可。
具體原理和公式這里不再贅述,網上很多文章講解非常好。
重點說明,dir的各個數據用途是:
dir = {Tuple2@9931} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])"
f0 = {DenseVector@9954} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335" //梯度
f1 = {double[2]@9938}
0 = 4.0 //權重
1 = 0.1 //學習率 learning rate,0.1是初始化數值,后續UpdateModel時候會更新
代碼摘要如下:
@Override
public void calc(ComContext context) {
Tuple2 <DenseVector, double[]> grad = context.getObj(OptimVariable.grad);
Tuple2 <DenseVector, double[]> dir = context.getObj(OptimVariable.dir);
Tuple2 <DenseVector[], DenseVector[]> hessian = context.getObj(OptimVariable.sKyK);
int size = grad.f0.size();
// gradarr是上一階段CalcGradient的結果
double[] gradarr = context.getObj(OptimVariable.gradAllReduce);
// 變量為
gradarr = {double[5]@9962}
0 = -38.396
1 = -38.396
2 = -14.206109929253465
3 = -14.364776997288134
4 = 4.0
if (this.oldGradient == null) {
oldGradient = new DenseVector(size);
}
// hessian用來當作隊列,存儲sK,yK,只保留最近m個
DenseVector[] sK = hessian.f0;
DenseVector[] yK = hessian.f1;
for (int i = 0; i < size; ++i) {
//gradarr[size]是權重
grad.f0.set(i, gradarr[i] / gradarr[size]); //size = 4
}
// 賦值梯度,這里都除以權重
grad = {Tuple2@9930} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[0.0])"
f0 = {DenseVector@9937} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335"
data = {double[4]@9963}
0 = -9.599
1 = -9.599
2 = -3.5515274823133662
3 = -3.5911942493220335
f1 = {double[1]@9961}
0 = 0.0
dir.f1[0] = gradarr[size]; //權重
int k = context.getStepNo() - 1;
if (k == 0) { //首次迭代
dir.f0.setEqual(grad.f0); // 梯度賦予在這里
oldGradient.setEqual(grad.f0);
} else {
yK[(k - 1) % m].setEqual(grad.f0);
yK[(k - 1) % m].minusEqual(oldGradient);
oldGradient.setEqual(grad.f0);
}
// copy g_k and store in qL
dir.f0.setEqual(grad.f0); //拷貝梯度到這里
//
dir = {Tuple2@9931} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])"
f0 = {DenseVector@9954} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335" //梯度
f1 = {double[2]@9938}
0 = 4.0 //權重
1 = 0.1 //學習率 learning rate,0.1是初始化數值
// compute H^-1 * g_k
int delta = k > m ? k - m : 0;
int l = k <= m ? k : m; // m = 10
if (alpha == null) {
alpha = new double[m];
}
// two-loop的過程,通過擬牛頓法計算Hessian矩陣
for (int i = l - 1; i >= 0; i--) {
int j = (i + delta) % m;
double dot = sK[j].dot(yK[j]);
if (Math.abs(dot) > 0.0) {
double rhoJ = 1.0 / dot;
alpha[i] = rhoJ * (sK[j].dot(dir.f0)); // 計算alpha
dir.f0.plusScaleEqual(yK[j], -alpha[i]); // 重新修正d
}
}
for (int i = 0; i < l; i++) {
int j = (i + delta) % m;
double dot = sK[j].dot(yK[j]);
if (Math.abs(dot) > 0.0) {
double rhoJ = 1.0 / dot;
double betaI = rhoJ * (yK[j].dot(dir.f0)); // 乘以rho
dir.f0.plusScaleEqual(sK[j], (alpha[i] - betaI));// 重新修正d
}
}
}
//最后是存儲在 OptimVariable.dir
3.5 CalcLosses
根據更新的 dir 和 當前系數 計算損失函數誤差值,這個損失是為后續的線性搜索准備的。目的是如果損失函數誤差值達到允許的范圍,那么停止迭代,否則重復迭代。
CalcLosses基本邏輯如下:
- 1)得到本次步長 Double beta = dir.f1[1] / numSearchStep;
- 后續UpdateModel 中會對下一次計算的步長(learning rate)進行更新,比如 dir.f1[1] *= 1.0 / (numSearchStep * numSearchStep); 或者 dir.f1[1] = Math.min(dir.f1[1], numSearchStep);
- 2)調用目標函數的 calcSearchValues 來計算當前系數對應的損失;
- 3)calcSearchValues 遍歷輸入labelVectors,對於每個 labelVector 按照線性搜索的步驟進行計算損失。vec[i] += weight * this.unaryLossFunc.loss(etaCoef - i * etaDelta, labelVector.f1); 循環內部如下:
- 3.1)用x-vec和coef計算出來的 Y ,etaCoef = getEta(labelVector, coefVector);
- 3.2)以x-vec和dirVec計算出來的 deltaY,etaDelta = getEta(labelVector, dirVec) * beta;
- 3.3)按照線性搜索的步驟進行計算損失。vec[i] += weight * this.unaryLossFunc.loss(etaCoef - i * etaDelta, labelVector.f1); 聯系損失函數可知,etaCoef - i * etaDelta, labelVector.f1 是 訓練數據預測值 與 實際類別 的偏差;
- 4)為后續聚合 lossAllReduce 准備數據;
代碼如下:
public class CalcLosses extends ComputeFunction {
@Override
public void calc(ComContext context) {
Iterable<Tuple3<Double, Double, Vector>> labledVectors = context.getObj(OptimVariable.trainData);
Tuple2<DenseVector, double[]> dir = context.getObj(OptimVariable.dir);
Tuple2<DenseVector, Double> coef = context.getObj(OptimVariable.currentCoef);
if (objFunc == null) {
objFunc = ((List<OptimObjFunc>)context.getObj(OptimVariable.objFunc)).get(0);
}
/**
* calculate losses of current coefficient.
* if optimizer is owlqn, constraint search will used, else without constraint.
*/
Double beta = dir.f1[1] / numSearchStep;
double[] vec = method.equals(OptimMethod.OWLQN) ?
objFunc.constraintCalcSearchValues(labledVectors, coef.f0, dir.f0, beta, numSearchStep)
: objFunc.calcSearchValues(labledVectors, coef.f0, dir.f0, beta, numSearchStep);
// 變量為
dir = {Tuple2@9988} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])"
coef = {Tuple2@9989} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)"
beta = {Double@10014} 0.025
vec = {double[5]@10015}
0 = 0.0
1 = 0.0
2 = 0.0
3 = 0.0
4 = 0.0
// prepare buffer vec for allReduce.
double[] buffer = context.getObj(OptimVariable.lossAllReduce);
if (buffer == null) {
buffer = vec.clone();
context.putObj(OptimVariable.lossAllReduce, buffer);
} else {
System.arraycopy(vec, 0, buffer, 0, vec.length);
}
}
}
其中搜索是一元目標函數提供的,其又調用了損失函數。
public class UnaryLossObjFunc extends OptimObjFunc {
/**
* Calculate loss values for line search in optimization.
*
* @param labelVectors train data.
* @param coefVector coefficient of current time.
* @param dirVec descend direction of optimization problem.
* @param beta step length of line search.
* @param numStep num of line search step.
* @return double[] losses.
*/
@Override
public double[] calcSearchValues(Iterable<Tuple3<Double, Double, Vector>> labelVectors,
DenseVector coefVector,
DenseVector dirVec,
double beta,
int numStep) {
double[] vec = new double[numStep + 1];
// labelVector是三元組Tuple3<weight, label, feature vector>
labelVectors = {ArrayList@10007} size = 4
0 = {Tuple3@10027} "(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"
1 = {Tuple3@10034} "(1.0,6.7,1.0 1.0 -0.338240712601273 -0.7385489458759964)"
2 = {Tuple3@10035} "(1.0,6.9,1.0 1.0 -0.7892283294029703 -0.3692744729379982)"
3 = {Tuple3@10036} "(1.0,8.0,1.0 1.0 -0.338240712601273 -0.3692744729379982)"
coefVector = {DenseVector@10008} "0.001 0.0 0.0 0.0"
dirVec = {DenseVector@10009} "-9.599 -9.599 -3.5515274823133662 -3.5911942493220335"
beta = 0.025
numStep = 4
vec = {double[5]@10026}
0 = 0.0
1 = 0.0
2 = 0.0
3 = 0.0
4 = 0.0
for (Tuple3<Double, Double, Vector> labelVector : labelVectors) {
double weight = labelVector.f0;
//用x-vec和coef計算出來的 Y
double etaCoef = getEta(labelVector, coefVector);
//以x-vec和dirVec計算出來的 deltaY
double etaDelta = getEta(labelVector, dirVec) * beta;
weight = 1.0
etaCoef = 0.001
etaDelta = -0.7427013482280431
for (int i = 0; i < numStep + 1; ++i) {
//labelVector.f1就是label y
//聯系下面損失函數可知,etaCoef - i * etaDelta, labelVector.f1 是 訓練數據預測值 與 實際類別 的偏差
vec[i] += weight * this.unaryLossFunc.loss(etaCoef - i * etaDelta, labelVector.f1);
}
}
return vec;
}
private double getEta(Tuple3<Double, Double, Vector> labelVector, DenseVector coefVector) {
//labelVector.f2 = {DenseVector@9972} "1.0 1.0 1.4657097546055162 1.4770978917519928"
return MatVecOp.dot(labelVector.f2, coefVector);
}
}
vec = {double[5]@10026}
0 = 219.33160199999998
1 = 198.85962729259512
2 = 179.40202828917856
3 = 160.95880498975038
4 = 143.52995739431051
回顧損失函數如下
/**
* Squared loss function.
* https://en.wikipedia.org/wiki/Loss_functions_for_classification#Square_loss
*/
public class SquareLossFunc implements UnaryLossFunc {
public SquareLossFunc() { }
@Override
public double loss(double eta, double y) {
return 0.5 * (eta - y) * (eta - y);
}
@Override
public double derivative(double eta, double y) {
return eta - y;
}
@Override
public double secondDerivative(double eta, double y) {
return 1;
}
}
3.6 UpdateModel
本模塊做兩件事
- 基於dir和step length來更新coefficient,即依據方向和步長計算。
- 如果簡化理解,參數更新公式為 :下一刻參數 = 上一時刻參數 - 學習率 * (損失函數對這個參數的導數)。
- 判斷循環的收斂。
因為變量太多,所以有時候就忘記誰是誰了,所以再次標示。
- OptimVariable.dir是CalcGradient計算出來的梯度做修正之后的結果
- OptimVariable.lossAllReduce 這個會變化,此時是上一階段計算的損失
代碼邏輯大致如下:
- 1)得出"最新損失"的最小值位置pos
- 2)得出學習率 beta = dir.f1[1] / numSearchStep;
- 3)根據"最新損失pos"和 上一個最小值 last loss value判斷來進行分別處理:
- 3.1)如果所有損失都比 last loss value 大,則
- 3.1.1)縮減學習率 multiply 1.0 / (numSearchStep*numSearchStep)
- 3.1.2)把 eta 設置為 0;這個就是步長了
- 3.1.3)把當前 loss 設置為 last loss value
- 3.2)如果losses[numSearchStep] 比 last loss value 小,則
- 3.2.1)增大學習率 multiply numSearchStep
- 3.2.2)設置eta是smallest value pos,eta = beta * pos; 這個eta就是步長了
- 3.2.3)把當前 loss 設置為當前loss最小值 losses[numSearchStep]
- 3.3)否則
- 3.3.1)學習率不更改
- 3.3.2)設置eta是smallest value pos,eta = beta * pos; 這個eta就是步長了
- 3.3.3)把當前 loss 設置為當前loss最小值 losses[pos]
- 3.1)如果所有損失都比 last loss value 大,則
- 4)修正Hessian矩陣
- 5)用方向向量和步長來來更新系數向量 curCoef.f0.plusScaleEqual(dir.f0, -eta);
- 6)如果當前loss比 min loss 小,則 用 current loss 更新 min loss
- 6.1)minCoef.f1 = curCoef.f1; 更新最小loss
- 6.2)minCoef.f0.setEqual(curCoef.f0); 更新最小loss所對應的Coef,即線性模型最后需要的系數
在這里求步長,我沒有發現 Wolf-Powell 准則的使用,Alink做了某種優化。如果有朋友能看出來Wolf-Powell如何使用,還請留言指點 ,謝謝。
代碼如下:
public class UpdateModel extends ComputeFunction {
@Override
public void calc(ComContext context) {
double[] losses = context.getObj(OptimVariable.lossAllReduce);
Tuple2<DenseVector, double[]> dir = context.getObj(OptimVariable.dir);
Tuple2<DenseVector, double[]> pseGrad = context.getObj(OptimVariable.pseGrad);
Tuple2<DenseVector, Double> curCoef = context.getObj(OptimVariable.currentCoef);
Tuple2<DenseVector, Double> minCoef = context.getObj(OptimVariable.minCoef);
double lossChangeRatio = 1.0;
double[] lossCurve = context.getObj(OptimVariable.lossCurve);
for (int j = 0; j < losses.length; ++j) {
losses[j] /= dir.f1[0]; //dir.f1[0]是權重
}
int pos = -1;
//get the min value of losses, and remember the position.
for (int j = 0; j < losses.length; ++j) {
if (losses[j] < losses[0]) {
losses[0] = losses[j];
pos = j;
}
}
// adaptive learningRate strategy
double beta = dir.f1[1] / numSearchStep;
double eta;
// 變量如下
losses = {double[5]@10001}
0 = 35.88248934857763
1 = 49.71490682314878
2 = 44.85050707229464
3 = 40.239701247437594
4 = 35.88248934857763
dir = {Tuple2@10002} "(-9.599 -9.599 -3.5515274823133662 -3.5911942493220335,[4.0, 0.1])"
pseGrad = null
curCoef = {Tuple2@10003} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)"
minCoef = {Tuple2@10004} "(0.001 0.0 0.0 0.0,1.7976931348623157E308)"
lossChangeRatio = 1.0
lossCurve = {double[100]@10005}
pos = 4
beta = 0.025
curCoef.f1 = {Double@10006} 1.7976931348623157E308
if (pos == -1) {
/**
* if all losses larger than last loss value. we'll do the below things:
* 1. reduce learning rate by multiply 1.0 / (numSearchStep*numSearchStep).
* 2. set eta with zero.
* 3. set current loss equals last loss value.
*/
eta = 0;
dir.f1[1] *= 1.0 / (numSearchStep * numSearchStep); // 學習率
curCoef.f1 = losses[0]; // 最小loss
} else if (pos == numSearchStep) {
/**
* if losses[numSearchStep] smaller than last loss value. we'll do the below things:
* 1. enlarge learning rate by multiply numSearchStep.
* 2. set eta with the smallest value pos.
* 3. set current loss equals smallest loss value.
*/
eta = beta * pos;
dir.f1[1] *= numSearchStep;
dir.f1[1] = Math.min(dir.f1[1], numSearchStep); // 學習率
lossChangeRatio = Math.abs((curCoef.f1 - losses[pos]) / curCoef.f1);
curCoef.f1 = losses[numSearchStep]; // 最小loss
// 當前數值
numSearchStep = 4 是NUM_SEARCH_STEP缺省值,是線性搜索的參數,Line search parameter, which define the search value num of one step.
lossChangeRatio = 1.0
pos = 4
eta = 0.1
curCoef.f1 = {Double@10049} 35.88248934857763
dir.f1[1] = 0.4
} else {
/**
* else :
* 1. learning rate not changed.
* 2. set eta with the smallest value pos.
* 3. set current loss equals smallest loss value.
*/
eta = beta * pos;
lossChangeRatio = Math.abs((curCoef.f1 - losses[pos]) / curCoef.f1);
curCoef.f1 = losses[pos]; // 最小loss
}
/* update loss value in loss curve at this step */
lossCurve[context.getStepNo() - 1] = curCoef.f1;
lossCurve = {double[100]@9998}
0 = 35.88248934857763
1 = Infinity
if (method.equals(OptimMethod.OWLQN)) {
.....
} else if (method.equals(OptimMethod.LBFGS)) {
Tuple2<DenseVector[], DenseVector[]> sKyK = context.getObj(OptimVariable.sKyK);
int size = dir.f0.size();
int k = context.getStepNo() - 1;
DenseVector[] sK = sKyK.f0;
for (int s = 0; s < size; ++s) {
// 修正矩陣
sK[k % OptimVariable.numCorrections].set(s, dir.f0.get(s) * (-eta));
}
curCoef.f0.plusScaleEqual(dir.f0, -eta); // 這里是用方向向量和步長來更新系數向量
sKyK = {Tuple2@10043} "([0.9599000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0],[0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0, 0.0 0.0 0.0 0.0])"
f0 = {DenseVector[10]@10044}
0 = {DenseVector@10074} "0.9599000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034"
}
/**
* if current loss is smaller than min loss, then update the min loss and min coefficient by current.
*/
if (curCoef.f1 < minCoef.f1) {
minCoef.f1 = curCoef.f1; // 最小loss
minCoef.f0.setEqual(curCoef.f0); // 最小loss所對應的Coef,即線性模型最后需要的系數
}
curCoef = {Tuple2@9996} "(0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034,35.88248934857763)"
f0 = {DenseVector@10059} "0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034"
f1 = {Double@10048} 35.88248934857763
minCoef = {Tuple2@9997} "(0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034,35.88248934857763)"
f0 = {DenseVector@10059} "0.9609000000000001 0.9599000000000001 0.35515274823133663 0.3591194249322034"
f1 = {Double@10048} 35.88248934857763
// judge the convergence of iteration.
filter(dir, curCoef, minCoef, context, lossChangeRatio);
}
}
filter 判斷是否收斂,里面的打印log很清晰的說明了函數邏輯。
/**
* judge the convergence of iteration.
*/
public void filter(Tuple2<DenseVector, double[]> grad,
Tuple2<DenseVector, Double> c,
Tuple2<DenseVector, Double> m,
ComContext context,
double lossChangeRatio) {
double epsilon = params.get(HasEpsilonDv0000001.EPSILON);
int maxIter = params.get(HasMaxIterDefaultAs100.MAX_ITER);
double gradNorm = ((Tuple2<DenseVector, double[]>)context.getObj(gradName)).f0.normL2();
if (c.f1 < epsilon || gradNorm < epsilon) {
printLog(" method converged at step : ", c.f1, m.f1, grad.f1[1], gradNorm, context, lossChangeRatio);
grad.f1[0] = -1.0;
} else if (context.getStepNo() > maxIter - 1) {
printLog(" method stop at max step : ", c.f1, m.f1, grad.f1[1], gradNorm, context, lossChangeRatio);
grad.f1[0] = -1.0;
} else if (grad.f1[1] < EPS) {
printLog(" learning rate is too small, method stops at step : ", c.f1, m.f1, grad.f1[1], gradNorm,
context, lossChangeRatio);
grad.f1[0] = -1.0;
} else if (lossChangeRatio < epsilon && gradNorm < Math.sqrt(epsilon)) {
printLog(" loss change ratio is too small, method stops at step : ", c.f1, m.f1, grad.f1[1], gradNorm,
context, lossChangeRatio);
grad.f1[0] = -1.0;
} else {
printLog(" method continue at step : ", c.f1, m.f1, grad.f1[1], gradNorm, context, lossChangeRatio);
}
}
3.7 OutputModel
.closeWith(new OutputModel())
這部分是每次迭代結束,臨時輸出模型,把數據轉換成Flink通用的Row類型,Transfer the state to model rows。
public class OutputModel extends CompleteResultFunction {
@Override
public List <Row> calc(ComContext context) {
// get the coefficient of min loss.
Tuple2 <DenseVector, double[]> minCoef = context.getObj(OptimVariable.minCoef);
double[] lossCurve = context.getObj(OptimVariable.lossCurve);
int effectiveSize = lossCurve.length;
for (int i = 0; i < lossCurve.length; ++i) {
if (Double.isInfinite(lossCurve[i])) {
effectiveSize = i;
break;
}
}
double[] effectiveCurve = new double[effectiveSize];
System.arraycopy(lossCurve, 0, effectiveCurve, 0, effectiveSize);
Params params = new Params();
params.set(ModelParamName.COEF, minCoef.f0);// 重點在這里,minCoef是我們真正需要的
params.set(ModelParamName.LOSS_CURVE, effectiveCurve);
List <Row> model = new ArrayList <>(1);
model.add(Row.of(params.toJson()));
return model;
}
}
0x04 准備模型元數據
這里設置了並行度為1。
// Prepare the meta info of linear model.
DataSet<Params> meta = labelInfo.f0
.mapPartition(new CreateMeta(modelName, linearModelType, isRegProc, params))
.setParallelism(1);
具體代碼
public static class CreateMeta implements MapPartitionFunction<Object, Params> {
@Override
public void mapPartition(Iterable<Object> rows, Collector<Params> metas) throws Exception {
Object[] labels = null;
if (!this.isRegProc) {
labels = orderLabels(rows);
}
Params meta = new Params();
meta.set(ModelParamName.MODEL_NAME, this.modelName);
meta.set(ModelParamName.LINEAR_MODEL_TYPE, this.modelType);
meta.set(ModelParamName.LABEL_VALUES, labels);
meta.set(ModelParamName.HAS_INTERCEPT_ITEM, this.hasInterceptItem);
meta.set(ModelParamName.VECTOR_COL_NAME, vectorColName);
meta.set(LinearTrainParams.LABEL_COL, labelName);
metas.collect(meta);
}
}
// 變量為
meta = {Params@9667} "Params {hasInterceptItem=true, vectorColName=null, modelName="Linear Regression", labelValues=null, labelCol="label", linearModelType="LinearReg"}"
0x05 建立模型
當迭代循環結束之后,Alink就根據Coef數據來建立模型。
/**
* build the linear model rows, the format to be output.
*/
public static class BuildModelFromCoefs extends AbstractRichFunction implements
@Override
public void mapPartition(Iterable<Tuple2<DenseVector, double[]>> iterable,
Collector<Row> collector) throws Exception {
for (Tuple2<DenseVector, double[]> coefVector : iterable) {
LinearModelData modelData = buildLinearModelData(meta,
featureNames,
labelType,
meanVar,
hasIntercept,
standardization,
coefVector);
new LinearModelDataConverter(this.labelType).save(modelData, collector);
}
}
}
得到模型數據為,里面coef就是 f(x)=w^Tx+b 中的 w, b。是最終用來計算的。
modelData = {LinearModelData@10584}
featureNames = {String[3]@9787}
featureTypes = null
vectorColName = null
coefVector = {DenseVector@10485} "-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847"
coefVectors = null
vectorSize = 3
modelName = "Linear Regression"
labelName = null
labelValues = null
linearModelType = {LinearModelType@4674} "LinearReg"
hasInterceptItem = true
lossCurve = {double[12]@10593}
0 = 35.88248934857763
1 = 12.807366842002144
2 = 0.5228366663917704
3 = 0.031112070740366038
4 = 0.01098914933042993
5 = 0.009765757443537283
6 = 0.008750523231785415
7 = 0.004210085397869248
8 = 0.0039042232755530704
9 = 0.0038821509860327537
10 = 0.003882042680010676
11 = 0.0038820422536391033
labelType = {FractionalTypeInfo@9790} "Double"
0x06 使用模型預測
預測時候,使用的是LinearModelMapper,其內部部分變量打印出來如下,能夠看出來模型數據。
this = {LinearModelMapper@10704}
vectorColIndex = -1
model = {LinearModelData@10597}
featureNames = {String[3]@10632}
featureTypes = null
vectorColName = null
coefVector = {DenseVector@10612} "-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847"
coefVectors = null
vectorSize = 0
modelName = "Linear Regression"
labelName = null
labelValues = {Object[0]@10613}
linearModelType = {LinearModelType@10611} "LinearReg"
hasInterceptItem = true
lossCurve = null
labelType = null
具體預測是在 LinearModelMapper.predict
中完成,具體如下:
- 對應原始輸入
Row.of("$3$0:1.0 1:7.0 2:9.0", "1.0 7.0 9.0", 1.0, 7.0, 9.0, 16.8),
, - 通過
FeatureLabelUtil.getFeatureVector
處理之后, - 得到的四元組是
"1.0 1.0 7.0 9.0"
,其中第一個 1.0 是通過aVector.set(0, 1.0);
專門設定的固定值。比如模型是 f(x) = ax + b,這個固定值 1.0 就是 b 的初始化值,隨着優化過程會得到 b。所以這里還是需要有一個 1.0 來進行預測。 - 模型系數是:
"-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847"
。 - 四元組 和 模型系數 點積的結果就是
dotValue = 16.814789059973744
。
這樣就能看出來模型系數如何使用的了。
public Object predict(Vector vector) throws Exception {
double dotValue = MatVecOp.dot(vector, model.coefVector);
switch (model.linearModelType) {
case LR:
case SVM:
case Perceptron:
return dotValue >= 0 ? model.labelValues[0] : model.labelValues[1];
case LinearReg:
case SVR:
return dotValue;
}
}
vector = {DenseVector@10610} "1.0 1.0 7.0 9.0"
data = {double[4]@10619}
0 = 1.0
1 = 1.0
2 = 7.0
3 = 9.0
model.coefVector = {DenseVector@10612} "-3.938937407856857 4.799499941426075 0.8929571907809862 1.078169576770847"
data = {double[4]@10620}
0 = -3.938937407856857
1 = 4.799499941426075
2 = 0.8929571907809862
3 = 1.078169576770847
0x07 本系列其他文章
Alink漫談(一) : 從KMeans算法實現不同看Alink設計思想
Alink漫談(二) : 從源碼看機器學習平台Alink設計和架構
Alink漫談(八) : 二分類評估 AUC、K-S、PRC、Precision、Recall、LiftChart 如何實現
0xFF 參考
http://www.mamicode.com/info-detail-2508527.html)
LogisticRegressionWithLBFGS--邏輯回歸
[原創]用“人話”解釋不精確線搜索中的Armijo-Goldstein准則及Wolfe-Powell准則
https://www.zhihu.com/question/36425542
https://zhuanlan.zhihu.com/p/32821110