MapReduce編程模型詳解(基於Windows平台Eclipse)


本文基於Windows平台Eclipse,以使用MapReduce編程模型統計文本文件中相同單詞的個數來詳述了整個編程流程及需要注意的地方。不當之處還請留言指出。

前期准備

hadoop集群的搭建

編寫map階段的map函數

package com.cnblogs._52mm;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 第一個參數:默認情況下是mapreduce框架所讀文件的起始偏移量,類型為Long,在mr框架中類型為LongWritable
 * 第二個參數:默認情況下是框架所讀到的內容,類型為String,在mr框架中為Text
 * 第三個參數:框架輸出數據的key,在該單詞統計的編程模型中輸出的是單詞,類型為String,在mr框架中為Text
 * 第四個參數:框架輸出數據的value,在此是每個所對應單詞的個數,類型為Integer,在mr框架中為IntWritable
 * @author Administrator
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//	map階段的邏輯
//	對每一行輸入數據調用一次我們自定義的map()方法
	@Override
	protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
//		將傳入的每一行數據轉為String
		String line = value.toString();
//		根據空格將單詞划分
		String[] words = line.split(" ");
		
		for(String word: words){
			//將word作為輸出的key,1作為輸出的value    <word,1>
			context.write(new Text(word), new IntWritable(1));
		}
//		mr框架不會在map處理完一行數據就發給reduce,會先將結果收集
	}
}

編寫reduce階段的reduce函數

package com.cnblogs._52mm;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * reduce的輸入是map的輸出
 * 第一個和第二個參數分別是map的輸出類型
 * 第三個參數是reduce程序處理完后的輸出值key的類型,單詞,為Text類型
 * 第四個參數是輸出的value的類型,每個單詞所對應的總數,為IntWritable類型
 * @author Administrator
 *
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	/**
	 * map輸出的內容相當於:
	 * 			<i,1><i,1><i,1><i,1><i,1><i,1>...
	 * 			<am,1><am,1><am,1><am,1><am,1><am,1>...
	 * 			<you,1><you,1><you,1><you,1><you,1><you,1>...	
	 */
	
	
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int count = 0;
		
//		Iterator<IntWritable> iterator = values.iterator();
//		while(iterator.hasNext()){
//			count += iterator.next().get();
//		}
		
		for(IntWritable value: values){
			count += value.get();
		}
		
		context.write(key, new IntWritable(count));
	}
}

編寫驅動類

package com.cnblogs._52mm;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



/**
 * 相當於yarn集群的客戶端,封裝mapreduce的相關運行參數,指定jar包,提交給yarn
 * @author Administrator
 *
 */
public class WordCountDriver {
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Configuration conf = new Configuration();
//		將默認配置文件傳給job
		Job job = Job.getInstance(conf);
		
//		告訴yarn  jar包在哪
		job.setJarByClass(WordCountDriver.class);
		
		//指定job要使用的map和reduce
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		
//		指定map的輸出類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
//		指定最終輸出的類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
//		job的輸入數據所在的目錄
//		第一個參數:給哪個job設置
//		第二個參數:輸入數據的目錄,多個目錄用逗號分隔
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
//		job的數據輸出在哪個目錄
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//將jar包和配置文件提交給yarn
//		submit方法提交作業就退出該程序
//		job.submit();
		
//		waitForCompletion方法提交作業並等待作業執行
//		true表示將作業信息打印出來,該方法會返回一個boolean值,表示是否成功運行
		boolean result = job.waitForCompletion(true);
//		mr運行成功返回true,輸出0表示運行成功,1表示失敗
		System.exit(result?0:1);
	}
	
}

運行MapReduce程序

1、打jar包(鼠標右鍵工程-->Export)

2、上傳到hadoop集群上(集群中的任何一台都行),運行

#wordcounrt.jar是剛剛從eclipse打包上傳到linux的jar包
#com.cnblogs._52mm.WordCountDriver是驅動類的全名
#hdfs的/wordcount/input目錄下是需要統計單詞的文本
#程序輸出結果保存在hdfs的/wordcount/output目錄下(該目錄必須不存在,由hadoop程序自己創建)
hadoop jar wordcount.jar com.cnblogs._52mm.WordCountDriver /wordcount/input /wordcount/output

3、也可用yarn的web界面查看作業信息

ps:在這里可以看到作業的詳細信息,失敗還是成功一目了然

4、查看輸出結果

hadoop fs -cat /wordcount/output/part-r-00000

也可查看hdfs的web界面

報錯解決

Error: java.io.IOException: Unable to initialize any output collector
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)
    at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:695)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

該錯誤是由於編寫代碼時impor了錯誤的包導致的(我錯在Text包導錯了),仔細檢查一下,改正后重新打jar包上傳。

 Output directory hdfs://mini1:9000/wordcount/output already exists

顯然,該錯誤是由於reduce的輸出目錄必須是不存在才行,不能自己在hdfs上手動創建輸出目錄。

總結

  • map函數和reduce函數的輸入輸出類型要用hadoop提供的基本類型(可優化網絡序列化傳輸)
  • LongWritable類型相當於java的Long類型,IntWritable類型相當於java的Integer類型,Text類型相當於java的String類型
  • reduce函數的輸入類型等於map函數的輸出類型
  • Job對象控制整個作業的執行。
  • job對象的setJarByClass()方法傳遞一個類,hadoop利用這個類來找到相應的jar文件
  • 運行作業前,輸出目錄不應該存在,否則hadoop會報錯(為了防止覆蓋了之前該目錄下已有的數據)
  • setOutputKeyClass()和setOutputValueClass()控制map和reduce函數的輸出類型,這兩個函數的輸出類型一般相同,如果不同,則通過setMapOutputKeyClass()和setMapOutputValueClass()來設置map函數的輸出類型。
  • 輸入數據的類型默認是TextInputFormat(文本),可通過InputFormat類來改變。
  • Job中的waitForCompletion()方法提交作業並等待執行完成,傳入true作為參數則會將作業的詳細信息打印出來。作業執行成功返回true,執行失敗返回false。

產品-訂單統計(map端join)

原始數據

#訂單信息(id,訂單日期,商品id,商品數量)
1001,20150710,p0001,2
1002,20150710,p0001,3
1003,20150710,p0002,4
1004,20150710,p0002,5
#產品信息(商品id,商品名,商品類型id,單價)
p0001,小米5,1000,2000
p0002,錘子,1000,3000

結果數據

order_id=1002, dateString=20150710, p_id=p0001, amount=3, pname=小米5, category_id=1000, price=2000.0
order_id=1001, dateString=20150710, p_id=p0001, amount=2, pname=小米5, category_id=1000, price=2000.0
order_id=1004, dateString=20150710, p_id=p0002, amount=5, pname=錘子, category_id=1000, price=3000.0
order_id=1003, dateString=20150710, p_id=p0002, amount=4, pname=錘子, category_id=1000, price=3000.0

自定義bean

package com.xiaojie.join;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

//不將其當做key時只要實現writable即可,否則要用另一個還要實現比較大小,相同和hash三個方法
public class InfoBean implements Writable{
private int order_id;
private String dateString;
private String p_id;
private int amount;
private String pname;
private int category_id;
private float price;
//flag為0表示封裝的是訂單表的信息
//flag為1表示封裝的為商品信息
private String flag;
public InfoBean() {
	
}
public void set(int order_id, String dateString, String p_id, int amount, String pname, int category_id, float price, String flag) {
	this.order_id = order_id;
	this.dateString = dateString;
	this.p_id = p_id;
	this.amount = amount;
	this.pname = pname;
	this.category_id = category_id;
	this.price = price;
	this.flag = flag;
}
public String getFlag() {
	return flag;
}
public void setFlag(String flag) {
	this.flag = flag;
}
public int getOrder_id() {
	return order_id;
}
public void setOrder_id(int order_id) {
	this.order_id = order_id;
}
public String getDateString() {
	return dateString;
}
public void setDateString(String dateString) {
	this.dateString = dateString;
}
public String getP_id() {
	return p_id;
}
public void setP_id(String p_id) {
	this.p_id = p_id;
}
public int getAmount() {
	return amount;
}
public void setAmount(int amount) {
	this.amount = amount;
}
public String getPname() {
	return pname;
}
public void setPname(String pname) {
	this.pname = pname;
}
public int getCategory_id() {
	return category_id;
}
public void setCategory_id(int category_id) {
	this.category_id = category_id;
}
public float getPrice() {
	return price;
}
public void setPrice(float price) {
	this.price = price;
}

@Override
public void write(DataOutput out) throws IOException {
	out.writeInt(order_id);
	out.writeUTF(dateString);
	out.writeUTF(p_id);
	out.writeInt(amount);
	out.writeUTF(pname);
	out.writeInt(category_id);
	out.writeFloat(price);
	out.writeUTF(flag);
}

@Override
public void readFields(DataInput in) throws IOException {
	this.order_id = in.readInt();
	this.dateString = in.readUTF();
	this.p_id = in.readUTF();
	this.amount = in.readInt();
	this.pname = in.readUTF();
	this.category_id = in.readInt();
	this.price = in.readFloat();
	this.flag = in.readUTF();
}

@Override
public String toString() {
	return "order_id=" + order_id + ", dateString=" + dateString + ", p_id=" + p_id + ", amount=" + amount
			+ ", pname=" + pname + ", category_id=" + category_id + ", price=" + price + ", flag=" + flag ;
}

}

mapreduce代碼

package com.xiaojie.join;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Join {
    static class JoinMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
        InfoBean bean = new InfoBean();
        Text k = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String pid = "";
            FileSplit inputSplit =(FileSplit) context.getInputSplit();
            //獲取文件名
            String name = inputSplit.getPath().getName();
            if(name.startsWith("order")) {
                 String[] split = line.split(",");
                 //id date pid amount pname category_id price
                 pid = split[2];
                 bean.set(Integer.parseInt(split[0]), split[1], split[2], Integer.parseInt(split[3]), "", 0, (float) 0.0, "0");
            }else {
                 String[] split = line.split(",");
                 //pid  pname category price
                 pid = split[0];
                 bean.set(0, "", pid, 0, split[1], Integer.parseInt(split[2]), Float.parseFloat(split[3]), "1");
            }
            k.set(pid);
            context.write(k, bean);
        }
    }

    //reduce不用傳value出去,用NullWritable
    static class JoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable>{
//		key是商品的id,一個商品id對應一個商品信息,多個訂單信息。
        @Override
        protected void reduce(Text key, Iterable<InfoBean> beans,Context context) throws IOException, InterruptedException {
                InfoBean bean = new InfoBean();
                ArrayList<InfoBean> orderBeans = new ArrayList<InfoBean>();
                for(InfoBean obj:beans) {
                    //falg為“1”表示商品信息,為“0”表示訂單信息
                    if("1".equals(obj.getFlag())) {
                        try {
                            //將obj對象里的數據拷貝到另一個對象
                            BeanUtils.copyProperties(bean, obj);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }else {
                        InfoBean orderbean = new InfoBean();
                        try {
                            BeanUtils.copyProperties(orderbean, obj);
                        } catch (IllegalAccessException | InvocationTargetException e) {
                            e.printStackTrace();
                        }
                        orderBeans.add(orderbean);
                    }
                }
                //每個訂單信息里都封裝上商品名,商品分類id,商品價格
                for(InfoBean orderbean: orderBeans) {
                    orderbean.setPname(bean.getPname());
                    orderbean.setCategory_id(bean.getCategory_id());
                    orderbean.setPrice(bean.getPrice());
                    //輸出key,沒有value
                    context.write(orderbean, NullWritable.get());
                }
        }
    }

    //驅動類
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
//		將默認配置文件傳給job
        Job job = Job.getInstance(conf);
        //指定自定義的map數據分區器
//		job.setPartitionerClass(ProvincePartitioner.class);
        //根據partitioner里的分區數量,設置reduce的數量
//		job.setNumReduceTasks(5);
//		告訴yarn  jar包在哪
        job.setJarByClass(Join.class);
        //指定job要使用的map和reduce
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
//		指定map的輸出類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(InfoBean.class);
//		指定最終輸出的類型
        job.setOutputKeyClass(InfoBean.class);
        job.setOutputValueClass(NullWritable.class);
//		job的輸入數據所在的目錄
//		第一個參數:給哪個job設置
//		第二個參數:輸入數據的目錄,多個目錄用逗號分隔
        FileInputFormat.setInputPaths(job, new Path("/home/miao/test/input/product/");
//		job的數據輸出在哪個目錄
        FileOutputFormat.setOutputPath(job, new Path("/home/miao/test/output/product/");
        //將jar包和配置文件提交給yarn
//		submit方法提交作業就退出該程序
//		job.submit();
//		waitForCompletion方法提交作業並等待作業執行
//		true表示將作業信息打印出來,該方法會返回一個boolean值,表示是否成功運行
        boolean result = job.waitForCompletion(true);
//		mr運行成功返回true,輸出0表示運行成功,1表示失敗
//		System.exit(result?0:1);
        System.out.println(result?0:1);
    }
}

map端join

當兩張表的數據差距很大,有一張小表的情況下,可以將小表分發到所有的map節點,這樣map節點就可以在本地對自己所讀到的大表數據進行join並輸出最終結果,可以大大提高join操作的並發度,加快處理速度。這個案例里訂單表是大表,而產品表是小表。

package com.xiaojie.join;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.stat.descriptive.summary.Product;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapSideJoin {
    //這種方式可以避免reduce端數據量差距大
    static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
//		存放產品信息
        Map<String, String> productInfo_map = new HashMap<String, String>();
        Text k = new Text();
        //在調用map方法前會先調用setup方法一次  用來做一些初始化工作
        //在進行map方法前先獲取到商品信息
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("product.txt")));
            String line;
            //讀入一行數據看是否為空
            while(StringUtils.isNotEmpty(line = br.readLine())) {
                String[] fields = line.split(",");
//				商品id和商品名
                productInfo_map.put(fields[0], fields[1]);
            }
            br.close();
        }
        
        //由於已經有了完整的產品信息表,所以在map方法中就能完成join邏輯 傳入map的數據是訂單信息
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String order_info = value.toString();
            String[] fields = order_info.split(",");
            //根據商品id獲取商品的名稱  將名稱加入到訂單信息,即可完成任務
            String pname = productInfo_map.get(fields[1]);

            k.set(order_info + "," + pname);
            context.write(k, NullWritable.get());
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance();
        job.setJarByClass(MapSideJoin.class);
        job.setMapperClass(MapSideJoinMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("/home/miao/test/input/mapsideJoin/order2"));
        FileOutputFormat.setOutputPath(job, new Path("/home/miao/test/output/product2"));

//		job.addArchiveToClassPath(archive); 緩存jar包到task運行節點的classpath中
//		job.addFileToClassPath(file);//緩存普通文件到task運行節點的classpath中
//		job.addCacheArchive(uri);//緩存壓縮文件到task運行節點的工作目錄
//		job.addCacheFile(uri);//緩存普通文件到task運行節點的工作目錄

        //將產品信息表文件緩存到task工作節點的工作目錄中
        job.addCacheFile(new URI("file:///home/miao/test/input/mapsideJoin/product.txt"));

        //map端就能完成任務,不需要reduce,將reduce數量設為0
        job.setNumReduceTasks(0);

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

GroupingComparator(bean作為key,實現bean成組)

原始數據(訂單號,價格)

Order_0000001	222.8
Order_0000001	25.8
Order_0000002	522.8
Order_0000002	122.4
Order_0000002	722.4
Order_0000003	222.8

要求:
每個相同的訂單中消費價格最高的

方法:
在reduce端利用groupingcomparator將訂單id相同的kv聚合成組,然后取第一個即是最大值。

結果數據:

Order_0000001	222.8
Order_0000002	722.4
Order_0000003	222.8

自定義bean

package com.xiaojie.bean_key;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
/**
 * 訂單信息bean,實現hadoop的序列化機制
 */
public class OrderBean implements WritableComparable<OrderBean>{
    private Text itemid;
    private DoubleWritable amount;

    public OrderBean() {
    }
    public OrderBean(Text itemid, DoubleWritable amount) {
        set(itemid, amount);
    }

    public void set(Text itemid, DoubleWritable amount) {
        this.itemid = itemid;
        this.amount = amount;
    }

    public Text getItemid() {
        return itemid;
    }

    public DoubleWritable getAmount() {
        return amount;
    }

    @Override
    public int compareTo(OrderBean o) {
        int cmp = this.itemid.compareTo(o.getItemid());
        if (cmp == 0) {
        	//根據價格倒敘排序,為了得到價錢最大的哪個key
            cmp = -this.amount.compareTo(o.getAmount());
        }
        return cmp;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(itemid.toString());
        out.writeDouble(amount.get());

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        String readUTF = in.readUTF();
        double readDouble = in.readDouble();
        this.itemid = new Text(readUTF);
        this.amount= new DoubleWritable(readDouble);
    }
    
    @Override
    public String toString() {
        return itemid.toString() + "\t" + amount.get();
    }
}

自定義groupingcomparator

package com.xiaojie.bean_key;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class ItemidGroupingComparator extends WritableComparator{
	//傳入作為key的bean的class
    protected ItemidGroupingComparator() {
        super(OrderBean.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean abean = (OrderBean) a;
        OrderBean bbean = (OrderBean) b;
        //將item_id相同的bean都視為相同,從而聚合為一組
        //OrderBean對象的id是Text的類型,所以這里使用的是Text的compareTo方法
        //返回0表示兩個bean對象相同 
        return abean.getItemid().compareTo(bbean.getItemid());
    }
}

自定義分區邏輯

package com.xiaojie.bean_key;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class ItemIdPartitioner extends Partitioner<OrderBean,NullWritable> {
//	partitionr的輸入是map的輸出
    @Override
    public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) {
//		根據id使用hash分區,id相同的被分到同一個區
        return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

mapreduce代碼

package com.xiaojie.bean_key;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Bean_key {
    static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
        OrderBean bean = new OrderBean();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = StringUtils.split(line, "\t");
            bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));
            context.write(bean, NullWritable.get());
        }
    }

    static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
        //在設置了groupingcomparator以后,這里收到的kv數據 就是:  <1001 87.6>,null  <1001 76.5>,null  .... 
        //此時,reduce方法中的參數key就是上述kv組中的第一個kv的key:<1001 87.6>
        //要輸出同一個item的所有訂單中最大金額的那一個,就只要輸出這個key
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(Bean_key.class);
        job.setMapperClass(SecondarySortMapper.class);
        job.setReducerClass(SecondarySortReducer.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path("/home/miao/test/input/bean_key"));
        FileOutputFormat.setOutputPath(job, new Path("/home/miao/test/output/bean_key"));
        //指定shuffle所使用的GroupingComparator類
        job.setGroupingComparatorClass(ItemidGroupingComparator.class);
        //指定shuffle所使用的partitioner類
//		job.setPartitionerClass(ItemIdPartitioner.class);
//		job.setNumReduceTasks(3);
        job.waitForCompletion(true);
    }
}

自定義OutputFormat

工作描述:

數據是一批日志數據,里面有瀏覽網頁的信息,還有一個數據是前期通過爬蟲等技術爬取到的網頁數據(保存在數據庫里),要對日志數據做的處理是,從每條日志記錄里獲取到url信息,根據這條url信息對比數據庫里已經保存的信息,如果數據庫里有這個url,則將數據庫中這個url所對應的信息添加到這條日志數據的后面,保存到一個新的日志文件里;如果數據庫里沒有這個url,就將這個url保存到另一個文件里作為等待爬取的url。

自定義從數據庫讀數據的類

package com.xiaojie.logenhance;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;

public class DBLoader {
    public static void dbLoader(Map<String, String> rule_map) {
        //從數據庫中加載規則數據到Map里
        Connection conn = null;
        Statement st = null;
        ResultSet res = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/hadoop", "test", "123456");
            st = conn.createStatement();
            res = st.executeQuery("select url,content from url_rule");
            while (res.next()) {
                rule_map.put(res.getString(1), res.getString(2));
            }
        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            try{
                if(res!=null){
                    res.close();
                }
                if(st!=null){
                    st.close();
                }
                if(conn!=null){
                    conn.close();
                }
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }

//	public static void main(String[] args) {
//		DBLoader db = new DBLoader();
//		HashMap<String, String> map = new HashMap<String,String>();
//		db.dbLoader(map);
//		System.out.println(map.size());
//	}
}

mapreduce程序

package com.xiaojie.logenhance;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogEnhance {
    static class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
        Map<String, String> tag = new HashMap<String,String>();
        Text k = new Text();
        NullWritable v = NullWritable.get();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
//			從數據庫中讀取出數據加載到內存里
//			maptask在初始化時會先調用setup方法一次 利用這個機制,將外部的知識庫加載到maptask執行的機器內存中
            DBLoader.dbLoader(tag);
        }
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //計數器  第一個參數組號  第二個參數計數器名  全局的,所有map都能在這一個計數器上累加
            Counter counter = context.getCounter("err","err_line");
            String line = value.toString();
            String[] fields = line.split("\t");
            try {
//			獲取日志信息的url
            String url = fields[26];
            if (!url.startsWith("http")) {
                counter.increment(1);
            }
            String content = tag.get(url);
            //判斷內容標簽是否為空,如果是空則只輸出url和標記(標記信息表示數據庫里還沒有這個url所對應的信息,需要輸出到另外的文件夾另做處理)
            //否則輸出原來的那行日志數據+標簽內容
            if(content==null) {
                k.set(url+"\t"+"tocrawl"+"\n");
            }else {
                k.set(line+"\t"+content+"\n");
            }
            context.write(k, v);
            }catch (Exception e) {
//				計數器,遇到一條錯誤的數據(字段不完整)就加1
                counter.increment(1);
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(LogEnhance.class);
        job.setMapperClass(LogEnhanceMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        // 要將自定義的輸出格式組件設置到job中  默認是FileOutputFormat
        job.setOutputFormatClass(LogEnhanceOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path("/home/miao/test/input/log_enhance/"));
        // 雖然我們自定義了outputformat(自定義的里面已經有了數據的輸出位置),但是因為我們的outputformat繼承自fileoutputformat
        // 而fileoutputformat要輸出一個_SUCCESS文件,所以,在這還得指定一個輸出目錄,這個輸出目錄下只會有一個_SUCCESS文件
        FileOutputFormat.setOutputPath(job, new Path("/home/miao/test/output/log_enhance2/"));
        job.waitForCompletion(true);
        System.exit(0);
    }
}

自定義OutputFormat

package com.xiaojie.logenhance;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogEnhanceOutputFormat extends FileOutputFormat<Text, NullWritable> {
    //maptask或reducetask在輸出數據時,先調用OutputFormat的getRecordWriter方法獲取RecordWriter
    //再使用RecordWriter對象調用write(k,v)方法將數據寫出
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context)
            throws IOException, InterruptedException {
//		獲取hdfs客戶端
        FileSystem fs = FileSystem.get(context.getConfiguration());
//		Path logenhance = new Path("hdfs://mini1:9000/output/logenhance/log.data");
//		Path todosomething = new Path("hdfs://mini1:9000/output/todosomeing/log.data");
        Path logenhance = new Path("/home/miao/test/output/logenhance/log.data");
        Path todosomething = new Path("/home/miao/test/output/logenhance/url.data");
//		打開文件
        FSDataOutputStream enhanceString = fs.create(logenhance);
        FSDataOutputStream todosomething_String = fs.create(todosomething);
        //構造方法傳入兩個輸出流作為參數
        return new LogEnhanceRecordWriter(enhanceString,todosomething_String);
    }

    static class LogEnhanceRecordWriter extends RecordWriter<Text, NullWritable>{
        FSDataOutputStream enhanceString = null;
        FSDataOutputStream todosomething_String = null;
        public LogEnhanceRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) {
            super();
            this.enhanceString = enhanceOut;
            this.todosomething_String = toCrawlOut;
        }
        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
                String result = key.toString();
                //如果要寫出的數據是帶爬取的位置url,則寫出到/home/miao/test/output/logenhance/url.data
                if(result.contains("tocrawl")) {
                    todosomething_String.write(result.getBytes());
                }else {
                    //沒有標記的數據 寫出到log日志里 /home/miao/test/output/logenhance/log.data
                    enhanceString.write(result.getBytes());
                }
        }
		//關閉輸出流
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if(todosomething_String != null) {
                todosomething_String.close();
            }
            if(enhanceString != null) {
                enhanceString.close();
            }
        }
    }
}

多job串聯

復雜的作業往往需要多個mapreduce串聯執行,可再驅動類中創建好多個job並為多個job配置好相應的信息,再利用mapreduce框架的JobControl實現。

		//這個驅動類的前面已經創建並配置好了三個job
 		ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());
        ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());
        ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());

        cJob1.setJob(job1);
        cJob2.setJob(job2);
        cJob3.setJob(job3);

        // 設置作業依賴關系
        cJob2.addDependingJob(cJob1);
        cJob3.addDependingJob(cJob2);

        JobControl jobControl = new JobControl("RecommendationJob");
        jobControl.addJob(cJob1);
        jobControl.addJob(cJob2);
        jobControl.addJob(cJob3);

        // 新建一個線程來運行已加入JobControl中的作業,開始進程並等待結束
        Thread jobControlThread = new Thread(jobControl);
        jobControlThread.start();
        while (!jobControl.allFinished()) {
            Thread.sleep(500);
        }
        jobControl.stop();
        return 0;

實際工作中不建議用joncontrol的方式將多個job串聯來執行,因為這種方式會使得多個job打在了一個jar包里,只需要執行其中一部分job時無法解耦出來。更加建議的是使用shell腳本等方式將多個mapreduce程序串聯起來執行,這樣多個mapreduce程序完全解耦,更加方便控制。



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM