MapReduce計數器


1、MapReduce計數器是什么?

  計數器是用來記錄job的執行進度和狀態的。它的作用可以理解為日志。我們可以在程序的某個位置插入計數器,記錄數據或者進度的變化情況。

2、MapReduce計數器能做什么?

  MapReduce 計數器(Counter)為我們提供一個窗口,用於觀察 MapReduce Job 運行期的各種細節數據。對MapReduce性能調優很有幫助,MapReduce性能優化的評估大部分都是基於這些 Counter 的數值表現出來的。

3、MapReduce 都有哪些內置計數器?

  MapReduce 自帶了許多默認Counter,現在我們來分析這些默認 Counter 的含義,方便大家觀察 Job 結果,如輸入的字節數、輸出的字節數、Map端輸入/輸出的字節數和條數、Reduce端的輸入/輸出的字節數和條數等。下面我們只需了解這些內置計數器,知道計數器組名稱(groupName)和計數器名稱(counterName),以后使用計數器會查找groupName和counterName即可。

  1、任務計數器

    在任務執行過程中,任務計數器采集任務的相關信息,每個作業的所有任務的結果會被聚集起來。例如,MAP_INPUT_RECORDS 計數器統計每個map任務輸入記錄的總數,並在一個作業的所有map任務上進行聚集,使得最終數字是整個作業的所有輸入記錄的總數。任務計數器由其關聯任務維護,並定期發送給TaskTracker,再由TaskTracker發送給 JobTracker。因此,計數器能夠被全局地聚集。下面我們分別了解各種任務計數器。

    1、MapReduce 任務計數器

      MapReduce 任務計數器的 groupName為org.apache.hadoop.mapreduce.TaskCounter,它包含的計數器如下表所示

計數器名稱

說明

map 輸入的記錄數(MAP_INPUT_RECORDS)

作業中所有 map 已處理的輸入記錄數。每次 RecorderReader 讀到一條記錄並將其傳給 map 的 map() 函數時,該計數器的值增加。

map 跳過的記錄數(MAP_SKIPPED_RECORDS)

作業中所有 map 跳過的輸入記錄數。

map 輸入的字節數(MAP_INPUT_BYTES)

作業中所有 map 已處理的未經壓縮的輸入數據的字節數。每次 RecorderReader 讀到一條記錄並 將其傳給 map 的 map() 函數時,該計數器的值增加

分片split的原始字節數(SPLIT_RAW_BYTES)

由 map 讀取的輸入-分片對象的字節數。這些對象描述分片元數據(文件的位移和長度),而不是分片的數據自身,因此總規模是小的

map 輸出的記錄數(MAP_OUTPUT_RECORDS)

作業中所有 map 產生的 map 輸出記錄數。每次某一個 map 的Context 調用 write() 方法時,該計數器的值增加

map 輸出的字節數(MAP_OUTPUT_BYTES)

作業中所有 map 產生的 未經壓縮的輸出數據的字節數。每次某一個 map 的 Context 調用 write() 方法時,該計數器的值增加。

map 輸出的物化字節數(MAP_OUTPUT_MATERIALIZED_BYTES)

map 輸出后確實寫到磁盤上的字節數;若 map 輸出壓縮功能被啟用,則會在計數器值上反映出來

combine 輸入的記錄數(COMBINE_INPUT_RECORDS)

作業中所有 Combiner(如果有)已處理的輸入記錄數。Combiner 的迭代器每次讀一個值,該計數器的值增加。

combine 輸出的記錄數(COMBINE_OUTPUT_RECORDS)

作業中所有 Combiner(如果有)已產生的輸出記錄數。每當一個 Combiner 的 Context 調用 write() 方法時,該計數器的值增加。

reduce 輸入的組(REDUCE_INPUT_GROUPS)

作業中所有 reducer 已經處理的不同的碼分組的個數。每當某一個 reducer 的 reduce() 被調用時,該計數器的值增加。

reduce 輸入的記錄數(REDUCE_INPUT_RECORDS)

作業中所有 reducer 已經處理的輸入記錄的個數。每當某個 reducer 的迭代器讀一個值時,該計數器的值增加。如果所有 reducer 已經處理完所有輸入, 則該計數器的值與計數器 “map 輸出的記錄” 的值相同

reduce 輸出的記錄數(REDUCE_OUTPUT_RECORDS)

作業中所有 map 已經產生的 reduce 輸出記錄數。每當某一個 reducer 的 Context 調用 write() 方法時,該計數器的值增加。

reduce 跳過的組數(REDUCE_SKIPPED_GROUPS)

作業中所有 reducer 已經跳過的不同的碼分組的個數。

reduce 跳過的記錄數(REDUCE_SKIPPED_RECORDS)

作業中所有 reducer 已經跳過輸入記錄數。

reduce 經過 shuffle 的字節數(REDUCE_SHUFFLE_BYTES)

shuffle 將 map 的輸出數據復制到 reducer 中的字節數。

溢出的記錄數(SPILLED_RECORDS)

作業中所有 map和reduce 任務溢出到磁盤的記錄數

CPU 毫秒(CPU_MILLISECONDS)

總計的 CPU 時間,以毫秒為單位,由/proc/cpuinfo獲取

物理內存字節數(PHYSICAL_MEMORY_BYTES)

一個任務所用物理內存的字節數,由/proc/cpuinfo獲取

虛擬內存字節數(VIRTUAL_MEMORY_BYTES)

一個任務所用虛擬內存的字節數,由/proc/cpuinfo獲取

有效的堆字節數(COMMITTED_HEAP_BYTES)

在 JVM 中的總有效內存量(以字節為單位),可由Runtime().getRuntime().totaoMemory()獲取。

GC 運行時間毫秒數(GC_TIME_MILLIS)

在任務執行過程中,垃圾收集器(garbage collection)花費的時間(以毫秒為單位), 可由 GarbageCollector MXBean.getCollectionTime()獲取;該計數器並未出現在1.x版本中。

由 shuffle 傳輸的 map 輸出數(SHUFFLED_MAPS)

有 shuffle 傳輸到 reducer 的 map 輸出文件數。

失敗的 shuffle 數(SHUFFLE_MAPS)

在 shuffle 過程中,發生拷貝錯誤的 map 輸出文件數,該計數器並沒有包含在 1.x 版本中。

被合並的 map 輸出數

在 shuffle 過程中,在 reduce 端被合並的 map 輸出文件數,該計數器沒有包含在 1.x 版本中。

    2、文件系統計數器

      文件系統計數器的 groupName為org.apache.hadoop.mapreduce.FileSystemCounter,它包含的計數器如下表所示

計數器名稱

說明

文件系統的讀字節數(BYTES_READ)

由 map 和 reduce 等任務在各個文件系統中讀取的字節數,各個文件系統分別對應一個計數器,可以是 Local、HDFS、S3和KFS等。

文件系統的寫字節數(BYTES_WRITTEN)

由 map 和 reduce 等任務在各個文件系統中寫的字節數。

    3、FileInputFormat 計數器

      FileInputFormat 計數器的 groupName為org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter,它包含的計數器如下表所示,計數器名稱列的括號()內容即為counterName

計數器名稱

說明

讀取的字節數(BYTES_READ)

由 map 任務通過 FileInputFormat 讀取的字節數。

    4、FileOutputFormat 計數器

      FileOutputFormat 計數器的 groupName為org.apache.hadoop.mapreduce.lib.input.FileOutputFormatCounter,它包含的計數器如下表所示

計數器名稱

說明

寫的字節數(BYTES_WRITTEN)

由 map 任務(針對僅含 map 的作業)或者 reduce 任務通過 FileOutputFormat 寫的字節數。

  2、作業計數器

    作業計數器由 JobTracker(或者 YARN)維護,因此無需在網絡間傳輸數據,這一點與包括 “用戶定義的計數器” 在內的其它計數器不同。這些計數器都是作業級別的統計量,其值不會隨着任務運行而改變。 作業計數器計數器的 groupName為org.apache.hadoop.mapreduce.JobCounter,它包含的計數器如下表所示

計數器名稱

說明

啟用的map任務數(TOTAL_LAUNCHED_MAPS)

啟動的map任務數,包括以“推測執行” 方式啟動的任務。

啟用的 reduce 任務數(TOTAL_LAUNCHED_REDUCES)

啟動的reduce任務數,包括以“推測執行” 方式啟動的任務。

失敗的map任務數(NUM_FAILED_MAPS)

失敗的map任務數。

失敗的 reduce 任務數(NUM_FAILED_REDUCES)

失敗的reduce任務數。

數據本地化的 map 任務數(DATA_LOCAL_MAPS)

與輸入數據在同一節點的 map 任務數。

機架本地化的 map 任務數(RACK_LOCAL_MAPS)

與輸入數據在同一機架范圍內、但不在同一節點上的 map 任務數。

其它本地化的 map 任務數(OTHER_LOCAL_MAPS)

與輸入數據不在同一機架范圍內的 map 任務數。由於機架之間的寬帶資源相對較少,Hadoop 會盡量讓 map 任務靠近輸入數據執行,因此該計數器值一般比較小。

map 任務的總運行時間(SLOTS_MILLIS_MAPS)

map 任務的總運行時間,單位毫秒。該計數器包括以推測執行方式啟動的任務。

reduce 任務的總運行時間(SLOTS_MILLIS_REDUCES)

reduce任務的總運行時間,單位毫秒。該值包括以推測執行方式啟動的任務。

在保留槽之后,map任務等待的總時間(FALLOW_SLOTS_MILLIS_MAPS)

在為 map 任務保留槽之后所花費的總等待時間,單位是毫秒。

在保留槽之后,reduce 任務等待的總時間(FALLOW_SLOTS_MILLIS_REDUCES)

在為 reduce 任務保留槽之后,花在等待上的總時間,單位為毫秒。

4、計數器的該如何使用?

  下面我們來介紹如何使用計數器。

  1、定義計數器

    1)枚舉聲明計數器

// 自定義枚舉變量Enum 
Counter counter = context.getCounter(Enum enum)

    2)自定義計數器

// 自己命名groupName和counterName 
Counter counter = context.getCounter(String groupName,String counterName)

  2、為計數器賦值

    1)初始化計數器

counter.setValue(long value);// 設置初始值

    2)計數器自增

counter.increment(long incr);// 增加計數

  3、獲取計數器的值

    1) 獲取枚舉計數器的值

Configuration conf = new Configuration(); 
Job job = new Job(conf, "MyCounter"); 
job.waitForCompletion(true); 
Counters counters=job.getCounters(); 
Counter counter=counters.findCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG);// 查找枚舉計數器,假如Enum的變量為BAD_RECORDS_LONG 
long value=counter.getValue();//獲取計數值

    2) 獲取自定義計數器的值

Configuration conf = new Configuration(); 
Job job = new Job(conf, "MyCounter"); 
job.waitForCompletion(true); 
Counters counters = job.getCounters(); 
Counter counter=counters.findCounter("ErrorCounter","toolong");// 假如groupName為ErrorCounter,counterName為toolong 
long value = counter.getValue();// 獲取計數值

    3) 獲取內置計數器的值

Configuration conf = new Configuration(); 
Job job = new Job(conf, "MyCounter"); 
job.waitForCompletion(true); 
Counters counters=job.getCounters(); 
// 查找作業運行啟動的reduce個數的計數器,groupName和counterName可以從內置計數器表格查詢(前面已經列舉有) 
Counter counter=counters.findCounter("org.apache.hadoop.mapreduce.JobCounter","TOTAL_LAUNCHED_REDUCES");// 假如groupName為org.apache.hadoop.mapreduce.JobCounter,counterName為TOTAL_LAUNCHED_REDUCES 
long value=counter.getValue();// 獲取計數值

    4) 獲取所有計數器的值

Configuration conf = new Configuration(); 
Job job = new Job(conf, "MyCounter"); 
Counters counters = job.getCounters(); 
for (CounterGroup group : counters) { 
  for (Counter counter : group) { 
    System.out.println(counter.getDisplayName() + ": " + counter.getName() + ": "+ counter.getValue()); 
  } 
}

5、自定義計數器

  自定義計數器用的比較廣泛,特別是統計無效數據條數的時候,我們就會用到計數器來記錄錯誤日志的條數。下面我們自定義計數器,統計輸入的無效數據。

  1、數據集

  假如一個文件,規范的格式是3個字段,“\t”作為分隔符,其中有2條異常數據,一條數據是只有2個字段,一條數據是有4個字段。其內容如下所示

      clip_image001

  2、實現

package com.buaa;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/** 
* @ProjectName CustomCounterDemo
* @PackageName com.buaa
* @ClassName MyCounter
* @Description 假如一個文件,規范的格式是3個字段,“\t”作為分隔符,其中有2條異常數據,一條數據是只有2個字段,一條數據是有4個字段
* @Author 劉吉超
* @Date 2016-05-23 20:10:14
*/
public class MyCounter {
    // \t鍵
    private static String TAB_SEPARATOR = "\t";

    public static class MyCounterMap extends
            Mapper<LongWritable, Text, Text, Text> {
        // 定義枚舉對象
        public static enum LOG_PROCESSOR_COUNTER {
            BAD_RECORDS_LONG, BAD_RECORDS_SHORT
        };
        
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String arr_value[] = value.toString().split(TAB_SEPARATOR);
            if (arr_value.length > 3) {
                /* 自定義計數器 */
                context.getCounter("ErrorCounter", "toolong").increment(1);
                /* 枚舉計數器 */
                context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG).increment(1);
            } else if (arr_value.length < 3) {
                // 自定義計數器
                context.getCounter("ErrorCounter", "tooshort").increment(1);
                // 枚舉計數器
                context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_SHORT).increment(1);
            }
        }
    }

    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        String[] args0 = { 
                "hdfs://hadoop2:9000/buaa/counter/counter.txt",
                "hdfs://hadoop2:9000/buaa/counter/out/" 
            };
        // 讀取配置文件
        Configuration conf = new Configuration();
        
        // 如果輸出目錄存在,則刪除
        Path mypath = new Path(args0[1]);
        FileSystem hdfs = mypath.getFileSystem(conf);
        if (hdfs.isDirectory(mypath)) {
            hdfs.delete(mypath, true);
        }

        // 新建一個任務
        Job job = new Job(conf, "MyCounter");
        // 主類
        job.setJarByClass(MyCounter.class);
        // Mapper
        job.setMapperClass(MyCounterMap.class);

        // 輸入目錄
        FileInputFormat.addInputPath(job, new Path(args0[0]));
        // 輸出目錄
        FileOutputFormat.setOutputPath(job, new Path(args0[1]));
        
        // 提交任務,並退出
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

  3、運行結果

  在輸出日志中,查看計數器的值

      clip_image003

  從日志中可以看出,通過枚舉聲明和自定義計數器兩種方式,統計出的不規范數據是一樣的

如果,您認為閱讀這篇博客讓您有些收獲,不妨點擊一下右下角的【推薦】。
如果,您希望更容易地發現我的新博客,不妨點擊一下左下角的【關注我】。
如果,您對我的博客所講述的內容有興趣,請繼續關注我的后續博客,我是【劉超★ljc】。

本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。

自定義計數器:下載


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM