MapReduce的計數器


 

 第一部分.Hadoop計數器簡述

hadoop計數器:

      可以讓開發人員以全局的視角來審查程序的運行情況以及各項指標,及時做出錯誤診斷並進行相應處理。 內置計數器(MapReduce相關、文件系統相關和作業調度相關),

也可以通過http://master:50030/jobdetails.jsp查看

MapReduce的輸出:

    

    運行jar包的詳細步驟:

[root@neusoft-master filecontent]# hadoop jar Traffic.jar /data/HTTP_20130313143750.dat /out2
17/02/01 19:58:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/01 19:58:18 INFO client.RMProxy: Connecting to ResourceManager at neusoft-master/192.168.191.130:8080
17/02/01 19:58:18 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/02/01 19:58:19 INFO input.FileInputFormat: Total input paths to process : 1
17/02/01 19:58:19 INFO mapreduce.JobSubmitter: number of splits:1
17/02/01 19:58:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1485556908836_0009
17/02/01 19:58:19 INFO impl.YarnClientImpl: Submitted application application_1485556908836_0009
17/02/01 19:58:19 INFO mapreduce.Job: The url to track the job: http://neusoft-master:8088/proxy/application_1485556908836_0009/
17/02/01 19:58:19 INFO mapreduce.Job: Running job: job_1485556908836_0009
17/02/01 19:58:26 INFO mapreduce.Job: Job job_1485556908836_0009 running in uber mode : false
17/02/01 19:58:26 INFO mapreduce.Job: map 0% reduce 0%
17/02/01 19:58:32 INFO mapreduce.Job: map 100% reduce 0%
17/02/01 19:58:38 INFO mapreduce.Job: map 100% reduce 100%
17/02/01 19:58:38 INFO mapreduce.Job: Job job_1485556908836_0009 completed successfully
17/02/01 19:58:38 INFO mapreduce.Job: Counters: 49
File System Counters  1.文件系統計數器,由兩類組成,FILE類是文件系統與Linux(磁盤)交互的類,HDFS是文件系統與HDFS交互的類(本質上都是與磁盤數據打交道)
FILE: Number of bytes read=1015
FILE: Number of bytes written=220657
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2334
HDFS: Number of bytes written=556
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters   2.作業計數器  3.框架本身的計數器
Launched map tasks=1   加載map任務
Launched reduce tasks=1  加載reduce任務
Data-local map tasks=1   數據本地化
Total time spent by all maps in occupied slots (ms)=3115   所有map任務在被占用的slots中所用的時間------在yarn中,程序打成jar包提交給resourcemanager,nodemanager向resourcemanager申請資源,然后在nodemanager上運行, 而划分資源(cpu,io,網絡,磁盤)的單位叫容器container,每個節點上資源不是無限的,因此應該將任務划分為不同的容器,job在運行的時候可以申請job的數量,之后由nodemanager確定哪些任務可以執行map,那些可以執行reduce等,從而由slot表示,表示槽的概念。任務過來就占用一個槽。

Total time spent by all reduces in occupied slots (ms)=3095  所有reduce任務在被占用的slots中所用的時間
Total time spent by all map tasks (ms)=3115  所有map執行時間
Total time spent by all reduce tasks (ms)=3095   所有reduce執行的時間
Total vcore-seconds taken by all map tasks=3115  
Total vcore-seconds taken by all reduce tasks=3095
Total megabyte-seconds taken by all map tasks=3189760
Total megabyte-seconds taken by all reduce tasks=3169280
Map-Reduce Framework
Map input records=22  //輸入的行數 或鍵值對數目
Map output records=22  // 輸出的鍵值對
Map output bytes=965
Map output materialized bytes=1015
Input split bytes=120
Combine input records=0   規約  第五步
Combine output records=0
Reduce input groups=21   輸入的是21個組
Reduce shuffle bytes=1015
Reduce input records=22   輸入的行數或鍵值對數目
Reduce output records=21
Spilled Records=44
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=73
CPU time spent (ms)=1800
Physical memory (bytes) snapshot=457379840
Virtual memory (bytes) snapshot=3120148480
Total committed heap usage (bytes)=322437120
Shuffle Errors       4.shuffle錯誤
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters    5.輸入計數器
Bytes Read=2214
File Output Format Counters   6.輸出的計數器
Bytes Written=556

運行結果截圖:

               

 

通過查看http://neusoft-master:8088/可得到詳細的job信息

 

   上述頁面是resourcemanager的集群,上面顯示了所有的application應用用戶層面看是job作業,resourcemanager層面看是applicaton應用

 第二部分 自定義計數器

核心代碼:

 

//計數器使用~解決:判斷下輸入文件中有多少hello
Counter counterHello = context.getCounter("Sensitive words","hello");
//假設hello為敏感詞
            if(line != null && line.contains("hello")){
                counterHello.increment(1L);
            }
//計數器代碼結束

 

    示例代碼:

  1 package Mapreduce;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Counter;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Reducer;
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 17 
 18 /**
 19  *
 20  * 計數器的使用及測試
 21  */
 22 public class MyCounterTest {
 23     public static void main(String[] args) throws Exception {
 24         //必須要傳遞的是自定的mapper和reducer的類,輸入輸出的路徑必須指定,輸出的類型<k3,v3>必須指定
 25         //2將自定義的MyMapper和MyReducer組裝在一起
 26         Configuration conf=new Configuration();
 27         String jobName=MyCounterTest.class.getSimpleName();
 28         //1首先寫job,知道需要conf和jobname在去創建即可
 29         Job job = Job.getInstance(conf, jobName);
 30         
 31         //*13最后,如果要打包運行改程序,則需要調用如下行
 32         job.setJarByClass(MyCounterTest.class);
 33         
 34         //3讀取HDFS內容:FileInputFormat在mapreduce.lib包下
 35         FileInputFormat.setInputPaths(job, new Path("hdfs://neusoft-master:9000/data/hellodemo"));
 36         //4指定解析<k1,v1>的類(誰來解析鍵值對)
 37         //*指定解析的類可以省略不寫,因為設置解析類默認的就是TextInputFormat.class
 38         job.setInputFormatClass(TextInputFormat.class);
 39         //5指定自定義mapper類
 40         job.setMapperClass(MyMapper.class);
 41         //6指定map輸出的key2的類型和value2的類型  <k2,v2>
 42         //*下面兩步可以省略,當<k3,v3>和<k2,v2>類型一致的時候,<k2,v2>類型可以不指定
 43         job.setMapOutputKeyClass(Text.class);
 44         job.setMapOutputValueClass(LongWritable.class);
 45         //7分區(默認1個),排序,分組,規約 采用 默認
 46         
 47         //接下來采用reduce步驟
 48         //8指定自定義的reduce類
 49         job.setReducerClass(MyReducer.class);
 50         //9指定輸出的<k3,v3>類型
 51         job.setOutputKeyClass(Text.class);
 52         job.setOutputValueClass(LongWritable.class);
 53         //10指定輸出<K3,V3>的類
 54         //*下面這一步可以省
 55         job.setOutputFormatClass(TextOutputFormat.class);
 56         //11指定輸出路徑
 57         FileOutputFormat.setOutputPath(job, new Path("hdfs://neusoft-master:9000/out3"));
 58         
 59         //12寫的mapreduce程序要交給resource manager運行
 60         job.waitForCompletion(true);
 61     }
 62     private static class MyMapper extends Mapper<LongWritable, Text, Text,LongWritable>{
 63         Text k2 = new Text();
 64         LongWritable v2 = new LongWritable();
 65         @Override
 66         protected void map(LongWritable key, Text value,//三個參數
 67                 Mapper<LongWritable, Text, Text, LongWritable>.Context context) 
 68                 throws IOException, InterruptedException {
 69             String line = value.toString();
 70             //計數器使用~解決:判斷下輸入文件中有多少hello  這里僅僅是舉例,如果有很多的hello可能顯示的還是如此結果
 71             Counter counterHello = context.getCounter("Sensitive words","hello");//假設hello為敏感詞
 72             if(line != null && line.contains("hello")){
 73                 counterHello.increment(1L);
 74             }
 75             //計數器代碼結束
 76             String[] splited = line.split("\t");//因為split方法屬於string字符的方法,首先應該轉化為string類型在使用
 77             for (String word : splited) {
 78                 //word表示每一行中每個單詞
 79                 //對K2和V2賦值
 80                 k2.set(word);
 81                 v2.set(1L);
 82                 context.write(k2, v2);
 83             }
 84         }
 85     }
 86     private static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
 87         LongWritable v3 = new LongWritable();
 88         @Override //k2表示單詞,v2s表示不同單詞出現的次數,需要對v2s進行迭代
 89         protected void reduce(Text k2, Iterable<LongWritable> v2s,  //三個參數
 90                 Reducer<Text, LongWritable, Text, LongWritable>.Context context)
 91                 throws IOException, InterruptedException {
 92             long sum =0;
 93             for (LongWritable v2 : v2s) {
 94                 //LongWritable本身是hadoop類型,sum是java類型
 95                 //首先將LongWritable轉化為字符串,利用get方法
 96                 sum+=v2.get();
 97             }
 98             v3.set(sum);
 99             //將k2,v3寫出去
100             context.write(k2, v3);
101         }
102     }
103 }

    運行:

   

   

  從上圖中可以看到Sensitive words里面顯示了hello的個數。

 

    第三部分  總結:

問:partition的目的是什么?
答:多個reducer task實現並行計算,節省運行實際,提高job執行效率。

問:什么時候使用自定義排序?
答:.....
問:如何使用自定義排序?
答:自定義個k2類型,覆蓋compareTo(...)方法

問:什么時候使用自定義分組?
答:當k2的compareTo方法不適合業務的時候。
問:如何使用自定義分組?
答:job.setGroupingComparatorClass(...);

問:使用combiner有什么好處?
答:在map端執行reduce操作,可以減少map最終的數據量,減少傳輸到reducer的數據量,減輕網絡壓力。
問:為什么combiner不是默認配置?
答:因為有個算法不適合使用combiner。什么樣的算法不適合?不符合冪等性。
問:為什么在map端執行了reduce操作,還需要在reduce端再次執行哪?
答:因為map端執行的是局部reduce操作,在reduce端執行全局reduce操作。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM