idea上運行Hadoop


上一篇eclipse連Hadoop,並以hdfs運行為例;這篇說idea連Hadoop,並以Map Reduce為例。搬運資料並略作總結

一、項目布置

  1、創建:new project -> java -> next …finish。

  2、配置:File -> Project Structure:

  左project  右jdk和SDK。

  左modules   右dependencies -> +  ->  jars and dir  ->加入依賴包,Hadoop安裝目錄share/hadoop/下的:

         common,common/lib,hdfs,mapreduce,yarn這幾個文件。

  左artifact  右output directory  -> 選擇.class文件生成路徑。

  這就連好了Hadoop,相比於eclipse 不能看到雲端文件結構,只好在瀏覽器中打開http://localhost:50070查看

二、運行文件

  1、hdfs, FileSystem類有很完善的方法

HDFSfile.java
/************************************************************   
 Copyright (C), 1988-1999, Huawei Tech. Co., Ltd.   
 FileName: HDFSfile.java 
 Author: Light     
 Version : version1.0      
 Date: 2018/7/16
 Description:以通過hadoop中的fileSystem API進行文件的操作// 模塊描述         
 Version:         // 版本信息 
    實現了對hdfs文件的大部分操作
 Function List:     // 主要函數及其功能     
 1 創建目錄mkdir("/idea/");
 2.創建文件create("/idea/haha.txt");
 3.查看hdfs文件內容read("/idea/text.txt");
 4文件重命名moveFile("/idea/haha.txt","/idea/hello.txt");
 5.上傳文件putFile("G://text.txt","/idea/");
 6.下載文件getFile("/idea/abc.txt","G://");
 7.查詢目錄下的所有文件listStatus("/idea/");
 8.刪除文件deleteFile("/idea/hello.txt");
  History: 
   // 歷史修改記錄 
 <author>  <time>   <version >   <desc>       
 Light    18/7/16     1.0     build this moudle   
 ***********************************************************/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.*;

public class HDFSfile {
    Configuration conf;
    FileSystem filesystem;
    String DEFNAME="fs.defaultFS";
    String HDFSURL="hdfs://192.168.72.10:9000";
    @Before
    public void before() throws IOException {
        conf=new Configuration();
        conf.set(DEFNAME, HDFSURL);
        filesystem=FileSystem.get(conf);

    }

    /**
     * junit測試函數
     * @throws IOException
     */
    @Test
    public void Text() throws IOException {
        //創建目錄
        //mkdir("/idea/");

        //創建文件
        //create("/idea/haha.txt");

        //查看hdfs文件內容
        //read("/idea/text.txt");

        //文件重命名
        //moveFile("/idea/haha.txt","/idea/hello.txt");

        //上傳文件
        //putFile("G://text.txt","/idea/");

        //下載文件
        //getFile("/idea/abc.txt","G://");

        //查詢目錄下的所有文件
        //listStatus("/idea/");

        //刪除文件
        //deleteFile("/idea/hello.txt");
    }

    /**
     * 創建目錄
     * @param path 創建目錄的地址(例:/hadoop/)
     * @throws IOException
     */
    public void mkdir(String path) throws IOException {
        //創建hdfs目錄
        if(filesystem.exists(new Path(path)))
        {
            System.out.println("目錄已存在");
        }
        else
        {
            boolean result=filesystem.mkdirs(new Path(path));
            System.out.println(result);
        }

    }

    /**
     * 創建文件
     * @param path hdfs文件地址(例:/hadoop/abc.txt)
     * @throws IOException
     */
    public  void create(String path) throws IOException{
        //創建文件
        if(filesystem.exists(new Path(path)))
        {
            System.out.println("文件已存在");
        }
        else
        {
            FSDataOutputStream outputStream=  filesystem.create(new Path(path));
            System.out.println("文件創建成功");
        }
    }

    /**
     * 查看文件內容
     * @param dst hdfs文件地址(例:/hadoop/abc.txt)
     * @throws IOException
     */
    public void read(String dst) throws IOException {
        if(filesystem.exists(new Path(dst)))
        {
            FSDataInputStream inputstream=filesystem.open(new Path(dst));
            InputStreamReader isr=new InputStreamReader(inputstream);
            BufferedReader br=new BufferedReader(isr);
            String str=br.readLine();
            while(str!=null){
                System.out.println(str);
                str=br.readLine();
            }
            br.close();
            isr.close();
            inputstream.close();
        }
       else
        {
            System.out.println("文件不存在");
        }
    }

    /**
     * 將dst1重命名為dst2,也可以進行文件的移動
     * @param oldpath 舊名
     * @param newpath 新名
     */
    public void moveFile(String oldpath, String newpath) {
        Path path1 = new Path(oldpath);
        Path path2 = new Path(newpath);
        try {
            if (!filesystem.exists(path1)) {
                System.out.println(oldpath + " 文件不存在!");
                return;
            }
            if (filesystem.exists(path2)) {
                System.out.println(newpath + "已存在!");
                return;
            }
            // 將文件進行重命名,可以起到移動文件的作用
            filesystem.rename(path1, path2);
            System.out.println("文件已重命名!");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 上傳文件到hdfs
     * @param local
     * @param dst
     */
    public void putFile(String local, String dst) {
        try {
            // 從本地將文件拷貝到HDFS中,如果目標文件已存在則進行覆蓋
            filesystem.copyFromLocalFile(new Path(local), new Path(dst));
            System.out.println("上傳成功!");
            // 關閉連接
        } catch (IOException e) {
            System.out.println("上傳失敗!");
            e.printStackTrace();
        }
    }

    /**
     * 下載文件到本地
     * @param dst
     * @param local
     */
    public void getFile(String dst, String local) {
        try {
            if (!filesystem.exists(new Path(dst))) {
                System.out.println("文件不存在!");
            } else {
                filesystem.copyToLocalFile(new Path(dst), new Path(local));
                System.out.println("下載成功!");
            }
        } catch (IOException e) {
            System.out.println("下載失敗!");
            e.printStackTrace();
        }
    }


    /**
     * 顯示目錄下所有文件
     * @param dst
     */
    public void listStatus(String dst) {
        try {
            if (!filesystem.exists(new Path(dst))) {
                System.out.println("目錄不存在!");
                return;
            }
            // 得到文件的狀態
            FileStatus[] status = filesystem.listStatus(new Path(dst));
            for (FileStatus s : status) {
                System.out.println(s.getPath().getName());
            }

        } catch (IllegalArgumentException | IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 刪除hdfs中的文件
     * @param dst
     */
    public void deleteFile(String dst) {
        try {
            if (!filesystem.exists(new Path(dst))) {
                System.out.println("文件不存在!");
            } else {
                filesystem.delete(new Path(dst), true);
                System.out.println("刪除成功!");
            }
        } catch (IOException e) {
            System.out.println("刪除失敗!");
            e.printStackTrace();
        }
    }


    /**
     * 關閉filesyatem
     */
    @After
    public void destory()
    {
        try {
            filesystem.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
HDFSfile.java  試試這個Test文件吧,記得改一下uri!   
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
//https://www.cnblogs.com/frankdeng/p/9255935.html
public class TestCompress {

    public static void main(String[] args) throws Exception, IOException {
        // 壓縮已存在的compre-1 ;解壓已存在的compre-1.bz2得到compre-1.bz2.decoded
        //    compress("inDir/unzip1","org.apache.hadoop.io.compress.BZip2Codec");
        decompres("inDir/unzip1.bz2");        //相對路徑在項目下面
    }

    /*
     * 壓縮
     * filername:要壓縮文件的路徑
     * method:欲使用的壓縮的方法(org.apache.hadoop.io.compress.BZip2Codec)
     */
    public static void compress(String filername, String method) throws ClassNotFoundException, IOException {

        // 1 創建壓縮文件路徑的輸入流
        File fileIn = new File(filername);
        InputStream in = new FileInputStream(fileIn);


        Class codecClass = Class.forName(method);// 2 獲取壓縮的方式的類
        Configuration conf = new Configuration();// 3 通過名稱找到對應的編碼/解碼器
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);

        // 4 該壓縮方法對應的文件擴展名
        File fileOut = new File(filername + codec.getDefaultExtension());
        OutputStream out = new FileOutputStream(fileOut);

        CompressionOutputStream cout = codec.createOutputStream(out);   // 5 流對接
        IOUtils.copyBytes(in, cout, 1024 * 1024 * 5, false); // 緩沖區設為5MB

        // 6 關閉資源
        in.close();
        cout.close();
        out.close();
    }

    /*
     * 解壓縮
     * filename:希望解壓的文件路徑
     */
    public static void decompres(String filename) throws FileNotFoundException, IOException {

        Configuration conf = new Configuration();
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);

        // 1 獲取文件的壓縮方法
        CompressionCodec codec = factory.getCodec(new Path(filename)); //conf -> factory(filename①) -> codec


        if (null == codec) {    // 2 判斷該壓縮方法是否存在
            System.out.println("Cannot find codec for file " + filename);
            return;
        }

        // 3 創建壓縮文件的輸入流
        InputStream cin = codec.createInputStream(new FileInputStream(filename));

        // 4 創建解壓縮文件的輸出流
        File fout = new File(filename + ".decoded"); //filename②
        OutputStream out = new FileOutputStream(fout);

        // 5 流對接
        IOUtils.copyBytes(cin, out, 1024 * 1024 * 5, false);

        // 6 關閉資源
        cin.close();  out.close();
    }
}
TestComprss.java 以前學過的文件壓縮一並奉上(無關主題,忘了從哪摘的) 

   2、mapreduce,先奉上幾個文件體會一下,修改幾處uri 並建立幾個文件就能執行!

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountApp {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, WordCountApp.class.getSimpleName());
        job.setJarByClass(WordCountApp.class);

        job.setMapperClass(MyMapper.class);     // TODO: specify a mapper
        job.setReducerClass(MyReducer.class);   // TODO: specify a reducer

        // TODO: specify output types
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // TODO: specify input and output DIRECTORIES (not files)
        FileInputFormat.setInputPaths(job, new Path("inDir/word1"));  //   "hdfs://localhost:8010/usr/outDir/word1 "
        FileOutputFormat.setOutputPath(job, new Path( "hdfs://192.168.12.128:9000/usr/outDir/word1" )); //"outDir/MRApp"
        if(job.waitForCompletion(true))System.out.println("OK");
    }
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
        Text k2 = new Text();
        LongWritable v2 = new LongWritable();
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            System.out.println(fileSplit.getPath().getName());  //文件名
            String[] split = value.toString().split(" ");
            for (String word : split) {
                k2.set(word);
                v2.set(1);
                context.write(k2, v2);
            }
        }
    }
    public  static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
        LongWritable lo=new LongWritable();
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException {
            long sum = 0;
            for (LongWritable one : v2s)
                sum+=one.get(); lo.set(sum);
            context.write(k2, lo);
        }
    }
}
經典的WordCountApp.java  執行時需要新建inDir/word文件 並寫點東西 

   理解:輸入文件有n行 即n條記錄<k1,v1>,默認k1是行偏移量-LongWritable類型值,v1是行內容-Text類型值。

    在Mapper端的map函數中逐行處理<k1,v1>,並輸出context.write(k2,v2):k2和v2的類型由自己設定 如Text, LongWritable,IntWritable。    

import java.io.DataInput;

import java.io.DataOutput;
import java.io.IOException;


import org.apache.hadoop.classification.InterfaceAudience;

import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;


/** A WritableComparable for ints. */

@InterfaceAudience.Public
@InterfaceStability.Stable
public class IntWritable implements WritableComparable<IntWritable> {
    private int value;

    public IntWritable() {
    }

    public IntWritable(int value) {
        set(value);
    }

    /** Set the value of this IntWritable. */
    public void set(int value) {
        this.value = value;
    }

    /** Return the value of this IntWritable. */
    public int get() {
        return value;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        value = in.readInt();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(value);
    }

    /** Returns true iff <code>o</code> is a IntWritable with the same value. */
    @Override
    public boolean equals(Object o) {
        if (!(o instanceof IntWritable))
            return false;
        IntWritable other = (IntWritable) o;
        return this.value == other.value;
    }

    @Override
    public int hashCode() {
        return value;
    }

    /** Compares two IntWritables. */
    @Override
    public int compareTo(IntWritable o) {
        int thisValue = this.value;
        int thatValue = o.value;
        return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
    }

    @Override
    public String toString() {
        return Integer.toString(value);
    }

    /** A Comparator optimized for IntWritable. */
    public static class Comparator extends WritableComparator {
        public Comparator() {
            super(IntWritable.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            int thisValue = readInt(b1, s1);
            int thatValue = readInt(b2, s2);
            return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
        }
    }

    static { // register this comparator
        WritableComparator.define(IntWritable.class, new Comparator());
    }
}
來看看IntWritable數據類型長什么樣   

    在Reducer端的reduce函數中接收k2和v2,並輸出contex.write(k3,v3):輸出文件將是m行<k3,v3>。  它這里是逐k2處理:

 protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) {
    long sum = 0;
        for (LongWritable one : v2s)
            sum+=one.get(); v3.set(sum);
            context.write(k2, v3);  //這里把k2不做處理直接當k3輸出
    } 

  運行一下會更懂!

  3、分區輸出:把m個<k2,v3>輸出到幾個不同文件中

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class WriteFile {
    public static void main(String[] args) {
        long start=System.currentTimeMillis();
        int numOfFiles = 20, numOfRecorders = 10000;
        String uri = "outDir/data";  //本地文件位置,修改合適的位置

        FileOutputStream fout = null;
        Random ra = new Random();
        try {
            for (int i = 1; i <= numOfFiles; i++) {
                System.out.println("writing file#"+i);
                fout = new FileOutputStream(new File(uri + "/file" + i));
                PrintStream pStream = new PrintStream(new BufferedOutputStream(fout));

                List<String> list = new ArrayList<String>();
                for (int j = 0; j < numOfRecorders; j++)
                    list.add(ra.nextInt(numOfRecorders) + "\t" + ra.nextInt(numOfFiles));    //1-99999
                for (String str : list) {
                    pStream.println(str);   //一次性輸出
                }
                pStream.close();
                fout.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally { }
        long end=System.currentTimeMillis();
        System.out.println("write "+numOfFiles+" files successfully in "+ (end-start)+"ms");
    }
}
Writable.java 生成一些文件,后面會用
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class SortPart {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, SortPart.class.getSimpleName());
        job.setJarByClass(SortPart.class);

        job.setMapperClass(MyMapper.class);              // TODO: specify a mapper
        job.setReducerClass(Reducer.class);              // TODO: specify a reducer
        job.setPartitionerClass(MyPartitioner.class);    //指定數據分區規則
        job.setNumReduceTasks(6);

        // TODO: specify output types
        job.setOutputKeyClass(MyIntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        FileSystem local = FileSystem.getLocal(conf);
        FileStatus[] inputFiles = local.listStatus(new Path("outDir/data"));
        for (int i = 1; i < inputFiles.length; ++i)
            FileInputFormat.addInputPath(job, inputFiles[i].getPath());
        FileOutputFormat.setOutputPath(job, new Path("outDir/MRSortPart"));
        if(job.waitForCompletion(true))System.out.println("OK");
    }
    public static class MyMapper extends Mapper<LongWritable, Text, MyIntWritable, IntWritable>{
        MyIntWritable k2 = new MyIntWritable();
        IntWritable v2 = new IntWritable();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] split = value.toString().split("\t");

            k2.set(Integer.parseInt(split[0]));
            v2.set(Integer.parseInt(split[1]));
            context.write(k2, v2);

        }
    }
    public static class MyIntWritable extends IntWritable {
        @Override
        public int compareTo(IntWritable o) {
            if(o.get() < this.get()){
                return 1;
            }else if (o.get() == this.get()){
                return 0;
            }else{
                return -1;
            }
        }
    }
    public static class MyPartitioner extends Partitioner<MyIntWritable, IntWritable> { //k2,v2
        @Override
        public int getPartition(MyIntWritable k, IntWritable v, int numPartitions) {
            if(k.get()>=10000)return 5;
            return k.get()/2000;
        }
    }
}
SortPart.java 輸入文件夾就是Writable的輸出文件夾
1   public static class MyPartitioner extends Partitioner<MyIntWritable, IntWritable> { 
2         @Override                                                  //這是mapper和reduce的中間層
3         public int getPartition(MyIntWritable k, IntWritable v, int numPartitions) {//k2,v2
4             if(k.get()>=10000)return 5;
5             return k.get()/2000;        //由k2的值設置 其所在的分區,
6         }
7

 4、k3 v3字典序輸出

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SecondSortMapReduce {
    /**
     * 使用內部類的形式,定義mapper程序
     * @author Administrator
     * 2018年5月31日上午11:06:30
     */
    static class MyMapper extends Mapper<LongWritable, Text, CombinationKey, IntWritable>{
        String[] split=null;
        CombinationKey kv=new CombinationKey();
        IntWritable v=new IntWritable();
        @Override
        protected void map(LongWritable key, Text value,
                           Context context)
                throws IOException, InterruptedException {
            split = value.toString().split(" ");
            kv.setFirstKey(split[0]);
            int vv = Integer.parseInt(split[1]);
            v.set(vv);
            kv.setSecondKey(vv);
            context.write(kv, v);
        }
    }
    /**
     * 使用內部類的形式,定義reduce程序
     * @author Administrator
     * 2018年5月31日上午11:06:51
     */
    static class MyReducer extends Reducer<CombinationKey, IntWritable, Text, Text>{
        Text k=new Text();
        Text v=new Text();
        @Override
        protected void reduce(CombinationKey first_second, Iterable<IntWritable> seconds,
                              Context context)
                throws IOException, InterruptedException {
            StringBuilder sb=new StringBuilder();
            for(IntWritable second:seconds) {
                sb.append(second.get()+",");
            }
            k.set(first_second.getFirstKey());
            v.set(sb.toString().substring(0, sb.toString().length()-1));
            context.write(k, v);
        }
    }
    /**
     * 主函數
     * @param args
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf=new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(SecondSortMapReduce.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

            //設置分區和reduce數目
        job.setPartitionerClass(DefinedPartition.class);
        job.setNumReduceTasks(1);
            //設置自定義的分組策略
            //job.setGroupingComparatorClass(DefinedGroupSort.class);
            //設置自定義的比較策略
        job.setSortComparatorClass(DefineCompparator.class);

        job.setMapOutputKeyClass(CombinationKey.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //設置輸入數據
        FileInputFormat.setInputPaths(job, new Path("inDir/Second"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8010/usr/outDir/Second"));  //“outDir/second”

        boolean res = job.waitForCompletion(true);

        System.exit(res?0:1);

    }




    /**
     * 自定義組合鍵,用於map階段的sort小階段
     * @author Administrator
     * 2018年5月31日上午8:16:38
     */
    public static class CombinationKey implements WritableComparable<CombinationKey>{

        private String firstKey;
        private Integer secondKey;
        public String getFirstKey() {
            return firstKey;
        }

        public void setFirstKey(String firstKey) {
            this.firstKey = firstKey;
        }

        public Integer getSecondKey() {
            return secondKey;
        }

        public void setSecondKey(Integer secondKey) {
            this.secondKey = secondKey;
        }

        public void write(DataOutput out) throws IOException {
            out.writeUTF(this.firstKey);
            out.writeInt(this.secondKey);
        }

        public void readFields(DataInput in) throws IOException {
            this.firstKey=in.readUTF();
            this.secondKey=in.readInt();
        }

        public int compareTo(CombinationKey o) {
            return this.firstKey.compareTo(o.getFirstKey());
        }


    }

    /**
     * 自定義比較器
     * @author Administrator
     * 2018年5月31日上午8:40:58
     */
    public static class DefineCompparator extends WritableComparator{

        protected DefineCompparator() {
            super(CombinationKey.class,true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {

            CombinationKey ck1=(CombinationKey) a;
            CombinationKey ck2=(CombinationKey) b;
            int cp1 = ck1.getFirstKey().compareTo(ck2.getFirstKey());
            if(cp1!=0) {
                //結束排序
                return cp1;
            }else {
                return  ck1.getSecondKey()-ck2.getSecondKey();
            }
        }
    }
    /**
     * 自定義分區
     * @author Administrator
     * 2018年5月31日上午8:20:58
     */
    public static class DefinedPartition extends Partitioner<CombinationKey, IntWritable>{
        /**
         * @param key map輸出,這里根據組合鍵的第一個值進行分區
         * @param value map輸出的key
         * @param numPartitions 分區總數,即reduce的個數
         */
        @Override
        public int getPartition(CombinationKey key, IntWritable value, int numPartitions) {
            return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
        }

    }
}
SecondSortMapReduce.java 忘了從哪復制的 
hadoop 781
hadoop 45
hello 830
hadoop 598
hello 82
what 243
hello 256
name 450
what 691
hadoop 233
what 102
name 103
hello 301
name 36
測試數據  

寫不動了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM