linux源碼解讀(二十二):網絡通信簡介——網絡擁塞控制之cubic算法


  網絡擁塞的概念大家一定不陌生,肯定都有親生體會:比如節假日的高速路堵車。本來車流量已經很大了,如果再不限制高速口的車進入,整條路只會越來越堵,所以交管部門可能會臨時限流,只允許車輛下高速,不允許上高速!互聯網剛發明的那會還沒有擁塞的概念,各個節點死命地傳輸數據,導致網絡中各種路由設備的buff瞬間被填滿,新來的包只能丟棄(像不像針對網絡中專設備的DOS攻擊了?)!為了維持網絡的正常運轉,需要接入網絡的所有節點有節制地發送數據,避免網絡擁塞!但如果節點過於節制,發送的數據報文過少,又會降低整個網絡的吞吐量、利用率(這些網絡設備閑着也是閑着)等,怎么才能盡可能最大化吞吐量,又能避免網絡擁塞了?擁塞控制算法誕生了!擁塞控制的算法很多,我這個版本采用的是cubic算法(算法細節可以參考鏈接2和3),本文以這個為准分析算法實現的細節!

  1、擁塞控制的控制引擎介紹:

  (1)由於擁塞控制涉及到很多復雜的計算公式,每個公式都有很多計算因子factor,所以要先給這些factor初始化賦值,這些賦值在cubictcp_register函數中統一完成,如下:

/*初始化各種計算的factor*/
static int __init cubictcp_register(void)
{
    BUILD_BUG_ON(sizeof(struct bictcp) > ICSK_CA_PRIV_SIZE);

    /* Precompute a bunch of the scaling factors that are used per-packet
     * based on SRTT of 100ms;計算SRTT=100ms時的縮放因子
     */

    beta_scale = 8*(BICTCP_BETA_SCALE+beta) / 3
        / (BICTCP_BETA_SCALE - beta);

    cube_rtt_scale = (bic_scale * 10);    /* 1024*c/rtt */

    /* calculate the "K" for (wmax-cwnd) = c/rtt * K^3
     *  so K = cubic_root( (wmax-cwnd)*rtt/c )
     * the unit of K is bictcp_HZ=2^10, not HZ
     *
     *  c = bic_scale >> 10
     *  rtt = 100ms
     *
     * the following code has been designed and tested for
     * cwnd < 1 million packets
     * RTT < 100 seconds
     * HZ < 1,000,00  (corresponding to 10 nano-second)
     */

    /* 1/c * 2^2*bictcp_HZ * srtt */
    cube_factor = 1ull << (10+3*BICTCP_HZ); /* 2^40 */

    /* divide by bic_scale and by constant Srtt (100ms) */
    do_div(cube_factor, bic_scale * 10);

    return tcp_register_congestion_control(&cubictcp);
}

  (2)由於cubic只是擁塞控制的算法之一(還有其他好幾種算法),並且未來可能還會誕生新的擁塞控制算法,為了便於管理現有算法,同時也能給新增的算法預留空間,linux采用了鏈表來保存/注冊所有的擁塞控制算法,實現的方法就是初始化函數最后一行的tcp_register_congestion_control,如下:

/*
 * Attach new congestion control algorithm to the list
 * of available options.
 */
int tcp_register_congestion_control(struct tcp_congestion_ops *ca)
{
    int ret = 0;

    /* all algorithms must implement ssthresh and cong_avoid ops */
    if (!ca->ssthresh || !(ca->cong_avoid || ca->cong_control)) {
        pr_err("%s does not implement required ops\n", ca->name);
        return -EINVAL;
    }
    //根據名稱、長度等計算出一個hash值,加快比對速度(早期版本是對比字符串,效率低;key是32bit的整數,比對效率比字符串快多了)
    ca->key = jhash(ca->name, sizeof(ca->name), strlen(ca->name));

    spin_lock(&tcp_cong_list_lock);
    if (ca->key == TCP_CA_UNSPEC || tcp_ca_find_key(ca->key)) {//檢查一下是不是已經在list里面了
        pr_notice("%s already registered or non-unique key\n",
              ca->name);
        ret = -EEXIST;
    } else {//新的擁塞控制算法加入鏈表
        list_add_tail_rcu(&ca->list, &tcp_cong_list);
        pr_debug("%s registered\n", ca->name);
    }
    spin_unlock(&tcp_cong_list_lock);

    return ret;
}

  (3)從注釋的地方可以看到加入鏈表的是tcp_congestion_ops結構體,這個結構體也是所有擁塞控制算法需要實現的結構體;每次有擁塞算法被發明后,就實現這個結構體中的部分甚至全部方法,然后調用上面的注冊方法把新算法加入鏈表,后續可以通過遍歷鏈表的方式選擇合適的擁塞控制算法!結構體如下:

struct tcp_congestion_ops {
    struct list_head    list;
    u32 key;/* 算法名稱的哈希值 */
    u32 flags;

    /* initialize private data (optional) */
    void (*init)(struct sock *sk);
    /* cleanup private data  (optional) */
    void (*release)(struct sock *sk);

    /* return slow start threshold (required) */
    u32 (*ssthresh)(struct sock *sk);
    /* do new cwnd calculation (required) */
    void (*cong_avoid)(struct sock *sk, u32 ack, u32 acked);
    /* call before changing ca_state (optional) */
    void (*set_state)(struct sock *sk, u8 new_state);
    /* call when cwnd event occurs (optional) */
    void (*cwnd_event)(struct sock *sk, enum tcp_ca_event ev);
    /* call when ack arrives (optional) */
    void (*in_ack_event)(struct sock *sk, u32 flags);
    /* new value of cwnd after loss (optional) */
    u32  (*undo_cwnd)(struct sock *sk);
    /* hook for packet ack accounting (optional) */
    void (*pkts_acked)(struct sock *sk, const struct ack_sample *sample);
    /* suggest number of segments for each skb to transmit (optional) */
    u32 (*tso_segs_goal)(struct sock *sk);
    /* returns the multiplier used in tcp_sndbuf_expand (optional) */
    u32 (*sndbuf_expand)(struct sock *sk);
    /* call when packets are delivered to update cwnd and pacing rate,
     * after all the ca_state processing. (optional)
     */
    void (*cong_control)(struct sock *sk, const struct rate_sample *rs);
    /* get info for inet_diag (optional) */
    size_t (*get_info)(struct sock *sk, u32 ext, int *attr,
               union tcp_cc_info *info);

    char         name[TCP_CA_NAME_MAX];/* 擁塞控制算法的名稱 */
    struct module     *owner;
};

  有成員字段,也有函數指針;函數名是固定的,也就是說函數的功能都是固定的,每種擁塞控制算法各自實現這些函數功能就行了,這個思路和驅動類似:linux定義了驅動的接口函數名稱,每個驅動廠家自己實現接口函數的功能!事實上擁塞算法也是和驅動一樣,都是以模塊的形式加載到內核的!

   (4)上面的結構體定義了每種擁塞控制算法的接口名稱和功能,那么具體到cubic算法,這些功能都是在哪些函數實現的了?在net\ipv4\tcp_cubic.c文件里面做的映射:

static struct tcp_congestion_ops cubictcp __read_mostly = {
    .init        = bictcp_init,
    .ssthresh    = bictcp_recalc_ssthresh,
    .cong_avoid    = bictcp_cong_avoid,
    .set_state    = bictcp_state,
    .undo_cwnd    = bictcp_undo_cwnd,
    .cwnd_event    = bictcp_cwnd_event,
    .pkts_acked     = bictcp_acked,
    .owner        = THIS_MODULE,
    .name        = "cubic",
};

  這個結構體的名稱叫cubictcp,剛好就在文章開頭介紹的cubictcp_register函數中注冊的!

       2、上述介紹圍繞着初始化、注冊等重要功能,這些功能抽象出的框架就是擁塞控制引擎!盡管擁塞算法實現的細節可以不同,但是使用的引擎框架都是一樣的,不得不佩服linux研發人員的模塊抽象和概括能力(好多大廠職級晉升答辯的重要考核之一就是這種模塊總結抽象、舉一反三的能力!);

      (1)cubic算法的實現涉及到很變量,為了統一管理,這了變量都定義在bictcp結構體中了,如下:

/* BIC TCP Parameters */
struct bictcp {
    u32    cnt;        /* increase cwnd by 1 after ACKs; 每次 cwnd 增長 1/cnt 的比例*/
    u32    last_max_cwnd;    /* last maximum snd_cwnd; snd_cwnd 之前的最大值*/
    u32    loss_cwnd;    /* congestion window at last loss;最近一次發生丟失的時候的擁塞窗口 */
    u32    last_cwnd;    /* the last snd_cwnd;最近的 snd_cwnd */
    u32    last_time;    /* time when updated last_cwnd;更新 last_cwnd 的時間 */
    u32    epoch_start;    /* beginning of an epoch;一輪的開始 */
#define ACK_RATIO_SHIFT    4
    /*限制了delayed_ack的最大值為(32 << 4),也就是說Packets/ACKs的最大估計值限制為32;
    為什么要增加這個限制呢?這是因為如果發送窗口很大,並且這個窗口的數據包的ACK大量丟失,那
    么發送端就會得到一個累積確認了非常多數據包的ACK,這會造成delayed_ack值劇烈的增大。如果
    一個ACK累積確認的數據包超過4096個,那么16位的delayed_ack就會溢出,最后的值可能為0。我
    們知道delayed_ack在bictcp_update中是作為除數的,這時會產生除數為0的錯誤*/
    u32    delayed_ack;    /* estimate the ratio of Packets/ACKs << 4 */
};

  (2)這么多變量,使用之前肯定是要初始化的,如下:

//將必要的參數都初始化為 0 
static inline void bictcp_reset(struct bictcp *ca)
{
    ca->cnt = 0;
    ca->last_max_cwnd = 0;
    ca->last_cwnd = 0;
    ca->last_time = 0;
    ca->epoch_start = 0;
    ca->delayed_ack = 2 << ACK_RATIO_SHIFT;
}

static void bictcp_init(struct sock *sk)
{
    struct bictcp *ca = inet_csk_ca(sk);

    bictcp_reset(ca);
    ca->loss_cwnd = 0;//此時尚未發送任何丟失,所以初始化為 0

    if (initial_ssthresh)
        tcp_sk(sk)->snd_ssthresh = initial_ssthresh;
}

   (3)算法開始執行后,需要計算ssthresh,在bictcp_recalc_ssthresh中實現:

/*
 *    behave like Reno until low_window is reached,
 *    then increase congestion window slowly
 */
static u32 bictcp_recalc_ssthresh(struct sock *sk)
{
    const struct tcp_sock *tp = tcp_sk(sk);
    struct bictcp *ca = inet_csk_ca(sk);

    ca->epoch_start = 0;    /* end of epoch */

    /* Wmax and fast convergence */
    if (tp->snd_cwnd < ca->last_max_cwnd && fast_convergence)
        ca->last_max_cwnd = (tp->snd_cwnd * (BICTCP_BETA_SCALE + beta))
            / (2 * BICTCP_BETA_SCALE);
    else
        ca->last_max_cwnd = tp->snd_cwnd;

    ca->loss_cwnd = tp->snd_cwnd;

    if (tp->snd_cwnd <= low_window)
        return max(tp->snd_cwnd >> 1U, 2U);
    else
        return max((tp->snd_cwnd * beta) / BICTCP_BETA_SCALE, 2U);
}

  這里面最核心的就是Fast Convergence機制了!在網絡中,一旦有新的節點加入通信,肯定是要占用網絡帶寬的,勢必需要其他正在通信的節點讓渡出部分帶寬。為了讓舊節點盡快釋放帶寬,這里采用了Fast Convergence機制:每次發生丟包后,會對比此次丟包時擁塞窗口的大小和之前的擁塞窗口大小,如小於上次的擁塞窗口,說明有新節點加入通信,占用了部分帶寬。此時舊節點需要多留一些帶寬給新節點使用,以使得網絡盡快收斂到穩定狀態!

 (3)當發生丟包時,記錄當前窗口為W-max, 減小窗口后的大小為W,BIC算法就是根據這個原理在(W, W-max]區間內做二分搜索;當接近於W-max時曲線應該更平滑,當離W-max較遠的時候,曲線可以更加陡峭;

/*
 * Compute congestion window to use.
   二分法重新計算擁塞窗口
 */
static inline void bictcp_update(struct bictcp *ca, u32 cwnd)
{
    if (ca->last_cwnd == cwnd &&
        (s32)(tcp_time_stamp - ca->last_time) <= HZ / 32)
        return;

    ca->last_cwnd = cwnd;
    ca->last_time = tcp_time_stamp;

    if (ca->epoch_start == 0) /* record the beginning of an epoch */
        ca->epoch_start = tcp_time_stamp;

    /* start off normal */
    if (cwnd <= low_window) {
        ca->cnt = cwnd;
        return;
    }

    /* binary increase :二分增加;相比reno的線性增加,這里的效率明顯高很多*/
    if (cwnd < ca->last_max_cwnd) {
        __u32    dist = (ca->last_max_cwnd - cwnd)
            / BICTCP_B;

        if (dist > max_increment)
            /* linear increase */
            ca->cnt = cwnd / max_increment;
        else if (dist <= 1U)
            /* binary search increase */
            ca->cnt = (cwnd * smooth_part) / BICTCP_B;
        else
            /* binary search increase */
            ca->cnt = cwnd / dist;
    } else {
        /* slow start AMD linear increase */
        if (cwnd < ca->last_max_cwnd + BICTCP_B)
            /* slow start */
            ca->cnt = (cwnd * smooth_part) / BICTCP_B;
        else if (cwnd < ca->last_max_cwnd + max_increment*(BICTCP_B-1))
            /* slow start */
            ca->cnt = (cwnd * (BICTCP_B-1))
                / (cwnd - ca->last_max_cwnd);
        else
            /* linear increase */
            ca->cnt = cwnd / max_increment;
    }

    /* if in slow start or link utilization is very low */
    if (ca->last_max_cwnd == 0) {
        if (ca->cnt > 20) /* increase cwnd 5% per RTT */
            ca->cnt = 20;
    }

    ca->cnt = (ca->cnt << ACK_RATIO_SHIFT) / ca->delayed_ack;
    if (ca->cnt == 0)            /* cannot be zero */
        ca->cnt = 1;
}

static void bictcp_cong_avoid(struct sock *sk, u32 ack, u32 acked)
{
    struct tcp_sock *tp = tcp_sk(sk);
    struct bictcp *ca = inet_csk_ca(sk);

    if (!tcp_is_cwnd_limited(sk))
        return;

    if (tcp_in_slow_start(tp))
        tcp_slow_start(tp, acked);
    else {
        bictcp_update(ca, tp->snd_cwnd);//計算ca->cnt, 表示增加一個cwnd需要的ack數量
        tcp_cong_avoid_ai(tp, ca->cnt, 1);// 根據ca->cnt計算snd_cwnd
    }
}

  計算snd_cwnd值:

/* In theory this is tp->snd_cwnd += 1 / tp->snd_cwnd (or alternative w),
 * for every packet that was ACKed.
 */
void tcp_cong_avoid_ai(struct tcp_sock *tp, u32 w, u32 acked)
{
    /* If credits accumulated at a higher w, apply them gently now. */
    /*如果 w 很大,那么, snd_cwnd_cnt 可能會積累為一個很大的值;
      此后, w 由於種種原因突然被縮小了很多。那么下面計算處理的 delta 就會很大。
      這可能導致流量的爆發。為了避免這種情況,這里提前增加了一個特判
    */
    if (tp->snd_cwnd_cnt >= w) {
        tp->snd_cwnd_cnt = 0;
        tp->snd_cwnd++;
    }
    /* 累計被確認的包的數目 */
    tp->snd_cwnd_cnt += acked;
    if (tp->snd_cwnd_cnt >= w) {
        /* 窗口增大的大小應當為被確認的包的數目除以當前窗口大小。
        * 以往都是直接加一,但直接加一並不是正確的加法增加 (AI) 的實現。
        * 例如, w 為 10, acked 為 20 時,應當增加 20/10=2,而不是 1。
        */
        u32 delta = tp->snd_cwnd_cnt / w;

        tp->snd_cwnd_cnt -= delta * w;
        tp->snd_cwnd += delta;
    }
    tp->snd_cwnd = min(tp->snd_cwnd, tp->snd_cwnd_clamp);
}

  (4)重新計算epoch_start的值:

/*  如果目前沒有任何數據包在傳輸了,那么需要重新設定epoch_start。這個是為了
    解決當應用程序在一段時間內不發送任何數據時, now-epoch_start 會變得很大,由此,
    根據 Cubic 函數計算出來的目標擁塞窗口值也會變得很大。但顯然,這是一個錯誤。因此,
    需要在應用程序重新開始發送數據時,重置epoch_start 的值。在這里CA_EVENT_TX_START事
    件表明目前所有的包都已經被確認了(即沒有任何正在傳輸的包),而應用程序又開始
    發送新的數據包了。所有的包都被確認說明應用程序有一段時間沒有發包。因而,在程
    序又重新開始發包時,需要重新設定 epoch_start的值,以便在計算擁塞窗口的大小時,
    仍能合理地遵循 cubic 函數的曲線*/
static void bictcp_cwnd_event(struct sock *sk, enum tcp_ca_event event)
{
    if (event == CA_EVENT_TX_START) {
        struct bictcp *ca = inet_csk_ca(sk);
        u32 now = tcp_time_stamp;
        s32 delta;

        delta = now - tcp_sk(sk)->lsndtime;

        /* We were application limited (idle) for a while.
         * Shift epoch_start to keep cwnd growth to cubic curve.
         */
        if (ca->epoch_start && delta > 0) {
            ca->epoch_start += delta;
            if (after(ca->epoch_start, now))
                ca->epoch_start = now;
        }
        return;
    }
}

  (5)發送到收到ACK后,需要重新計算鏈路的延遲情況,以確認后續的各種窗口

/* Track delayed acknowledgment ratio using sliding window
 * ratio = (15*ratio + sample) / 16
 */
static void bictcp_acked(struct sock *sk, const struct ack_sample *sample)
{
    const struct inet_connection_sock *icsk = inet_csk(sk);

    if (icsk->icsk_ca_state == TCP_CA_Open) {
        struct bictcp *ca = inet_csk_ca(sk);

        ca->delayed_ack += sample->pkts_acked -
            (ca->delayed_ack >> ACK_RATIO_SHIFT);
    }
}

 

總結:

1、擁塞控制本質:

(1)目的:控制發送方發數據包的速度,避免少數節點無節制發數據導致整個網絡被占滿,其他節點無法通信;但如果過度節制又會導致網絡大幅空閑,利用率降低,所以需要在擁塞和利用率之間找到平衡

(2)節點新加入時,不知道網絡是否擁塞,只能不停地探測:依次同時發送1、2、4、8個.....(按照2^N增長)數據包!當然,這種指數級別的增長肯定是會到頭的,達到以下3個條件之一后就要考慮減少同時發送的數據包了:

  • 達到了人為事先設置的ssthresh值
  • 等待的ack超時,可能是發送的包丟了,也可能是ack的包丟了;當然也可能沒丟,還在路上了!
  • 收到多個(一般是3個)冗余的ack,說明對方沒收到前序的某個包

(3)減少同時發送的數據包,具體減少到多少了?一般是從一半的量重新開始!比如同時發送16個數據包時觸發了上述3個條件之一,那么重新從每次發送8個數據包開始探測!

(4)重新開始探測后,每次同時發送的數據包需要增加多少個了?reno是線性增加,BIC是二分增加,cubic用的是3次方的公式計算增加值!   

     reno的線性增加: 

      

 

     bic的二分增加:

 

 

    cubic的三次函數:

   

2、jhash是個比較好的hash算法,這個版本的實現如下:

/* jhash - hash an arbitrary key
 * @k: sequence of bytes as key
 * @length: the length of the key
 * @initval: the previous hash, or an arbitray value
 *
 * The generic version, hashes an arbitrary sequence of bytes.
 * No alignment or length assumptions are made about the input key.
 *
 * Returns the hash value of the key. The result depends on endianness.
 */
static inline u32 jhash(const void *key, u32 length, u32 initval)
{
    u32 a, b, c;
    const u8 *k = key;

    /* Set up the internal state */
    a = b = c = JHASH_INITVAL + length + initval;

    /* All but the last block: affect some 32 bits of (a,b,c) */
    while (length > 12) {
        a += __get_unaligned_cpu32(k);
        b += __get_unaligned_cpu32(k + 4);
        c += __get_unaligned_cpu32(k + 8);
        __jhash_mix(a, b, c);
        length -= 12;
        k += 12;
    }
    /* Last block: affect all 32 bits of (c) */
    /* All the case statements fall through */
    switch (length) {
    case 12: c += (u32)k[11]<<24;
    case 11: c += (u32)k[10]<<16;
    case 10: c += (u32)k[9]<<8;
    case 9:  c += k[8];
    case 8:  b += (u32)k[7]<<24;
    case 7:  b += (u32)k[6]<<16;
    case 6:  b += (u32)k[5]<<8;
    case 5:  b += k[4];
    case 4:  a += (u32)k[3]<<24;
    case 3:  a += (u32)k[2]<<16;
    case 2:  a += (u32)k[1]<<8;
    case 1:  a += k[0];
         __jhash_final(a, b, c);
    case 0: /* Nothing left to add */
        break;
    }

    return c;
}

 

 

參考:

1、https://www.bilibili.com/video/BV1L4411a7RN?from=search&seid=9607714680684056910  擁塞控制

2、https://www.bilibili.com/video/BV1MU4y157SY/ cubic擁塞控制

3、https://www.cnblogs.com/huang-xiang/p/13226229.html  cubic擁塞控制算法


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM