Mapreduce數據分析實例


數據包

百度網盤

鏈接:https://pan.baidu.com/s/1v9M3jNdT4vwsqup9N0mGOA
提取碼:hs9c
復制這段內容后打開百度網盤手機App,操作更方便哦

1、     數據清洗說明:

(1)      第一列是時間;

(2)      第二列是賣出方;

(3)      第三列是買入方;

(4)      第四列是票的數量;

(5)      第五列是金額。 

賣出方,買入方一共三個角色,機場(C開頭),代理人(O開頭)和一般顧客(PAX)

2、     數據清洗要求:

(1)統計最繁忙的機場Top10(包括買入賣出);

(2)統計最受歡迎的航線;(起點終點一致(或相反))

(3)統計最大的代理人TOP10;

(4)統計某一天的各個機場的賣出數據top10。

3、     數據可視化要求:

(1)上述四中統計要求可以用餅圖、柱狀圖等顯示;

(2)可用關系圖展示各個機場之間的聯系程度(以機票數量作為分析來源)。

 

實驗關鍵部分代碼(列舉統計最繁忙機場的代碼,其他代碼大同小異):

數據初步情理,主要是過濾出各個機場個總票數

1.    package mapreduce;    
2.    import java.io.IOException;    
3.    import java.net.URI;    
4.    import org.apache.hadoop.conf.Configuration;    
5.    import org.apache.hadoop.fs.Path;    
6.    import org.apache.hadoop.io.LongWritable;    
7.    import org.apache.hadoop.io.Text;    
8.    import org.apache.hadoop.mapreduce.Job;    
9.    import org.apache.hadoop.mapreduce.Mapper;    
10.    import org.apache.hadoop.mapreduce.Reducer;    
11.    import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;    
12.    import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;    
13.    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;    
14.    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;    
15.    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;    
16.    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;    
17.    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;    
18.    import org.apache.hadoop.fs.FileSystem;    
19.    import org.apache.hadoop.io.IntWritable;    
20.    public class ChainMapReduce {    
21.        private static final String INPUTPATH = "hdfs://localhost:9000/mapreducetest/region.txt";    
22.        private static final String OUTPUTPATH = "hdfs://localhost:9000/mapreducetest/out1";    
23.        public static void main(String[] args) {    
24.            try {    
25.                Configuration conf = new Configuration();    
26.                FileSystem fileSystem = FileSystem.get(new URI(OUTPUTPATH), conf);    
27.                if (fileSystem.exists(new Path(OUTPUTPATH))) {    
28.                    fileSystem.delete(new Path(OUTPUTPATH), true);    
29.                }    
30.                Job job = new Job(conf, ChainMapReduce.class.getSimpleName());    
31.                FileInputFormat.addInputPath(job, new Path(INPUTPATH));    
32.                job.setInputFormatClass(TextInputFormat.class);    
33.                ChainMapper.addMapper(job, FilterMapper1.class, LongWritable.class, Text.class, Text.class, IntWritable.class, conf);    
34.                ChainReducer.setReducer(job, SumReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);    
35.                job.setMapOutputKeyClass(Text.class);    
36.                job.setMapOutputValueClass(IntWritable.class);    
37.                job.setPartitionerClass(HashPartitioner.class);    
38.                job.setNumReduceTasks(1);    
39.                job.setOutputKeyClass(Text.class);    
40.                job.setOutputValueClass(IntWritable.class);    
41.                FileOutputFormat.setOutputPath(job, new Path(OUTPUTPATH));    
42.                job.setOutputFormatClass(TextOutputFormat.class);    
43.                System.exit(job.waitForCompletion(true) ? 0 : 1);    
44.            } catch (Exception e) {    
45.                e.printStackTrace();    
46.            }    
47.        }    
48.        public static class FilterMapper1 extends Mapper<LongWritable, Text, Text, IntWritable> {    
49.            private Text outKey = new Text();    
50.            private IntWritable outValue = new IntWritable();    
51.            @Override    
52.            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)    
53.            throws IOException,InterruptedException {    
54.                String line = value.toString();    
55.                if (line.length() > 0) {    
56.                    String[] arr = line.split(",");    
57.                    int visit = Integer.parseInt(arr[3]);   
58.                    if(arr[1].substring(0, 1).equals("C")||arr[2].substring(0, 1).equals("C")){    
59.                        outKey.set(arr[1]);    
60.                        outValue.set(visit);    
61.                        context.write(outKey, outValue);    
62.                    }    
63.                }    
64.            }    
65.        }    
66.         
67.        public  static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {    
68.            private IntWritable outValue = new IntWritable();    
69.            @Override    
70.            protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)    
71.        throws IOException, InterruptedException {    
72.        int sum = 0;    
73.        for (IntWritable val : values) {    
74.        sum += val.get();    
75.        }    
76.        outValue.set(sum);    
77.        context.write(key, outValue);    
78.        }    
79.        }    
80.        
81.        
82.        }    

 

數據二次清理,進行排序

package mapreduce;    
import java.io.IOException;    
import org.apache.hadoop.conf.Configuration;    
import org.apache.hadoop.fs.Path;    
import org.apache.hadoop.io.IntWritable;    
import org.apache.hadoop.io.Text;    
import org.apache.hadoop.mapreduce.Job;    
import org.apache.hadoop.mapreduce.Mapper;    
import org.apache.hadoop.mapreduce.Reducer;    
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;    
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;    
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;    
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;    
public class OneSort {    
    public static class Map extends Mapper<Object , Text , IntWritable,Text >{    
    private static Text goods=new Text();    
    private static IntWritable num=new IntWritable();    
    public void map(Object key,Text value,Context context) throws IOException, InterruptedException{    
    String line=value.toString();    
    String arr[]=line.split("\t");   
    num.set(Integer.parseInt(arr[1]));    
    goods.set(arr[0]);    
    context.write(num,goods);    
    }    
    }    
    public static class Reduce extends Reducer< IntWritable, Text, IntWritable, Text>{    
    private static IntWritable result= new IntWritable();    
    public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{    
        for(Text val:values){    
        context.write(key,val);    
        }    
        }    
        }    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{    
        Configuration conf=new Configuration();    
        Job job =new Job(conf,"OneSort");    
        job.setJarByClass(OneSort.class);    
        job.setMapperClass(Map.class);    
        job.setReducerClass(Reduce.class);    
        job.setOutputKeyClass(IntWritable.class);    
        job.setOutputValueClass(Text.class);    
        job.setInputFormatClass(TextInputFormat.class);    
        job.setOutputFormatClass(TextOutputFormat.class);    
        Path in=new Path("hdfs://localhost:9000/mapreducetest/out1/part-r-00000");    
        Path out=new Path("hdfs://localhost:9000/mapreducetest/out2");    
        FileInputFormat.addInputPath(job,in);    
        FileOutputFormat.setOutputPath(job,out);    
        System.exit(job.waitForCompletion(true) ? 0 : 1);    
    
        }    
        }    

 

從hadoop中讀取文件

  1. package mapreduce;  
      
    import java.io.BufferedReader;  
    import java.io.IOException;  
    import java.io.InputStreamReader;  
    import java.net.URI;  
    import java.util.ArrayList;  
    import java.util.List;  
      
    import org.apache.hadoop.conf.Configuration;  
    import org.apache.hadoop.fs.FSDataInputStream;  
    import org.apache.hadoop.fs.FileSystem;  
    import org.apache.hadoop.fs.Path;  
      
    public class ReadFile {  
        public static List<String> ReadFromHDFS(String file) throws IOException    
        {    
            //System.setProperty("hadoop.home.dir", "H:\\文件\\hadoop\\hadoop-2.6.4");  
            List<String> list=new ArrayList();  
            int i=0;  
             Configuration conf = new Configuration();    
            StringBuffer buffer = new StringBuffer();  
            FSDataInputStream fsr = null;  
            BufferedReader bufferedReader = null;  
            String lineTxt = null;  
              
            try  
            {  
                FileSystem fs = FileSystem.get(URI.create(file),conf);  
                fsr = fs.open(new Path(file));  
                bufferedReader = new BufferedReader(new InputStreamReader(fsr));          
                while ((lineTxt = bufferedReader.readLine()) != null)  
                {  
                    String[] arg=lineTxt.split("\t");  
                    list.add(arg[0]);  
                    list.add(arg[1]);  
                }  
            } catch (Exception e)  
            {  
                e.printStackTrace();  
            } finally  
            {  
                if (bufferedReader != null)  
                {  
                    try  
                    {  
                        bufferedReader.close();  
                    } catch (IOException e)  
                    {  
                        e.printStackTrace();  
                    }  
                }  
            }  
            return list;  
       
        }  
          
        public static void main(String[] args) throws IOException {  
            List<String> ll=new  ReadFile().ReadFromHDFS("hdfs://localhost:9000/mapreducetest/out2/part-r-00000");  
            for(int i=0;i<ll.size();i++)  
            {  
                System.out.println(ll.get(i));  
            }  
              
        }  
      
    }  

前台網頁代碼

<%@page import="mapreduce.ReadFile"%>  
<%@page import="java.util.List"%>  
<%@page import="java.util.ArrayList"%>  
<%@page import="org.apache.hadoop.fs.FSDataInputStream" %>  
<%@ page language="java" contentType="text/html; charset=UTF-8"  
    pageEncoding="UTF-8"%>  
<!DOCTYPE html>  
<html>  
<head>  
<meta charset="UTF-8">  
<title>Insert title here</title>  
<% List<String> ll= ReadFile.ReadFromHDFS("hdfs://localhost:9000/mapreducetest/out2/part-r-00000");%>  
 <script src="../js/echarts.js"></script>  
</head>  
<body>  
<div id="main" style="width: 900px;height:400px;"></div>  
 <script type="text/javascript">  
        // 基於准備好的dom,初始化echarts實例  
        var myChart = echarts.init(document.getElementById('main'));  
  
        // 指定圖表的配置項和數據  
        var option = {  
            title: {  
                text: '最繁忙的機場TOP10'  
            },  
            tooltip: {},  
            legend: {  
                data:['票數']  
            },  
            xAxis: {  
                data:["<%=ll.get(ll.size()-1)%>"<%for(int i=ll.size()-3;i>=ll.size()-19;i--){  
                    if(i%2==1){  
                        %>,"<%=ll.get(i)%>"  
                    <%     
                    }  
                    }  
                    %>]  
  
  
            },  
            yAxis: {},  
            series: [{  
                name: '票數',  
                type: 'bar',  
                data: [<%=ll.get(ll.size()-2)%>  
                <%for(int i=ll.size()-1;i>=ll.size()-19;i--){  
                    if(i%2==0){  
                    %>,<%=ll.get(i)%>  
                <%     
                }  
                }  
                %>]  
            }]  
        };  
  
        // 使用剛指定的配置項和數據顯示圖表。  
        myChart.setOption(option);  
    </script>  
    <h2 color="red"><a href="NewFile.jsp">返回</a></h2>  
</body>  

 

結果截圖:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM