MapReduce計算線性回歸的系數估計值


1. 先修知識

設多元線性回歸方程的模型為

\[Y=\beta_0+\beta_1X_1+\beta_2X_2+\cdots+\beta_pX_p \]

可令\(X_0=1\),則模型可寫做:

\[Y=\beta_0X_0+\beta_1X_1+\beta_2X_2+\cdots+\beta_pX_p \]

表示成矩陣形式為:

\[Y=\beta X \]

其中,

\[\beta = \left[\begin{matrix} \beta_0 \\ \beta_1 \\ \beta_2 \\ \vdots \\ \beta_p \end{matrix} \right] X= \left[\begin{matrix} &x_{10} &x_{11} &\cdots &x_{1p} \\ &x_{20} &x_{21} &\cdots &x_{2p} \\ &\vdots &\vdots &\ddots &\vdots \\ &x_{n0} &x_{n1} &\cdots &x_{np} \end{matrix}\right] \]

則線性回歸方程的最小二乘系數估計值為:

\[\hat{\beta} = (X'X)^{-1}X'Y \]

2. 編程思路

利用MapReduce計算系數是,由於輸入數據是一行一行進行讀取的,因此在計算的時候,不可能直接利用矩陣乘法進行計算。這里,我們假設輸入的數據格式為:

\[x_0\ x_1\ x_2\ \cdots x_p\ y \]

\[1\ x_1\ x_2\ \cdots x_p\ y \]

首先,考慮最簡單的,計算\(X'Y\),即計算

\[\left[\begin{matrix} &x_{10} &x_{20} &x_{30} &\cdots &x_{n0} \\ &x_{11} &x_{21} &x_{31} &\cdots &x_{n1} \\ &x_{12} &x_{22} &x_{32} &\cdots &x_{n2} \\ &\vdots &\vdots &\vdots &\ddots &\vdots \\ &x_{1p} &x_{2p} &x_{3p} &\cdots &x_{np} \end{matrix}\right] \times \left[\begin{matrix} y_1 \\ y_2 \\ y_3 \\ \vdots \\ y_n \end{matrix}\right]=\left[\begin{matrix} &x_{10}y_1+&x_{20}y_2+&x_{30}y_3+&\cdots+&x_{n0}y_n \\ &x_{11}y_1+&x_{21}y_2+&x_{31}y_3+&\cdots+&x_{n1}y_n \\ &x_{12}y_1+&x_{22}y_2+&x_{32}y_3+&\cdots+&x_{n2}y_n \\ &\vdots &\vdots &\vdots &\ddots &\vdots \\ &x_{1p}y_1+&x_{2p}y_2+&x_{3p}y_3+&\cdots+&x_{np}y_n \end{matrix}\right] \]

觀察右側矩陣可以發現,\(y_1\)總是與第一列相乘,\(y_2\)總是與第二列相乘,……,以此類推。而第一列實際上就是我們讀取數據的第一行,第二列是讀取數據的第二行……。根據這種規律,我們每讀取一行,就讓當前的\(y\)與所有的自變量相乘,然后把所有的結果累加求和,即為我們想要的結果。

上面已經解決了矩陣自變量矩陣的轉置與因變量向量的乘積,即:

\[part1 = X'Y \]

那么,矩陣轉置與矩陣的乘積\(X'X\)仍然可以仿照上述的方法進行。

\[X'X = \left[\begin{matrix} &x_{10} &x_{20} &x_{30} &\cdots &x_{n0} \\ &x_{11} &x_{21} &x_{31} &\cdots &x_{n1} \\ &x_{12} &x_{22} &x_{32} &\cdots &x_{n2} \\ &\vdots &\vdots &\vdots &\ddots &\vdots \\ &x_{1p} &x_{2p} &x_{3p} &\cdots &x_{np} \end{matrix}\right] \times\left[\begin{matrix} &x_{10} &x_{11} &\cdots &x_{1p} \\ &x_{20} &x_{21} &\cdots &x_{2p} \\ &x_{30} &x_{31} &\cdots &x_{2p} \\ &\vdots &\vdots &\ddots &\vdots \\ &x_{n0} &x_{n1} &\cdots &x_{np} \end{matrix} \right] \]


\[=\left[\begin{matrix} &x_{10} &x_{20} &x_{30} &\cdots &x_{n0} \\ &x_{11} &x_{21} &x_{31} &\cdots &x_{n1} \\ &x_{12} &x_{22} &x_{32} &\cdots &x_{n2} \\ &\vdots &\vdots &\vdots &\ddots &\vdots \\ &x_{1p} &x_{2p} &x_{3p} &\cdots &x_{np} \end{matrix}\right]\times\left[\begin{matrix} x_0, x_1, x_2, \cdots, x_p \end{matrix}\right] \]

可以把上述既然課程看作是進行了\(p+1\)次的矩陣轉置與向量乘積過程。當讀取第一行時,我們可以得到一個矩陣:

\[step1 = \left[\begin{matrix} &x_{10}x_{10} &x_{11}x_{10} &\cdots &x_{1p}x_{10} \\ &x_{10}x_{11} &x_{11}x_{11} &\cdots &x_{1p}x_{11} \\ &x_{10}x_{12} &x_{11}x_{12} &\cdots &x_{1p}x_{12} \\ &\vdots &\vdots &\ddots &\vdots \\ &x_{10}x_{1p} &x_{11}x_{1p} &\cdots &x_{1p}x_{1p} \end{matrix}\right]\]

同理,當讀取第二行的時候,同樣可以得到一個矩陣:

\[step2 = \left[ \begin{matrix} &x_{20}x_{20} &x_{21}x_{20} &\cdots &x_{2p}x_{20} \\ &x_{20}x_{21} &x_{21}x_{21} &\cdots &x_{2p}x_{21} \\ &x_{20}x_{22} &x_{21}x_{12} &\cdots &x_{2p}x_{12} \\ &\vdots &\vdots &\ddots &\vdots \\ &x_{20}x_{2p} &x_{21}x_{2p} &\cdots &x_{2p}x_{2p} \end{matrix}\right]\]

此時,將\(step1\ step2\)對應元素相加,即得到我們的更新矩陣。迭代下去,最終讀取完所在的數據,即為\(X'X\)的結果。

當數據量很大的時候,用MapReduce方法進行計算,hadoop會將數據按照block的大小切分成若干塊,每一塊都執行mapper函數,在reducer里面把所有的mapper結果加起來,最后計算\((X'X)^{-1}X'Y\).

3. 編程實現

由於矩陣的輸出不容易處理,因此在得到矩陣的時候,可以將其拉長為向量,這樣就可以使用標准化輸出函數print將計算結果輸出,以便於reducer使用。

mapper函數如下:

#! /usr/bin/anaconda2/bin/python
# -*- coding:UTF-8 -*-

import sys
import numpy as np

def read_input(file):
    for line in file:
        yield line.strip()

def matmulti():
    
    input = read_input(sys.stdin)

    innerLength = 0
    length = 0
    for line in input:
        fields = line.split(",")
        if innerLength == 0:
            innerLength = len(fields) - 1
            data1 = np.array([0.0 for _ in range(innerLength)])
        temp = np.array(fields,float)[:innerLength]*float(fields[innerLength])
        data1 = data1 + temp
        
        if length == 0:
            length = len(fields) - 1
            data2 = np.diag(np.zeros(length))
        for index in range(length):
            data2[index] += np.array(fields[:length],dtype = float)*float(fields[index])
            
    return data1,data2

data1,data2 = matmulti()
data1 = list(data1)
length = len(data2)
data2 = data2.reshape(1,length**2)
data2 = list(data2[0])
data = data1 + data2
print("\t".join(str(i) for i in data))

reducer函數如下:

#! /usr/bin/anaconda2/bin/python
# -*- coding:UTF-8 -*-

import sys
import numpy as np
import math

def read_input(file):
    for line in file:
        yield line.strip()

input = read_input(sys.stdin)

length = 0

for line in input:
    fields = line.split("\t")
    if length == 0:
        length = len(fields)
        data = np.array([0.0 for _ in range(length)])
    fields = np.array(fields,dtype = float)
    data += fields

lenght = len(data)
varnums = int((-1+math.sqrt(1+4.0*length))/2.0)

part1 = np.mat(data[:varnums])
part1 = part1.T

part2 = data[varnums:]
part2 = part2.reshape(varnums,varnums)
part2 = np.mat(part2)
part2 = part2.I

result = part2*part1

print result

用R隨機生成一份樣本量為100萬的數據,R回歸結果為:

Coefficients:
              Estimate Std. Error t value Pr(>|t|)    
(Intercept) 19.9995945  0.0602316 332.045   <2e-16 ***
V2          -0.0012168  0.0009998  -1.217   0.2236    
V3          -0.0018043  0.0009990  -1.806   0.0709 .  
V4           0.0017635  0.0009990   1.765   0.0775 .  
V5          -0.0021161  0.0009987  -2.119   0.0341 *  
V6          -0.0002982  0.0009997  -0.298   0.7655    
V7           0.0008608  0.0009998   0.861   0.3892    
V8           0.0007678  0.0009995   0.768   0.4423    
V9           0.0009089  0.0009991   0.910   0.3630    
V10          0.0009761  0.0009991   0.977   0.3286 

MapReduce的輸出結果為:

[[  1.99995945e+01]	
 [ -1.21684122e-03]	
 [ -1.80428969e-03]	
 [  1.76352620e-03]	
 [ -2.11610460e-03]	
 [ -2.98208998e-04]	
 [  8.60846240e-04]	
 [  7.67848910e-04]	
 [  9.08866936e-04]	
 [  9.76052909e-04]]

二者的計算結果是一樣的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM