上一篇eclipse連Hadoop,並以hdfs運行為例;這篇說idea連Hadoop,並以Map Reduce為例。搬運資料並略作總結
一、項目布置
1、創建:new project -> java -> next …finish。
2、配置:File -> Project Structure:
左project 右jdk和SDK。
左modules 右dependencies -> + -> jars and dir ->加入依賴包,Hadoop安裝目錄share/hadoop/下的:
common,common/lib,hdfs,mapreduce,yarn這幾個文件。
左artifact 右output directory -> 選擇.class文件生成路徑。
這就連好了Hadoop,相比於eclipse 不能看到雲端文件結構,只好在瀏覽器中打開http://localhost:50070查看
二、運行文件
1、hdfs, FileSystem類有很完善的方法:
HDFSfile.java /************************************************************ Copyright (C), 1988-1999, Huawei Tech. Co., Ltd. FileName: HDFSfile.java Author: Light Version : version1.0 Date: 2018/7/16 Description:以通過hadoop中的fileSystem API進行文件的操作// 模塊描述 Version: // 版本信息 實現了對hdfs文件的大部分操作 Function List: // 主要函數及其功能 1 創建目錄mkdir("/idea/"); 2.創建文件create("/idea/haha.txt"); 3.查看hdfs文件內容read("/idea/text.txt"); 4文件重命名moveFile("/idea/haha.txt","/idea/hello.txt"); 5.上傳文件putFile("G://text.txt","/idea/"); 6.下載文件getFile("/idea/abc.txt","G://"); 7.查詢目錄下的所有文件listStatus("/idea/"); 8.刪除文件deleteFile("/idea/hello.txt"); History: // 歷史修改記錄 <author> <time> <version > <desc> Light 18/7/16 1.0 build this moudle ***********************************************************/ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.*; public class HDFSfile { Configuration conf; FileSystem filesystem; String DEFNAME="fs.defaultFS"; String HDFSURL="hdfs://192.168.72.10:9000"; @Before public void before() throws IOException { conf=new Configuration(); conf.set(DEFNAME, HDFSURL); filesystem=FileSystem.get(conf); } /** * junit測試函數 * @throws IOException */ @Test public void Text() throws IOException { //創建目錄 //mkdir("/idea/"); //創建文件 //create("/idea/haha.txt"); //查看hdfs文件內容 //read("/idea/text.txt"); //文件重命名 //moveFile("/idea/haha.txt","/idea/hello.txt"); //上傳文件 //putFile("G://text.txt","/idea/"); //下載文件 //getFile("/idea/abc.txt","G://"); //查詢目錄下的所有文件 //listStatus("/idea/"); //刪除文件 //deleteFile("/idea/hello.txt"); } /** * 創建目錄 * @param path 創建目錄的地址(例:/hadoop/) * @throws IOException */ public void mkdir(String path) throws IOException { //創建hdfs目錄 if(filesystem.exists(new Path(path))) { System.out.println("目錄已存在"); } else { boolean result=filesystem.mkdirs(new Path(path)); System.out.println(result); } } /** * 創建文件 * @param path hdfs文件地址(例:/hadoop/abc.txt) * @throws IOException */ public void create(String path) throws IOException{ //創建文件 if(filesystem.exists(new Path(path))) { System.out.println("文件已存在"); } else { FSDataOutputStream outputStream= filesystem.create(new Path(path)); System.out.println("文件創建成功"); } } /** * 查看文件內容 * @param dst hdfs文件地址(例:/hadoop/abc.txt) * @throws IOException */ public void read(String dst) throws IOException { if(filesystem.exists(new Path(dst))) { FSDataInputStream inputstream=filesystem.open(new Path(dst)); InputStreamReader isr=new InputStreamReader(inputstream); BufferedReader br=new BufferedReader(isr); String str=br.readLine(); while(str!=null){ System.out.println(str); str=br.readLine(); } br.close(); isr.close(); inputstream.close(); } else { System.out.println("文件不存在"); } } /** * 將dst1重命名為dst2,也可以進行文件的移動 * @param oldpath 舊名 * @param newpath 新名 */ public void moveFile(String oldpath, String newpath) { Path path1 = new Path(oldpath); Path path2 = new Path(newpath); try { if (!filesystem.exists(path1)) { System.out.println(oldpath + " 文件不存在!"); return; } if (filesystem.exists(path2)) { System.out.println(newpath + "已存在!"); return; } // 將文件進行重命名,可以起到移動文件的作用 filesystem.rename(path1, path2); System.out.println("文件已重命名!"); } catch (IOException e) { e.printStackTrace(); } } /** * 上傳文件到hdfs * @param local * @param dst */ public void putFile(String local, String dst) { try { // 從本地將文件拷貝到HDFS中,如果目標文件已存在則進行覆蓋 filesystem.copyFromLocalFile(new Path(local), new Path(dst)); System.out.println("上傳成功!"); // 關閉連接 } catch (IOException e) { System.out.println("上傳失敗!"); e.printStackTrace(); } } /** * 下載文件到本地 * @param dst * @param local */ public void getFile(String dst, String local) { try { if (!filesystem.exists(new Path(dst))) { System.out.println("文件不存在!"); } else { filesystem.copyToLocalFile(new Path(dst), new Path(local)); System.out.println("下載成功!"); } } catch (IOException e) { System.out.println("下載失敗!"); e.printStackTrace(); } } /** * 顯示目錄下所有文件 * @param dst */ public void listStatus(String dst) { try { if (!filesystem.exists(new Path(dst))) { System.out.println("目錄不存在!"); return; } // 得到文件的狀態 FileStatus[] status = filesystem.listStatus(new Path(dst)); for (FileStatus s : status) { System.out.println(s.getPath().getName()); } } catch (IllegalArgumentException | IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 刪除hdfs中的文件 * @param dst */ public void deleteFile(String dst) { try { if (!filesystem.exists(new Path(dst))) { System.out.println("文件不存在!"); } else { filesystem.delete(new Path(dst), true); System.out.println("刪除成功!"); } } catch (IOException e) { System.out.println("刪除失敗!"); e.printStackTrace(); } } /** * 關閉filesyatem */ @After public void destory() { try { filesystem.close(); } catch (IOException e) { e.printStackTrace(); } } }
import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.util.ReflectionUtils; //https://www.cnblogs.com/frankdeng/p/9255935.html public class TestCompress { public static void main(String[] args) throws Exception, IOException { // 壓縮已存在的compre-1 ;解壓已存在的compre-1.bz2得到compre-1.bz2.decoded // compress("inDir/unzip1","org.apache.hadoop.io.compress.BZip2Codec"); decompres("inDir/unzip1.bz2"); //相對路徑在項目下面 } /* * 壓縮 * filername:要壓縮文件的路徑 * method:欲使用的壓縮的方法(org.apache.hadoop.io.compress.BZip2Codec) */ public static void compress(String filername, String method) throws ClassNotFoundException, IOException { // 1 創建壓縮文件路徑的輸入流 File fileIn = new File(filername); InputStream in = new FileInputStream(fileIn); Class codecClass = Class.forName(method);// 2 獲取壓縮的方式的類 Configuration conf = new Configuration();// 3 通過名稱找到對應的編碼/解碼器 CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); // 4 該壓縮方法對應的文件擴展名 File fileOut = new File(filername + codec.getDefaultExtension()); OutputStream out = new FileOutputStream(fileOut); CompressionOutputStream cout = codec.createOutputStream(out); // 5 流對接 IOUtils.copyBytes(in, cout, 1024 * 1024 * 5, false); // 緩沖區設為5MB // 6 關閉資源 in.close(); cout.close(); out.close(); } /* * 解壓縮 * filename:希望解壓的文件路徑 */ public static void decompres(String filename) throws FileNotFoundException, IOException { Configuration conf = new Configuration(); CompressionCodecFactory factory = new CompressionCodecFactory(conf); // 1 獲取文件的壓縮方法 CompressionCodec codec = factory.getCodec(new Path(filename)); //conf -> factory(filename①) -> codec if (null == codec) { // 2 判斷該壓縮方法是否存在 System.out.println("Cannot find codec for file " + filename); return; } // 3 創建壓縮文件的輸入流 InputStream cin = codec.createInputStream(new FileInputStream(filename)); // 4 創建解壓縮文件的輸出流 File fout = new File(filename + ".decoded"); //filename② OutputStream out = new FileOutputStream(fout); // 5 流對接 IOUtils.copyBytes(cin, out, 1024 * 1024 * 5, false); // 6 關閉資源 cin.close(); out.close(); } }
2、mapreduce,先奉上幾個文件體會一下,修改幾處uri 並建立幾個文件就能執行!
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountApp { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, WordCountApp.class.getSimpleName()); job.setJarByClass(WordCountApp.class); job.setMapperClass(MyMapper.class); // TODO: specify a mapper job.setReducerClass(MyReducer.class); // TODO: specify a reducer // TODO: specify output types job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job, new Path("inDir/word1")); // "hdfs://localhost:8010/usr/outDir/word1 " FileOutputFormat.setOutputPath(job, new Path( "hdfs://192.168.12.128:9000/usr/outDir/word1" )); //"outDir/MRApp" if(job.waitForCompletion(true))System.out.println("OK"); } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ Text k2 = new Text(); LongWritable v2 = new LongWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) context.getInputSplit(); System.out.println(fileSplit.getPath().getName()); //文件名 String[] split = value.toString().split(" "); for (String word : split) { k2.set(word); v2.set(1); context.write(k2, v2); } } } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ LongWritable lo=new LongWritable(); @Override protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable one : v2s) sum+=one.get(); lo.set(sum); context.write(k2, lo); } } }
理解:輸入文件有n行 即n條記錄<k1,v1>,默認k1是行偏移量-LongWritable類型值,v1是行內容-Text類型值。
在Mapper端的map函數中逐行處理<k1,v1>,並輸出context.write(k2,v2):k2和v2的類型由自己設定 如Text, LongWritable,IntWritable。
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** A WritableComparable for ints. */ @InterfaceAudience.Public @InterfaceStability.Stable public class IntWritable implements WritableComparable<IntWritable> { private int value; public IntWritable() { } public IntWritable(int value) { set(value); } /** Set the value of this IntWritable. */ public void set(int value) { this.value = value; } /** Return the value of this IntWritable. */ public int get() { return value; } @Override public void readFields(DataInput in) throws IOException { value = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(value); } /** Returns true iff <code>o</code> is a IntWritable with the same value. */ @Override public boolean equals(Object o) { if (!(o instanceof IntWritable)) return false; IntWritable other = (IntWritable) o; return this.value == other.value; } @Override public int hashCode() { return value; } /** Compares two IntWritables. */ @Override public int compareTo(IntWritable o) { int thisValue = this.value; int thatValue = o.value; return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1)); } @Override public String toString() { return Integer.toString(value); } /** A Comparator optimized for IntWritable. */ public static class Comparator extends WritableComparator { public Comparator() { super(IntWritable.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int thisValue = readInt(b1, s1); int thatValue = readInt(b2, s2); return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1)); } } static { // register this comparator WritableComparator.define(IntWritable.class, new Comparator()); } }
在Reducer端的reduce函數中接收k2和v2,並輸出contex.write(k3,v3):輸出文件將是m行<k3,v3>。 它這里是逐k2處理:
protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) { long sum = 0; for (LongWritable one : v2s) sum+=one.get(); v3.set(sum); context.write(k2, v3); //這里把k2不做處理直接當k3輸出 }
運行一下會更懂!
3、分區輸出:把m個<k2,v3>輸出到幾個不同文件中
import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; import java.util.ArrayList; import java.util.List; import java.util.Random; public class WriteFile { public static void main(String[] args) { long start=System.currentTimeMillis(); int numOfFiles = 20, numOfRecorders = 10000; String uri = "outDir/data"; //本地文件位置,修改合適的位置 FileOutputStream fout = null; Random ra = new Random(); try { for (int i = 1; i <= numOfFiles; i++) { System.out.println("writing file#"+i); fout = new FileOutputStream(new File(uri + "/file" + i)); PrintStream pStream = new PrintStream(new BufferedOutputStream(fout)); List<String> list = new ArrayList<String>(); for (int j = 0; j < numOfRecorders; j++) list.add(ra.nextInt(numOfRecorders) + "\t" + ra.nextInt(numOfFiles)); //1-99999 for (String str : list) { pStream.println(str); //一次性輸出 } pStream.close(); fout.close(); } } catch (Exception e) { e.printStackTrace(); } finally { } long end=System.currentTimeMillis(); System.out.println("write "+numOfFiles+" files successfully in "+ (end-start)+"ms"); } }
import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SortPart { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, SortPart.class.getSimpleName()); job.setJarByClass(SortPart.class); job.setMapperClass(MyMapper.class); // TODO: specify a mapper job.setReducerClass(Reducer.class); // TODO: specify a reducer job.setPartitionerClass(MyPartitioner.class); //指定數據分區規則 job.setNumReduceTasks(6); // TODO: specify output types job.setOutputKeyClass(MyIntWritable.class); job.setOutputValueClass(IntWritable.class); FileSystem local = FileSystem.getLocal(conf); FileStatus[] inputFiles = local.listStatus(new Path("outDir/data")); for (int i = 1; i < inputFiles.length; ++i) FileInputFormat.addInputPath(job, inputFiles[i].getPath()); FileOutputFormat.setOutputPath(job, new Path("outDir/MRSortPart")); if(job.waitForCompletion(true))System.out.println("OK"); } public static class MyMapper extends Mapper<LongWritable, Text, MyIntWritable, IntWritable>{ MyIntWritable k2 = new MyIntWritable(); IntWritable v2 = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); k2.set(Integer.parseInt(split[0])); v2.set(Integer.parseInt(split[1])); context.write(k2, v2); } } public static class MyIntWritable extends IntWritable { @Override public int compareTo(IntWritable o) { if(o.get() < this.get()){ return 1; }else if (o.get() == this.get()){ return 0; }else{ return -1; } } } public static class MyPartitioner extends Partitioner<MyIntWritable, IntWritable> { //k2,v2 @Override public int getPartition(MyIntWritable k, IntWritable v, int numPartitions) { if(k.get()>=10000)return 5; return k.get()/2000; } } }
1 public static class MyPartitioner extends Partitioner<MyIntWritable, IntWritable> { 2 @Override //這是mapper和reduce的中間層 3 public int getPartition(MyIntWritable k, IntWritable v, int numPartitions) {//k2,v2 4 if(k.get()>=10000)return 5; 5 return k.get()/2000; //由k2的值設置 其所在的分區, 6 } 7 }
4、k3 v3字典序輸出
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparable; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SecondSortMapReduce { /** * 使用內部類的形式,定義mapper程序 * @author Administrator * 2018年5月31日上午11:06:30 */ static class MyMapper extends Mapper<LongWritable, Text, CombinationKey, IntWritable>{ String[] split=null; CombinationKey kv=new CombinationKey(); IntWritable v=new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { split = value.toString().split(" "); kv.setFirstKey(split[0]); int vv = Integer.parseInt(split[1]); v.set(vv); kv.setSecondKey(vv); context.write(kv, v); } } /** * 使用內部類的形式,定義reduce程序 * @author Administrator * 2018年5月31日上午11:06:51 */ static class MyReducer extends Reducer<CombinationKey, IntWritable, Text, Text>{ Text k=new Text(); Text v=new Text(); @Override protected void reduce(CombinationKey first_second, Iterable<IntWritable> seconds, Context context) throws IOException, InterruptedException { StringBuilder sb=new StringBuilder(); for(IntWritable second:seconds) { sb.append(second.get()+","); } k.set(first_second.getFirstKey()); v.set(sb.toString().substring(0, sb.toString().length()-1)); context.write(k, v); } } /** * 主函數 * @param args * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondSortMapReduce.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //設置分區和reduce數目 job.setPartitionerClass(DefinedPartition.class); job.setNumReduceTasks(1); //設置自定義的分組策略 //job.setGroupingComparatorClass(DefinedGroupSort.class); //設置自定義的比較策略 job.setSortComparatorClass(DefineCompparator.class); job.setMapOutputKeyClass(CombinationKey.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //設置輸入數據 FileInputFormat.setInputPaths(job, new Path("inDir/Second")); FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8010/usr/outDir/Second")); //“outDir/second” boolean res = job.waitForCompletion(true); System.exit(res?0:1); } /** * 自定義組合鍵,用於map階段的sort小階段 * @author Administrator * 2018年5月31日上午8:16:38 */ public static class CombinationKey implements WritableComparable<CombinationKey>{ private String firstKey; private Integer secondKey; public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public Integer getSecondKey() { return secondKey; } public void setSecondKey(Integer secondKey) { this.secondKey = secondKey; } public void write(DataOutput out) throws IOException { out.writeUTF(this.firstKey); out.writeInt(this.secondKey); } public void readFields(DataInput in) throws IOException { this.firstKey=in.readUTF(); this.secondKey=in.readInt(); } public int compareTo(CombinationKey o) { return this.firstKey.compareTo(o.getFirstKey()); } } /** * 自定義比較器 * @author Administrator * 2018年5月31日上午8:40:58 */ public static class DefineCompparator extends WritableComparator{ protected DefineCompparator() { super(CombinationKey.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { CombinationKey ck1=(CombinationKey) a; CombinationKey ck2=(CombinationKey) b; int cp1 = ck1.getFirstKey().compareTo(ck2.getFirstKey()); if(cp1!=0) { //結束排序 return cp1; }else { return ck1.getSecondKey()-ck2.getSecondKey(); } } } /** * 自定義分區 * @author Administrator * 2018年5月31日上午8:20:58 */ public static class DefinedPartition extends Partitioner<CombinationKey, IntWritable>{ /** * @param key map輸出,這里根據組合鍵的第一個值進行分區 * @param value map輸出的key * @param numPartitions 分區總數,即reduce的個數 */ @Override public int getPartition(CombinationKey key, IntWritable value, int numPartitions) { return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; } } }
hadoop 781 hadoop 45 hello 830 hadoop 598 hello 82 what 243 hello 256 name 450 what 691 hadoop 233 what 102 name 103 hello 301 name 36
寫不動了。
