Hadoop——基礎練習題


一、Wordcount練習

  1.需求:通過hadoop分析文件中單詞總數

    1.要被分析的文件內容如圖所示,每個單詞之間以空格分開

      

     2.實現的效果如圖

      

 

  2.代碼實現

    1.解決數據傾斜問題

      考慮到在機器運行過程中 Reduce階段每個相同的Key會由一個ReduceTask來處理,而java共有十六萬個,其他的單詞只有幾個,分出的ReduceTask處理少的單詞很快就完成,但是處理的java的單詞會用一些時間才處理完,這就造成了嚴重的數據傾斜的狀況,所以在這種情況下應該創建多個分區,將整個數據分到不同的分區中, 然后分區中在處理的java單詞數量就會大大減少工作時間,然后將各個分區的統計在相加得出最終結果.具體實現看代碼

    2.編寫代碼

    

package com.wxw.superwc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Random;

/*在Hadoop中想要實現數據整合就必須有類去繼承 Mapper 和 Reduce 這兩個類
       Map主要用於數據的拆分后 以鍵值對的形式向Reduce輸出
           在繼承Mapper中 需要填入泛型約束他的參數類型
           Mapper<LongWritable, Text,Text, IntWritable>
           第一個參數類型 只能為LongWritable表示讀入文件的偏移量 這個偏移量是讀入行的Key
           第二個參數類型 表示傳進來的每一行數據 這一行數據內容是讀入文件的值
           第三個參數類型 表示出入的Key的類型
           第四個參數類型 標書輸出的Value的類型
           繼承Mapper后需要重寫方法 map(LongWritable key, Text value, Context context)
               map方法中的參數為Mapper的前兩個參數類型 context表示上下文 輸出給Reduce
               在文件讀出的每一行都要去調用一次map方法
       Reduce主要用於數據的計算
           計算的類繼承Reduce后需要填入他的泛型
           Reduce<Text,IntWritable,Text,IntWritable>
           表示<從Mapper接受他輸出Key的類型,從Mapper接受的value的數據類型,從Reduce輸出到文件Key的類型,從Reduce輸出到文件Value的類型>
           重寫reduce方法reduce(Text key, Iterable<IntWritable> values, Context context)
           在Reduce是處理的相同鍵不同值的value結合
       創建繼承分區Partitioner的類 並重寫方法
            getPartition(Text text, IntWritable intWritable, int i)
            三個參數表示從Mapper接受的鍵和值的類型及在主程序中設置的分區個數的值
       */

public class SuperWc {

    public static class SupWcMa extends Mapper<LongWritable, Text,Text, IntWritable>{//編寫繼承Mapper的類用於拆分
        //重寫map方法
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] s = value.toString().split(" ");//將讀取到的每一行數據按照空格拆分成數組
            for (String name ://利用循環 讓每一單詞作為鍵值為1去輸出
                    s) {
                context.write(new Text(name),new IntWritable(1));
            }
        }
    }
    public static class SpuWcRe extends Reducer<Text,IntWritable,Text,IntWritable>{//編寫繼承Reduce的類實現計算
        //重寫reduce方法
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum=0;//生成一個計數器
            for (IntWritable writable  :values//遍歷得相同鍵的值的集合 得到他們的值並通過sum計數統計出數量
                 ) {
                sum+=writable.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }
    public static class fenqu extends Partitioner<Text,IntWritable> {//編寫一個分區類 這個類在Mapper之后 Reduce之前運行
        //重寫分區的方法
        @Override
        public int getPartition(Text text, IntWritable intWritable, int i) {
            Random random = new Random();
            return random.nextInt(i);//在主程序中已經設好分區的個數為5 然后隨機的將map得來的值分到不同的區中,減少數據傾斜的弊端
        }
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //添加驅動
        Configuration configuration = new Configuration();//聲明驅動類
        Job job = Job.getInstance(configuration);//聲明配置類
        job.setMapperClass(SupWcMa.class);//設置運行Mapper的類
        job.setReducerClass(SpuWcRe.class);//設置運行Reduce的類
        job.setJarByClass(SuperWc.class);//設置在Linux的運行Jar包的類 一般為程序的主類
        //設置分區
        job.setPartitionerClass(fenqu.class);//加載 寫好的分區類
        job.setNumReduceTasks(5);//設置分區的個數 這個個數是設置分區隨機數傳的參
        //設置Mapper類輸入出類型參數
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //設置Reduce的輸出類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //設置文件讀取的路徑 最好是絕對值
        FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\superWc\\input"));
        //加載驅動
        FileSystem fs = FileSystem.get(configuration);
        //設置輸出路徑
        Path outPath = new Path("E:\\wordcountdemo\\superWc\\output");
        if (fs.exists(outPath)){
            fs.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job,outPath);
        //提交文件
        job.submit();
    }
}

每個分區會生成一個文件結果如下

再次編寫代碼整合每個分區的文件

  

package com.wxw.superwc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SuperWC2 {
    public static class SupWcMa extends Mapper<LongWritable, Text,Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
           //分割讀取的文件
            String[] s = value.toString().split("\t");
            //輸出得到文件的內容
                context.write(new Text(s[0]),new IntWritable(Integer.valueOf(s[1])));
        }
    }
    public static class SpuWcRe extends Reducer<Text,IntWritable,Text,IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //將相同鍵的值相加
            int sum=0;
            for (IntWritable writable  :values
            ) {
                sum+=writable.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //添加驅動
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setMapperClass(SupWcMa.class);
        job.setReducerClass(SpuWcRe.class);
        job.setJarByClass(SuperWC2.class);

        //設置輸入出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //設置文件加載
        FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\superWc\\output\\part*"));
        FileSystem fs = FileSystem.get(configuration);
        //設置輸出路徑
        Path outPath = new Path("E:\\wordcountdemo\\superWc\\output2");
        if (fs.exists(outPath)){
            fs.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job,outPath);
        job.submit();
    }
}

最終結果為

 

二、共同好友問題

1.先求出誰都有A的好友

package com.wxw;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class friend {
    //處理map
    public static class FDMapper extends Mapper<LongWritable, Text,Text,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            if (key.get()!=0){
                String fridens[]=value.toString().split(":");
                String user=fridens[0];
                String friendss[]=fridens[1].split(",");
                System.out.println(111);
                for (String f :friendss){

                    context.write(new Text(f),new Text(user));
                    System.out.println(f);
                }
            }

        }
    }
    //處理reduce
    public static class FDReduce extends Reducer<Text,Text,Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer stringBuffer=new StringBuffer();
            for (Text item :
                   values) {
                stringBuffer.append(item).append(",");
            }
        context.write(key,new Text(stringBuffer.toString()));
            System.out.println(stringBuffer);
        }
    }
    //添加主程序
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(friend.class);
        job.setMapperClass(FDMapper.class);
        job.setReducerClass(FDReduce.class);

        //指定map和reduce輸出數據的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\friend\\input"));

        FileSystem fs = FileSystem.get(conf);
        Path outPath = new Path("E:\\wordcountdemo\\friend\\output");
        if (fs.exists(outPath)){
            fs.delete(outPath,true);
        }

        FileOutputFormat.setOutputPath(job,outPath);
        job.submit();
    }
    }

效果如下

2.將得出的結果再次遍歷 產看兩兩結果的交集

package com.wxw;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Arrays;

public class friend2 {
    public static class Fd2Mapper extends Mapper<LongWritable, Text,Text,Text>{
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


            String user=value.toString().split("\t")[0];
            String[] aa =value.toString().split("\t")[1].split(",");
            Arrays.sort(aa);
            for (int i=0;i<aa.length-1;i++) {
                for (int j=i+1;j<aa.length;j++){
                    context.write(new Text(aa[i]+"-"+aa[j]),new Text(user));
                }

            }
        }
    }
    public static class FD2Reduce extends Reducer<Text,Text,Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer stringBuffer=new StringBuffer();
            for (Text te:values
                 ) {
                stringBuffer.append(te).append(",");

            }
            context.write(key,new Text(stringBuffer.toString()));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(friend2.class);
        job.setMapperClass(Fd2Mapper.class);
        job.setReducerClass(FD2Reduce.class);

        //指定map和reduce輸出數據的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\friend\\output\\"));

        FileSystem fs = FileSystem.get(conf);
        Path outPath = new Path("E:\\wordcountdemo\\friend\\output2");
        if (fs.exists(outPath)){
            fs.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job,outPath);
        job.submit();
    }
    }

運行效果:

三、通過JSON文件讀取並輸出對象

  1.將如下JSON轉成對象輸出

    

  2.題意分析 :本次練習只需要讀取文件並輸出,因此並不需要通過Reduce計算 代碼如下

      創建實體類

package com.wxw.phone;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//想要在Mapper中出現該類需要實現Writable的類 並且 實現Writable的兩個方法
public class Phone implements Writable {
    private String uid;
    private String phone;
    private String addr;

    @Override
    public String toString() {
        return "Phone{" +
                "uid='" + uid + '\'' +
                ", phone='" + phone + '\'' +
                ", addr='" + addr + '\'' +
                '}';
    }

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public String getAddr() {
        return addr;
    }

    public void setAddr(String addr) {
        this.addr = addr;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.uid);
        dataOutput.writeUTF(this.phone);
        dataOutput.writeUTF(this.addr);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.uid=dataInput.readUTF();
        this.phone=dataInput.readUTF();
        this.addr=dataInput.readUTF();
    }
}

 

  創建操作類

package com.wxw.phone;

import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;


import java.io.IOException;

public class PhoneMR {
    //本題只是讓map輸出對象 因此將輸出的Key設置為Phone對象 它的值為空
    public static class PhoneMap extends Mapper<LongWritable, Text,Phone, NullWritable>{
        //從寫map
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //將讀取的數據轉換格式防止輸出文件時出錯
            String line=new String(value.getBytes(),0,value.getLength(),"GBK");
            //聲明解析Json格式的類
            ObjectMapper obj=new ObjectMapper();
            //實例化要輸出的對象並通過解析將數據按照對象的類型寫入
            Phone phone=obj.readValue(line,Phone.class);
            //輸出
            context.write(phone, NullWritable.get());
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(PhoneMR.class);
        job.setMapperClass(PhoneMap.class);
        //只需設置MAp的輸出類型
        job.setOutputKeyClass(Phone.class);
        job.setOutputValueClass(NullWritable.class);
        //沒有reduce的操作所以就要將它的ReduceTask設置為0為了避免出錯
        job.setNumReduceTasks(0);

        FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\phone\\input"));

        FileSystem fs = FileSystem.get(conf);
        Path outPath = new Path("E:\\wordcountdemo\\phone\\output");
        if (fs.exists(outPath)){
            fs.delete(outPath,true);
        }

        FileOutputFormat.setOutputPath(job,outPath);

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

 

四、兩表連接

  1.如果要操作兩個表,那么就可以先將小的文件加載在緩存中,在Mapper操作中一起讀寫操作,這樣就可以完成兩個表的操作 如圖 將兩張表的數據整合到一張。

   

  2.代碼編寫

package com.wxw.join;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

//用於兩表聯合 主要是將小的表加載到內存中 連接到大的數據表
public class joinmap {
    public static class joinMap extends Mapper<LongWritable, Text,Text, NullWritable>{
        //定義集合
        Map map=new HashMap<String,String>();
        //次方法在本類中只執行一次 可以用於優化
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //創建緩沖池 讀取較小的文件
            BufferedReader br = new BufferedReader(new FileReader("E:\\wordcountdemo\\join\\input\\gou.txt"));
            String line=" ";
            while(StringUtils.isNotBlank(line=br.readLine())){
                //將讀取的文件存在集合中 方便在Mapper類中調用
                String[] split = line.split("\t");
                map.put(split[0],split[1]);
            }
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //設置字符編碼
            String line=new String(value.getBytes(),0,value.getLength(),"GBK");
            String[] split1 =line.split("\t");
            //通過if判斷達到數據清洗的效果
            if (split1.length>=2){
                System.out.println(split1[0]+" "+split1[1]+" "+map.get(split1[0]));
                //直接輸出
                context.write(new Text(split1[0]+split1[1]+" "+map.get(split1[0])),NullWritable.get());
            }

        }
    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //添加配置類
        Configuration conf=new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(joinmap.class);
        job.setMapperClass(joinMap.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //設置分區個數
        job.setNumReduceTasks(0);
        //設置文件讀取輸出配置
        FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\join\\input\\guc.txt"));
        FileSystem fs = FileSystem.get(conf);
        Path outPath = new Path("E:\\wordcountdemo\\join\\output");
        if (fs.exists(outPath)){
            fs.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job,outPath);
        job.submit();

    }
}

 

 五、按照某種條件分區的練習

  1.根據科目分區,將學生的其平均成績求出如圖

 

 

 2.代碼實現

   創建學生類

package com.wxw.score;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//實現WritableComparable 實現排序的方法
public class student implements WritableComparable <student> {
    private String name;
    private String subject;
    private double avg;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getSubject() {
        return subject;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public double getAvg() {
        return avg;
    }

    public void setAvg(double avg) {
        this.avg = avg;
    }


    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.name);
        dataOutput.writeUTF(this.subject);
        dataOutput.writeDouble(this.avg);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.name=dataInput.readUTF();
        this.subject=dataInput.readUTF();
        this.avg=dataInput.readDouble();
    }
    //自定義排序
    @Override
    public int compareTo(student o) {
        if (o.getAvg() == this.getAvg()){
            return   o.getName().compareTo(this.name);
        }else {
         return  o.getAvg() >this.getAvg()?1:-1;
        }
    }

    @Override
    public String toString() {
        return "student{" +
                "name='" + name + '\'' +
                ", subject='" + subject + '\'' +
                ", avg=" + avg +
                '}';
    }
}

 

 

 編寫代碼

package com.wxw.score;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.HashMap;

public class scoreTest {
    //創建分區的類
    public  static  class parnt extends Partitioner<student, NullWritable> {
        HashMap<String, Integer> map = new HashMap<>();
        @Override
        public int getPartition(student student,NullWritable s, int i) {
            //自定義一個分區的集合 根據map傳過來的值並且自動分區
            map.put("computer",0);
            map.put("english",1);
            map.put("algorithm",2);
            map.put("math",3);
            return  map.get(student.getSubject());
        }

    }
    public static class scoreMap extends Mapper<LongWritable, Text,student, NullWritable> {
        //map操作
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
           //切分
            String[] split = value.toString().split(",");
            //實例化總分
            Double sum=0.0;
            //實例化科目數
            int num=split.length-2;
            //實例化avg
            Double avg=0.0;
            //實例化學生對象
            student student=new student();
            student.setName(split[1]);
            student.setSubject(split[0]);
            for (int i=2;i<split.length;i++){
                sum += (Double) Double.valueOf(split[i]);
            }
            avg = sum/num*1.0;
            student.setAvg(avg);
            context.write(student,NullWritable.get());
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //添加驅動
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(scoreTest.class);
        job.setMapperClass(scoreMap.class);

        //設置分區
        job.setPartitionerClass(parnt.class);
        job.setNumReduceTasks(4);

        //設置輸入出
        job.setOutputKeyClass(student.class);
        job.setOutputValueClass(NullWritable.class);
        //設置文件加載
        FileInputFormat.setInputPaths(job,new Path("E:\\wordcountdemo\\score\\input"));
        FileSystem fs = FileSystem.get(configuration);
        Path outPath = new Path("E:\\wordcountdemo\\score\\output");
        if (fs.exists(outPath)){
            fs.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job,outPath);
        job.submit();
    }
}

 實現效果

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM