對於任意矩陣M和N,若矩陣M的列數等於矩陣N的行數,則記M和N的乘積為P=M*N,其中mik 記做矩陣M的第i行和第k列,nkj記做矩陣N的第k行和第j列,則矩陣P中,第i行第j列的元素可表示為公式(1-1):
pij=(M*N)ij=∑miknkj=mi1*n1j+mi2*n2j+……+mik*nkj (公式1-1)
由公式(1-1)可以看出,最后決定pij是(i,j),所以可以將其作為Reducer的輸入key值。為了求出pij分別需要知道mik和nkj,對於mik,其所需要的屬性有矩陣M,所在行數i、所在列數k,和其本身的數值大小mik;同樣對於nkj,其所需要的屬性有矩陣N,所在行數k、所在列數j,和其本身數值大小nkj,這些屬性值可由Mapper處理得到
Map函數:對於矩陣M中的每個元素mik ,產生一系列的key-value對<(i,j),(M,k,mik)>,其中,k=1,2……直到矩陣N的總列數,對於矩陣N的每個元素nkj,產生一系列的key-value對,<(i,j),(N,k,nkj)>,其中i=1,2……直到矩陣M的總行數
Reduce函數:對於每個鍵(i,j)相關聯的值(M,k,mik)及(N,k,nkj),根據相同的k值將mik和nkj分別放入不同的數組中,然后將兩者的第k個元素抽取出來分別相乘,再累加,即可得到pij的值
有M和N兩個文件分別存放兩個矩陣,文件內容的每一行的形式是“行號,列號\t元素值”,本例中,使用shell腳本生成數據
代碼1-2
root@lejian:/data# cat matrix
#!/bin/bash
for i in `seq 1 $1`
do
for j in `seq 1 $2`
do
s=$(($RANDOM % 100))
echo -e "$i,$j\t$s" >> M_$1_$2
done
done
for i in `seq 1 $2`
do
for j in `seq 1 $3`
do
s=$(($RANDOM%100))
echo -e "$i,$j\t$s" >> N_$2_$3
done
done
代碼1-3,執行matrix腳本,生成一個2行3列和3行3列的矩陣,並在HDFS下新建一個data文件夾,將生成的兩個矩陣放入data文件夾下
代碼1-3
root@lejian:/data# ./matrix 2 3 3 root@lejian:/data# cat M_2_3 1,1 6 1,2 84 1,3 40 2,1 51 2,2 37 2,3 97 root@lejian:/data# cat N_3_3 1,1 97 1,2 34 1,3 95 2,1 93 2,2 10 2,3 70 3,1 71 3,2 24 3,3 47 root@lejian:/data# hadoop fs -mkdir /data root@lejian:/data# hadoop fs -put /data/M_2_3 /data/ root@lejian:/data# hadoop fs -put /data/N_3_3 /data/ root@lejian:/data# hadoop fs -ls -R /data -rw-r--r-- 1 root supergroup 41 2017-01-07 11:57 /data/M_2_3 -rw-r--r-- 1 root supergroup 63 2017-01-07 11:57 /data/N_3_3
矩陣乘法Mapper類程序如代碼1-4
代碼1-4
package com.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> {
private int columnN = 0;
private int rowM = 0;
private Text mapKey = new Text();
private Text mapValue = new Text();
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
columnN = Integer.parseInt(conf.get("columnN"));
rowM = Integer.parseInt(conf.get("rowM"));
};
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSplit file = (FileSplit) context.getInputSplit();
String fileName = file.getPath().getName();
String line = value.toString();
String[] tuple = line.split(",");
if (tuple.length != 2) {
throw new RuntimeException("MatrixMapper tuple error");
}
int row = Integer.parseInt(tuple[0]);
String[] tuples = tuple[1].split("\t");
if (tuples.length != 2) {
throw new RuntimeException("MatrixMapper tuples error");
}
if (fileName.contains("M")) {
matrixM(row, Integer.parseInt(tuples[0]), Integer.parseInt(tuples[1]), context);
} else {
matrixN(row, Integer.parseInt(tuples[0]), Integer.parseInt(tuples[1]), context);
}
};
private void matrixM(int row, int column, int value, Context context) throws IOException, InterruptedException {
for (int i = 1; i < columnN + 1; i++) {
mapKey.set(row + "," + i);
mapValue.set("M," + column + "," + value);
context.write(mapKey, mapValue);
}
}
private void matrixN(int row, int column, int value, Context context) throws IOException, InterruptedException {
for (int i = 1; i < rowM + 1; i++) {
mapKey.set(i + "," + column);
mapValue.set("N," + row + "," + value);
context.write(mapKey, mapValue);
}
}
}
矩陣乘法Reducer類程序如代碼1-5
代碼1-5
package com.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MatrixReducer extends Reducer<Text, Text, Text, Text> {
private int columnM = 0;
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
columnM = Integer.parseInt(conf.get("columnM"));
};
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int sum = 0;
int[] m = new int[columnM + 1];
int[] n = new int[columnM + 1];
for (Text val : values) {
String[] tuple = val.toString().split(",");
if (tuple.length != 3) {
throw new RuntimeException("MatrixReducer tuple error");
}
if ("M".equals(tuple[0])) {
m[Integer.parseInt(tuple[1])] = Integer.parseInt(tuple[2]);
} else {
n[Integer.parseInt(tuple[1])] = Integer.parseInt(tuple[2]);
}
}
for (int i = 1; i < columnM + 1; i++) {
sum += m[i] * n[i];
}
context.write(key, new Text(sum + ""));
};
}
矩陣乘法主函數如代碼1-5
package com.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Matrix {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args == null || args.length != 5) {
throw new RuntimeException("請輸入輸入路徑、輸出路徑、矩陣M的行數、矩陣M的列數、矩陣N的列數");
}
Configuration conf = new Configuration();
conf.set("rowM", args[2]);
conf.set("columnM", args[3]);
conf.set("columnN", args[4]);
Job job = Job.getInstance(conf);
job.setJobName("Matrix");
job.setJarByClass(Matrix.class);
job.setMapperClass(MatrixMapper.class);
job.setReducerClass(MatrixReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
運行代碼1-5,運行結果如代碼1-6所示(注:代碼1-6省略部分MapReduce執行內容)
代碼1-6
root@lejian:/data# hadoop jar matrix.jar com.hadoop.mapreduce.Matrix /data/ /output/ 2 3 3 ………… root@lejian:/data# hadoop fs -ls -R /output -rw-r--r-- 1 root supergroup 0 2017-01-07 12:04 /output/_SUCCESS -rw-r--r-- 1 root supergroup 57 2017-01-07 12:04 /output/part-r-00000 root@lejian:/data# hadoop fs -cat /output/part-r-00000 1,1 11234 1,2 2004 1,3 8330 2,1 15275 2,2 4432 2,3 11994
