netty 的流量整形深度探險


重點摘要

netty通過AbstractTrafficShapingHandler 即TSH 來實現限流的基本框架,它的流量整形的具體作用是? 分析已經有同行做了,而且分析的非常好,這里做些摘要( https://www.jianshu.com/p/bea1b4ea8402):
 
AbstractTrafficShapingHandler 的3大實現:
分別是,GlobalTrafficShapingHandler、ChannelTrafficShapingHandler、GlobalChannelTrafficShapingHandler,其中ChannelTrafficShapingHandler最好理解,它是對單個通道進行限制;而 GlobalTrafficShapingHandler 是全局流量整形,也就是說它限制了全局的帶寬,無論開啟了幾個channel。但 Global 並不是對本地所有的channel 的總流量進行限流,而是僅僅maxGlobalWriteSize GlobalTrafficShapingHandler 對全局的channel 采取一致的 速度控制, 並不是 總共的大小,進行控制。 GlobalChannelTrafficShapingHandler 最難理解, 它其是對GlobalTrafficShapingHandler做了優化,使得等待時間更加平衡,這里暫時不展開。
 
重點難點:
channelRead 是關鍵, 其作用就是在每次讀到數據的時候計算其大小,然后判斷是否超出限制,超出則等待一會(但不能超過最大值),否則就直接讀

ReopenReadTimerTask 是重啟讀操作的定時任務。它在讀的時候,如果需要等待則設置為channel 的屬性,同時提交 該定時任務 到ctx.executor 

write 方法作用類似channelRead ,其作用就是在每次准備寫數據的時候計算其大小,然后判斷是否超出限制,超出則等待一會(但不能超過最大值),否則就直接寫
 

TrafficCounter 難點分析

TrafficCounter 是流量計數器,也是很關鍵、很重要的東西,源碼不難看懂,不過,也不是很容易。那個博客基本上已經很清楚了,但是 還是好幾個關鍵點沒有說明白!
 
Counts the number of read and written bytes for rate-limiting traffic. It computes the statistics for both inbound and outbound traffic periodically at the given checkInterval, and calls the AbstractTrafficShapingHandler.doAccounting(TrafficCounter) method back. If the checkInterval is 0, no accounting will be done and statistics will only be computed at each receive or write operation.
Counts the number of read and written bytes for rate-limiting traffic.
It computes the statistics for both inbound and outbound traffic periodically at the given checkInterval, and calls the AbstractTrafficShapingHandler.doAccounting(TrafficCounter) method back. If the checkInterval is 0, no accounting will be done and statistics will only be computed at each receive or write operation.
 
意思是 計算被限制流量的read和written 的字節,它周期性的統計進、出的流量,周期是checkInterval,然后回調AbstractTrafficShapingHandler.doAccounting(TrafficCounter) 方法。 如果checkInterval是0, 那么不進行accounting ,而是只統計每次 receive 或者 write 。單單是看這個,肯定是理解不了的, 也記不住。需要仔細研究一下源碼。
 

TrafficCounter 的變量

TrafficCounter 的方法有很多變量。一個變量可以理解,但是為什么這么多啊! 初看還是比較難懂的。其中checkInterval 一般是很少改動的,在構造函數中確定, 是executor.scheduleAtFixedRate 的參數。
 
其實很有規律,其中很多last開頭的變量,只在 resetAccounting方法被寫,也就是被改變:
 
last 開頭的幾個變量,都在這個方法中被重置
首先, lastTime 也被重置為當前時間, 基本上就是一個窗口過去,就修改它,所以lastTime 可以理解為就是上個窗口的終點,或當前窗口的起點!它跟讀寫無關!
 
然后
lastReadBytes lastWrittenBytes 是上個時間窗口的讀、寫到的數據總和;—— 指用戶發出的動作,但實際可能會被封裝為一個延遲任務!
lastReadThroughput lastWriteThroughput 是使用秒作為單位,所以乘以1000,表示 讀或寫吞吐量 xx 字節/秒
realWriteThroughput 是上個時間窗口實際發生的寫的數據總和
 
lastWritingTime 是上一次寫的時間; lastWritingTime 是lastWritingTime writingTime中取較大者, 也就是較新的一個時間 ,lastReadingTime 類似。從Math.max(lastWritingTime, writingTime)來看, 不好理解,因為是自己和 上次寫writingTime 取max,所以 lastWritingTime肯定是越來越大
一般情況下,應該都是 這行代碼執行完后, lastWritingTime = writingTime;
任意時刻下,因為writingTime 任何情況下,不可能變小,應該應該有: lastWritingTime <= writingTime;
 
lastReadingTime 也是同理。
 
lastTime 是重置為 入參,resetAccounting方法是 lastTime 唯一修改的地方。
除了last 開頭的幾個變量,重置的部分還有 current 開頭的變量,包括currentReadBytes currentWrittenBytes 被置為0, 還有 realWrittenBytes
 
修改的部分 lastReadBytes lastWrittenBytes 平時都是不變的,當前方法會進行賦值,值來源正是currentReadBytes currentWrittenBytes ,即是 當前上個窗口時間內已經完成讀寫的部分,lastWriteThroughput lastWriteThroughput 也是平時都是不變,當前方法會進行計算
 
累加的部分是 realWriteThroughput ,通過 realWrittenBytes 計算而得
 
readingTime在每次 readTimeToWait 方法 准確說是 讀操作 AbstractTrafficShapingHandler.channelRead 的時候更新,writingTime 類似
currentReadBytes 也是
 
writingTime readingTime 分別在每次進行寫或者讀的時候進行更新,但是 需要特別注意的是, 它表示的是 寫或者讀動作完成的時間, 而不是開始的時間!(除非超過了最大等待時間即 maxTime),所以它可能是未來的時間! ( lastWritingTime 也可能是未來的時間!)
 
其他時候只會被讀。
 

TrafficCounter 的重要方法

resetAccounting 方法

resetAccounting 是Reset the accounting on Read and Write. 有一個參數是newLastTime 是說我們應該考慮進行更新的時間。其實是重置,就是把所有last開頭的變量 置為原始狀態,或者進行了累加。io.netty.handler.traffic.TrafficCounter#start 的時候,啟動一個定時任務:TrafficMonitoringTask ,每隔固定時間執行:scheduleAtFixedRate,—— 這個非常重要,它就相當於實現了一個時間窗口。一定要理解。
/**
 * Reset the accounting on Read and Write.
 *
 * @param newLastTime the milliseconds unix timestamp that we should be considered up-to-date for.
 */
synchronized void resetAccounting(long newLastTime) {
    //interval為什么每次要重新計算?看起來它是固定的,其實不一定,因為我們是固定時間間隔,所以可能會因為具體定時任務稍微單位幾個毫秒
    long interval = newLastTime - lastTime.getAndSet(newLastTime);
    if (interval == 0) {
        // nothing to do
        return;
    }
    if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
        logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
    }
    lastReadBytes = currentReadBytes.getAndSet(0);
    lastWrittenBytes = currentWrittenBytes.getAndSet(0);
    lastReadThroughput = lastReadBytes * 1000 / interval;// lastReadThroughput 是使用秒作為單位
    // nb byte / checkInterval in ms * 1000 (1s)
    lastWriteThroughput = lastWrittenBytes * 1000 / interval;
    // nb byte / checkInterval in ms * 1000 (1s)
    realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
    lastWritingTime = Math.max(lastWritingTime, writingTime);
    lastReadingTime = Math.max(lastReadingTime, readingTime);
}

 

writeTimeToWait方法

以寫為例,我們看io.netty.handler.traffic.TrafficCounter#writeTimeToWait(long, long, long, long),我們看到 里面很多個if else, 只要size == 0 || limitTraffic == 0,那么writingTime 必然會被更新一次。 其中的代碼 其實還是比較難理解的。
 
記上一次寫的時間戳(特指 寫操作實際完成的時間!),即writingTime,縮寫為w,分下面幾個情況 分析:
 
1、小於當前窗口的起點:
即w < c2,比如w=w2,那么lastWritingTime<=writingTime<=w2,因為w2-c2<0, 所以postDelay=0, lastWrittenBytes是整個window2窗口寫的數據之和; 進一步,如果w小於上一個窗口的起點,即如果w < c1,比如w=w1, 則lastWrittenBytes必然是0
 
2、大於當前窗口的起點,但小於當前時間:
即c2<w<now ,分兩組情況:
a、 lastWritingTime<=c2 。比如w=w3,那么writingTime是w3,但 lastWritingTime還是w2; 因為w2-c2<0, 所以postDelay=0, 同樣lastWrittenBytes是整個window2窗口寫的數據之和
b、 lastWritingTime>c2 。比如w=lastWritingTime=w3,那么 w2-c2>0, 所以postDelay>0.
 
3、大於當前窗口的起點,且大於當前時間:
即c2<now<w ,比如w=w4,那么lastWritingTime是w4;postDelay=w4-c2,同樣lastWrittenBytes是整個window2窗口寫的數據之和
a、 lastWritingTime<=c2 。比如w=w2, postDelay=0
b、 lastWritingTime>c2 。比如w=w3 或者w=w4, postDelay>0
 

 

 

 
c 0/1/2 表示 定時任務的各個checkpoint 時間點
localWritingTime = writingTime,即上一次寫即writingTime,縮寫為w,
lastTimeCheck 即上次定時任務的時間點,即圖中的c2
interval = now - lastTimeCheck 即當前時間距離上個c 的時間差,也就是當前窗口已經走過的時間!
 
long pastDelay = Math.max(lastWritingTime- lastTimeCheck, 0); 其實不好理解,因為lastWritingTime 是上一個窗口的最后寫時間,和現在的寫操作可能隔着好幾個寫操作呢... pastDelay 的最小值是0, ;如果分拆為兩種情況,那么會稍微好理解一點,
 
lastWritingTime - lastTimeCheck <= 0 是上面分析的 1 , 此時lastWritingTime 位於C2 的左邊,pastDelay 為0,
lastWritingTime - lastTimeCheck >0 是上面分析的 2、3, 此時lastWritingTime 位於C2 的右邊,pastDelay > 0, 表示上次檢查就已經存在的 超出其本身窗口的延遲, 那為什么當前窗口的每一次寫 都要去計量它呢? 是lastTimeCheck 可能會發生變化嗎? lastTimeCheck 發生變化,只可能是configure() 方法, 這顯然不會經常發生。 所以這個可能是 誤寫的bug嗎? 是不是把 lastWritingTime 改成 writingTime 更好?
 
如果pastDelay 是上次檢查就已經存在的 超出其本身窗口的延遲,那么它在整個當前窗口都是固定的!
 
---- 仔細考慮, 其實沒有bug! 因為我們當前計算 寫延遲, 是從lastTimeCheck 開始計算的:
long time = sum * 1000 / limitTraffic - interval + pastDelay;
 
這行代碼,sum 是計算當前窗口的所有寫數據大小,乘以 1000 是因為limitTraffic 的單位是秒,需要轉為毫秒;
limitTraffic 就是限制的速度
interval 是當前窗口已經走過的時間!
pastDelay 是上個窗口(准確說是 之前所有的窗口的)的留下的延遲, 是不能不考慮的
 
總體就是
當前窗口所有寫操作的需要的總的延遲 = 計算當前窗口的已經執行的寫的總流量 / 當前窗口已經走過的時間 + 之前所有寫的留下的延遲
 
換句話說, time 是當前窗口所有寫操作的需要的總的延遲, 它並不是 當次寫的需要的延遲了!! 這一點和 Guava 的Ratelimiter 差別非常大!
 
如果 time > 最小延遲10ms, 那么當前寫操作自然需要排隊到后面, 從而就應該 返回一個等待時間! 可以想象的是, 如果當前+之前的寫操作導致延遲很大,本次寫也不得不排在它的后面!
 
等等, 這里為什么判斷 是否大於最小延遲10ms呢? 我猜測是因為,10ms太短,沒有使用延遲任務的必要性,直接去阻塞的方式寫吧!而且從io.netty.handler.traffic.AbstractTrafficShapingHandler#write 方法看, wait 時間也是需要大於最小延遲10ms! 就是說ctx.write 方法本來是異步式的, 然后寫到管道中去,讓它去管道中稍微排隊一會吧!但是 如果搞成異步任務的話,還需要修改通道的可寫興,即setUserDefinedWritability,然后啟動異步檢測任務 10ms之內把它改回來,也是挺消耗性能的!大概是因為 不能太頻繁,xx也需要一定時間。 實際的真正的寫操作其實很快,關鍵是需要wait時間。
 
writeTimeToWait 方法的復雜之處在於其中好幾個if , interval > 最小延遲10ms 的if 已經說完,else呢? else情況是怎么處理的?仔細看, else情況其實是通過計算當前窗口和上一個窗口的 總流量 / 總時間差 來求得需要的 延遲。我想是因為 如果interval < 10ms 太小,可能計算不准確,干脆拿上個窗口的寫流量來一起計算!
 
 

TrafficCounter的超級難點

===================== 超級難點 =====================
 
if (time > maxTime && now + time - localReadingTime > maxTime) {
    time = maxTime;
}
time > maxTime && now + time - localWritingTime > maxTime 怎么理解?time > maxTime 意思比較直白就是 延遲不能超過了最大延遲, now + time - localWritingTime > maxTime( 記為X),比較難理解。如果不考慮maxTime ,其實X 就是當前寫操作應該時間執行的時間點!
 
now + time - localWritingTime > maxTime ==> time - maxTime > localWritingTime - now
因為time - maxTime > 0, 所以,
如果 localWritingTime - now < 0 , 也就是 上一個寫 在當前時間之前 則自動成立;那么如果本次寫的總延遲 大於 最大延遲 則重置 延遲為 maxTime
如果 localWritingTime - now > 0, 也就是 上一個寫 在當前時間之后,那么需要把 這個時間差計算在內! 也就是如果 延遲 需要大於 最大延遲 + 上一個寫在當前時間之后的時間差,則重置 延遲為 maxTime! 比如上一個寫 在 1點30分,當前時間是 1點25分,maxTime 是15分鍾,那么如果本次寫的總延遲為21 > 15 + 5 = 20, 但是因為最大延遲是 15, 所以需要重置 延遲時間為 15,而不能是20! 這個判斷,實際上是避免了 過長的等待! 如果本次寫的總延遲為17, 那么不會重置, 還是 17分鍾!可見, 還是可能出現 延遲大於15分鍾的!
 
從另外角度分析
X = now + time - localWritingTime
= now + ( sum * 1000 / limitTraffic - interval + pastDelay )- localWritingTime
= now + ( sum * 1000 / limitTraffic - now + lastTimeCheck + pastDelay )- localWritingTime
= sum * 1000 / limitTraffic + pastDelay + lastTimeCheck - localWritingTime
 
可見 X 其實跟now 無關,其實就是 當前窗口的所有寫延遲 + 之前所有的寫延遲 + 上次寫實際執行的時間 。注意到lastTime writingTime lastWritingTime 等時間值,包括now,並不是時間戳,而是 System.nanoTime() ,也就是cpu 指紋時間!它們 只能用來計算相對值。(目的是為了 精度?)
 
lastTimeCheck - localWritingTime 意味着什么?分兩種情況來看比較好:
1、如果 lastTimeCheck - localWritingTime > 0, 也就是上一個寫的理論上應該實際執行的時間 在當前窗口起點之前, 那么pastDelay = 0,相當於上一個寫還在上一個窗口,本窗口沒有發生過寫,那么它 相當於是pastDelay 的補償,也就是之前的空閑!
2、如果 lastTimeCheck - localWritingTime <= 0, 也就是上一個寫的理論上應該實際執行的時間 在當前窗口起點之后, 那么它就是 上一個寫的總的寫延遲。
 
還是不直觀,這樣考慮:
第一次寫的時候肯定是 localWritingTime = init time, 所以 lastTimeCheck - localWritingTime 就是 啟動到當前窗口起點的時間差,則 X 就是以當次寫的實際執行的時間差; 第二次寫的時候 lastTimeCheck - localWritingTime是 第一次寫 從當前窗口起點 超出的部分的負數( 可能是正數、也可能負數)也就是上一次的 延遲, 所以X是補償了上一次寫的延遲的 時間差,也就是本次的寫的單獨計算的 時間延遲!
 
注意 writeTimeToWait 返回的是從當前時間 開始的延遲!localWritingTime 是上一個寫的理論上應該實際執行的時間,時間變量很多, 需要搞清楚哪個是 時間片段或稱時間差,哪個是時間點 或稱CPU的指紋時間點。
 
但是注意到因為 pastDelay 只會計算之前的延遲,不會計算之前的 空閑,所以X 意味着考慮之前空閑之后的真正的 延遲!
 
綜上 X = 當前窗口的所有寫延遲 + 之前所有的寫延遲 + 之前所有的寫延遲的誤差,X 到底是什么? 其實就是本地寫 單獨核算需要的延遲!
 
所以, time > maxTime && X > maxTime 就是意味着,當前總延遲 大於 最大時間,且本次寫單獨核算的延遲 大於 最大時間!
 
就是說 如果 總延遲 大於 最大時間, 但是 本次寫單獨核算的延遲 沒有超出,那么 可能需要等待更久,如果本次超出,那么就延遲重置為 最大時間!
 
writeTimeToWait 方法很復雜,readTimeToWait 稍微簡單一點,下文分析!
 
真是煞費苦心!!!
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM