hadoop處理Excel通話記錄


前面我們所寫mr程序的輸入都是文本文件,但真正工作中我們難免會碰到需要處理其它格式的情況,下面以處理excel數據為例

1、項目需求

    有劉超與家庭成員之間的通話記錄一份,存儲在Excel文件中,如下面的數據集所示。我們需要基於這份數據,統計每個月每個家庭成員給自己打電話的次數,並按月份輸出到不同文件

    下面是部分數據,數據格式:編號  聯系人  電話  時間

    image

2、分析

    統計每個月每個家庭成員給自己打電話的次數這一點很簡單,我們之前已經寫過幾個這樣的程序。實現需求的麻煩點在於文件的輸入是Excel文件,輸出要按月份輸出到不同文件。這就要我們實現自定義的InputFormat和OutputFormat

3、實現

    首先,輸入文件是Excel格式,我們可以借助poi來解析Excel文件,我們先來實現一個Excel的解析類(ExcelParser)

package com.buaa;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.poi.hssf.usermodel.HSSFSheet;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;

/** 
* @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelParser
* @Description 解析excel
* @Author 劉吉超
* @Date 2016-04-24 16:59:28
*/
public class ExcelParser {
    private static final Log logger = LogFactory.getLog(ExcelParser.class);

    /**
     * 解析is
     * 
     * @param is 數據源
     * @return String[]
     */
    public static String[] parseExcelData(InputStream is) {
        // 結果集
        List<String> resultList = new ArrayList<String>();
        
        try {
            // 獲取Workbook
            HSSFWorkbook workbook = new HSSFWorkbook(is);
            // 獲取sheet
            HSSFSheet sheet = workbook.getSheetAt(0);
            
            Iterator<Row> rowIterator = sheet.iterator();
            
            while (rowIterator.hasNext()) {
                //
                Row row = rowIterator.next();
                // 字符串
                StringBuilder rowString = new StringBuilder();
                
                Iterator<Cell> colIterator = row.cellIterator();
                while (colIterator.hasNext()) {
                    Cell cell = colIterator.next();

                    switch (cell.getCellType()) {
                        case Cell.CELL_TYPE_BOOLEAN:
                            rowString.append(cell.getBooleanCellValue() + "\t");
                            break;
                        case Cell.CELL_TYPE_NUMERIC:
                            rowString.append(cell.getNumericCellValue() + "\t");
                            break;
                        case Cell.CELL_TYPE_STRING:
                            rowString.append(cell.getStringCellValue() + "\t");
                            break;
                    }
                }
                
                resultList.add(rowString.toString());
            }
        } catch (IOException e) {
            logger.error("IO Exception : File not found " + e);
        }
        
        return resultList.toArray(new String[0]);
    }
}

    然后,我們需要定義一個從Excel讀取數據的InputFormat類,命名為ExcelInputFormat,實現代碼如下

package com.buaa;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/** 
* @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelInputFormat
* @Description TODO
* @Author 劉吉超
* @Date 2016-04-28 17:31:54
*/
public class ExcelInputFormat extends FileInputFormat<LongWritable,Text>{
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException, InterruptedException {
        
        return new ExcelRecordReader();
    }
    
    public class ExcelRecordReader extends RecordReader<LongWritable, Text> {
        private LongWritable key = new LongWritable(-1);
        private Text value = new Text();
        private InputStream inputStream;
        private String[] strArrayofLines;

        @Override
        public void initialize(InputSplit genericSplit, TaskAttemptContext context)
                throws IOException, InterruptedException {
            // 分片
            FileSplit split = (FileSplit) genericSplit;
            // 獲取配置
            Configuration job = context.getConfiguration();
            
            // 分片路徑
            Path filePath = split.getPath();
            
            FileSystem fileSystem = filePath.getFileSystem(job);
            
            inputStream = fileSystem.open(split.getPath());
            
            // 調用解析excel方法            
            this.strArrayofLines = ExcelParser.parseExcelData(inputStream);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            int pos = (int) key.get() + 1;
            
            if (pos < strArrayofLines.length){
                
                if(strArrayofLines[pos] != null){
                    key.set(pos);
                    value.set(strArrayofLines[pos]);
                    
                    return true;
                }
            }
            
            return false;
        }

        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException {        
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {        
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }

        @Override
        public void close() throws IOException {
            if (inputStream != null) {
                inputStream.close();
            }
        }
    }
}

    接下來,我們要定義一個ExcelOutputFormat類,用於實現按月份輸出到不同文件中

package com.buaa;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

/** 
* @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelOutputFormat
* @Description TODO
* @Author 劉吉超
* @Date 2016-04-28 17:24:23
*/
public class ExcelOutputFormat extends FileOutputFormat<Text,Text> {  
    // MultiRecordWriter對象
    private MultiRecordWriter writer = null; 
    
    @Override
    public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job) throws IOException,  
            InterruptedException {  
        if (writer == null) {  
            writer = new MultiRecordWriter(job, getTaskOutputPath(job));  
        }
        
        return writer;  
    }
    
    private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {  
        Path workPath = null;
        
        OutputCommitter committer = super.getOutputCommitter(conf);
        
        if (committer instanceof FileOutputCommitter) {  
            workPath = ((FileOutputCommitter) committer).getWorkPath();  
        } else {  
            Path outputPath = super.getOutputPath(conf);  
            if (outputPath == null) {  
                throw new IOException("沒有定義輸出目錄");  
            }  
            workPath = outputPath;  
        }
        
        return workPath;  
    }
    
    /**
     * 通過key, value, conf來確定輸出文件名(含擴展名)
     * 
     * @param key
     * @param value
     * @param conf
     * @return String
     */
    protected String generateFileNameForKeyValue(Text key, Text value, Configuration conf){
        // name + month
        String[] records = key.toString().split("\t"); 
        return records[1] + ".txt";
    }
    
    /** 
    * 定義MultiRecordWriter
    */
    public class MultiRecordWriter extends RecordWriter<Text,Text> {  
        // RecordWriter的緩存  
        private HashMap<String, RecordWriter<Text,Text>> recordWriters = null;
        // TaskAttemptContext
        private TaskAttemptContext job = null;  
        // 輸出目錄  
        private Path workPath = null;
        
        public MultiRecordWriter(TaskAttemptContext job, Path workPath) {  
            super();  
            this.job = job;  
            this.workPath = workPath;  
            this.recordWriters = new HashMap<String, RecordWriter<Text,Text>>();  
        }
        
        @Override  
        public void write(Text key, Text value) throws IOException, InterruptedException {  
            // 得到輸出文件名  
            String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());  
            RecordWriter<Text,Text> rw = this.recordWriters.get(baseName);  
            if (rw == null) {  
                rw = getBaseRecordWriter(job, baseName);  
                this.recordWriters.put(baseName, rw);  
            }  
            rw.write(key, value);  
        } 
        
        private RecordWriter<Text,Text> getBaseRecordWriter(TaskAttemptContext job, String baseName)  
                throws IOException, InterruptedException {  
            Configuration conf = job.getConfiguration();
            
            boolean isCompressed = getCompressOutput(job);
            //key value 分隔符  
            String keyValueSeparator = "\t";
            
            RecordWriter<Text,Text> recordWriter = null;  
            if (isCompressed) {  
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,  
                        GzipCodec.class);  
                CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
                
                Path file = new Path(workPath, baseName + codec.getDefaultExtension());  
                
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);  
                
                recordWriter = new MailRecordWriter<Text,Text>(new DataOutputStream(codec  
                        .createOutputStream(fileOut)), keyValueSeparator);  
            } else {  
                Path file = new Path(workPath, baseName); 
                
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);  
                
                recordWriter = new MailRecordWriter<Text,Text>(fileOut, keyValueSeparator);  
            }  
            
            return recordWriter;  
        } 
        
        @Override  
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {  
            Iterator<RecordWriter<Text,Text>> values = this.recordWriters.values().iterator();  
            while (values.hasNext()) {  
                values.next().close(context);  
            }  
            this.recordWriters.clear();  
        }  
        
    }  
}
package com.buaa;

import java.io.DataOutputStream;  
import java.io.IOException;  
import java.io.UnsupportedEncodingException;  

import org.apache.hadoop.io.NullWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.RecordWriter;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;    
  
/** 
* @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName MailRecordWriter
* @Description TODO
* @Author 劉吉超
* @Date 2016-04-24 16:59:23
*/
public class MailRecordWriter< K, V > extends RecordWriter< K, V > {
    // 編碼
    private static final String utf8 = "UTF-8";
    // 換行
    private static final byte[] newline;  
    static {  
        try {  
            newline = "\n".getBytes(utf8);//換行符 "/n"不對  
        } catch (UnsupportedEncodingException uee) {  
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
        }  
    }
    // 輸出數據
    protected DataOutputStream out;
    // 分隔符
    private final byte[] keyValueSeparator;
    
    public MailRecordWriter(DataOutputStream out, String keyValueSeparator) {  
        this.out = out;  
        try {  
            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  
        } catch (UnsupportedEncodingException uee) {  
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
        }  
    }
    
    public MailRecordWriter(DataOutputStream out) {  
        this(out, "/t");  
    }
    
    private void writeObject(Object o) throws IOException {  
        if (o instanceof Text) {  
            Text to = (Text) o;  
            out.write(to.getBytes(), 0, to.getLength());  
        } else {  
            out.write(o.toString().getBytes(utf8));  
        }  
    }
    
    public synchronized void write(K key, V value) throws IOException {  
        boolean nullKey = key == null || key instanceof NullWritable;  
        boolean nullValue = value == null || value instanceof NullWritable;  
        if (nullKey && nullValue) {  
            return;  
        }  
        if (!nullKey) {  
            writeObject(key);  
        }  
        if (!(nullKey || nullValue)) {  
            out.write(keyValueSeparator);  
        }  
        if (!nullValue) {  
            writeObject(value);  
        }  
        out.write(newline);  
    }
    
    public synchronized void close(TaskAttemptContext context) throws IOException {
        out.close();  
    }  
}

    最后我們來編寫Mapper類,實現 map() 函數;編寫Reduce類,實現reduce函數;還有一些運行代碼

package com.buaa;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/** 
* @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelContactCount
* @Description TODO
* @Author 劉吉超
* @Date 2016-04-24 16:34:24
*/
public class ExcelContactCount extends Configured implements Tool {

    public static class PhoneMapper extends Mapper<LongWritable, Text, Text, Text> {
        
        public void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException {
            Text pkey = new Text();
            Text pvalue = new Text();
            // 1.0, 老爸, 13999123786, 2014-12-20
            String line = value.toString();
            
            String[] records = line.split("\\s+");
            // 獲取月份
            String[] months = records[3].split("-");
            // 昵稱+月份
            pkey.set(records[1] + "\t" + months[1]);
            // 手機號
            pvalue.set(records[2]);
            
            context.write(pkey, pvalue);
        }
    }

    public static class PhoneReducer extends Reducer<Text, Text, Text, Text> {
        
        protected void reduce(Text Key, Iterable<Text> Values, Context context) throws IOException, InterruptedException {
            Text phone = Values.iterator().next();
            int phoneToal = 0;
            
            for(java.util.Iterator<Text> its = Values.iterator();its.hasNext();its.next()){
                phoneToal++;
            }
            
            Text pvalue = new Text(phone + "\t" + phoneToal);
            
            context.write(Key, pvalue);
        }
    }

    @Override
    @SuppressWarnings("deprecation")
    public int run(String[] args) throws Exception {
        // 讀取配置文件
        Configuration conf = new Configuration();
        
        // 判斷輸出路徑,如果存在,則刪除
        Path mypath = new Path(args[1]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }
        
        // 新建任務
        Job job = new Job(conf,"Call Log");
        job.setJarByClass(ExcelContactCount.class);
        
        // 輸入路徑
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        // Mapper
        job.setMapperClass(PhoneMapper.class);
        // Reduce
        job.setReducerClass(PhoneReducer.class);
        
        // 輸出key類型
        job.setOutputKeyClass(Text.class);
        // 輸出value類型
        job.setOutputValueClass(Text.class);
        
        // 自定義輸入格式
        job.setInputFormatClass(ExcelInputFormat.class);
        // 自定義輸出格式
        job.setOutputFormatClass(ExcelOutputFormat.class);
        
        return job.waitForCompletion(true) ? 0:1;
    }

    public static void main(String[] args) throws Exception {
        String[] args0 = { 
                "hdfs://ljc:9000/buaa/phone/phone.xls",
                "hdfs://ljc:9000/buaa/phone/out/" 
            };
        int ec = ToolRunner.run(new Configuration(), new ExcelContactCount(), args0);
        System.exit(ec);
    }
}

4、結果

    image

   通過這份數據很容易看出,劉超1月份與姐姐通話次數最多,19次

 

如果,您認為閱讀這篇博客讓您有些收獲,不妨點擊一下右下角的【推薦】。
如果,您希望更容易地發現我的新博客,不妨點擊一下左下角的【關注我】。
如果,您對我的博客所講述的內容有興趣,請繼續關注我的后續博客,我是【劉超★ljc】。

本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。

實現代碼及數據:下載


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM