- 算法特征:
回歸曲線上的每一點均對應一個獨立的線性方程, 該線性方程由一組經過加權后的殘差決定. 殘差來源於待擬合數據點與擬合超平面在相空間的距離, 權重依賴於待擬合數據點與擬合數據點在參數空間的距離. - 算法推導:
待擬合方程:
\begin{equation}\label{eq_1}
h_{\theta}(x) = x^T\theta
\end{equation}
最小二乘法:
\begin{equation}\label{eq_2}
min\ \frac{1}{2}(X^T\theta-\bar{Y})^TW(X^T\theta-\bar{Y})
\end{equation}
其中, $X=[x^1, x^2, ..., x^n]$是由待擬合數據之輸入值所組成的矩陣, 每根分量均為一個列矢量, 將該列矢量的最后一個元素置1以滿足線性擬合之常數項需求. $\bar{Y}$由待擬合數據之標准輸出值組成, 是一個列矢量. $W$由待擬合數據之權重組成, 是一個對角矩陣, 此處取第$i$個對角元為$exp(-(x_i - x)^2 / (2\tau^2))$. $\theta$為待求解的優化變量, 是一個列矢量.
上訴優化問題(\ref{eq_2})為無約束凸優化問題, 存在如下近似解析解:
\begin{equation}\label{eq_3}
\theta = (XWX^T + \varepsilon I)^{-1}XW\bar{Y}
\end{equation} - 代碼實現:
1 # 局部加權線性回歸 2 # 交叉驗證計算泛化誤差最小點 3 4 5 import numpy 6 from matplotlib import pyplot as plt 7 8 9 # 待擬合不含噪聲之目標函數 10 def oriFunc(x): 11 y = numpy.exp(-x) * numpy.sin(10*x) 12 return y 13 # 待擬合包含噪聲之目標函數 14 def traFunc(x, sigma=0.03): 15 y = oriFunc(x) + numpy.random.normal(0, sigma, numpy.array(x).size) 16 return y 17 18 19 # 局部加權線性回歸之實現 20 class LW(object): 21 22 def __init__(self, xBound=(0, 3), number=500, tauBound=(0.001, 100), epsilon=1.e-3): 23 self.__xBound = xBound # 采樣邊界 24 self.__number = number # 采樣數目 25 self.__tauBound = tauBound # tau之搜索邊界 26 self.__epsilon = epsilon # tau之搜索精度 27 28 29 def get_data(self): 30 ''' 31 根據目標函數生成待擬合數據 32 ''' 33 X = numpy.linspace(*self.__xBound, self.__number) 34 oriY_ = oriFunc(X) # 不含誤差之響應 35 traY_ = traFunc(X) # 包含誤差之響應 36 37 self.X = numpy.vstack((X.reshape((1, -1)), numpy.ones((1, X.shape[0])))) 38 self.oriY_ = oriY_.reshape((-1, 1)) 39 self.traY_ = traY_.reshape((-1, 1)) 40 41 return self.X, self.oriY_, self.traY_ 42 43 44 def lw_fitting(self, tau=None): 45 if not hasattr(self, "X"): 46 self.get_data() 47 if tau is None: 48 if hasattr(self, "bestTau"): 49 tau = self.bestTau 50 else: 51 tau = self.get_tau() 52 53 xList, yList = list(), list() 54 for val in numpy.linspace(*self.__xBound, self.__number * 5): 55 x = numpy.array([[val], [1]]) 56 theta = self.__fitting(x, self.X, self.traY_, tau) 57 y = numpy.matmul(theta.T, x) 58 xList.append(x[0, 0]) 59 yList.append(y[0, 0]) 60 61 resiList = list() # 殘差計算 62 for idx in range(self.__number): 63 x = self.X[:, idx:idx+1] 64 theta = self.__fitting(x, self.X, self.traY_, tau) 65 y = numpy.matmul(theta.T, x) 66 resiList.append(self.traY_[idx, 0] - y[0, 0]) 67 68 return xList, yList, self.X[0, :].tolist(), resiList 69 70 71 def show(self): 72 ''' 73 繪圖展示整體擬合情況 74 ''' 75 xList, yList, sampleXList, sampleResiList = self.lw_fitting() 76 y2List = oriFunc(numpy.array(xList)) 77 fig = plt.figure(figsize=(8, 14)) 78 ax1 = plt.subplot(2, 1, 1) 79 ax2 = plt.subplot(2, 1, 2) 80 81 ax1.scatter(self.X[0, :], self.traY_[:, 0], c="green", alpha=0.7, label="samples with noise") 82 ax1.plot(xList, y2List, c="red", lw=4, alpha=0.7, label="standard curve") 83 ax1.plot(xList, yList, c="black", lw=2, linestyle="--", label="fitting curve") 84 ax1.set(xlabel="$x$", ylabel="$y$") 85 ax1.legend() 86 87 ax2.scatter(sampleXList, sampleResiList, c="blue", s=10) 88 ax2.set(xlabel="$x$", ylabel="$\epsilon$", title="residual distribution") 89 90 plt.show() 91 plt.close() 92 fig.tight_layout() 93 fig.savefig("lw.png", dpi=300) 94 95 96 def __fitting(self, x, X, Y_, tau, epsilon=1.e-9): 97 tmpX = X[0:1, :] 98 tmpW = (-(tmpX - x[0, 0]) ** 2 / tau ** 2 / 2).reshape(-1) 99 W = numpy.diag(numpy.exp(tmpW)) 100 101 item1 = numpy.matmul(numpy.matmul(X, W), X.T) 102 item2 = numpy.linalg.inv(item1 + epsilon * numpy.identity(item1.shape[0])) 103 item3 = numpy.matmul(numpy.matmul(X, W), Y_) 104 105 theta = numpy.matmul(item2, item3) 106 107 return theta 108 109 110 def get_tau(self): 111 ''' 112 交叉驗證返回最優tau 113 采用黃金分割法計算最優tau 114 ''' 115 if not hasattr(self, "X"): 116 self.get_data() 117 118 lowerBound, upperBound = self.__tauBound 119 lowerTau = self.__calc_lowerTau(lowerBound, upperBound) 120 upperTau = self.__calc_upperTau(lowerBound, upperBound) 121 lowerErr = self.__calc_generalErr(self.X, self.traY_, lowerTau) 122 upperErr = self.__calc_generalErr(self.X, self.traY_, upperTau) 123 124 while (upperTau - lowerTau) > self.__epsilon: 125 if lowerErr > upperErr: 126 lowerBound = lowerTau 127 lowerTau = upperTau 128 lowerErr = upperErr 129 upperTau = self.__calc_upperTau(lowerBound, upperBound) 130 upperErr = self.__calc_generalErr(self.X, self.traY_, upperTau) 131 else: 132 upperBound = upperTau 133 upperTau = lowerTau 134 upperErr = lowerErr 135 lowerTau = self.__calc_lowerTau(lowerBound, upperBound) 136 lowerErr = self.__calc_generalErr(self.X, self.traY_, lowerTau) 137 138 self.bestTau = (upperTau + lowerTau) / 2 139 return self.bestTau 140 141 142 def __calc_generalErr(self, X, Y_, tau): 143 generalErr = 0 144 145 for idx in range(X.shape[1]): 146 tmpx = X[:, idx:idx+1] 147 tmpy_ = Y_[idx:idx+1, :] 148 tmpX = numpy.hstack((X[:, 0:idx], X[:, idx+1:])) 149 tmpY_ = numpy.vstack((Y_[0:idx, :], Y_[idx+1:, :])) 150 151 theta = self.__fitting(tmpx, tmpX, tmpY_, tau) 152 tmpy = numpy.matmul(theta.T, tmpx) 153 generalErr += (tmpy_[0, 0] - tmpy[0, 0]) ** 2 154 155 return generalErr 156 157 158 def __calc_lowerTau(self, lowerBound, upperBound): 159 delta = upperBound - lowerBound 160 lowerTau = upperBound - delta * 0.618 161 return lowerTau 162 163 164 def __calc_upperTau(self, lowerBound, upperBound): 165 delta = upperBound - lowerBound 166 upperTau = lowerBound + delta * 0.618 167 return upperTau 168 169 170 171 172 if __name__ == '__main__': 173 obj = LW() 174 obj.show()
筆者所用示例函數為:
\begin{equation}\label{eq_4}
f(x) = e^{-x}sin(10x)
\end{equation} - 結果展示:
- 使用建議:
1. 局部加權線性回歸主要作為一種光滑化的手段存在, 其對非樣本覆蓋區間的預測能力較低.
2. 在不明確具體擬合公式的前提下, 可采用局部加權線性回歸獲取一條可以接受的擬合曲線.
3. 在選定合適的加權方式后, 可以計算指定點處相對可靠的0階及1階函數信息.