背景:(簡述)
Pinpoint 是一套APM (Application Performance Management)工具,主要用於幫助分析系統的總體結構和組件如何相互調用,也可用於追蹤線上性能問題,方便定位出現問題的點。
Pinpoint主要有如下幾個組成部分:
- Pinpoint Agent :通過字節碼增強技術,附加到 用戶的java 應用來做采樣,程序啟動時指定javaagent以及agentId,pplicationName。
- HBase :用於存儲agent采樣的數據。
- Pinpoint Collector :信息的收集者,部署在tomcat中,由於收集agent采樣的數據並存入hbase。
- Pinpoint Web :提供WEB_UI界面,部署在tomcat中,提供可視化頁面,並且提供監控報警功能。(需要自行實現)
博文的主要內容與主要目的:
背景知識:(詳情可以百度一下,后面也會介紹相關內容)
Spring Task:Spring內置的定時任務。
Spring Batch: 一個大數據量的並行處理框架。
概述:
通過Spring Task的定時任務,每分鍾做一次通過SpringBatch的批處理檢查。
該模塊github源碼地址:
https://github.com/naver/pinpoint/tree/master/web/src/main/java/com/navercorp/pinpoint/web/alarm
源碼解析:(本文重點)
1.定時任務入口: src/main/resources/batch/applicationContext-batch-schedule.xml
<task:scheduled-tasks scheduler="scheduler"> <task:scheduled ref="batchJobLauncher" method="alarmJob" cron="0 0/1 * * * *" /> <!--省略--> </task:scheduled-tasks> <task:scheduler id="scheduler" pool-size="5"/>
此段代碼標簽使用的是SpringTask的標簽,大概意思為,定義了一個線程池大小為5的調度器scheduler。
scheduler執行的任務有三個,batchJobLauncher的alarmJob方法,每一分鍾執行一次,alarmJob,顧名思義,報警的任務。
2.批處理任務入口: src/main/resources/batch/applicationContext-alarmJob.xml
前置說明-SpringBatch批處理框架中與此相關的解釋:
如果是批處理的話,自然有批處理任務(對應下面的Job標簽),每個任務自然有一個或者多個步驟(對應下面的Step標簽)。
每個步驟有三個操作,讀取數據(對應reader),處理數據(對應processor),回寫數據(對應writer)。這三者中的參數是按照順序傳遞的。
大家可能會想,報警機制和讀取數據,處理數據,回寫數據有什么關系嗎?下面我說明一下pinpoint相關的對應的業務關系:
reader:讀取數據 => 通過用戶配置的規則提供Checker,即異常校驗器。
processor:處理數據 => 用Checker進行校驗,標記異常狀態。
writer:回寫數據 => 判斷Checker是否有異常情況,有則報警。
下面的配置的源碼:
<!--定義了一個alramJob的批處理任務-->
<batch:job id="alarmJob">
<batch:step id="alarmPartitionStep">
<!--此alarmJob只有一個Step-->
<batch:partition step="alarmStep" partitioner="alarmPartitioner">
<!--設置執行的線程池-->
<batch:handler task-executor="alarmPoolTaskExecutorForPartition" />
</batch:partition>
</batch:step>
<batch:listeners>
<batch:listener ref="jobFailListener"/>
</batch:listeners>
</batch:job>
<batch:step id="alarmStep">
<!--代表step的一種處理策略-->
<batch:tasklet>
<!--批處理流程-->
<!-- 順序執行
reader:讀取數據 => 提供Checker,即異常校驗器,見下面的bean
processor:處理數據 => 用Checker進行校驗,見下面的bean
writer:回寫數據 => 判斷Checker是否有異常情況,有則報警,見下面的bean
-->
<batch:chunk reader="reader" processor="processor" writer="writer" commit-interval="1"/>
</batch:tasklet>
</batch:step>
<bean id="alarmPartitioner" class="com.navercorp.pinpoint.web.alarm.AlarmPartitioner"/>
<bean id="reader" class="com.navercorp.pinpoint.web.alarm.AlarmReader" scope="step"/>
<bean id="processor" class="com.navercorp.pinpoint.web.alarm.AlarmProcessor" scope="step"/>
<bean id="writer" class="com.navercorp.pinpoint.web.alarm.AlarmWriter" scope="step"/>
3.通過對AlarmReader、AlarmProcessor、AlarmWriter的解析,梳理報警原理
(1) com.navercorp.pinpoint.web.alarm.AlarmReader:實現了ItemReader接口
// StepExecutionListener監聽器,可以定義Step開始前后的操作
public class AlarmReader implements ItemReader<AlarmChecker>, StepExecutionListener {
...
// 報警所用的Checker在內存里
private final Queue<AlarmChecker> checkers = new ConcurrentLinkedDeque<>();
...
// Checker出隊,供processor使用,
@Override
public AlarmChecker read() {
return checkers.poll();
}
// 批處理之前,將應用的報警規則加入Checker之中
@Override
public void beforeStep(StepExecution stepExecution) {
// 查詢所有的應用
List<Application> applicationList = applicationIndexDao.selectAllApplicationNames();
// 根據應用用戶配置的規則,添加Checker到隊列當中
for (Application application : applicationList) {
addChecker(application);
}
}
private void addChecker(Application application) {
// 根據應用名稱獲取所有的規則,應用名稱就是配置agent的時候,指定的applicationName
List<Rule> rules = alarmService.selectRuleByApplicationId(application.getName());
long timeSlotEndTime = System.currentTimeMillis();
Map<DataCollectorCategory, DataCollector> collectorMap = new HashMap<>();
// 遍歷規則
for (Rule rule : rules) {
// CheckerCategory是一個枚舉類,預置了所有的報警規則模版,比如失敗請求次數、慢請求次數等
CheckerCategory checkerCategory = CheckerCategory.getValue(rule.getCheckerName());
// 數據收集器是為檢驗規則准備的,例如Rule是失敗請求次數,但是次數從哪里來,就是從這個收集器來的
DataCollector collector = collectorMap.get(checkerCategory.getDataCollectorCategory());
// 這里是一個基於Map的緩存
if (collector == null) {
collector = dataCollectorFactory.createDataCollector(checkerCategory, application, timeSlotEndTime);
collectorMap.put(collector.getDataCollectorCategory(), collector);
}
// 創建Checker,有興趣的讀者可以看看CheckerCategroy的源碼,設計的還是很不錯的。
// AlaramChecker是一個抽象類,具體的功能由子類實現
AlarmChecker checker = checkerCategory.createChecker(collector, rule);
// 加入隊列
checkers.add(checker);
}
}
...
}
(2) com.navercorp.pinpoint.web.alarm.AlarmProcessor:實現了ItemProcessor接口
public class AlarmProcessor implements ItemProcessor<AlarmChecker, AlarmChecker> { // 此處的AlarmChecker是上面的read()方法傳遞過來的
@Override public AlarmChecker process(AlarmChecker checker) { // check,顧名思義,檢驗,標記是否有異常情況,check()方法見下 checker.check(); return checker; } }
com.navercorp.pinpoint.web.alarm.checker.AlarmProcessor
protected abstract boolean decideResult(T value); public void check() { // 收集數據 dataCollector.collect(); // 標記是否有異常情況,意為是否滿足報警的閥值,decideResult是一個抽象方法 // detected字段在后續的Writter中會被檢查 detected = decideResult(getDetectedValue()); }
(3)com.navercorp.pinpoint.web.alarm.AlarmWriter:實現了ItemWriter接口
public class AlarmWriter implements ItemWriter<AlarmChecker> { // 需要用戶自定義配置在Spring中的AlarmMessageSender,如果不配置,則是一個空實現 @Autowired(required = false) private AlarmMessageSender alarmMessageSender = new EmptyMessageSender(); @Autowired private AlarmService alarmService;
// 實現的接口的方法,主要內容 @Override public void write(List<? extends AlarmChecker> checkers) throws Exception { Map<String, CheckerResult> beforeCheckerResults = alarmService.selectBeforeCheckerResults(checkers.get(0).getRule().getApplicationId()); // 遍歷上面傳遞的Checker for (AlarmChecker checker : checkers) { CheckerResult beforeCheckerResult = beforeCheckerResults.get(checker.getRule().getCheckerName()); if (beforeCheckerResult == null) { beforeCheckerResult = new CheckerResult(checker.getRule().getApplicationId(), checker.getRule().getCheckerName(), false, 0, 1); } // 對上面的Processor標記的detected值進行檢查 if (checker.isDetected()) { sendAlarmMessage(beforeCheckerResult, checker); } // 記錄報警歷史 alarmService.updateBeforeCheckerResult(beforeCheckerResult, checker); } } private void sendAlarmMessage(CheckerResult beforeCheckerResult, AlarmChecker checker) { if (isTurnToSendAlarm(beforeCheckerResult)) { // 是否配置了發送報警短信 if (checker.isSMSSend()) { alarmMessageSender.sendSms(checker, beforeCheckerResult.getSequenceCount() + 1); } // 是否配置了發送報警郵件 if (checker.isEmailSend()) { alarmMessageSender.sendEmail(checker, beforeCheckerResult.getSequenceCount() + 1); } } } // ... }
總結:
通過SpringTask定時任務,alarmJob任務每分鍾執行一次,alarmJob分為三個步驟來執行,1.首先AlarmReader根據已有的應用以及配置的報警名稱,通過實現read()方法,傳遞給AlaramProcessor;2.AlaramProcessor再調用check()方法,通過收集器去收集數據,查看是否滿足報警規則,並通過detected字段進行標示,傳遞給AlarmWriter;3.AlarmWriter再根據用戶配置的報警信息,進行報警並記錄報警記錄。