前面我們所寫mr程序的輸入都是文本文件,但真正工作中我們難免會碰到需要處理其它格式的情況,下面以處理excel數據為例
1、項目需求
有劉超與家庭成員之間的通話記錄一份,存儲在Excel文件中,如下面的數據集所示。我們需要基於這份數據,統計每個月每個家庭成員給自己打電話的次數,並按月份輸出到不同文件
下面是部分數據,數據格式:編號 聯系人 電話 時間
2、分析
統計每個月每個家庭成員給自己打電話的次數這一點很簡單,我們之前已經寫過幾個這樣的程序。實現需求的麻煩點在於文件的輸入是Excel文件,輸出要按月份輸出到不同文件。這就要我們實現自定義的InputFormat和OutputFormat
3、實現
首先,輸入文件是Excel格式,我們可以借助poi來解析Excel文件,我們先來實現一個Excel的解析類(ExcelParser)
package com.buaa; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.poi.hssf.usermodel.HSSFSheet; import org.apache.poi.hssf.usermodel.HSSFWorkbook; import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.Row; /** * @ProjectName HandleExcelPhone * @PackageName com.buaa * @ClassName ExcelParser * @Description 解析excel * @Author 劉吉超 * @Date 2016-04-24 16:59:28 */ public class ExcelParser { private static final Log logger = LogFactory.getLog(ExcelParser.class); /** * 解析is * * @param is 數據源 * @return String[] */ public static String[] parseExcelData(InputStream is) { // 結果集 List<String> resultList = new ArrayList<String>(); try { // 獲取Workbook HSSFWorkbook workbook = new HSSFWorkbook(is); // 獲取sheet HSSFSheet sheet = workbook.getSheetAt(0); Iterator<Row> rowIterator = sheet.iterator(); while (rowIterator.hasNext()) { // 行 Row row = rowIterator.next(); // 字符串 StringBuilder rowString = new StringBuilder(); Iterator<Cell> colIterator = row.cellIterator(); while (colIterator.hasNext()) { Cell cell = colIterator.next(); switch (cell.getCellType()) { case Cell.CELL_TYPE_BOOLEAN: rowString.append(cell.getBooleanCellValue() + "\t"); break; case Cell.CELL_TYPE_NUMERIC: rowString.append(cell.getNumericCellValue() + "\t"); break; case Cell.CELL_TYPE_STRING: rowString.append(cell.getStringCellValue() + "\t"); break; } } resultList.add(rowString.toString()); } } catch (IOException e) { logger.error("IO Exception : File not found " + e); } return resultList.toArray(new String[0]); } }
然后,我們需要定義一個從Excel讀取數據的InputFormat類,命名為ExcelInputFormat,實現代碼如下
package com.buaa; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * @ProjectName HandleExcelPhone * @PackageName com.buaa * @ClassName ExcelInputFormat * @Description TODO * @Author 劉吉超 * @Date 2016-04-28 17:31:54 */ public class ExcelInputFormat extends FileInputFormat<LongWritable,Text>{ @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new ExcelRecordReader(); } public class ExcelRecordReader extends RecordReader<LongWritable, Text> { private LongWritable key = new LongWritable(-1); private Text value = new Text(); private InputStream inputStream; private String[] strArrayofLines; @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { // 分片 FileSplit split = (FileSplit) genericSplit; // 獲取配置 Configuration job = context.getConfiguration(); // 分片路徑 Path filePath = split.getPath(); FileSystem fileSystem = filePath.getFileSystem(job); inputStream = fileSystem.open(split.getPath()); // 調用解析excel方法 this.strArrayofLines = ExcelParser.parseExcelData(inputStream); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { int pos = (int) key.get() + 1; if (pos < strArrayofLines.length){ if(strArrayofLines[pos] != null){ key.set(pos); value.set(strArrayofLines[pos]); return true; } } return false; } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { if (inputStream != null) { inputStream.close(); } } } }
接下來,我們要定義一個ExcelOutputFormat類,用於實現按月份輸出到不同文件中
package com.buaa; import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; /** * @ProjectName HandleExcelPhone * @PackageName com.buaa * @ClassName ExcelOutputFormat * @Description TODO * @Author 劉吉超 * @Date 2016-04-28 17:24:23 */ public class ExcelOutputFormat extends FileOutputFormat<Text,Text> { // MultiRecordWriter對象 private MultiRecordWriter writer = null; @Override public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("沒有定義輸出目錄"); } workPath = outputPath; } return workPath; } /** * 通過key, value, conf來確定輸出文件名(含擴展名) * * @param key * @param value * @param conf * @return String */ protected String generateFileNameForKeyValue(Text key, Text value, Configuration conf){ // name + month String[] records = key.toString().split("\t"); return records[1] + ".txt"; } /** * 定義MultiRecordWriter */ public class MultiRecordWriter extends RecordWriter<Text,Text> { // RecordWriter的緩存 private HashMap<String, RecordWriter<Text,Text>> recordWriters = null; // TaskAttemptContext private TaskAttemptContext job = null; // 輸出目錄 private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; this.recordWriters = new HashMap<String, RecordWriter<Text,Text>>(); } @Override public void write(Text key, Text value) throws IOException, InterruptedException { // 得到輸出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); RecordWriter<Text,Text> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } private RecordWriter<Text,Text> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); //key value 分隔符 String keyValueSeparator = "\t"; RecordWriter<Text,Text> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new MailRecordWriter<Text,Text>(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new MailRecordWriter<Text,Text>(fileOut, keyValueSeparator); } return recordWriter; } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<Text,Text>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } } }
package com.buaa; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * @ProjectName HandleExcelPhone * @PackageName com.buaa * @ClassName MailRecordWriter * @Description TODO * @Author 劉吉超 * @Date 2016-04-24 16:59:23 */ public class MailRecordWriter< K, V > extends RecordWriter< K, V > { // 編碼 private static final String utf8 = "UTF-8"; // 換行 private static final byte[] newline; static { try { newline = "\n".getBytes(utf8);//換行符 "/n"不對 } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } // 輸出數據 protected DataOutputStream out; // 分隔符 private final byte[] keyValueSeparator; public MailRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public MailRecordWriter(DataOutputStream out) { this(out, "/t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
最后我們來編寫Mapper類,實現 map() 函數;編寫Reduce類,實現reduce函數;還有一些運行代碼
package com.buaa; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @ProjectName HandleExcelPhone * @PackageName com.buaa * @ClassName ExcelContactCount * @Description TODO * @Author 劉吉超 * @Date 2016-04-24 16:34:24 */ public class ExcelContactCount extends Configured implements Tool { public static class PhoneMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException { Text pkey = new Text(); Text pvalue = new Text(); // 1.0, 老爸, 13999123786, 2014-12-20 String line = value.toString(); String[] records = line.split("\\s+"); // 獲取月份 String[] months = records[3].split("-"); // 昵稱+月份 pkey.set(records[1] + "\t" + months[1]); // 手機號 pvalue.set(records[2]); context.write(pkey, pvalue); } } public static class PhoneReducer extends Reducer<Text, Text, Text, Text> { protected void reduce(Text Key, Iterable<Text> Values, Context context) throws IOException, InterruptedException { Text phone = Values.iterator().next(); int phoneToal = 0; for(java.util.Iterator<Text> its = Values.iterator();its.hasNext();its.next()){ phoneToal++; } Text pvalue = new Text(phone + "\t" + phoneToal); context.write(Key, pvalue); } } @Override @SuppressWarnings("deprecation") public int run(String[] args) throws Exception { // 讀取配置文件 Configuration conf = new Configuration(); // 判斷輸出路徑,如果存在,則刪除 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } // 新建任務 Job job = new Job(conf,"Call Log"); job.setJarByClass(ExcelContactCount.class); // 輸入路徑 FileInputFormat.addInputPath(job, new Path(args[0])); // 輸出路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); // Mapper job.setMapperClass(PhoneMapper.class); // Reduce job.setReducerClass(PhoneReducer.class); // 輸出key類型 job.setOutputKeyClass(Text.class); // 輸出value類型 job.setOutputValueClass(Text.class); // 自定義輸入格式 job.setInputFormatClass(ExcelInputFormat.class); // 自定義輸出格式 job.setOutputFormatClass(ExcelOutputFormat.class); return job.waitForCompletion(true) ? 0:1; } public static void main(String[] args) throws Exception { String[] args0 = { "hdfs://ljc:9000/buaa/phone/phone.xls", "hdfs://ljc:9000/buaa/phone/out/" }; int ec = ToolRunner.run(new Configuration(), new ExcelContactCount(), args0); System.exit(ec); } }
4、結果
通過這份數據很容易看出,劉超1月份與姐姐通話次數最多,19次