什么是延時監控?
延時監控,簡單理解監控算子到算子的延遲時間。記錄算子間或者源流入到算子時間,監控系統健康以及調節。
流式計算中處理延遲是一個非常重要的監控metric
flink中通過開啟配置 metrics.latency.interval 來開啟latency后就可以在metric中看到askManagerJobMetricGroup/operator_id/operator_subtask_index/latency指標了
如果每一條數據都打上時間監控 輸出時間- 輸入時間,會大量的消耗性能
來看一下flink自帶的延遲監控是怎么做的
其實也可以想到原理很簡單,就是在source周期性的插入一條特殊的數據LatencyMarker
LatencyMarker初始化的時候會帶上它產生時的時間
每次當task接收到的數據是LatencyMarker的時候他就用 當前時間 - LatencyMarker時間 = lateTime 並發送到指標收集系統
接着繼續把這個LatencyMarker往下游emit
來看一下源碼是如何實現的
因為是從source加入LatencyMarker先看StreamSource.java
在StreamSource的run 方法中
初始化了一個LatencyMarksEmitter
其實就是在processTimeServera中周期性(我們設置的metrics.latency.interval 時長)去向下游emit 當前時間的LatencyMarker
接着來到task接收數據的地方
StreamInputProcessor的processInput方法中
可以看到就是用當前時間 - LatencyMarker,然后就往report發送了,然后emit
而sink算子的唯一區別就是
區別就是sink沒有emit LatencyMarker 因為是最后一個算子了嘛
這里就講完了
注意的點是:
其實可以看到flink中的LatencyMarker是沒有走用戶代碼邏輯的,也就是說統計出來的延遲時間並不是端到端的,而是除了用戶邏輯處理外的延遲,
因為LatencyMarker和數據的處理是同步處理的,雖然監控延遲中沒有過用戶邏輯代碼(正常數據接收以后用戶代碼處理然后emit,LatencyMarker接收后直接emit)
但是就像馬路一樣,整個馬路擁塞了延遲高了,那還是會使這個指標值越來越大,結論就是這個延遲大致等於端到端延遲
可能這樣的設計是考慮到LatencyMarker如果也走用戶處理邏輯的話會消耗過多的性能吧,特別是采集頻繁的時候
https://blog.csdn.net/hyy1568786/article/details/105904930