做demo前需要先搭建Hadoop集群,並且有linux基礎,可參考 https://www.cnblogs.com/linyufeng/p/10831240.html
1.引出問題
給一串數據,找出每年的每個月溫度最高的2天。其中有可能包含着相同的數據。
1949-10-01 14:21:02 34c 1949-10-01 19:21:02 38c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1951-12-01 12:21:02 23c 1950-10-02 12:21:02 41c 1950-10-03 12:21:02 27c 1951-07-01 12:21:02 45c 1951-07-02 12:21:02 46c 1951-07-03 12:21:03 47c 1949-10-01 14:21:02 34c 1949-10-01 19:21:02 38c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1951-12-01 12:21:02 23c
2.分析
從肉眼去看,這么幾條數據,人工也能很快的得出結果,但如果有幾百萬條呢?所以采用hadoop的mapReduce框架,mapReduce是一個分布式的離線計算框架,流程分為4步驟split---map---shuffle---reduce。用mapReduce去處理這些數據。處理天氣數據所需要使用到的有年和月還有溫度,最后輸出則是要對應相對應的日。所以創建一個天氣類。並且實現writableComparable接口,實現其未實現方法。
package com.sjt.mr.tq.demo; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class MyTQ implements WritableComparable<MyTQ>{ private int year;//年 private int month;//月 private int day;//日 private int wd;//溫度 public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getDay() { return day; } public void setDay(int day) { this.day = day; } public int getWd() { return wd; } public void setWd(int wd) { this.wd = wd; } @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeInt(day); out.writeInt(wd); } @Override public void readFields(DataInput in) throws IOException { year = in.readInt(); month = in.readInt(); day = in.readInt(); wd = in.readInt(); } @Override public int compareTo(MyTQ mytq) { //1980-05-02 34c //比較年是否相同 int c1 = Integer.compare(year, mytq.getYear()); if(c1==0){ //如果相同則比較月份 int c2 = Integer.compare(month, mytq.getMonth()); if(c2==0){ //如果月份相同則比較溫度,使得溫度降序排序 return -Integer.compare(wd, mytq.getWd()); } return c2; } return c1; } }
3.Map階段
在map的讀入階段,需要把從split中讀取的數據做切割處理,並且讓對象進行序列化存入磁盤中。
package com.sjt.mr.tq.demo; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<Object, Text, MyTQ, IntWritable>{ private MyTQ tqKey=new MyTQ(); private IntWritable Tvalue=new IntWritable(); //key記錄的是偏移量,value為一行的數據 @Override protected void map(Object key, Text value, Mapper<Object, Text, MyTQ, IntWritable>.Context context) throws IOException, InterruptedException { try { //1980-01-02 18:15:45 34c //根據制表符分割 獲得 split[0]=1980-01-02 18:15:45 split[1]=34c String[] splits =value.toString().split("\t"); //設值key SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd"); Date date = sdf.parse(splits[0]); Calendar calendar = Calendar.getInstance(); calendar.setTime(date); //給tqkey設值 tqKey.setYear(calendar.get(calendar.YEAR)); tqKey.setMonth(calendar.get(calendar.MONTH)+1); tqKey.setDay(calendar.get(calendar.DAY_OF_MONTH)); int wd = Integer.parseInt(splits[1].substring(0, splits[1].length()-1)); tqKey.setWd(wd); //設值溫度 Tvalue.set(wd); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } context.write(tqKey, Tvalue); } }
4.分組比較器
調用天氣對象的分組比較器會造成一條數據一組的現象,因為條件多了一個天氣,所以需要自定義分組比較器。
package com.sjt.mr.tq.demo; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; //分組比較 public class MyGroupComparator extends WritableComparator{ //調用父類的構造方法 public MyGroupComparator(){ super(MyTQ.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { MyTQ tq1=(MyTQ)a; MyTQ tq2=(MyTQ)b; //先比較年 再比較月 int c1 = Integer.compare(tq1.getYear(), tq2.getYear()); if(c1==0){ return Integer.compare(tq1.getMonth(), tq2.getMonth()); } return c1; } }
5.Reduce階段
reduce階段需要對數據進行最后的處理,根據前面的排序只需要取出當前所得結果的前兩行就是該年某月中的溫度最高的兩天,當然也不排除當前數據中只有一條數據是該年某月的,而且還要判斷是否存在相同的天的溫度是否處於前兩行。
package com.sjt.mr.tq.demo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReduce extends Reducer<MyTQ, IntWritable,Text,IntWritable>{ private Text key=new Text(); private IntWritable value=new IntWritable(); @Override protected void reduce(MyTQ mytq, Iterable<IntWritable> values, Reducer<MyTQ, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int flag=0;//計數 int day=0;//儲存日期 for (IntWritable v : values) { //如果計算為0取第一位的年月日:溫度 if(flag==0){ //設key和value值 key.set(mytq.getYear()+"-"+mytq.getMonth()+"-"+mytq.getDay()+":"); value.set(mytq.getWd()); //給day設值 防止重復天數 day=mytq.getDay(); flag++;//計數+1 context.write(key, value); } //如果計數為1且日期不和第一次寫出的日期不一致 if(flag!=0&&mytq.getDay()!=day){ key.set(mytq.getYear()+"-"+mytq.getMonth()+"-"+mytq.getDay()+":"); value.set(mytq.getWd()); context.write(key, value); break; } } } }
6.打包運行
部分代碼省略,可再文章最后查看到項目源代碼及數據處理文件。
並且將打包后的jar包丟入集群,將需要處理的文本儲存到hdfs上,命令為:
執行下行命令運行:分析tq.txt文件 MyJob為入口,結果放入output目錄下
如果不報錯,並且再hdfs的output的目錄下出現了下圖中的情況說明你成功了。可以對part-r-00000進行下載。
下載的結果如果為下圖,則代表運行代碼正確。
項目源代碼及文件路徑 https://github.com/shijintao123/Hadoop
編譯報錯參考:https://blog.csdn.net/qq_42476731/article/details/90298983