上一篇eclipse连Hadoop,并以hdfs运行为例;这篇说idea连Hadoop,并以Map Reduce为例。搬运资料并略作总结
一、项目布置
1、创建:new project -> java -> next …finish。
2、配置:File -> Project Structure:
左project 右jdk和SDK。
左modules 右dependencies -> + -> jars and dir ->加入依赖包,Hadoop安装目录share/hadoop/下的:
common,common/lib,hdfs,mapreduce,yarn这几个文件。
左artifact 右output directory -> 选择.class文件生成路径。
这就连好了Hadoop,相比于eclipse 不能看到云端文件结构,只好在浏览器中打开http://localhost:50070查看
二、运行文件
1、hdfs, FileSystem类有很完善的方法:
HDFSfile.java /************************************************************ Copyright (C), 1988-1999, Huawei Tech. Co., Ltd. FileName: HDFSfile.java Author: Light Version : version1.0 Date: 2018/7/16 Description:以通过hadoop中的fileSystem API进行文件的操作// 模块描述 Version: // 版本信息 实现了对hdfs文件的大部分操作 Function List: // 主要函数及其功能 1 创建目录mkdir("/idea/"); 2.创建文件create("/idea/haha.txt"); 3.查看hdfs文件内容read("/idea/text.txt"); 4文件重命名moveFile("/idea/haha.txt","/idea/hello.txt"); 5.上传文件putFile("G://text.txt","/idea/"); 6.下载文件getFile("/idea/abc.txt","G://"); 7.查询目录下的所有文件listStatus("/idea/"); 8.删除文件deleteFile("/idea/hello.txt"); History: // 历史修改记录 <author> <time> <version > <desc> Light 18/7/16 1.0 build this moudle ***********************************************************/ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.*; public class HDFSfile { Configuration conf; FileSystem filesystem; String DEFNAME="fs.defaultFS"; String HDFSURL="hdfs://192.168.72.10:9000"; @Before public void before() throws IOException { conf=new Configuration(); conf.set(DEFNAME, HDFSURL); filesystem=FileSystem.get(conf); } /** * junit测试函数 * @throws IOException */ @Test public void Text() throws IOException { //创建目录 //mkdir("/idea/"); //创建文件 //create("/idea/haha.txt"); //查看hdfs文件内容 //read("/idea/text.txt"); //文件重命名 //moveFile("/idea/haha.txt","/idea/hello.txt"); //上传文件 //putFile("G://text.txt","/idea/"); //下载文件 //getFile("/idea/abc.txt","G://"); //查询目录下的所有文件 //listStatus("/idea/"); //删除文件 //deleteFile("/idea/hello.txt"); } /** * 创建目录 * @param path 创建目录的地址(例:/hadoop/) * @throws IOException */ public void mkdir(String path) throws IOException { //创建hdfs目录 if(filesystem.exists(new Path(path))) { System.out.println("目录已存在"); } else { boolean result=filesystem.mkdirs(new Path(path)); System.out.println(result); } } /** * 创建文件 * @param path hdfs文件地址(例:/hadoop/abc.txt) * @throws IOException */ public void create(String path) throws IOException{ //创建文件 if(filesystem.exists(new Path(path))) { System.out.println("文件已存在"); } else { FSDataOutputStream outputStream= filesystem.create(new Path(path)); System.out.println("文件创建成功"); } } /** * 查看文件内容 * @param dst hdfs文件地址(例:/hadoop/abc.txt) * @throws IOException */ public void read(String dst) throws IOException { if(filesystem.exists(new Path(dst))) { FSDataInputStream inputstream=filesystem.open(new Path(dst)); InputStreamReader isr=new InputStreamReader(inputstream); BufferedReader br=new BufferedReader(isr); String str=br.readLine(); while(str!=null){ System.out.println(str); str=br.readLine(); } br.close(); isr.close(); inputstream.close(); } else { System.out.println("文件不存在"); } } /** * 将dst1重命名为dst2,也可以进行文件的移动 * @param oldpath 旧名 * @param newpath 新名 */ public void moveFile(String oldpath, String newpath) { Path path1 = new Path(oldpath); Path path2 = new Path(newpath); try { if (!filesystem.exists(path1)) { System.out.println(oldpath + " 文件不存在!"); return; } if (filesystem.exists(path2)) { System.out.println(newpath + "已存在!"); return; } // 将文件进行重命名,可以起到移动文件的作用 filesystem.rename(path1, path2); System.out.println("文件已重命名!"); } catch (IOException e) { e.printStackTrace(); } } /** * 上传文件到hdfs * @param local * @param dst */ public void putFile(String local, String dst) { try { // 从本地将文件拷贝到HDFS中,如果目标文件已存在则进行覆盖 filesystem.copyFromLocalFile(new Path(local), new Path(dst)); System.out.println("上传成功!"); // 关闭连接 } catch (IOException e) { System.out.println("上传失败!"); e.printStackTrace(); } } /** * 下载文件到本地 * @param dst * @param local */ public void getFile(String dst, String local) { try { if (!filesystem.exists(new Path(dst))) { System.out.println("文件不存在!"); } else { filesystem.copyToLocalFile(new Path(dst), new Path(local)); System.out.println("下载成功!"); } } catch (IOException e) { System.out.println("下载失败!"); e.printStackTrace(); } } /** * 显示目录下所有文件 * @param dst */ public void listStatus(String dst) { try { if (!filesystem.exists(new Path(dst))) { System.out.println("目录不存在!"); return; } // 得到文件的状态 FileStatus[] status = filesystem.listStatus(new Path(dst)); for (FileStatus s : status) { System.out.println(s.getPath().getName()); } } catch (IllegalArgumentException | IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 删除hdfs中的文件 * @param dst */ public void deleteFile(String dst) { try { if (!filesystem.exists(new Path(dst))) { System.out.println("文件不存在!"); } else { filesystem.delete(new Path(dst), true); System.out.println("删除成功!"); } } catch (IOException e) { System.out.println("删除失败!"); e.printStackTrace(); } } /** * 关闭filesyatem */ @After public void destory() { try { filesystem.close(); } catch (IOException e) { e.printStackTrace(); } } }
import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.util.ReflectionUtils; //https://www.cnblogs.com/frankdeng/p/9255935.html public class TestCompress { public static void main(String[] args) throws Exception, IOException { // 压缩已存在的compre-1 ;解压已存在的compre-1.bz2得到compre-1.bz2.decoded // compress("inDir/unzip1","org.apache.hadoop.io.compress.BZip2Codec"); decompres("inDir/unzip1.bz2"); //相对路径在项目下面 } /* * 压缩 * filername:要压缩文件的路径 * method:欲使用的压缩的方法(org.apache.hadoop.io.compress.BZip2Codec) */ public static void compress(String filername, String method) throws ClassNotFoundException, IOException { // 1 创建压缩文件路径的输入流 File fileIn = new File(filername); InputStream in = new FileInputStream(fileIn); Class codecClass = Class.forName(method);// 2 获取压缩的方式的类 Configuration conf = new Configuration();// 3 通过名称找到对应的编码/解码器 CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); // 4 该压缩方法对应的文件扩展名 File fileOut = new File(filername + codec.getDefaultExtension()); OutputStream out = new FileOutputStream(fileOut); CompressionOutputStream cout = codec.createOutputStream(out); // 5 流对接 IOUtils.copyBytes(in, cout, 1024 * 1024 * 5, false); // 缓冲区设为5MB // 6 关闭资源 in.close(); cout.close(); out.close(); } /* * 解压缩 * filename:希望解压的文件路径 */ public static void decompres(String filename) throws FileNotFoundException, IOException { Configuration conf = new Configuration(); CompressionCodecFactory factory = new CompressionCodecFactory(conf); // 1 获取文件的压缩方法 CompressionCodec codec = factory.getCodec(new Path(filename)); //conf -> factory(filename①) -> codec if (null == codec) { // 2 判断该压缩方法是否存在 System.out.println("Cannot find codec for file " + filename); return; } // 3 创建压缩文件的输入流 InputStream cin = codec.createInputStream(new FileInputStream(filename)); // 4 创建解压缩文件的输出流 File fout = new File(filename + ".decoded"); //filename② OutputStream out = new FileOutputStream(fout); // 5 流对接 IOUtils.copyBytes(cin, out, 1024 * 1024 * 5, false); // 6 关闭资源 cin.close(); out.close(); } }
2、mapreduce,先奉上几个文件体会一下,修改几处uri 并建立几个文件就能执行!
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountApp { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, WordCountApp.class.getSimpleName()); job.setJarByClass(WordCountApp.class); job.setMapperClass(MyMapper.class); // TODO: specify a mapper job.setReducerClass(MyReducer.class); // TODO: specify a reducer // TODO: specify output types job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job, new Path("inDir/word1")); // "hdfs://localhost:8010/usr/outDir/word1 " FileOutputFormat.setOutputPath(job, new Path( "hdfs://192.168.12.128:9000/usr/outDir/word1" )); //"outDir/MRApp" if(job.waitForCompletion(true))System.out.println("OK"); } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ Text k2 = new Text(); LongWritable v2 = new LongWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) context.getInputSplit(); System.out.println(fileSplit.getPath().getName()); //文件名 String[] split = value.toString().split(" "); for (String word : split) { k2.set(word); v2.set(1); context.write(k2, v2); } } } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ LongWritable lo=new LongWritable(); @Override protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable one : v2s) sum+=one.get(); lo.set(sum); context.write(k2, lo); } } }
理解:输入文件有n行 即n条记录<k1,v1>,默认k1是行偏移量-LongWritable类型值,v1是行内容-Text类型值。
在Mapper端的map函数中逐行处理<k1,v1>,并输出context.write(k2,v2):k2和v2的类型由自己设定 如Text, LongWritable,IntWritable。
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** A WritableComparable for ints. */ @InterfaceAudience.Public @InterfaceStability.Stable public class IntWritable implements WritableComparable<IntWritable> { private int value; public IntWritable() { } public IntWritable(int value) { set(value); } /** Set the value of this IntWritable. */ public void set(int value) { this.value = value; } /** Return the value of this IntWritable. */ public int get() { return value; } @Override public void readFields(DataInput in) throws IOException { value = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(value); } /** Returns true iff <code>o</code> is a IntWritable with the same value. */ @Override public boolean equals(Object o) { if (!(o instanceof IntWritable)) return false; IntWritable other = (IntWritable) o; return this.value == other.value; } @Override public int hashCode() { return value; } /** Compares two IntWritables. */ @Override public int compareTo(IntWritable o) { int thisValue = this.value; int thatValue = o.value; return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1)); } @Override public String toString() { return Integer.toString(value); } /** A Comparator optimized for IntWritable. */ public static class Comparator extends WritableComparator { public Comparator() { super(IntWritable.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int thisValue = readInt(b1, s1); int thatValue = readInt(b2, s2); return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1)); } } static { // register this comparator WritableComparator.define(IntWritable.class, new Comparator()); } }
在Reducer端的reduce函数中接收k2和v2,并输出contex.write(k3,v3):输出文件将是m行<k3,v3>。 它这里是逐k2处理:
protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) { long sum = 0; for (LongWritable one : v2s) sum+=one.get(); v3.set(sum); context.write(k2, v3); //这里把k2不做处理直接当k3输出 }
运行一下会更懂!
3、分区输出:把m个<k2,v3>输出到几个不同文件中
import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; import java.util.ArrayList; import java.util.List; import java.util.Random; public class WriteFile { public static void main(String[] args) { long start=System.currentTimeMillis(); int numOfFiles = 20, numOfRecorders = 10000; String uri = "outDir/data"; //本地文件位置,修改合適的位置 FileOutputStream fout = null; Random ra = new Random(); try { for (int i = 1; i <= numOfFiles; i++) { System.out.println("writing file#"+i); fout = new FileOutputStream(new File(uri + "/file" + i)); PrintStream pStream = new PrintStream(new BufferedOutputStream(fout)); List<String> list = new ArrayList<String>(); for (int j = 0; j < numOfRecorders; j++) list.add(ra.nextInt(numOfRecorders) + "\t" + ra.nextInt(numOfFiles)); //1-99999 for (String str : list) { pStream.println(str); //一次性输出 } pStream.close(); fout.close(); } } catch (Exception e) { e.printStackTrace(); } finally { } long end=System.currentTimeMillis(); System.out.println("write "+numOfFiles+" files successfully in "+ (end-start)+"ms"); } }
import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SortPart { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, SortPart.class.getSimpleName()); job.setJarByClass(SortPart.class); job.setMapperClass(MyMapper.class); // TODO: specify a mapper job.setReducerClass(Reducer.class); // TODO: specify a reducer job.setPartitionerClass(MyPartitioner.class); //指定数据分区规则 job.setNumReduceTasks(6); // TODO: specify output types job.setOutputKeyClass(MyIntWritable.class); job.setOutputValueClass(IntWritable.class); FileSystem local = FileSystem.getLocal(conf); FileStatus[] inputFiles = local.listStatus(new Path("outDir/data")); for (int i = 1; i < inputFiles.length; ++i) FileInputFormat.addInputPath(job, inputFiles[i].getPath()); FileOutputFormat.setOutputPath(job, new Path("outDir/MRSortPart")); if(job.waitForCompletion(true))System.out.println("OK"); } public static class MyMapper extends Mapper<LongWritable, Text, MyIntWritable, IntWritable>{ MyIntWritable k2 = new MyIntWritable(); IntWritable v2 = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); k2.set(Integer.parseInt(split[0])); v2.set(Integer.parseInt(split[1])); context.write(k2, v2); } } public static class MyIntWritable extends IntWritable { @Override public int compareTo(IntWritable o) { if(o.get() < this.get()){ return 1; }else if (o.get() == this.get()){ return 0; }else{ return -1; } } } public static class MyPartitioner extends Partitioner<MyIntWritable, IntWritable> { //k2,v2 @Override public int getPartition(MyIntWritable k, IntWritable v, int numPartitions) { if(k.get()>=10000)return 5; return k.get()/2000; } } }
1 public static class MyPartitioner extends Partitioner<MyIntWritable, IntWritable> { 2 @Override //这是mapper和reduce的中间层 3 public int getPartition(MyIntWritable k, IntWritable v, int numPartitions) {//k2,v2 4 if(k.get()>=10000)return 5; 5 return k.get()/2000; //由k2的值设置 其所在的分区, 6 } 7 }
4、k3 v3字典序输出
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparable; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SecondSortMapReduce { /** * 使用内部类的形式,定义mapper程序 * @author Administrator * 2018年5月31日上午11:06:30 */ static class MyMapper extends Mapper<LongWritable, Text, CombinationKey, IntWritable>{ String[] split=null; CombinationKey kv=new CombinationKey(); IntWritable v=new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { split = value.toString().split(" "); kv.setFirstKey(split[0]); int vv = Integer.parseInt(split[1]); v.set(vv); kv.setSecondKey(vv); context.write(kv, v); } } /** * 使用内部类的形式,定义reduce程序 * @author Administrator * 2018年5月31日上午11:06:51 */ static class MyReducer extends Reducer<CombinationKey, IntWritable, Text, Text>{ Text k=new Text(); Text v=new Text(); @Override protected void reduce(CombinationKey first_second, Iterable<IntWritable> seconds, Context context) throws IOException, InterruptedException { StringBuilder sb=new StringBuilder(); for(IntWritable second:seconds) { sb.append(second.get()+","); } k.set(first_second.getFirstKey()); v.set(sb.toString().substring(0, sb.toString().length()-1)); context.write(k, v); } } /** * 主函数 * @param args * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondSortMapReduce.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //设置分区和reduce数目 job.setPartitionerClass(DefinedPartition.class); job.setNumReduceTasks(1); //设置自定义的分组策略 //job.setGroupingComparatorClass(DefinedGroupSort.class); //设置自定义的比较策略 job.setSortComparatorClass(DefineCompparator.class); job.setMapOutputKeyClass(CombinationKey.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入数据 FileInputFormat.setInputPaths(job, new Path("inDir/Second")); FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8010/usr/outDir/Second")); //“outDir/second” boolean res = job.waitForCompletion(true); System.exit(res?0:1); } /** * 自定义组合键,用于map阶段的sort小阶段 * @author Administrator * 2018年5月31日上午8:16:38 */ public static class CombinationKey implements WritableComparable<CombinationKey>{ private String firstKey; private Integer secondKey; public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public Integer getSecondKey() { return secondKey; } public void setSecondKey(Integer secondKey) { this.secondKey = secondKey; } public void write(DataOutput out) throws IOException { out.writeUTF(this.firstKey); out.writeInt(this.secondKey); } public void readFields(DataInput in) throws IOException { this.firstKey=in.readUTF(); this.secondKey=in.readInt(); } public int compareTo(CombinationKey o) { return this.firstKey.compareTo(o.getFirstKey()); } } /** * 自定义比较器 * @author Administrator * 2018年5月31日上午8:40:58 */ public static class DefineCompparator extends WritableComparator{ protected DefineCompparator() { super(CombinationKey.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { CombinationKey ck1=(CombinationKey) a; CombinationKey ck2=(CombinationKey) b; int cp1 = ck1.getFirstKey().compareTo(ck2.getFirstKey()); if(cp1!=0) { //结束排序 return cp1; }else { return ck1.getSecondKey()-ck2.getSecondKey(); } } } /** * 自定义分区 * @author Administrator * 2018年5月31日上午8:20:58 */ public static class DefinedPartition extends Partitioner<CombinationKey, IntWritable>{ /** * @param key map输出,这里根据组合键的第一个值进行分区 * @param value map输出的key * @param numPartitions 分区总数,即reduce的个数 */ @Override public int getPartition(CombinationKey key, IntWritable value, int numPartitions) { return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; } } }
hadoop 781 hadoop 45 hello 830 hadoop 598 hello 82 what 243 hello 256 name 450 what 691 hadoop 233 what 102 name 103 hello 301 name 36
写不动了。
