1. 先修知識
設多元線性回歸方程的模型為
可令\(X_0=1\),則模型可寫做:
表示成矩陣形式為:
其中,
則線性回歸方程的最小二乘系數估計值為:
2. 編程思路
利用MapReduce計算系數是,由於輸入數據是一行一行進行讀取的,因此在計算的時候,不可能直接利用矩陣乘法進行計算。這里,我們假設輸入的數據格式為:
即
首先,考慮最簡單的,計算\(X'Y\),即計算
觀察右側矩陣可以發現,\(y_1\)總是與第一列相乘,\(y_2\)總是與第二列相乘,……,以此類推。而第一列實際上就是我們讀取數據的第一行,第二列是讀取數據的第二行……。根據這種規律,我們每讀取一行,就讓當前的\(y\)與所有的自變量相乘,然后把所有的結果累加求和,即為我們想要的結果。
上面已經解決了矩陣自變量矩陣的轉置與因變量向量的乘積,即:
那么,矩陣轉置與矩陣的乘積\(X'X\)仍然可以仿照上述的方法進行。
可以把上述既然課程看作是進行了\(p+1\)次的矩陣轉置與向量乘積過程。當讀取第一行時,我們可以得到一個矩陣:
同理,當讀取第二行的時候,同樣可以得到一個矩陣:
此時,將\(step1\ step2\)對應元素相加,即得到我們的更新矩陣。迭代下去,最終讀取完所在的數據,即為\(X'X\)的結果。
當數據量很大的時候,用MapReduce方法進行計算,hadoop會將數據按照block的大小切分成若干塊,每一塊都執行mapper函數,在reducer里面把所有的mapper結果加起來,最后計算\((X'X)^{-1}X'Y\).
3. 編程實現
由於矩陣的輸出不容易處理,因此在得到矩陣的時候,可以將其拉長為向量,這樣就可以使用標准化輸出函數print將計算結果輸出,以便於reducer使用。
mapper函數如下:
#! /usr/bin/anaconda2/bin/python
# -*- coding:UTF-8 -*-
import sys
import numpy as np
def read_input(file):
for line in file:
yield line.strip()
def matmulti():
input = read_input(sys.stdin)
innerLength = 0
length = 0
for line in input:
fields = line.split(",")
if innerLength == 0:
innerLength = len(fields) - 1
data1 = np.array([0.0 for _ in range(innerLength)])
temp = np.array(fields,float)[:innerLength]*float(fields[innerLength])
data1 = data1 + temp
if length == 0:
length = len(fields) - 1
data2 = np.diag(np.zeros(length))
for index in range(length):
data2[index] += np.array(fields[:length],dtype = float)*float(fields[index])
return data1,data2
data1,data2 = matmulti()
data1 = list(data1)
length = len(data2)
data2 = data2.reshape(1,length**2)
data2 = list(data2[0])
data = data1 + data2
print("\t".join(str(i) for i in data))
reducer函數如下:
#! /usr/bin/anaconda2/bin/python
# -*- coding:UTF-8 -*-
import sys
import numpy as np
import math
def read_input(file):
for line in file:
yield line.strip()
input = read_input(sys.stdin)
length = 0
for line in input:
fields = line.split("\t")
if length == 0:
length = len(fields)
data = np.array([0.0 for _ in range(length)])
fields = np.array(fields,dtype = float)
data += fields
lenght = len(data)
varnums = int((-1+math.sqrt(1+4.0*length))/2.0)
part1 = np.mat(data[:varnums])
part1 = part1.T
part2 = data[varnums:]
part2 = part2.reshape(varnums,varnums)
part2 = np.mat(part2)
part2 = part2.I
result = part2*part1
print result
用R隨機生成一份樣本量為100萬的數據,R回歸結果為:
Coefficients:
Estimate Std. Error t value Pr(>|t|)
(Intercept) 19.9995945 0.0602316 332.045 <2e-16 ***
V2 -0.0012168 0.0009998 -1.217 0.2236
V3 -0.0018043 0.0009990 -1.806 0.0709 .
V4 0.0017635 0.0009990 1.765 0.0775 .
V5 -0.0021161 0.0009987 -2.119 0.0341 *
V6 -0.0002982 0.0009997 -0.298 0.7655
V7 0.0008608 0.0009998 0.861 0.3892
V8 0.0007678 0.0009995 0.768 0.4423
V9 0.0009089 0.0009991 0.910 0.3630
V10 0.0009761 0.0009991 0.977 0.3286
MapReduce的輸出結果為:
[[ 1.99995945e+01]
[ -1.21684122e-03]
[ -1.80428969e-03]
[ 1.76352620e-03]
[ -2.11610460e-03]
[ -2.98208998e-04]
[ 8.60846240e-04]
[ 7.67848910e-04]
[ 9.08866936e-04]
[ 9.76052909e-04]]
二者的計算結果是一樣的。