《中間件性能挑戰賽--分布式統計和過濾的鏈路追蹤》java 選手分享


2020年6月份天池舉辦的《中間件性能挑戰賽》可謂是異常激烈,本人抽業余時間報名參與,感受比賽慘烈的同時,也有諸多感慨哈,總結一個多月的賽程,多少有一些心得與大家分享

本文原創地址:https://www.cnblogs.com/xijiu/p/14235551.html 轉載請注明

賽題

賽題地址: https://tianchi.aliyun.com/competition/entrance/231790/information

實現一個分布式統計和過濾的鏈路追蹤

  • 賽題背景

    本題目是另外一種采樣方式(tail-based Sampling),只要請求的鏈路追蹤數據中任何節點出現重要數據特征(錯慢請求),這個請求的所有鏈路數據都采集。目前開源的鏈路追蹤產品都沒有實現tail-based Sampling,主要的挑戰是:任何節點出現符合采樣條件的鏈路數據,那就需要把這個請求的所有鏈路數據采集。即使其他鏈路數據在這條鏈路節點數據之前還是之后產生,即使其他鏈路數據在分布式系統的成百上千台機器上產生。

  • 整體流程

    用戶需要實現兩個程序,一個是數量流(橙色標記)的處理程序,該機器可以獲取數據源的http地址,拉取數據后進行處理,一個是后端程序(藍色標記),和客戶端數據流處理程序通信,將最終數據結果在后端引擎機器上計算。具體描述可直接打開賽題地址 https://tianchi.aliyun.com/competition/entrance/231790/information。此處不再贅述

    image

解題

題目分析

可將整體流程粗略分為三大部分

  • 一、front 讀取 http stream 至 front 節點
    • 網絡io
  • 二、front 節點處理數據
    • cpu處理
  • 三、將 bad traces 同步至 backend,backend 經過匯總計算完成上報
    • 網絡傳輸 + cpu

遵循原則:各部分協調好,可抽象為生成、消費模型,切勿產生數據飢餓;理想效果是stream流完,計算也跟着馬上結束

方案一 (trace粒度)

最先想到的方案是以trace細粒度控制的方案

因題目中明確表明某個 trace 的數據出現的位置前后不超過2萬上,故每2萬行是非常重要的特征

  • 按行讀取字符流
    • BufferedReader.readLine()
  • 分析計算
    • 在某個 trace 首次出現時(p),記錄其結束位置 p + 2w,並將其放入待處理隊列(queue)中
    • 如果當前 trace 為 badTrace,將其放入 BadTraceSet 中
    • 每處理一行,均從 queue 隊列中拿出 firstElement,判斷是否與之相等,如果相等,且為 badTrace,那么進入第3步
  • 向 backend 發送數據 注:后續所有涉及網絡交互的部門均為 netty 異步交互
    • 將當前 trace 對應的所有數據按照 startTime 排序
    • 將排好序的數據與該 trace 最后結束位置 endPosition 一並發送至 backend 節點
  • backend 通知 front2 發送數據
    • backend 接收到從 front1 發送過來的 trace 數據,向 front2 發送通知,要求其發送該 trace 的全部數據
    • 注:此處交互有多種情況,在步驟5時,會具體說明
  • front2 將數據發送至 backend 此處列舉所有可能發生的情況
    • 場景 1:front1 主動上報 traceA 后,發現 front2 已經上報該 traceA 數據,結束
    • 場景 2:front1 主動上報 traceA 后,front2 未上報,front2 發現該trace在已就緒集合中,排序、上報,結束
    • 場景 3:front1 主動上報 traceA 后,front2 未上報,且 front2 的已就緒集合沒有該 trace,在錯誤集合中發現該 trace,結束 注:因該 trace 存在於 badTraceSet 中,故將來某個時刻 front2 一定會主動上報該 trace
    • 場景 4:front1 主動上報 traceA 后,front2 未上報,且 front2 的已就緒集合沒有該 trace,那么等待行數超限后,檢查該 trace 是否存在於 badTraceSet 中,如果已存在,結束
    • 場景 5:front1 主動上報 traceA 后,front2 未上報,且 front2 的已就緒集合沒有該 trace,那么等待行數超限后,檢查該 trace 是否存在於 badTraceSet 中,如果不存在,排序、上報,結束 注:即便是 front2 中不存在該trace的信息,也需要上報
  • 結果計算
    • 在收集到 front1 跟 front2 數據后,對2個有序集合進行 merge sort 后,計算其 MD5

方案分析

此方案的跑分大致在 25s 左右,成績不甚理想,總結原因大致可分為以下幾種

  • 交互場景較為復雜
  • 需要維護一塊緩存區域
    • 如果該緩存區域通過行數來失效過期數據的話,那么需要額外的分支計算來判斷過期數據,拖慢整體響應時間
    • 如果通過緩存大小來自動失效過期數據的話,那么大小設置很難平衡,如果太小,則可能會失效一些正常數據,導致最終結果不正確,如果太大,又會導致程序反應變慢,帶來一系列不可控的問題

基於上述原因,為了充分利用 2萬行的數據特征,引入方案二

方案二 (batch粒度)

說明:為了更優雅處理數據過期及充分利用2萬行特性,故衍生出此版本

因題目中明確表明某個trace的數據出現的位置前后不超過2萬上,故每2萬行數據可作為一個批次存儲,過期數據自動刪除

  • 按行讀取字符流

    • BufferedReader.readLine()
  • 每2萬行數據作為一個batch,並分配唯一的batchId(自增即可),此處涉及大量cpu計算,分為2部分

    • 在每行的 tag 部分尋找error=1http.status_code!=200的數據並將其暫存
    • 將 traceId 相同的 span 數據放入預先開辟好空間的數據結構List<Map<String, List<String>>>中,方便后續 backend 節點拿取數據
    • 注:此處下載數據與處理數據並行執行,交由2個線程處理,一切為了提速
  • 上報 badTraceId

    • 每切割 2萬行,統計所有的 badTraceId,與 batchId 一並上報至 backend
    • 因同一個 span 數據可能分布在2個 front 節點中,所以有可能 front1 存在錯誤數據,front2 卻全部正確,2個 front 又不能直接通信,所以此時需要同步至 backend,由 backend 統計全量的 badTraceIds
    • front 收到 backend 的通知后,進行當前批次錯誤 trace 數據的統計,因當前批次的數據有可能出現在上一個批次跟下一個批次,故一定要等到處理每行數據的線程已經處理完 currentBatchNum+1 個線程后,方能執行操作
  • 通知2個 front 節點發送指定 traceIds 的全量數據

    • backend 主動向2個 front 發送獲取指定 traceIds 的全量數據通知
    • front 將 span 數據排好序后上報至 backend
    • backend 執行二路歸並排序后,計算整體 span 的 md5 值,反復循環,直至數據流讀取完畢 注:因2個 front 節點為2核4g,backend 節點為單核2g,為減少 backend 壓力,將部分排序工作下放至 front 節點
  • 計算結果

    • 歸並排序,計算最終結果

方案總結

當前方案耗時在20s左右,統計發現字符流的讀取耗時15s,其他耗時5s,且監控發現各個緩沖區沒有發現飢餓、過剩的情況,所以當前方案的瓶頸還是卡在字符流的讀取、以及cpu判斷上,所以一套面向字節流處理的方案呼之欲出

  • 跟讀BufferedReader源碼,發現其將字節流轉換字符流做了大量的工作,也是耗時的源頭,故需要將當前方案改造為面向字節的操作

方案三 (面向字節)

字符處理是耗時的根源,只能將方案改造為面向字節的方式,倘若如此,java 的大部分數據結構不能再使用

大層面的設計思想與方案二一致,不過面向字節處理的話,從讀取流、截斷行、判斷是否為bad trace、數據組裝等均需為字節操作

  • 好處:預分配內存,面向字節,程序性能提高
  • 弊端:編碼復雜,需自定義數據協議

  • 讀取字節流

    • 程序在初始化時,預先分配10個字節數據 byte[],每個數組存放10M數據
    • io 與 cpu 分離,將讀取數據任務交個某個獨立線程,核心線程處理cpu
  • 數據處理

    • 用固定內存結構 int[20000] 替換之前動態分配內存的數據結構體 List<Map<String, List<String>>>,只記錄每行開始的 position
    • 同時將 bad trace id 存入預先分配好的數組中
  • 上報 badTraceId

    • 同方案二
  • 通知2個 front 節點發送指定 traceIds 的全量數據

    • backend 主動向2個 front 發送獲取指定 traceIds 的全量數據通知
    • 因在步驟二時,並沒有針對 trace 進行數據聚合,所以在搜集數據時,需要遍歷int[20000],將符合要求的 trace 數據放入自定義規范的byte[] 注:剛開始設計的(快排+歸並排序)的方案效果不明顯,且線上的評測環境的2個 front 節點壓力很大,再考慮到某個 trace 對應的 span 數據只有幾十條,故此處將所有的排序操作都下放給 backend 節點,從而減輕 front 壓力
    • 因 span 為變長數據,故自定義規范byte[]存儲數據的設計如下
      • 預先分配10Mbyte[],來存儲一個批次中的所有 bad trace 對應 span 數據
      • 用2個 byte 存放下一個 span 數據的長度
      • 存儲 span 數據
      • 最后返回byte[]及有效長度
  • 計算結果

    • 排序,計算最終結果

線程並發

  • A-Thread: slot 粒度,讀取 http stream 線程
  • B-Thread: block 粒度,處理 slot 中的 block,將 block 數據按行切割、抓取 bad trace id 等
  • C-Thread: block 粒度,響應 backend 拉取數據的請求
阻塞場景
  • A-Thread 讀取 http stream 中數據后,將其放入下一個緩存區,如果下一個緩沖區還沒有被線程C消費,那么A-Thread 將被阻塞
  • B-Thread 處理數據,如果B-Thread下一個要處理的byte[]數據A線程還未下載完畢,那么B-Thread將被阻塞(io阻塞)
  • C-Thread 為拼接 bad trace 的線程,需要 previous、current、next 3個 batch 都 ready后,才能組織數據,當B-Thread還未處理完next batch 數據時,C-Thread將被阻塞
解決思路
  • A-B 同步:10個 slot 分配10個Semaphore,為 A-Thread 與 B-Thread 同步服務,A-Thread 產生數據后,對應 slot 的信號量+1,B-Thread 消費數據之前,需要semaphore.acquire()
  • B-C 同步:通過volatile及納秒級睡眠Thread.sleep(0, 2)實現高效響應。實際測試,某些場景中,該組合性能超過Semaphore;C-Thread 發現 B-Thread 還未產出 next batch 的數據,那么進入等待狀態
  • A-C 同步:同樣利用volatile及納秒級睡眠Thread.sleep(0, 2)

JVM調參

打印gc輸出日志時發現,程序會發生3-5次 full gc,導致性能欠佳,分析內存使用場景發現,流式輸出的數據模型,在內存中只會存在很短的一段時間便會失效,真正流入老年代的內存是很小的,故應調大新生代占比

java -Dserver.port=$SERVER_PORT -Xms3900m -Xmx3900m -Xmn3500m -jar tailbaseSampling-1.0-SNAPSHOT.jar &

直接分配約 4g 的空間,其中新生代占 3.5g,通過觀測 full gc 消失;此舉可使評測快2-3s

方案總結

此方案最優成績跑到了5.7s,性能有了較大提升,總結快的原因如下:

  • 面向字節
  • 內存預分配;避免臨時開辟內存
  • 使用輕量級鎖
  • 避免程序阻塞或飢餓

奇技淫巧

奇技淫巧,俗稱偷雞,本不打算寫該模塊,不過很多上分的小技巧都源於此,真實的通用化場景中,可能本模塊作用不大,不過比賽就是這樣,無所不用其極。。。

快速讀取字節數組

因java語言設計緣故,凡事讀取比 int 小的數據類型,統一轉為 int 后操作,試想以下代碼

while ((byteNum = input.read(data)) != -1) {
	for (int i = 0; i < byteNum; i++) {
		if (data[i] == 10) {
			count++;
		}
	}
}

大量的字節對比操作,每次對比,均把一個 byte 轉換為 4個 byte,效率可想而知

一個典型的提高字節數組對比效率的例子,采用萬能的Unsafe,一次性獲取8個byte long val = unsafe.getLong(lineByteArr, pos + Unsafe.ARRAY_BYTE_BASE_OFFSET); 然后比較2個 long 值是否相等,提速是成倍增長的,那么怎么用到本次賽題上呢?

span數據是類似這樣格式的

193081e285d91b5a|1593760002553450|1e86d0a94dab70d|28b74c9f5e05b2af|508|PromotionCenter|DoGetCommercialStatus|192.168.102.13|http.status_code=200&component=java-web-servlet&span.kind=server&bizErr=4-failGetOrder&http.method=GET

用"|"切割后,倒數第二位是ip,且格式固定為192.168.***.***,如果采用Unsafe,每次讀取一個 int 時,勢必會落在192.168.中間,有4種可能192.92.12.16.168,故可利用此特性,直接進行 int 判斷

int val = unsafe.getInt(data, beginPos + Unsafe.ARRAY_BYTE_BASE_OFFSET);
if (val == 775043377 || val == 825111097 || val == 909192754 || val == 943075630) {
}

此“技巧”提速1-2秒

大循環遍歷

提供2種遍歷字節數組方式,哪種效率更高

  • 方式1

    byte[] data = new byte[1024 * 1024 * 2];
    int byteNum;
    while ((byteNum = input.read(data)) != -1) {
    	for (int i = 0; i < byteNum; i++) {
    		if (data[i] == 10) {
    			count++;
    		}
    	}
    }	
    
  • 方式2

    byte[] data = new byte[1024 * 1024 * 2];
    int byteNum;
    int beginIndex;
    int endIndex;
    int beginPos;
    while ((byteNum = input.read(data)) != -1) {
    	beginIndex = 0;
    	endIndex = byteNum;
    	beginPos = 0;
    	while (beginIndex < endIndex) {
    		int i;
    		for (i = beginPos; i < endIndex; i++) {
    			if (data[i] == 124) {
    				beginPos = i + 1;
    				times++;
    				break;
    			} else {
    				if (data[i] == 10) {
    					count++;
    					beginIndex = i + 1;
    					beginPos = i + 1;
    					break;
    				}
    			}
    		}
    		if (i >= byteNum) {
    			break;
    		}
    	}
    }	
    

兩種方式達到的效果一樣,都是尋找換行符。方式2不同的是,每次找到換行符都 break 掉當前循環,然后從之前位置繼續循環。其實這個小點卡了我1個星期,就是將字符流轉換為字節流時,性能幾乎沒有得到提高,換成方式2后,性能至少提高一倍。為什么會呈現這樣一種現象,我還沒找到相關資料,有知道的同學,還望不吝賜教哈

結束

這種cpu密集型的賽題,一向是 c/cpp 大展身手的舞台,前排幾乎被其霸占。作為一名多年 crud 的 javaer,經過無數個通宵達旦,最終拿到了集團第6的成績,雖不算優異,但自己也盡力了哈

最終比賽成績貼上哈


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM