使用eclipse開發MapReduce


使用eclipse開發MapReduce項目更加方便(使用hadoop插件)

插件和window編譯程序下載地址:鏈接:https://pan.baidu.com/s/1iXp3MeiE8pXS3QevDJ24kw 提取碼:mzye

1.把插件jar包放到eclipse目錄的plugins下面


2.將Window編譯后的hadoop文件放到hadoop的bin目錄下


3.添加環境變量支持


4.修改hdfs-site.xml的配置

5.eclipse上配置



需要先打開虛擬機上的hadoop服務

然后才能連上去

6.准備要分析的數據並且上傳到hdfs 會在D盤的tmp文件下生成1-300.txt 里面就是要分析的數據

package com.blb.core;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
 * 300戶 每戶都會有一個清單文件
 * 商品是隨機  數量也是隨機
 * 洗漱用品 臉盆、杯子、牙刷和牙膏、毛巾、肥皂(洗衣服的)以及皂盒、洗發水和護發素、沐浴液   [1-5之間]
 * 床上用品 比如枕頭、枕套、枕巾、被子、被套、棉被、毯子、床墊、涼席   [0 1之間]
 * 家用電器 比如電磁爐、電飯煲、吹風機、電水壺、豆漿機、台燈等   [1-3之間]
 * 廚房用品 比如鍋、碗、瓢、盆、灶   [1-2 之間]
 * 柴、米、油、鹽、醬、醋 [1-6之間]  
 * 要生成300個文件 命名規則  1-300來表示 
 * @author Administrator
 *
 */
public class BuildBill {
	private static Random random=new Random(); //要還是不要
    private static List<String> washList=new ArrayList<>();
    private static List<String> bedList=new ArrayList<>();
    private static List<String> homeList=new ArrayList<>();
    private static List<String> kitchenList=new ArrayList<>();
	private static List<String> useList=new ArrayList<>();
	
	static{
		washList.add("臉盆");
		washList.add("杯子");
		washList.add("牙刷");
		washList.add("牙膏");
		washList.add("毛巾");
		washList.add("肥皂");
		washList.add("皂盒");
		washList.add("洗發水");
		washList.add("護發素");
		washList.add("沐浴液");
		///////////////////////////////
		bedList.add("枕頭");
		bedList.add("枕套");
		bedList.add("枕巾");
		bedList.add("被子");
		bedList.add("被套");
		bedList.add("棉被");
		bedList.add("毯子");
		bedList.add("床墊");
		bedList.add("涼席");
		//////////////////////////////
		homeList.add("電磁爐");
		homeList.add("電飯煲");
		homeList.add("吹風機");
		homeList.add("電水壺");
		homeList.add("豆漿機");
		homeList.add("電磁爐");
		homeList.add("台燈");
		//////////////////////////
		kitchenList.add("鍋");
		kitchenList.add("碗");
		kitchenList.add("瓢");
		kitchenList.add("盆");
		kitchenList.add("灶 ");
		////////////////////////
		useList.add("米");
		useList.add("油");
		useList.add("鹽");
		useList.add("醬");
		useList.add("醋");
	}
	//確定要還是不要 1/2 
	private static boolean iswant()
	{
		 int num=random.nextInt(1000);
	     if(num%2==0)
	     {
	    	 return true;
	     }
	     else
	     {
	    	 return false;
	     }
	}
	
	/**
	 * 表示我要幾個
	 * @param sum
	 * @return
	 */
	private static int wantNum(int sum)
	{
		return random.nextInt(sum);
	}
	
	
	
	//生成300個清單文件  格式如下
	//輸出的文件的格式 一定要是UTF-8
	//油     2
	public static void main(String[] args) {
		for(int i=1;i<=300;i++)
		{
			System.out.println(i);
			try {
				//字節流
			FileOutputStream out=new FileOutputStream(new File("D:\\tmp\\"+i+".txt"));
				
			//轉換流  可以將字節流轉換字符流  設定編碼格式 
			//字符流
			    BufferedWriter writer=new BufferedWriter(new OutputStreamWriter(out,"UTF-8"));
			    //隨機一下  我要不要  隨機一下 要幾個  再從我們的清單里面 隨機拿出幾個來 數量
			    boolean iswant1=iswant();
			    if(iswant1)
			    {
			    	//我要幾個 不能超過該類商品的總數目
			    	int wantNum = wantNum(washList.size()+1);
			    	//3
			    	for(int j=0;j<wantNum;j++)
			    	{
			    	String product=washList.get(random.nextInt(washList.size()));
			    	writer.write(product+"\t"+(random.nextInt(5)+1));
			    	writer.newLine();
			    	}
               }
			 
			    boolean iswant2=iswant();
			    if(iswant2)
			    {
			    	//我要幾個 不能超過該類商品的總數目
			    	int wantNum = wantNum(bedList.size()+1);
			    	//3
			    	for(int j=0;j<wantNum;j++)
			    	{
			    	String product=bedList.get(random.nextInt(bedList.size()));
			    	writer.write(product+"\t"+(random.nextInt(1)+1));
			    	writer.newLine();
			    	}
               }
			    
			    boolean iswant3=iswant();
			    if(iswant3)
			    {
			    	//我要幾個 不能超過該類商品的總數目
			    	int wantNum = wantNum(homeList.size()+1);
			    	//3
			    	for(int j=0;j<wantNum;j++)
			    	{
			    	String product=homeList.get(random.nextInt(homeList.size()));
			    	writer.write(product+"\t"+(random.nextInt(3)+1));
			    	writer.newLine();
			    	}
               }
			    boolean iswant4=iswant();
			    if(iswant4)
			    {
			    	//我要幾個 不能超過該類商品的總數目
			    	int wantNum = wantNum(kitchenList.size()+1);
			    	//3
			    	for(int j=0;j<wantNum;j++)
			    	{
			    	String product=kitchenList.get(random.nextInt(kitchenList.size()));
			    	writer.write(product+"\t"+(random.nextInt(2)+1));
			    	writer.newLine();
			    	}
               }
			    
			    boolean iswant5=iswant();
			    if(iswant5)
			    {
			    	//我要幾個 不能超過該類商品的總數目
			    	int wantNum = wantNum(useList.size()+1);
			    	//3
			    	for(int j=0;j<wantNum;j++)
			    	{
			    	String product=useList.get(random.nextInt(useList.size()));
			    	writer.write(product+"\t"+(random.nextInt(6)+1));
			    	writer.newLine();
			    	}
               }
			    writer.flush();
			    writer.close();
			} catch (FileNotFoundException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	
	
}

生成的文件上傳到hdfs

7.開始寫MapReduce程序

創建一個MapReduce項目


map階段

package com.blb.lyx;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class GoodCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
        //讀取一行的文件
        String line = ivalue.toString();
        //進行字符串的切分
        String[] split = line.split("\t");
        //寫入
        context.write(new Text(split[0]), new IntWritable(Integer.parseInt(split[1])));
    }

}

reduce階段

package com.blb.lyx;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class GoodCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable val : values) {
            //將IntWritable轉換為Int類型
            int i = val.get();
            sum += i;
        }
        context.write(_key, new IntWritable(sum));
    }

}

job階段

package com.blb.lyx;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class GoodCountDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //配置服務器的端口和地址
        conf.set("fs.defaultFS", "hdfs://192.168.43.61:9000");
        
        Job job = Job.getInstance(conf, "CountDriver");
        job.setJarByClass(GoodCountDriver.class);
        
        // TODO: specify a mapper
        job.setMapperClass(GoodCountMapper.class);
        // TODO: specify a reducer
        job.setReducerClass(GoodCountReducer.class);

        //如果reducer的key類型和map的key類型一樣,可以不寫map的key類型
        //如果reduce的value類型和map的value類型一樣,可以不寫map的value類型
        // TODO: specify output types
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // TODO: specify input and output DIRECTORIES (not files)
        FileInputFormat.setInputPaths(job, new Path("/tmp/"));
        FileOutputFormat.setOutputPath(job, new Path("/out2/"));

        if (!job.waitForCompletion(true))
            return;
    }

}

8.運行項目 主要運行在hadoop上 Run on Hadoop

運行成功

查看結果


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM