FLink全鏈路時延—測量方式


一、背景

FLink Job端到端延遲是一個重要的指標,用來衡量FLink任務的整體性能和響應延遲(大部分流式應用,要求低延遲特性)。

通過流處理引擎競品對比,我們發現大部分流計算引擎產品,都在告警監控頁面,集成了全鏈路時延指標展示(直方圖)

一些低延時的處理場景,例如用於登陸、用戶下單規則檢測,實時預測場景,需要一個可度量的Metric指標,來實時觀測、監控集群全鏈路時延情況。

二、源碼分析來源

1、本文的源碼分析基於FLink社區issue FLINK-3660,以及issue對應的pr源碼pull-2386,另外,個人也新增了實現源碼的說明。

2、其pr源碼中只涉及到了部分全鏈路時延實現代碼,因此,我在文章中總結了:

  • Source到Sink處理Latency Marker源碼
  • LatencyMarksEmitter 提交時延標記類
  • LatencyStats(時延直方圖Metric實現)源碼
  • 時延測量–整體架構圖

三、騰訊Oceanus監控指標參考

如下圖,紅色框線對應的數據延時,即我們描述的指標

FLink全鏈路時延---測量方式_第1張圖片

FLink全鏈路時延---測量方式_第2張圖片

四、Flink LatencyMarker實現思路

1、實現方案變遷

在webinterface中,加入流式job的端到端延遲是一個重要特性。因此,FLink社區最初的想法是在每個記錄的source上附加一個攝取時間( ingestion -time)時間戳。

然而,這為不使用monitor feature(監控功能)的用戶,帶來了額外開銷(每個元素+每個元素上的System.currentTimeMilis()需要8個字節)。

因此,FLink社區最后決定,通過定期發送特殊事件來實現此功能,類似於通過拓撲發送水印watermark。

2、實現原理

這些特殊事件(LatencyMarker)在source上以可配置發送間隔,並由任務Task轉發。Sink最后接收到LatencyMarks后,將比較LatencyMarker的時間戳與當前系統時間,以確定延遲。

LatencyMarker不會增加作業的延遲,但是LatencyMarker與常規記錄類似,可以被delay阻塞(例如反壓情況),因此LatencyMarker的延遲與Record延遲近似。

3、節點間時鍾偏移及准確性

當前方案期望所有任務管理器TaskManager上的時鍾是同步的。否則,測量的延遲也包括TaskManager時鍾之間的偏移。

后續,我們可以嘗試通過使用JobManager作為計時服務中心(central timing service)來緩解這個問題。taskmanager將定期查詢JM的當前時間,以確定其時鍾的偏移量。

這個偏移量仍然包括TM和JM之間的網絡延遲,但是仍然比較好的測量時延。

五、Flink LatencyMarker實現源碼

本章節對應到pr源碼pull-2386的實現,這里簡要說明。

FLink全鏈路時延---測量方式_第3張圖片

1、實現基礎類及下發標記

Flink源碼中,引入了一個新的StreamElement,稱為LatencyMarker。

與水印類似,LatencyMarker按配置的間隔從源發出。這個時間間隔的默認值是0毫秒,即不觸發 (配置項在ExecutionConfig#latencyTrackingInterval,名稱metrics.latency.interval),例如可以配置成2000毫秒觸發一次LatencyMarker發送。

LatencyMarker不能“多於”常規元素。這確保了測量的延遲接近於常規流元素的端到端延遲。

常規操作符Operator(不包括那些參與迭代的Operator)如果不是sink,就會轉發延遲標記LatencyMarker。

2、多輸出通道—隨機下發標記

具有多個輸出channel的Operator,隨機選擇一個channel通道,將LatencyMarker發送給它。這可以確保每個LatencyMarker標記在系統中只存在一次,並且重新分區步驟不會導致傳輸的LatencyMarker數量激增。

public class RecordWriterOutput{
	@Override
	public void emitLatencyMarker(LatencyMarker latencyMarker) {
		serializationDelegate.setInstance(latencyMarker);

		try {
			// 內部實現了隨機選擇通道
			recordWriter.randomEmit(serializationDelegate);
		}
		catch (Exception e) {
			throw new RuntimeException(e.getMessage(), e);
		}
	}
}

上述RecordWriterOutput#emitLatencyMarker()會被StreamSource、AbstractStreamOperator調用,分別實現source和中間operator的延遲標記下發

如果操作符Operator是Sink,它將維護每個已知source實例的最后128個LatencyMarker信息。

3、Metric展示

每個已知source的最小/最大/平均值/p50/p95/p99時延,在sink的LatencyStats對象中,進行匯總(如果沒有任何輸出的Operator,就是是sink)。

本pr只涉及全鏈路延遲統計的實現,Flink已有一整套Metric顯示體系,全鏈路時延Metric展示交給FLink框架本身)。

此外,目前還沒有確保系統時鍾同步的機制,因此如果硬件時鍾不正確,則延遲測量將不准確。

六、時延粒度Granularity說明

1、時延粒度–概念說明

任意一個中間Operator或Sink,可以通過配置metrics.latency.granularity項,調整與Source間統計的粒度(Singe、Operator、Subtask):

A、統計的時候,可以選擇source源id、source源subtask index進行組合,調整統計粒度。

B、統計的時候,當前Operator及當前Operator subtask index總是參與粒度名稱的生成,固定的。

2、三種時延跟蹤策略及其源碼定義

Single - 跟蹤延遲,無需區分:源+源子任務

(例如雙流Join的兩個source,這里都默認為一個數據源了)

		SINGLE {
			String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
				// 只有自己的operatorId和operatorSubtaskIndex參與Metric名稱生成
				// LatencyMarker帶有的id(源)不參與Metric名稱生成
				return String.valueOf(operatorId) + operatorSubtaskIndex;
			}
		}

Operator - 跟蹤延遲,區分源,但不區分源的子任務;

		OPERATOR {
			String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
			// LatencyMarker帶有的id(源)中id參與計算
				return String.valueOf(marker.getOperatorId()) + operatorId + operatorSubtaskIndex;
			}
		}

Subtask - 跟蹤延遲,區分源+源子任務

		SUBTASK {
			String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
				return String.valueOf(marker.getOperatorId()) + marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex;
			}
		}

根據上述不同的名稱key,將直方圖對象放入Map中,Map定義:

Map<string, descriptivestatisticshistogram=""> latencyStats = new HashMap<>()
偽代碼(創建直方圖):
latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize);
this.latencyStats.put(uniqueName, latencyHistogram);

偽代碼(更新直方圖):
long now = System.currentTimeMillis();
latencyHistogram.update(now - marker.getMarkedTime())

3、Single、Operator 、Subtask 時延策略在Web Metric中的體現

上述Single、Operator 、Subtask不同測試,生成的Metric名稱和group就會產生變化,Web Metric中名稱相應改變

一個Subtask時延粒度的Metric路徑:

Job_<source_id><source_subtask_index><operator_id>_<operator_subtask_index> .latency

七、總結說明

1、LatencyMarker不參與window、MiniBatch的緩存計時,直接被中間Operator下發。

2、Metric路徑:TaskManagerJobMetricGroup/operator_id/operator_subtask_index/latency(根據時延配置粒度Granularity,路徑會有變化,參考本文第六章節)

3、每個中間Operator、以及Sink都會統計自己與Source節點的鏈路延遲,我們在監控頁面,一般展示Source至Sink鏈路延遲。

4、延遲粒度細分到Task,可以用來排查哪台機器的Task時延偏高,進行對比和運維排查。

5、從實現原理來看,發送時延標記間隔配置大一些(例如20秒一次),一般不會影響系統處理業務數據的性能(所有的StreamSource Task都按間隔發送時延標記,中間節點有多個輸出通道的,隨機選擇一個通道下發,不會復制多份數據出來)。

https://www.it610.com/article/1278165442950610944.htm


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM