pinpoint web報警機制源碼解析


背景:(簡述)

Pinpoint 是一套APM (Application Performance Management)工具,主要用於幫助分析系統的總體結構和組件如何相互調用,也可用於追蹤線上性能問題,方便定位出現問題的點。

Pinpoint主要有如下幾個組成部分:

  • Pinpoint Agent :通過字節碼增強技術,附加到 用戶的java 應用來做采樣,程序啟動時指定javaagent以及agentId,pplicationName。
  • HBase :用於存儲agent采樣的數據。
  • Pinpoint Collector :信息的收集者,部署在tomcat中,由於收集agent采樣的數據並存入hbase。
  • Pinpoint Web :提供WEB_UI界面,部署在tomcat中,提供可視化頁面,並且提供監控報警功能。(需要自行實現)

博文的主要內容與主要目的:

  pinpoint的報警功能是需要自行拓展實現的,網上有很多的實現方法,但是沒有一個對於此報警機制的源碼分析,本文旨在填補此處空白,讓讀者有一些基本的了解,如此調試報警的時候才能夠得心應手。

背景知識:(詳情可以百度一下,后面也會介紹相關內容)

  Spring Task:Spring內置的定時任務。

  Spring Batch: 一個大數據量的並行處理框架。

概述:

   通過Spring Task的定時任務,每分鍾做一次通過SpringBatch的批處理檢查。

該模塊github源碼地址:

  https://github.com/naver/pinpoint/tree/master/web/src/main/java/com/navercorp/pinpoint/web/alarm

源碼解析:(本文重點)

  1.定時任務入口: src/main/resources/batch/applicationContext-batch-schedule.xml

<task:scheduled-tasks scheduler="scheduler">
       <task:scheduled ref="batchJobLauncher" method="alarmJob" cron="0 0/1 * * * *" />
       <!--省略-->
</task:scheduled-tasks>
<task:scheduler id="scheduler" pool-size="5"/>

此段代碼標簽使用的是SpringTask的標簽,大概意思為,定義了一個線程池大小為5的調度器scheduler。

scheduler執行的任務有三個,batchJobLauncher的alarmJob方法,每一分鍾執行一次,alarmJob,顧名思義,報警的任務。

  2.批處理任務入口: src/main/resources/batch/applicationContext-alarmJob.xml

前置說明-SpringBatch批處理框架中與此相關的解釋:

如果是批處理的話,自然有批處理任務(對應下面的Job標簽),每個任務自然有一個或者多個步驟(對應下面的Step標簽)。

每個步驟有三個操作,讀取數據(對應reader),處理數據(對應processor),回寫數據(對應writer)。這三者中的參數是按照順序傳遞的。

大家可能會想,報警機制和讀取數據,處理數據,回寫數據有什么關系嗎?下面我說明一下pinpoint相關的對應的業務關系:

reader:讀取數據 => 通過用戶配置的規則提供Checker,即異常校驗器。

processor:處理數據 => 用Checker進行校驗,標記異常狀態。

writer:回寫數據 => 判斷Checker是否有異常情況,有則報警。

下面的配置的源碼:
<!--定義了一個alramJob的批處理任務-->
  <batch:job id="alarmJob">    
    <batch:step id="alarmPartitionStep">        
      <!--此alarmJob只有一個Step-->
        <batch:partition step="alarmStep" partitioner="alarmPartitioner">
            <!--設置執行的線程池-->
            <batch:handler task-executor="alarmPoolTaskExecutorForPartition" />
        </batch:partition>
    </batch:step>
    <batch:listeners>
       <batch:listener ref="jobFailListener"/>
    </batch:listeners>
</batch:job>

<batch:step id="alarmStep">
    <!--代表step的一種處理策略-->
    <batch:tasklet>
        <!--批處理流程-->
        <!-- 順序執行
        reader:讀取數據 => 提供Checker,即異常校驗器,見下面的bean
        processor:處理數據 => 用Checker進行校驗,見下面的bean
        writer:回寫數據 => 判斷Checker是否有異常情況,有則報警,見下面的bean
        -->
        <batch:chunk reader="reader" processor="processor" writer="writer" commit-interval="1"/>
    </batch:tasklet>
</batch:step>

<bean id="alarmPartitioner" class="com.navercorp.pinpoint.web.alarm.AlarmPartitioner"/>

<bean id="reader" class="com.navercorp.pinpoint.web.alarm.AlarmReader" scope="step"/>
<bean id="processor" class="com.navercorp.pinpoint.web.alarm.AlarmProcessor" scope="step"/>
<bean id="writer" class="com.navercorp.pinpoint.web.alarm.AlarmWriter" scope="step"/>

  3.通過對AlarmReader、AlarmProcessor、AlarmWriter的解析,梳理報警原理

(1) com.navercorp.pinpoint.web.alarm.AlarmReader:實現了ItemReader接口 
// StepExecutionListener監聽器,可以定義Step開始前后的操作
public class AlarmReader implements ItemReader<AlarmChecker>, StepExecutionListener {
    ...
   // 報警所用的Checker在內存里
    private final Queue<AlarmChecker> checkers = new ConcurrentLinkedDeque<>();
    ...

    // Checker出隊,供processor使用,
   @Override
    public AlarmChecker read() {
        return checkers.poll();
    }

    // 批處理之前,將應用的報警規則加入Checker之中
    @Override
    public void beforeStep(StepExecution stepExecution) {
     // 查詢所有的應用
        List<Application> applicationList = applicationIndexDao.selectAllApplicationNames();
     // 根據應用用戶配置的規則,添加Checker到隊列當中
        for (Application application : applicationList) {
            addChecker(application);
        }
    }

    private void addChecker(Application application) {
     // 根據應用名稱獲取所有的規則,應用名稱就是配置agent的時候,指定的applicationName
        List<Rule> rules = alarmService.selectRuleByApplicationId(application.getName());
        long timeSlotEndTime = System.currentTimeMillis();
        Map<DataCollectorCategory, DataCollector> collectorMap = new HashMap<>();
        // 遍歷規則
        for (Rule rule : rules) {
        // CheckerCategory是一個枚舉類,預置了所有的報警規則模版,比如失敗請求次數、慢請求次數等
            CheckerCategory checkerCategory = CheckerCategory.getValue(rule.getCheckerName());
        // 數據收集器是為檢驗規則准備的,例如Rule是失敗請求次數,但是次數從哪里來,就是從這個收集器來的

            DataCollector collector = collectorMap.get(checkerCategory.getDataCollectorCategory());
       // 這里是一個基於Map的緩存
            if (collector == null) {
                collector = dataCollectorFactory.createDataCollector(checkerCategory, application, timeSlotEndTime);
                collectorMap.put(collector.getDataCollectorCategory(), collector);
            }
            // 創建Checker,有興趣的讀者可以看看CheckerCategroy的源碼,設計的還是很不錯的。
        // AlaramChecker是一個抽象類,具體的功能由子類實現
            AlarmChecker checker = checkerCategory.createChecker(collector, rule);
       // 加入隊列
            checkers.add(checker);
        }
        
    }
    ...
}
 (2) com.navercorp.pinpoint.web.alarm.AlarmProcessor:實現了ItemProcessor接口
public class AlarmProcessor implements ItemProcessor<AlarmChecker, AlarmChecker> {
   // 此處的AlarmChecker是上面的read()方法傳遞過來的
@Override public AlarmChecker process(AlarmChecker checker) { // check,顧名思義,檢驗,標記是否有異常情況,check()方法見下 checker.check(); return checker; } }
com.navercorp.pinpoint.web.alarm.checker.AlarmProcessor
protected abstract boolean decideResult(T value);

    public void check() {
        // 收集數據
        dataCollector.collect();
        // 標記是否有異常情況,意為是否滿足報警的閥值,decideResult是一個抽象方法
     // detected字段在后續的Writter中會被檢查
        detected = decideResult(getDetectedValue()); }
(3)com.navercorp.pinpoint.web.alarm.AlarmWriter:實現了ItemWriter接口
public class AlarmWriter implements ItemWriter<AlarmChecker> {

    // 需要用戶自定義配置在Spring中的AlarmMessageSender,如果不配置,則是一個空實現
    @Autowired(required = false)
    private AlarmMessageSender alarmMessageSender = new EmptyMessageSender();

    @Autowired
    private AlarmService alarmService;

   // 實現的接口的方法,主要內容 @Override public void write(List<? extends AlarmChecker> checkers) throws Exception { Map<String, CheckerResult> beforeCheckerResults = alarmService.selectBeforeCheckerResults(checkers.get(0).getRule().getApplicationId()); // 遍歷上面傳遞的Checker for (AlarmChecker checker : checkers) { CheckerResult beforeCheckerResult = beforeCheckerResults.get(checker.getRule().getCheckerName()); if (beforeCheckerResult == null) { beforeCheckerResult = new CheckerResult(checker.getRule().getApplicationId(), checker.getRule().getCheckerName(), false, 0, 1); } // 對上面的Processor標記的detected值進行檢查 if (checker.isDetected()) { sendAlarmMessage(beforeCheckerResult, checker); } // 記錄報警歷史 alarmService.updateBeforeCheckerResult(beforeCheckerResult, checker); } } private void sendAlarmMessage(CheckerResult beforeCheckerResult, AlarmChecker checker) { if (isTurnToSendAlarm(beforeCheckerResult)) { // 是否配置了發送報警短信 if (checker.isSMSSend()) { alarmMessageSender.sendSms(checker, beforeCheckerResult.getSequenceCount() + 1); } // 是否配置了發送報警郵件 if (checker.isEmailSend()) { alarmMessageSender.sendEmail(checker, beforeCheckerResult.getSequenceCount() + 1); } } }   // ... }

總結:

    通過SpringTask定時任務,alarmJob任務每分鍾執行一次,alarmJob分為三個步驟來執行,1.首先AlarmReader根據已有的應用以及配置的報警名稱,通過實現read()方法,傳遞給AlaramProcessor;2.AlaramProcessor再調用check()方法,通過收集器去收集數據,查看是否滿足報警規則,並通過detected字段進行標示,傳遞給AlarmWriter;3.AlarmWriter再根據用戶配置的報警信息,進行報警並記錄報警記錄。

Tips:

即使滿足報警規則,也不一定報警:因為是否報警還是有算法來處理,用於防止可能發生的每分鍾都有報警導致的郵件轟炸,我將在下一章節介紹。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM