項目需求
自定義輸入格式,將明星微博數據排序后按粉絲數 關注數 微博數 分別輸出到不同文件中。
數據集
下面是部分數據,猛戳此鏈接下載完整數據集
數據格式: 明星 明星微博名稱 粉絲數 關注數 微博數
黃曉明 黃曉明 22616497 506 2011
張靚穎 張靚穎 27878708 238 3846
羅志祥 羅志祥 30763518 277 3843
劉嘉玲 劉嘉玲 12631697 350 2057
李娜 李娜 23309493 81 631
成龍 成龍 22485765 5 758
...
思路分析
自定義的InputFormat讀取明星微博數據,通過getSortedHashtableByValue分別對明星follower、friend、statuses數據進行排序,然后利用MultipleOutputs輸出不同項到不同的文件中。
程序
Weibo.java
package com.hadoop.WeiboCount;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/*
數據格式: 明星 明星微博名稱 粉絲數 關注數 微博數
黃曉明 黃曉明 22616497 506 2011
張靚穎 張靚穎 27878708 238 3846
張成龍2012 張成龍2012 9813621 199 744
羅志祥 羅志祥 30763518 277 3843
劉嘉玲 劉嘉玲 12631697 350 2057
吳君如大美女 吳君如大美女 18490338 190 412
柯震東Kai 柯震東Kai 31337479 219 795
李娜 李娜 23309493 81 631
徐小平 徐小平 11659926 1929 13795
唐嫣 唐嫣 24301532 200 2391
有斐君 有斐君 8779383 577 4251
孫燕姿 孫燕姿 21213839 68 342
成龍 成龍 22485765 5 758
*/
public class WeiBo implements WritableComparable<Object> {
// 其實這里,跟TVPlayData和ScoreWritable一樣的
// 注意: Hadoop通過Writable接口實現的序列化機制,不過沒有提供比較功能,所以和java中的Comparable接口合並,提供一個接口WritableComparable。(自定義比較)
// Writable接口提供兩個方法(write和readFields)。
// 直接利用java的基本數據類型int,定義成員變量fan、followers、microblogs
// 粉絲
private int fan;
// 關注
private int followers;
// 微博數
private int microblogs;
// 問:這里我們自己編程時,是一定要創建一個帶有參的構造方法,為什么還要顯式的寫出來一個帶無參的構造方法呢?
// 答:構造器其實就是構造對象實例的方法,無參數的構造方法是默認的,但是如果你創造了一個帶有參數的構造方法,那么無參的構造方法必須顯式的寫出來,否則會編譯失敗。
public WeiBo(){}; //java里的無參構造函數,是用來在創建對象時初始化對象
//在hadoop的每個自定義類型代碼里,好比,現在的WeiBo,都必須要寫無參構造函數。
//問:為什么我們在編程的時候,需要創建一個帶有參的構造方法?
//答:就是能讓賦值更靈活。構造一般就是初始化數值,你不想別人用你這個類的時候每次實例化都能用另一個構造動態初始化一些信息么(當然沒有需要額外賦值就用默認的)。
public WeiBo(int fan,int followers,int microblogs){
this.fan = fan;
this.followers = followers;
this.microblogs = microblogs;
}
//問:其實set和get方法,這兩個方法只是類中的setxxx和getxxx方法的總稱,
//那么,為什么在編程時,有set和set***兩個,只有get***一個呢?
public void set(int fan,int followers,int microblogs){
this.fan = fan;
this.followers = followers;
this.microblogs = microblogs;
}
// public float get(int fan,int followers,int microblogs){因為這是錯誤的,所以對於set可以分開,get只能是get***
// return fan;
// return followers;
// return microblogs;
//}
// 實現WritableComparable的readFields()方法,以便該數據能被序列化后完成網絡傳輸或文件輸入
// 對象不能傳輸的,需要轉化成字節流!
// 將對象轉換為字節流並寫入到輸出流out中是序列化,write 的過程(最好記!!!)
// 從輸入流in中讀取字節流反序列化為對象 是反序列化,readFields的過程(最好記!!!)
@Override
public void readFields(DataInput in) throws IOException {
//拿代碼來說的話,對象就是比如fan、followers。。。。
fan = in.readInt(); //因為,我們這里的對象是Int類型,所以是readInt()
followers = in.readInt();
microblogs = in.readInt(); //注意:反序列化里,需要生成對象對吧,所以,是用到的是get對象
// in.readByte()
// in.readChar()
// in.readDouble()
// in.readLine()
// in.readFloat()
// in.readLong()
// in.readShort()
}
// 實現WritableComparable的write()方法,以便該數據能被序列化后完成網絡傳輸或文件輸出
// 將對象轉換為字節流並寫入到輸出流out中是序列化,write 的過程(最好記!!!)
// 從輸入流in中讀取字節流反序列化為對象 是反序列化,readFields的過程(最好記!!!)
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(fan);
out.writeInt(followers); //因為,我們這里的對象是Int類型,所以是writeInt()
out.writeInt(microblogs); //注意:序列化里,需要對象對吧,所以,用到的是set那邊的對象
// out.writeByte()
// out.writeChar()
// out.writeDouble()
// out.writeFloat()
// out.writeLong()
// out.writeShort()
// out.writeUTF()
}
@Override
public int compareTo(Object o) { //java里的比較,Java String.compareTo()
// TODO Auto-generated method stub
return 0;
}
// Hadoop中定義了兩個序列化相關的接口:Writable 接口和 Comparable 接口,這兩個接口可以合並成一個接口 WritableComparable。
// Writable 接口中定義了兩個方法,分別為write(DataOutput out)和readFields(DataInput in)
// 所有實現了Comparable接口的對象都可以和自身相同類型的對象比較大小
// 源碼是
// package java.lang;
// import java.util.*;
// public interface Comparable {
// /**
// * 將this對象和對象o進行比較,約定:返回負數為小於,零為大於,整數為大於
// */
// public int compareTo(T o);
// }
public int getFan() {
return fan;
}
public void setFan(int fan) {
this.fan = fan;
}
public int getFollowers() {
return followers;
}
public void setFollowers(int followers) {
this.followers = followers;
}
public int getMicroblogs() {
return microblogs;
}
public void setMicroblogs(int microblogs) {
this.microblogs = microblogs;
}
}
WeiboCount.java
package com.hadoop.WeiboCount;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WeiboCount extends Configured implements Tool {
// tab分隔符
private static String TAB_SEPARATOR = "\t";
// 粉絲
private static String FAN = "fan";
// 關注
private static String FOLLOWERS = "followers";
// 微博數
private static String MICROBLOGS = "microblogs";
public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text> {
@Override
protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException {
// 粉絲
context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan()));
// 關注
context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers()));
// 微博數
context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs()));
}
}
public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> {
private MultipleOutputs<Text, IntWritable> mos;
protected void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, IntWritable>(context);
}
protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException {
Map<String,Integer> map = new HashMap< String,Integer>();
for(Text value : Values){ //增強型for循環,意思是把Values的值傳給Text value
// value = 名稱 + (粉絲數 或 關注數 或 微博數)
String[] records = value.toString().split(TAB_SEPARATOR);
map.put(records[0], Integer.parseInt(records[1].toString()));
}
// 對Map內的數據進行排序
Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(map);
for(int i = 0; i < entries.length;i++){
mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue());
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
@SuppressWarnings("deprecation")
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration(); // 配置文件對象
// 判斷路徑是否存在,如果存在,則刪除
Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
Job job = new Job(conf, "weibo"); // 構造任務
job.setJarByClass(WeiboCount.class); // 主類
// Mapper
job.setMapperClass(WeiBoMapper.class);
// Mapper key輸出類型
job.setMapOutputKeyClass(Text.class);
// Mapper value輸出類型
job.setMapOutputValueClass(Text.class);
// Reducer
job.setReducerClass(WeiBoReducer.class);
// Reducer key輸出類型
job.setOutputKeyClass(Text.class);
// Reducer value輸出類型
job.setOutputValueClass(IntWritable.class);
// 輸入路徑
FileInputFormat.addInputPath(job, new Path(args[0]));
// 輸出路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 自定義輸入格式
job.setInputFormatClass(WeiboInputFormat.class) ;
//自定義文件輸出類別
MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class);
MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class);
// 去掉job設置outputFormatClass,改為通過LazyOutputFormat設置
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
//提交任務
return job.waitForCompletion(true)?0:1;
}
// 對Map內的數據進行排序(只適合小數據量)
@SuppressWarnings("unchecked")
public static Entry<String, Integer>[] getSortedHashtableByValue(Map<String, Integer> h) {
Entry<String, Integer>[] entries = (Entry<String, Integer>[]) h.entrySet().toArray(new Entry[0]);
// 排序
Arrays.sort(entries, new Comparator<Entry<String, Integer>>() {
public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {
return entry2.getValue().compareTo(entry1.getValue());
}
});
return entries;
}
public static void main(String[] args) throws Exception {
String[] args0 = { "hdfs://centpy:9000/weibo/weibo.txt", "hdfs://centpy:9000/weibo/out/" };
// String[] args0 = { "./data/weibo/weibo.txt", "./out/weibo/" };
int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0);
System.exit(ec);
}
}
WeiboInputFormat.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
//其實這個程序,就是在實現InputFormat接口,TVPlayInputFormat是InputFormat接口的實現類
//比如 WeiboInputFormat extends FileInputFormat implements InputFormat。
//問:自定義輸入格式 WeiboInputFormat 類,首先繼承 FileInputFormat,然后分別重寫 isSplitable() 方法和 createRecordReader() 方法。
//線路是: boolean isSplitable() -> RecordReader<Text,WeiBo> createRecordReader() -> WeiboRecordReader extends RecordReader<Text, WeiBo >
public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{
@Override
protected boolean isSplitable(JobContext context, Path filename) {//這是InputFormat的isSplitable方法
//isSplitable方法就是是否要切分文件,這個方法顯示如果是壓縮文件就不切分,非壓縮文件就切分。
// 如果不允許分割,則isSplitable==false,則將第一個block、文件目錄、開始位置為0,長度為整個文件的長度封裝到一個InputSplit,加入splits中
// 如果文件長度不為0且支持分割,則isSplitable==true,獲取block大小,默認是64MB
return false; //整個文件封裝到一個InputSplit
//要么就是return true; //切分64MB大小的一塊一塊,再封裝到InputSplit
}
@Override
public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
// RecordReader<k1, v1>是返回類型,返回的RecordReader對象的封裝
// createRecordReader是方法,在這里是,WeiboInputFormat.createRecordReader。WeiboInputFormat是InputFormat類的實例
// InputSplit input和TaskAttemptContext context是傳入參數
// isSplitable(),如果是壓縮文件就不切分,整個文件封裝到一個InputSplit
// isSplitable(),如果是非壓縮文件就切,切分64MB大小的一塊一塊,再封裝到InputSplit
//這里默認是系統實現的的RecordReader,按行讀取,下面我們自定義這個類WeiboRecordReader。
//類似與Excel、WeiBo、TVPlayData代碼寫法
// 自定義WeiboRecordReader類,按行讀取
return new WeiboRecordReader(); //新建一個ScoreRecordReader實例,所有才有了上面RecordReader<Text,ScoreWritable>,所以才如下ScoreRecordReader,寫我們自己的
}
public class WeiboRecordReader extends RecordReader<Text, WeiBo>{
//LineReader in是1,行號。
//Text line; 俞灝明 俞灝明 10591367 206 558,每行的相關記錄
public LineReader in; //行讀取器
public Text line;//每行數據類型
public Text lineKey;//自定義key類型,即k1
public WeiBo lineValue;//自定義value類型,即v1
@Override
public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { //初始化,都是模板
FileSplit split = (FileSplit)input; // 獲取split
Configuration job = context.getConfiguration(); // 獲取配置
Path file = split.getPath(); // 分片路徑
FileSystem fs = file.getFileSystem(job);
FSDataInputStream filein = fs.open(file); // 打開文件
in=new LineReader(filein,job); //輸入流in
line=new Text();//每行數據類型
lineKey=new Text();//自定義key類型,即k1。//新建一個Text實例作為自定義格式輸入的key
lineValue = new WeiBo();//自定義value類型,即v1。//新建一個TVPlayData實例作為自定義格式輸入的value
}
//此方法讀取每行數據,完成自定義的key和value
@Override
public boolean nextKeyValue() throws IOException, InterruptedException { //這里面,才是篡改的重點
// 一行數據
Text line = new Text();
int linesize = in.readLine(line); //line是每行數據,我們這里用到的是in.readLine(str)這個構造函數,默認讀完讀到文件末尾。其實這里有三種。
// 是SplitLineReader.readLine -> SplitLineReader extends LineReader -> org.apache.hadoop.util.LineReader
// in.readLine(str)//這個構造方法執行時,會首先將value原來的值清空。默認讀完讀到文件末尾
// in.readLine(str, maxLineLength)//只讀到maxLineLength行
// in.readLine(str, maxLineLength, maxBytesToConsume)//這個構造方法來實現不清空,前面讀取的行的值
if(linesize == 0)
return false;
// 通過分隔符'\t',將每行的數據解析成數組 pieces
String[] pieces = line.toString().split("\t");//因為,我們這里是。默認讀完讀到文件末尾。line是Text類型。pieces是String[],即String數組。
if(pieces.length != 5){
throw new IOException("Invalid record received");
}
int a,b,c;
try{
a = Integer.parseInt(pieces[2].trim()); //粉絲數,//將String類型,如pieces[2]轉換成,float類型,給a
b = Integer.parseInt(pieces[3].trim()); //關注數
c = Integer.parseInt(pieces[4].trim()); //微博數
}catch(NumberFormatException nfe){
throw new IOException("Error parsing floating poing value in record");
}
//自定義key和value值
lineKey.set(pieces[0]); //完成自定義key數據
lineValue.set(a, b, c); //完成自定義value數據
// 或者寫
// lineValue.set(Integer.parseInt(pieces[2].trim()),Integer.parseInt(pieces[3].trim()),Integer.parseInt(pieces[4].trim()));
return true;
}
@Override
public void close() throws IOException { //關閉輸入流
if(in != null){
in.close();
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException { //獲取當前的key,即CurrentKey
return lineKey; //返回類型是Text,即Text lineKey
}
@Override
public WeiBo getCurrentValue() throws IOException, InterruptedException { //獲取當前的Value,即CurrentValue
return lineValue; //返回類型是WeiBo,即WeiBo lineValue
}
@Override
public float getProgress() throws IOException, InterruptedException { //獲取進程,即Progress
return 0; //返回類型是float,即float 0
}
}
}
實驗結果
以上就是博主為大家介紹的這一板塊的主要內容,這都是博主自己的學習過程,希望能給大家帶來一定的指導作用,有用的還望大家點個支持,如果對你沒用也望包涵,有錯誤煩請指出。如有期待可關注博主以第一時間獲取更新哦,謝謝!
版權聲明:本文為博主原創文章,未經博主允許不得轉載。