1.先將Hadoop環境加入到eclipse中

2.生成模擬數據
1 package com.blb.core; 2 3 import java.io.BufferedWriter; 4 import java.io.File; 5 import java.io.FileNotFoundException; 6 import java.io.FileOutputStream; 7 import java.io.FileWriter; 8 import java.io.IOException; 9 import java.io.OutputStreamWriter; 10 import java.util.ArrayList; 11 import java.util.List; 12 import java.util.Random; 13 14 /** 15 * 300戶 每戶都會有一個清單文件 16 * 商品是隨機 數量也是隨機 17 * 洗漱用品 臉盆、杯子、牙刷和牙膏、毛巾、肥皂(洗衣服的)以及皂盒、洗發水和護發素、沐浴液 [1-5之間] 18 * 床上用品 比如枕頭、枕套、枕巾、被子、被套、棉被、毯子、床墊、涼席 [0 1之間] 19 * 家用電器 比如電磁爐、電飯煲、吹風機、電水壺、豆漿機、台燈等 [1-3之間] 20 * 廚房用品 比如鍋、碗、瓢、盆、灶 [1-2 之間] 21 * 柴、米、油、鹽、醬、醋 [1-6之間] 22 * 要生成300個文件 命名規則 1-300來表示 23 * @author Administrator 24 * 25 */ 26 public class BuildBill { 27 private static Random random=new Random(); //要還是不要 28 private static List<String> washList=new ArrayList<>(); 29 private static List<String> bedList=new ArrayList<>(); 30 private static List<String> homeList=new ArrayList<>(); 31 private static List<String> kitchenList=new ArrayList<>(); 32 private static List<String> useList=new ArrayList<>(); 33 34 static{ 35 washList.add("臉盆"); 36 washList.add("杯子"); 37 washList.add("牙刷"); 38 washList.add("牙膏"); 39 washList.add("毛巾"); 40 washList.add("肥皂"); 41 washList.add("皂盒"); 42 washList.add("洗發水"); 43 washList.add("護發素"); 44 washList.add("沐浴液"); 45 /////////////////////////////// 46 bedList.add("枕頭"); 47 bedList.add("枕套"); 48 bedList.add("枕巾"); 49 bedList.add("被子"); 50 bedList.add("被套"); 51 bedList.add("棉被"); 52 bedList.add("毯子"); 53 bedList.add("床墊"); 54 bedList.add("涼席"); 55 ////////////////////////////// 56 homeList.add("電磁爐"); 57 homeList.add("電飯煲"); 58 homeList.add("吹風機"); 59 homeList.add("電水壺"); 60 homeList.add("豆漿機"); 61 homeList.add("電磁爐"); 62 homeList.add("台燈"); 63 ////////////////////////// 64 kitchenList.add("鍋"); 65 kitchenList.add("碗"); 66 kitchenList.add("瓢"); 67 kitchenList.add("盆"); 68 kitchenList.add("灶 "); 69 //////////////////////// 70 useList.add("米"); 71 useList.add("油"); 72 useList.add("鹽"); 73 useList.add("醬"); 74 useList.add("醋"); 75 } 76 //確定要還是不要 1/2 77 private static boolean iswant() 78 { 79 int num=random.nextInt(1000); 80 if(num%2==0) 81 { 82 return true; 83 } 84 else 85 { 86 return false; 87 } 88 } 89 90 /** 91 * 表示我要幾個 92 * @param sum 93 * @return 94 */ 95 private static int wantNum(int sum) 96 { 97 return random.nextInt(sum); 98 } 99 100 101 102 //生成300個清單文件 格式如下 103 //輸出的文件的格式 一定要是UTF-8 104 //油 2 105 public static void main(String[] args) { 106 for(int i=1;i<=300;i++) 107 { 108 try { 109 //字節流 110 FileOutputStream out=new FileOutputStream(new File("D:\\tmp\\"+i+".txt")); 111 112 //轉換流 可以將字節流轉換字符流 設定編碼格式 113 //字符流 114 BufferedWriter writer=new BufferedWriter(new OutputStreamWriter(out,"UTF-8")); 115 //隨機一下 我要不要 隨機一下 要幾個 再從我們的清單里面 隨機拿出幾個來 數量 116 boolean iswant1=iswant(); 117 if(iswant1) 118 { 119 //我要幾個 不能超過該類商品的總數目 120 int wantNum = wantNum(washList.size()+1); 121 //3 122 for(int j=0;j<wantNum;j++) 123 { 124 String product=washList.get(random.nextInt(washList.size())); 125 writer.write(product+"\t"+(random.nextInt(5)+1)); 126 writer.newLine(); 127 } 128 } 129 130 boolean iswant2=iswant(); 131 if(iswant2) 132 { 133 //我要幾個 不能超過該類商品的總數目 134 int wantNum = wantNum(bedList.size()+1); 135 //3 136 for(int j=0;j<wantNum;j++) 137 { 138 String product=bedList.get(random.nextInt(bedList.size())); 139 writer.write(product+"\t"+(random.nextInt(1)+1)); 140 writer.newLine(); 141 } 142 } 143 144 boolean iswant3=iswant(); 145 if(iswant3) 146 { 147 //我要幾個 不能超過該類商品的總數目 148 int wantNum = wantNum(homeList.size()+1); 149 //3 150 for(int j=0;j<wantNum;j++) 151 { 152 String product=homeList.get(random.nextInt(homeList.size())); 153 writer.write(product+"\t"+(random.nextInt(3)+1)); 154 writer.newLine(); 155 } 156 } 157 boolean iswant4=iswant(); 158 if(iswant4) 159 { 160 //我要幾個 不能超過該類商品的總數目 161 int wantNum = wantNum(kitchenList.size()+1); 162 //3 163 for(int j=0;j<wantNum;j++) 164 { 165 String product=kitchenList.get(random.nextInt(kitchenList.size())); 166 writer.write(product+"\t"+(random.nextInt(2)+1)); 167 writer.newLine(); 168 } 169 } 170 171 boolean iswant5=iswant(); 172 if(iswant5) 173 { 174 //我要幾個 不能超過該類商品的總數目 175 int wantNum = wantNum(useList.size()+1); 176 //3 177 for(int j=0;j<wantNum;j++) 178 { 179 String product=useList.get(random.nextInt(useList.size())); 180 writer.write(product+"\t"+(random.nextInt(6)+1)); 181 writer.newLine(); 182 } 183 } 184 writer.flush(); 185 writer.close(); 186 } catch (FileNotFoundException e) { 187 // TODO Auto-generated catch block 188 e.printStackTrace(); 189 } catch (IOException e) { 190 // TODO Auto-generated catch block 191 e.printStackTrace(); 192 } 193 } 194 } 195 196 197 }
3.創建MapReduce項目

4.生成Mapper類、Reduce類和Driver類

5.Mapper類代碼
1 package com.blb.core; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 10 public class BillMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 11 12 public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { 13 String line = ivalue.toString(); 14 String[] words = line.split("\t"); 15 context.write(new Text(words[0]), new IntWritable(Integer.parseInt(words[1]))); 16 } 17 18 }
6.Reduce類代碼
1 package com.blb.core; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 public class BillReduce extends Reducer<Text, IntWritable, Text, IntWritable> { 10 11 public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 12 // process values 13 int sum=0; 14 for (IntWritable val : values) { 15 int i = val.get(); 16 sum+=i; 17 } 18 context.write(_key, new IntWritable(sum)); 19 } 20 }
7.Driver類代碼
1 package com.blb.core; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 public class BillDriver { 14 15 public static void main(String[] args) throws Exception { 16 Configuration conf = new Configuration(); 17 conf.set("fs.defaultFS", "hdfs://sugar01:9000"); 18 Job job = Job.getInstance(conf, "BillDriver"); 19 job.setJarByClass(BillDriver.class); 20 // TODO: specify a mapper 21 job.setMapperClass(BillMapper.class); 22 // TODO: specify a reducer 23 job.setReducerClass(BillReduce.class); 24 25 // TODO: specify output types 26 job.setOutputKeyClass(Text.class); 27 job.setOutputValueClass(IntWritable.class); 28 29 // TODO: specify input and output DIRECTORIES (not files) 30 FileInputFormat.setInputPaths(job, new Path("/uploads")); 31 FileOutputFormat.setOutputPath(job, new Path("/out2/")); 32 33 if (!job.waitForCompletion(true)) 34 return; 35 } 36 37 }
