一、背景
FLink Job端到端延遲是一個重要的指標,用來衡量FLink任務的整體性能和響應延遲(大部分流式應用,要求低延遲特性)。
通過流處理引擎競品對比,我們發現大部分流計算引擎產品,都在告警監控頁面,集成了全鏈路時延指標展示(直方圖)
一些低延時的處理場景,例如用於登陸、用戶下單規則檢測,實時預測場景,需要一個可度量的Metric指標,來實時觀測、監控集群全鏈路時延情況。
二、源碼分析來源
1、本文的源碼分析基於FLink社區issue FLINK-3660,以及issue對應的pr源碼pull-2386,另外,個人也新增了實現源碼的說明。
2、其pr源碼中只涉及到了部分全鏈路時延實現代碼,因此,我在文章中總結了:
- Source到Sink處理Latency Marker源碼
- LatencyMarksEmitter 提交時延標記類
- LatencyStats(時延直方圖Metric實現)源碼
- 時延測量–整體架構圖
三、騰訊Oceanus監控指標參考
如下圖,紅色框線對應的數據延時,即我們描述的指標
四、Flink LatencyMarker實現思路
1、實現方案變遷
在webinterface中,加入流式job的端到端延遲是一個重要特性。因此,FLink社區最初的想法是在每個記錄的source上附加一個攝取時間( ingestion -time)時間戳。
然而,這為不使用monitor feature(監控功能)的用戶,帶來了額外開銷(每個元素+每個元素上的System.currentTimeMilis()需要8個字節)。
因此,FLink社區最后決定,通過定期發送特殊事件來實現此功能,類似於通過拓撲發送水印watermark。
2、實現原理
這些特殊事件(LatencyMarker)在source上以可配置發送間隔,並由任務Task轉發。Sink最后接收到LatencyMarks后,將比較LatencyMarker的時間戳與當前系統時間,以確定延遲。
LatencyMarker不會增加作業的延遲,但是LatencyMarker與常規記錄類似,可以被delay阻塞(例如反壓情況),因此LatencyMarker的延遲與Record延遲近似。
3、節點間時鍾偏移及准確性
當前方案期望所有任務管理器TaskManager上的時鍾是同步的。否則,測量的延遲也包括TaskManager時鍾之間的偏移。
后續,我們可以嘗試通過使用JobManager作為計時服務中心(central timing service)來緩解這個問題。taskmanager將定期查詢JM的當前時間,以確定其時鍾的偏移量。
這個偏移量仍然包括TM和JM之間的網絡延遲,但是仍然比較好的測量時延。
五、Flink LatencyMarker實現源碼
本章節對應到pr源碼pull-2386的實現,這里簡要說明。
1、實現基礎類及下發標記
Flink源碼中,引入了一個新的StreamElement,稱為LatencyMarker。
與水印類似,LatencyMarker按配置的間隔從源發出。這個時間間隔的默認值是0毫秒,即不觸發 (配置項在ExecutionConfig#latencyTrackingInterval,名稱metrics.latency.interval),例如可以配置成2000毫秒觸發一次LatencyMarker發送。
LatencyMarker不能“多於”常規元素。這確保了測量的延遲接近於常規流元素的端到端延遲。
常規操作符Operator(不包括那些參與迭代的Operator)如果不是sink,就會轉發延遲標記LatencyMarker。
2、多輸出通道—隨機下發標記
具有多個輸出channel的Operator,隨機選擇一個channel通道,將LatencyMarker發送給它。這可以確保每個LatencyMarker標記在系統中只存在一次,並且重新分區步驟不會導致傳輸的LatencyMarker數量激增。
public class RecordWriterOutput{
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
serializationDelegate.setInstance(latencyMarker);
try {
// 內部實現了隨機選擇通道
recordWriter.randomEmit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
上述RecordWriterOutput#emitLatencyMarker()會被StreamSource、AbstractStreamOperator調用,分別實現source和中間operator的延遲標記下發
如果操作符Operator是Sink,它將維護每個已知source實例的最后128個LatencyMarker信息。
3、Metric展示
每個已知source的最小/最大/平均值/p50/p95/p99時延,在sink的LatencyStats對象中,進行匯總(如果沒有任何輸出的Operator,就是是sink)。
本pr只涉及全鏈路延遲統計的實現,Flink已有一整套Metric顯示體系,全鏈路時延Metric展示交給FLink框架本身)。
此外,目前還沒有確保系統時鍾同步的機制,因此如果硬件時鍾不正確,則延遲測量將不准確。
六、時延粒度Granularity說明
1、時延粒度–概念說明
任意一個中間Operator或Sink,可以通過配置metrics.latency.granularity項,調整與Source間統計的粒度(Singe、Operator、Subtask):
A、統計的時候,可以選擇source源id、source源subtask index進行組合,調整統計粒度。
B、統計的時候,當前Operator及當前Operator subtask index總是參與粒度名稱的生成,固定的。
2、三種時延跟蹤策略及其源碼定義
Single - 跟蹤延遲,無需區分:源+源子任務
(例如雙流Join的兩個source,這里都默認為一個數據源了)
SINGLE {
String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
// 只有自己的operatorId和operatorSubtaskIndex參與Metric名稱生成
// LatencyMarker帶有的id(源)不參與Metric名稱生成
return String.valueOf(operatorId) + operatorSubtaskIndex;
}
}
Operator - 跟蹤延遲,區分源,但不區分源的子任務;
OPERATOR {
String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
// LatencyMarker帶有的id(源)中id參與計算
return String.valueOf(marker.getOperatorId()) + operatorId + operatorSubtaskIndex;
}
}
Subtask - 跟蹤延遲,區分源+源子任務
SUBTASK {
String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
return String.valueOf(marker.getOperatorId()) + marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex;
}
}
根據上述不同的名稱key,將直方圖對象放入Map中,Map定義:
Map<string, descriptivestatisticshistogram=""> latencyStats = new HashMap<>()
偽代碼(創建直方圖):
latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize);
this.latencyStats.put(uniqueName, latencyHistogram);
偽代碼(更新直方圖):
long now = System.currentTimeMillis();
latencyHistogram.update(now - marker.getMarkedTime())
3、Single、Operator 、Subtask 時延策略在Web Metric中的體現
上述Single、Operator 、Subtask不同測試,生成的Metric名稱和group就會產生變化,Web Metric中名稱相應改變
一個Subtask時延粒度的Metric路徑:
Job_<source_id><source_subtask_index><operator_id>_<operator_subtask_index> .latency
七、總結說明
1、LatencyMarker不參與window、MiniBatch的緩存計時,直接被中間Operator下發。
2、Metric路徑:TaskManagerJobMetricGroup/operator_id/operator_subtask_index/latency(根據時延配置粒度Granularity,路徑會有變化,參考本文第六章節)
3、每個中間Operator、以及Sink都會統計自己與Source節點的鏈路延遲,我們在監控頁面,一般展示Source至Sink鏈路延遲。
4、延遲粒度細分到Task,可以用來排查哪台機器的Task時延偏高,進行對比和運維排查。
5、從實現原理來看,發送時延標記間隔配置大一些(例如20秒一次),一般不會影響系統處理業務數據的性能(所有的StreamSource Task都按間隔發送時延標記,中間節點有多個輸出通道的,隨機選擇一個通道下發,不會復制多份數據出來)。
https://www.it610.com/article/1278165442950610944.htm