kcp源碼走讀


kcp協議與tcp協議類似,是一種ARQ協議。他的優點在於比tcp的延遲更小30%-40%,但相應的會犧牲一部分的帶寬,大該比tcp多浪費10%~20%。tcp的設計目標是增大網絡利用率,而kcp的設計目標是增大網絡傳輸速率。因此kcp與tcp對比,kcp有如下機制可以提高傳輸速度:

1.kcp的RTO每次是增加為1.5倍,相比tcp的2倍,具有更短的超時重傳時間
2.無延遲ACK,通過配置讓ack立即發送,而tcp為了增加網絡利用率會盡量讓ack與用戶數據一起發送
3.快速重傳門限可控制,可以適當縮小fastack門限,提高重傳響應速度
4.earlyRetrans機制,無用戶數據時立刻發送ack
5.擁塞控制可取消,取消擁塞控制后,用戶的發包速率不受網絡擁塞的影響

 

golang版本kcp源碼下載地址:https://github.com/skywind3000/kcp

 

一個發送數據接收數據的基本流程如下

//用戶有一段數據buf需要發送,於是調用send函數

kcp1.Send(buf.Bytes())//send函數將buffer分片成kcp的數據包格式,存在待發送隊列中

kcp1.flush()//將發送隊列中的數據通過下層協議(UDP)進行發送

//kcp2接收到下層協議(UDP)傳進來的數據底層數據buffer

kcp2.Input(buffer[:hr], true, false)//調用Input函數將UDP接收的數據buffer轉換成kcp的數據包格式

hr = int32(kcp2.Recv(buffer[:10]))//kcp2將接收的kcp數據包還原成kcp1發送的buffer數據

 在Send的時候用戶數據長度不作限制,但在Recv的時候不一定能夠一次性接收完Send的所有數據,用戶在Recv后應該做校驗。


 
func (kcp *KCP) Recv(buffer []byte) (n int)

    // merge fragment
    count := 0
    for k := range kcp.rcv_queue {
        seg := &kcp.rcv_queue[k]
        copy(buffer, seg.data)
        buffer = buffer[len(seg.data):]
        n += len(seg.data)
        count++
        kcp.delSegment(*seg)
        if seg.frg == 0 {
            break
        }
    }
    if count > 0 {
        kcp.rcv_queue = kcp.remove_front(kcp.rcv_queue, count)
    }

count = 0
    for k := range kcp.rcv_buf {
        seg := &kcp.rcv_buf[k]
        if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
            kcp.rcv_nxt++
            count++
        } else {
            break
        }
    }
 
         
    if count > 0 {
        kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
        kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
    }
// fast recover
    if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
        // ready to send back IKCP_CMD_WINS in ikcp_flush
        // tell remote my window size
        kcp.probe |= IKCP_ASK_TELL
    }
 
        

recv函數將接收消息隊列中的數據包還原成原來的消息格式,通過buffer返回給調用者

還會把rcv_buf中的與接收序號相匹配的數據拷貝到rcv_queue中。這里注意到在Input->parse_data函數中有同樣的處理,這里之所以需要重復處理是因為kcp.rcv_queue的大小可能會發生改變,len(kcp.rcv_queue) < int(kcp.rcv_wnd)條件有可能重新成立。

fast_recover標識的意思是快速告知對端我又有窗口大小空出來了,因為在Input函數中有可能窗口會滿了,此時發送給對端的是窗口滿消息,而在recv過后,因為取走了消息,可用接收窗口又變大了,此時需要快速告知對端可以繼續發消息了。

 

 

func (kcp *KCP) Send(buffer []byte) int {
    var count int
    if len(buffer) == 0 { return -1 } // append to previous segment in streaming mode (if possible) if kcp.stream != 0 { n := len(kcp.snd_queue) if n > 0 { seg := &kcp.snd_queue[n-1] if len(seg.data) < int(kcp.mss) { capacity := int(kcp.mss) - len(seg.data) extend := capacity if len(buffer) < capacity { extend = len(buffer) } // grow slice, the underlying cap is guaranteed to // be larger than kcp.mss oldlen := len(seg.data) seg.data = seg.data[:oldlen+extend] copy(seg.data[oldlen:], buffer) buffer = buffer[extend:] } } if len(buffer) == 0 { return 0 } } if len(buffer) <= int(kcp.mss) { count = 1 } else { count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss) } if count > 255 { return -2 } if count == 0 { count = 1 } for i := 0; i < count; i++ { var size int if len(buffer) > int(kcp.mss) { size = int(kcp.mss) } else { size = len(buffer) } seg := kcp.newSegment(size) copy(seg.data, buffer[:size]) if kcp.stream == 0 { // message mode seg.frg = uint8(count - i - 1) } else { // stream mode seg.frg = 0 } kcp.snd_queue = append(kcp.snd_queue, seg) buffer = buffer[size:] } return 0 }

send函數主要功能是把用戶需要發送的字符數組轉化成kcp的數據包。如果用戶的數據超過一個MSS,還會對數據進行分片。這里有兩種分片的方式,消息方式和流方式。消息方式把用戶數據分片,為每個分片設置ID,將分片后的數據一個一個地存入發送隊列種,接收方通過id解析回原來的包,消息方式一個分片的數據量可能不能達到MSS(最大分片大小)。流方式則是會檢測每個發送隊列里的分片是否達到最大mss,如果沒有達到就會用新的數據填充分片。流方式的網絡速度優於消息方式,但是流方式接收方接收時是一個分片一個分片地接收,而消息方式kcp接收函數會自己把原本屬於一個數據的分片重組回來。

 

func (kcp *KCP) flush(ackOnly bool)

flush函數大致功能如下:

發送ack

發送探測窗口消息

將發送隊列中的消息存入緩存隊列(緩存隊列實際上就是發送窗口)

檢查緩存隊列中當前需要發送的數據(包括新傳數據與重傳數據)

根據重傳數據更新發送窗口大小

if change > 0 {
        inflight := kcp.snd_nxt - kcp.snd_una
        kcp.ssthresh = inflight / 2
        if kcp.ssthresh < IKCP_THRESH_MIN {
            kcp.ssthresh = IKCP_THRESH_MIN
        }
        kcp.cwnd = kcp.ssthresh + resent
        kcp.incr = kcp.cwnd * kcp.mss
    }

在發生快速重傳的時候,會將慢啟動閾值調整為當前發送窗口的一半,並把擁塞窗口大小調整為kcp.ssthresh + resent,resent是觸發快速重傳的丟包的次數,resent的值代表的意思在被弄丟的包后面收到了resent個數的包的ack。這樣調整后kcp就進入了擁塞控制狀態。

 

    if lost > 0 {
        kcp.ssthresh = cwnd / 2
        if kcp.ssthresh < IKCP_THRESH_MIN {
            kcp.ssthresh = IKCP_THRESH_MIN
        }
        kcp.cwnd = 1
        kcp.incr = kcp.mss
    }

如果發生的超時重傳,那么就重新進入慢啟動狀態。

 

func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int {
    una := kcp.snd_una
    if len(data) < IKCP_OVERHEAD {
        return -1
    }

    var maxack uint32
    var lastackts uint32
    var flag int
    var inSegs uint64

    for {
        var ts, sn, length, una, conv uint32
        var wnd uint16
        var cmd, frg uint8

        if len(data) < int(IKCP_OVERHEAD) {
            break
        }

        data = ikcp_decode32u(data, &conv)
        if conv != kcp.conv {
            return -1
        }

        data = ikcp_decode8u(data, &cmd)
        data = ikcp_decode8u(data, &frg)
        data = ikcp_decode16u(data, &wnd)
        data = ikcp_decode32u(data, &ts)
        data = ikcp_decode32u(data, &sn)
        data = ikcp_decode32u(data, &una)
        data = ikcp_decode32u(data, &length)
        if len(data) < int(length) {
            return -2
        }

        if cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
            cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS {
            return -3
        }

        // only trust window updates from regular packets. i.e: latest update
        if regular {

            kcp.rmt_wnd = uint32(wnd)

        }
        kcp.parse_una(una)
        kcp.shrink_buf()

        if cmd == IKCP_CMD_ACK {
            kcp.parse_ack(sn)
            kcp.shrink_buf()
            if flag == 0 {
                flag = 1
                maxack = sn
            } else if _itimediff(sn, maxack) > 0 {
                maxack = sn
            }
            lastackts = ts
        } else if cmd == IKCP_CMD_PUSH {
            if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 {
                kcp.ack_push(sn, ts)
                if _itimediff(sn, kcp.rcv_nxt) >= 0 {
                    seg := kcp.newSegment(int(length))
                    seg.conv = conv
                    seg.cmd = cmd
                    seg.frg = frg
                    seg.wnd = wnd
                    seg.ts = ts
                    seg.sn = sn
                    seg.una = una
                    copy(seg.data, data[:length])
                    kcp.parse_data(seg)
                } else {
                    atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
                }
            } else {
                atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
            }
        } else if cmd == IKCP_CMD_WASK {
            // ready to send back IKCP_CMD_WINS in Ikcp_flush
            // tell remote my window size
            kcp.probe |= IKCP_ASK_TELL
        } else if cmd == IKCP_CMD_WINS {
            // do nothing
        } else {
            return -3
        }

        inSegs++
        data = data[length:]
    }
    atomic.AddUint64(&DefaultSnmp.InSegs, inSegs)

    if flag != 0 && regular {
        kcp.parse_fastack(maxack)
        current := currentMs()
        if _itimediff(current, lastackts) >= 0 {
            kcp.update_ack(_itimediff(current, lastackts))
        }
    }

    if _itimediff(kcp.snd_una, una) > 0 {
        if kcp.cwnd < kcp.rmt_wnd {
            mss := kcp.mss
            if kcp.cwnd < kcp.ssthresh {
                kcp.cwnd++
                kcp.incr += mss
            } else {
                if kcp.incr < mss {
                    kcp.incr = mss
                }
                kcp.incr += (mss*mss)/kcp.incr + (mss / 16)
                if (kcp.cwnd+1)*mss <= kcp.incr {
                    kcp.cwnd++
                }
            }
            if kcp.cwnd > kcp.rmt_wnd {
                kcp.cwnd = kcp.rmt_wnd
                kcp.incr = kcp.rmt_wnd * mss
            }
        }
    }

    if ackNoDelay && len(kcp.acklist) > 0 { // ack immediately
        kcp.flush(true)
    } else if kcp.rmt_wnd == 0 && len(kcp.acklist) > 0 { // window zero
        kcp.flush(true)
    }
    return 0
}

input函數接收udp協議傳過來的報文,把udp報文解碼成kcp報文進行緩存。

kcp報文分為ack報文,數據報文,探測窗口報文,響應窗口報文四種。

kcp報文的una字段表示對端希望接收的下一個kcp包序號,也就是說明接收端已經收到了所有小於una序號的kcp包。解析una字段后需要把發送緩沖區里面包序號小於una的包全部丟棄掉。 ack報文則包含了對端收到的kcp包的序號,接到ack包后需要刪除發送緩沖區中與ack包中的發送包序號(sn)相同的kcp包。上述una和ack處理完后,需要更新kcp.snd_una(發送端第一個未被確認的包),如果snd_una增加了那么就說明對端正常收到且回應了發送方發送緩沖區第一個待確認的包,此時需要更新cwnd(擁塞窗口)

收到數據報文時,需要判斷數據報文是否在接收窗口內,如果是則保存ack,如果數據報文的sn正好是待接收的第一個報文rcv_nxt,那么就更新rcv_nxt(加1)。如果配置了ackNodelay模式(無延遲ack)或者遠端窗口為0(代表暫時不能發送用戶數據),那么這里會立刻fulsh()發送ack。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM