Hadoop實戰:微博數據分析


項目需求

  自定義輸入格式,將明星微博數據排序后按粉絲數 關注數 微博數 分別輸出到不同文件中。

數據集

  下面是部分數據,猛戳此鏈接下載完整數據集

  數據格式: 明星   明星微博名稱    粉絲數       關注數   微博數

          黃曉明       黃曉明        22616497       506      2011

        張靚穎      張靚穎         27878708       238     3846

        羅志祥      羅志祥         30763518       277     3843

        劉嘉玲      劉嘉玲         12631697       350     2057

        李娜         李娜             23309493        81      631

        成龍         成龍             22485765        5        758

       ...

思路分析

  自定義的InputFormat讀取明星微博數據,通過getSortedHashtableByValue分別對明星follower、friend、statuses數據進行排序,然后利用MultipleOutputs輸出不同項到不同的文件中。

程序

Weibo.java

package com.hadoop.WeiboCount;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;


/*
數據格式:    明星     明星微博名稱                 粉絲數     關注數     微博數
            黃曉明    黃曉明                        22616497    506    2011
            張靚穎    張靚穎                        27878708    238    3846
            張成龍2012    張成龍2012                9813621        199    744
            羅志祥    羅志祥                        30763518    277    3843
            劉嘉玲    劉嘉玲                        12631697    350    2057
            吳君如大美女    吳君如大美女        18490338    190    412
            柯震東Kai    柯震東Kai                31337479    219    795
            李娜    李娜                        23309493    81    631
            徐小平    徐小平                        11659926    1929    13795
            唐嫣    唐嫣                        24301532    200    2391
            有斐君    有斐君                        8779383    577    4251
            孫燕姿    孫燕姿                        21213839    68    342
            成龍    成龍                        22485765    5    758
*/

public class WeiBo implements WritableComparable<Object> {
//  其實這里,跟TVPlayData和ScoreWritable一樣的
//  注意:    Hadoop通過Writable接口實現的序列化機制,不過沒有提供比較功能,所以和java中的Comparable接口合並,提供一個接口WritableComparable。(自定義比較)
//    Writable接口提供兩個方法(write和readFields)。
    
//  直接利用java的基本數據類型int,定義成員變量fan、followers、microblogs
    // 粉絲
    private int fan;
    // 關注
    private int followers;
    // 微博數
    private int microblogs;
     
//    問:這里我們自己編程時,是一定要創建一個帶有參的構造方法,為什么還要顯式的寫出來一個帶無參的構造方法呢?
//    答:構造器其實就是構造對象實例的方法,無參數的構造方法是默認的,但是如果你創造了一個帶有參數的構造方法,那么無參的構造方法必須顯式的寫出來,否則會編譯失敗。
    
    public WeiBo(){}; //java里的無參構造函數,是用來在創建對象時初始化對象  
  //在hadoop的每個自定義類型代碼里,好比,現在的WeiBo,都必須要寫無參構造函數。
    
    
    //問:為什么我們在編程的時候,需要創建一個帶有參的構造方法?
    //答:就是能讓賦值更靈活。構造一般就是初始化數值,你不想別人用你這個類的時候每次實例化都能用另一個構造動態初始化一些信息么(當然沒有需要額外賦值就用默認的)。
    public WeiBo(int fan,int followers,int microblogs){
        this.fan = fan;
        this.followers = followers;
        this.microblogs = microblogs;
    }
     
    
    //問:其實set和get方法,這兩個方法只是類中的setxxx和getxxx方法的總稱,
    //那么,為什么在編程時,有set和set***兩個,只有get***一個呢?
    public void set(int fan,int followers,int microblogs){
        this.fan = fan;
        this.followers = followers;
        this.microblogs = microblogs;
    }
     
    
//    public float get(int fan,int followers,int microblogs){因為這是錯誤的,所以對於set可以分開,get只能是get***
//    return fan;
//    return followers;
//    return microblogs;
//}
    
    
//  實現WritableComparable的readFields()方法,以便該數據能被序列化后完成網絡傳輸或文件輸入
//    對象不能傳輸的,需要轉化成字節流!
//    將對象轉換為字節流並寫入到輸出流out中是序列化,write 的過程(最好記!!!)
//    從輸入流in中讀取字節流反序列化為對象      是反序列化,readFields的過程(最好記!!!)
    
    @Override
    public void readFields(DataInput in) throws IOException {
        //拿代碼來說的話,對象就是比如fan、followers。。。。
        fan  = in.readInt(); //因為,我們這里的對象是Int類型,所以是readInt()
        followers = in.readInt();
        microblogs = in.readInt(); //注意:反序列化里,需要生成對象對吧,所以,是用到的是get對象
//        in.readByte()
//        in.readChar()
//        in.readDouble()
//        in.readLine()
//        in.readFloat()
//        in.readLong()
//        in.readShort()
    }
     
    
//  實現WritableComparable的write()方法,以便該數據能被序列化后完成網絡傳輸或文件輸出  
//    將對象轉換為字節流並寫入到輸出流out中是序列化,write 的過程(最好記!!!)
//    從輸入流in中讀取字節流反序列化為對象      是反序列化,readFields的過程(最好記!!!)
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(fan);
        out.writeInt(followers); //因為,我們這里的對象是Int類型,所以是writeInt()
        out.writeInt(microblogs); //注意:序列化里,需要對象對吧,所以,用到的是set那邊的對象
//        out.writeByte()
//        out.writeChar()
//        out.writeDouble()
//        out.writeFloat()
//        out.writeLong()
//        out.writeShort()
//        out.writeUTF()
    }
     
    @Override
    public int compareTo(Object o) { //java里的比較,Java String.compareTo()
        // TODO Auto-generated method stub
        return 0;
    }
    
//    Hadoop中定義了兩個序列化相關的接口:Writable 接口和 Comparable 接口,這兩個接口可以合並成一個接口 WritableComparable。
//    Writable 接口中定義了兩個方法,分別為write(DataOutput out)和readFields(DataInput in)
//    所有實現了Comparable接口的對象都可以和自身相同類型的對象比較大小
    
//  源碼是
//    package java.lang;  
//    import java.util.*;      
//    public interface Comparable {  
//        /**
//        * 將this對象和對象o進行比較,約定:返回負數為小於,零為大於,整數為大於
//        */  
//        public int compareTo(T o);  
//    }
 
    public int getFan() {  
        return fan;
    }
 
    public void setFan(int fan) {
        this.fan = fan;
    }
 
    public int getFollowers() {
        return followers;
    }
 
    public void setFollowers(int followers) {
        this.followers = followers;
    }
 
    public int getMicroblogs() {
        return microblogs;
    }
 
    public void setMicroblogs(int microblogs) {
        this.microblogs = microblogs;
    }
}

WeiboCount.java

package com.hadoop.WeiboCount;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WeiboCount extends Configured implements Tool {
    // tab分隔符
    private static String TAB_SEPARATOR = "\t";
    // 粉絲
    private static String FAN = "fan";
    // 關注
    private static String FOLLOWERS = "followers";
    // 微博數
    private static String MICROBLOGS = "microblogs";
     
    public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text> {
        @Override
        protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException {
            // 粉絲
            context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan()));
            // 關注
            context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers()));
            // 微博數
            context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs()));
        }
    }
     
    public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> {
        private MultipleOutputs<Text, IntWritable> mos;
 
        protected void setup(Context context) throws IOException, InterruptedException {
            mos = new MultipleOutputs<Text, IntWritable>(context);
        }
 
        protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException {
            Map<String,Integer> map = new HashMap< String,Integer>();
             
            for(Text value : Values){ //增強型for循環,意思是把Values的值傳給Text value
                // value = 名稱 + (粉絲數 或 關注數 或 微博數)
                String[] records = value.toString().split(TAB_SEPARATOR);
                map.put(records[0], Integer.parseInt(records[1].toString()));
            }
             
            // 對Map內的數據進行排序
            Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(map);
            for(int i = 0; i < entries.length;i++){
                mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue());
            }                
        }
 
        protected void cleanup(Context context) throws IOException, InterruptedException {
            mos.close();
        }
    }
     
    @SuppressWarnings("deprecation")
    @Override
    public int run(String[] args) throws Exception {
       
        Configuration conf = new Configuration();  // 配置文件對象
         
        // 判斷路徑是否存在,如果存在,則刪除
        Path mypath = new Path(args[1]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }
         
       
        Job job = new Job(conf, "weibo");  // 構造任務
        
        job.setJarByClass(WeiboCount.class); // 主類
 
        // Mapper
        job.setMapperClass(WeiBoMapper.class);
        // Mapper key輸出類型
        job.setMapOutputKeyClass(Text.class);
        // Mapper value輸出類型
        job.setMapOutputValueClass(Text.class);
         
        // Reducer
        job.setReducerClass(WeiBoReducer.class);
        // Reducer key輸出類型
        job.setOutputKeyClass(Text.class);
        // Reducer value輸出類型
        job.setOutputValueClass(IntWritable.class);
         
        // 輸入路徑
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
         
        // 自定義輸入格式
        job.setInputFormatClass(WeiboInputFormat.class) ;
        //自定義文件輸出類別
        MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class);
        MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class);
        MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class);
         
        // 去掉job設置outputFormatClass,改為通過LazyOutputFormat設置   
        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);  
         
         //提交任務   
        return job.waitForCompletion(true)?0:1;
    }
     
    // 對Map內的數據進行排序(只適合小數據量)
    @SuppressWarnings("unchecked")
    public static Entry<String, Integer>[] getSortedHashtableByValue(Map<String, Integer> h) {   
        Entry<String, Integer>[] entries = (Entry<String, Integer>[]) h.entrySet().toArray(new Entry[0]);   
        // 排序
        Arrays.sort(entries, new Comparator<Entry<String, Integer>>() {
            public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {
                return entry2.getValue().compareTo(entry1.getValue());
            }  
        });
        return entries;   
    }
     
    public static void main(String[] args) throws Exception {
        String[] args0 = { "hdfs://centpy:9000/weibo/weibo.txt", "hdfs://centpy:9000/weibo/out/" };

//      String[] args0 = { "./data/weibo/weibo.txt", "./out/weibo/" };
        int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0);
        System.exit(ec);
    }
}      

WeiboInputFormat.java

package com.hadoop.WeiboCount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

//其實這個程序,就是在實現InputFormat接口,TVPlayInputFormat是InputFormat接口的實現類
//比如   WeiboInputFormat  extends FileInputFormat implements InputFormat。

//問:自定義輸入格式 WeiboInputFormat 類,首先繼承 FileInputFormat,然后分別重寫 isSplitable() 方法和 createRecordReader() 方法。


//線路是: boolean  isSplitable()   ->   RecordReader<Text,WeiBo> createRecordReader()   ->   WeiboRecordReader extends RecordReader<Text, WeiBo >
public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {//這是InputFormat的isSplitable方法
            //isSplitable方法就是是否要切分文件,這個方法顯示如果是壓縮文件就不切分,非壓縮文件就切分。
//        如果不允許分割,則isSplitable==false,則將第一個block、文件目錄、開始位置為0,長度為整個文件的長度封裝到一個InputSplit,加入splits中
//        如果文件長度不為0且支持分割,則isSplitable==true,獲取block大小,默認是64MB
        return false;    //整個文件封裝到一個InputSplit
        //要么就是return true;        //切分64MB大小的一塊一塊,再封裝到InputSplit
    }
    
   @Override
   public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
//        RecordReader<k1, v1>是返回類型,返回的RecordReader對象的封裝
//        createRecordReader是方法,在這里是,WeiboInputFormat.createRecordReader。WeiboInputFormat是InputFormat類的實例
//        InputSplit input和TaskAttemptContext context是傳入參數
        
//        isSplitable(),如果是壓縮文件就不切分,整個文件封裝到一個InputSplit
//        isSplitable(),如果是非壓縮文件就切,切分64MB大小的一塊一塊,再封裝到InputSplit
        
        //這里默認是系統實現的的RecordReader,按行讀取,下面我們自定義這個類WeiboRecordReader。
        //類似與Excel、WeiBo、TVPlayData代碼寫法
      
       // 自定義WeiboRecordReader類,按行讀取
        return new WeiboRecordReader(); //新建一個ScoreRecordReader實例,所有才有了上面RecordReader<Text,ScoreWritable>,所以才如下ScoreRecordReader,寫我們自己的
   }

   
   
   public class WeiboRecordReader extends RecordReader<Text, WeiBo>{
          //LineReader      in是1,行號。
            //Text line;      俞灝明    俞灝明    10591367    206    558,每行的相關記錄
           public LineReader in;  //行讀取器
           public Text line;//每行數據類型
          public Text lineKey;//自定義key類型,即k1
          public WeiBo lineValue;//自定義value類型,即v1
           
          @Override
          public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { //初始化,都是模板
             
              FileSplit split = (FileSplit)input;  // 獲取split
              
              Configuration job = context.getConfiguration(); // 獲取配置  
             
              Path file = split.getPath();  // 分片路徑  
               
              FileSystem fs = file.getFileSystem(job);  
            
              FSDataInputStream filein = fs.open(file);   // 打開文件    
               
              in=new LineReader(filein,job); //輸入流in
              line=new Text();//每行數據類型
              lineKey=new Text();//自定義key類型,即k1。//新建一個Text實例作為自定義格式輸入的key
              lineValue = new WeiBo();//自定義value類型,即v1。//新建一個TVPlayData實例作為自定義格式輸入的value
          }

          
          //此方法讀取每行數據,完成自定義的key和value
          @Override
          public boolean nextKeyValue() throws IOException, InterruptedException { //這里面,才是篡改的重點
              // 一行數據
              Text line = new Text();
               
              int linesize = in.readLine(line);  //line是每行數據,我們這里用到的是in.readLine(str)這個構造函數,默認讀完讀到文件末尾。其實這里有三種。
               
//            是SplitLineReader.readLine  ->  SplitLineReader  extends   LineReader  ->  org.apache.hadoop.util.LineReader

//            in.readLine(str)//這個構造方法執行時,會首先將value原來的值清空。默認讀完讀到文件末尾
//            in.readLine(str, maxLineLength)//只讀到maxLineLength行
//            in.readLine(str, maxLineLength, maxBytesToConsume)//這個構造方法來實現不清空,前面讀取的行的值
              
              if(linesize == 0)  
                  return false;  
               
              // 通過分隔符'\t',將每行的數據解析成數組 pieces
              String[] pieces = line.toString().split("\t");//因為,我們這里是。默認讀完讀到文件末尾。line是Text類型。pieces是String[],即String數組。
               
              if(pieces.length != 5){   
                  throw new IOException("Invalid record received");   
              }  
               
              int a,b,c;
              try{   
                  
                  a = Integer.parseInt(pieces[2].trim()); //粉絲數,//將String類型,如pieces[2]轉換成,float類型,給a  
                 
                  b = Integer.parseInt(pieces[3].trim()); //關注數
                 
                  c = Integer.parseInt(pieces[4].trim()); //微博數
              }catch(NumberFormatException nfe){   
                  throw new IOException("Error parsing floating poing value in record");   
              }
               
              //自定義key和value值
              lineKey.set(pieces[0]);  //完成自定義key數據  
              lineValue.set(a, b, c); //完成自定義value數據
               
//            或者寫
//            lineValue.set(Integer.parseInt(pieces[2].trim()),Integer.parseInt(pieces[3].trim()),Integer.parseInt(pieces[4].trim()));

           
              return true;
          }
           
          @Override
          public void close() throws IOException { //關閉輸入流
              if(in != null){
                  in.close();
              }
          }

          @Override
          public Text getCurrentKey() throws IOException, InterruptedException { //獲取當前的key,即CurrentKey
              return lineKey; //返回類型是Text,即Text lineKey
          }

          @Override
          public WeiBo getCurrentValue() throws IOException, InterruptedException { //獲取當前的Value,即CurrentValue
              return lineValue; //返回類型是WeiBo,即WeiBo lineValue
          }

          @Override
          public float getProgress() throws IOException, InterruptedException { //獲取進程,即Progress
              return 0; //返回類型是float,即float 0
          }
           
      }
}

實驗結果

 

 

以上就是博主為大家介紹的這一板塊的主要內容,這都是博主自己的學習過程,希望能給大家帶來一定的指導作用,有用的還望大家點個支持,如果對你沒用也望包涵,有錯誤煩請指出。如有期待可關注博主以第一時間獲取更新哦,謝謝! 

 

 版權聲明:本文為博主原創文章,未經博主允許不得轉載。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM