網絡擁塞的概念大家一定不陌生,肯定都有親生體會:比如節假日的高速路堵車。本來車流量已經很大了,如果再不限制高速口的車進入,整條路只會越來越堵,所以交管部門可能會臨時限流,只允許車輛下高速,不允許上高速!互聯網剛發明的那會還沒有擁塞的概念,各個節點死命地傳輸數據,導致網絡中各種路由設備的buff瞬間被填滿,新來的包只能丟棄(像不像針對網絡中專設備的DOS攻擊了?)!為了維持網絡的正常運轉,需要接入網絡的所有節點有節制地發送數據,避免網絡擁塞!但如果節點過於節制,發送的數據報文過少,又會降低整個網絡的吞吐量、利用率(這些網絡設備閑着也是閑着)等,怎么才能盡可能最大化吞吐量,又能避免網絡擁塞了?擁塞控制算法誕生了!擁塞控制的算法很多,我這個版本采用的是cubic算法(算法細節可以參考鏈接2和3),本文以這個為准分析算法實現的細節!
1、擁塞控制的控制引擎介紹:
(1)由於擁塞控制涉及到很多復雜的計算公式,每個公式都有很多計算因子factor,所以要先給這些factor初始化賦值,這些賦值在cubictcp_register函數中統一完成,如下:
/*初始化各種計算的factor*/ static int __init cubictcp_register(void) { BUILD_BUG_ON(sizeof(struct bictcp) > ICSK_CA_PRIV_SIZE); /* Precompute a bunch of the scaling factors that are used per-packet * based on SRTT of 100ms;計算SRTT=100ms時的縮放因子 */ beta_scale = 8*(BICTCP_BETA_SCALE+beta) / 3 / (BICTCP_BETA_SCALE - beta); cube_rtt_scale = (bic_scale * 10); /* 1024*c/rtt */ /* calculate the "K" for (wmax-cwnd) = c/rtt * K^3 * so K = cubic_root( (wmax-cwnd)*rtt/c ) * the unit of K is bictcp_HZ=2^10, not HZ * * c = bic_scale >> 10 * rtt = 100ms * * the following code has been designed and tested for * cwnd < 1 million packets * RTT < 100 seconds * HZ < 1,000,00 (corresponding to 10 nano-second) */ /* 1/c * 2^2*bictcp_HZ * srtt */ cube_factor = 1ull << (10+3*BICTCP_HZ); /* 2^40 */ /* divide by bic_scale and by constant Srtt (100ms) */ do_div(cube_factor, bic_scale * 10); return tcp_register_congestion_control(&cubictcp); }
(2)由於cubic只是擁塞控制的算法之一(還有其他好幾種算法),並且未來可能還會誕生新的擁塞控制算法,為了便於管理現有算法,同時也能給新增的算法預留空間,linux采用了鏈表來保存/注冊所有的擁塞控制算法,實現的方法就是初始化函數最后一行的tcp_register_congestion_control,如下:
/* * Attach new congestion control algorithm to the list * of available options. */ int tcp_register_congestion_control(struct tcp_congestion_ops *ca) { int ret = 0; /* all algorithms must implement ssthresh and cong_avoid ops */ if (!ca->ssthresh || !(ca->cong_avoid || ca->cong_control)) { pr_err("%s does not implement required ops\n", ca->name); return -EINVAL; } //根據名稱、長度等計算出一個hash值,加快比對速度(早期版本是對比字符串,效率低;key是32bit的整數,比對效率比字符串快多了) ca->key = jhash(ca->name, sizeof(ca->name), strlen(ca->name)); spin_lock(&tcp_cong_list_lock); if (ca->key == TCP_CA_UNSPEC || tcp_ca_find_key(ca->key)) {//檢查一下是不是已經在list里面了 pr_notice("%s already registered or non-unique key\n", ca->name); ret = -EEXIST; } else {//新的擁塞控制算法加入鏈表 list_add_tail_rcu(&ca->list, &tcp_cong_list); pr_debug("%s registered\n", ca->name); } spin_unlock(&tcp_cong_list_lock); return ret; }
(3)從注釋的地方可以看到加入鏈表的是tcp_congestion_ops結構體,這個結構體也是所有擁塞控制算法需要實現的結構體;每次有擁塞算法被發明后,就實現這個結構體中的部分甚至全部方法,然后調用上面的注冊方法把新算法加入鏈表,后續可以通過遍歷鏈表的方式選擇合適的擁塞控制算法!結構體如下:
struct tcp_congestion_ops { struct list_head list; u32 key;/* 算法名稱的哈希值 */ u32 flags; /* initialize private data (optional) */ void (*init)(struct sock *sk); /* cleanup private data (optional) */ void (*release)(struct sock *sk); /* return slow start threshold (required) */ u32 (*ssthresh)(struct sock *sk); /* do new cwnd calculation (required) */ void (*cong_avoid)(struct sock *sk, u32 ack, u32 acked); /* call before changing ca_state (optional) */ void (*set_state)(struct sock *sk, u8 new_state); /* call when cwnd event occurs (optional) */ void (*cwnd_event)(struct sock *sk, enum tcp_ca_event ev); /* call when ack arrives (optional) */ void (*in_ack_event)(struct sock *sk, u32 flags); /* new value of cwnd after loss (optional) */ u32 (*undo_cwnd)(struct sock *sk); /* hook for packet ack accounting (optional) */ void (*pkts_acked)(struct sock *sk, const struct ack_sample *sample); /* suggest number of segments for each skb to transmit (optional) */ u32 (*tso_segs_goal)(struct sock *sk); /* returns the multiplier used in tcp_sndbuf_expand (optional) */ u32 (*sndbuf_expand)(struct sock *sk); /* call when packets are delivered to update cwnd and pacing rate, * after all the ca_state processing. (optional) */ void (*cong_control)(struct sock *sk, const struct rate_sample *rs); /* get info for inet_diag (optional) */ size_t (*get_info)(struct sock *sk, u32 ext, int *attr, union tcp_cc_info *info); char name[TCP_CA_NAME_MAX];/* 擁塞控制算法的名稱 */ struct module *owner; };
有成員字段,也有函數指針;函數名是固定的,也就是說函數的功能都是固定的,每種擁塞控制算法各自實現這些函數功能就行了,這個思路和驅動類似:linux定義了驅動的接口函數名稱,每個驅動廠家自己實現接口函數的功能!事實上擁塞算法也是和驅動一樣,都是以模塊的形式加載到內核的!
(4)上面的結構體定義了每種擁塞控制算法的接口名稱和功能,那么具體到cubic算法,這些功能都是在哪些函數實現的了?在net\ipv4\tcp_cubic.c文件里面做的映射:
static struct tcp_congestion_ops cubictcp __read_mostly = { .init = bictcp_init, .ssthresh = bictcp_recalc_ssthresh, .cong_avoid = bictcp_cong_avoid, .set_state = bictcp_state, .undo_cwnd = bictcp_undo_cwnd, .cwnd_event = bictcp_cwnd_event, .pkts_acked = bictcp_acked, .owner = THIS_MODULE, .name = "cubic", };
這個結構體的名稱叫cubictcp,剛好就在文章開頭介紹的cubictcp_register函數中注冊的!
2、上述介紹圍繞着初始化、注冊等重要功能,這些功能抽象出的框架就是擁塞控制引擎!盡管擁塞算法實現的細節可以不同,但是使用的引擎框架都是一樣的,不得不佩服linux研發人員的模塊抽象和概括能力(好多大廠職級晉升答辯的重要考核之一就是這種模塊總結抽象、舉一反三的能力!);
(1)cubic算法的實現涉及到很變量,為了統一管理,這了變量都定義在bictcp結構體中了,如下:
/* BIC TCP Parameters */ struct bictcp { u32 cnt; /* increase cwnd by 1 after ACKs; 每次 cwnd 增長 1/cnt 的比例*/ u32 last_max_cwnd; /* last maximum snd_cwnd; snd_cwnd 之前的最大值*/ u32 loss_cwnd; /* congestion window at last loss;最近一次發生丟失的時候的擁塞窗口 */ u32 last_cwnd; /* the last snd_cwnd;最近的 snd_cwnd */ u32 last_time; /* time when updated last_cwnd;更新 last_cwnd 的時間 */ u32 epoch_start; /* beginning of an epoch;一輪的開始 */ #define ACK_RATIO_SHIFT 4 /*限制了delayed_ack的最大值為(32 << 4),也就是說Packets/ACKs的最大估計值限制為32; 為什么要增加這個限制呢?這是因為如果發送窗口很大,並且這個窗口的數據包的ACK大量丟失,那 么發送端就會得到一個累積確認了非常多數據包的ACK,這會造成delayed_ack值劇烈的增大。如果 一個ACK累積確認的數據包超過4096個,那么16位的delayed_ack就會溢出,最后的值可能為0。我 們知道delayed_ack在bictcp_update中是作為除數的,這時會產生除數為0的錯誤*/ u32 delayed_ack; /* estimate the ratio of Packets/ACKs << 4 */ };
(2)這么多變量,使用之前肯定是要初始化的,如下:
//將必要的參數都初始化為 0 static inline void bictcp_reset(struct bictcp *ca) { ca->cnt = 0; ca->last_max_cwnd = 0; ca->last_cwnd = 0; ca->last_time = 0; ca->epoch_start = 0; ca->delayed_ack = 2 << ACK_RATIO_SHIFT; } static void bictcp_init(struct sock *sk) { struct bictcp *ca = inet_csk_ca(sk); bictcp_reset(ca); ca->loss_cwnd = 0;//此時尚未發送任何丟失,所以初始化為 0 if (initial_ssthresh) tcp_sk(sk)->snd_ssthresh = initial_ssthresh; }
(3)算法開始執行后,需要計算ssthresh,在bictcp_recalc_ssthresh中實現:
/* * behave like Reno until low_window is reached, * then increase congestion window slowly */ static u32 bictcp_recalc_ssthresh(struct sock *sk) { const struct tcp_sock *tp = tcp_sk(sk); struct bictcp *ca = inet_csk_ca(sk); ca->epoch_start = 0; /* end of epoch */ /* Wmax and fast convergence */ if (tp->snd_cwnd < ca->last_max_cwnd && fast_convergence) ca->last_max_cwnd = (tp->snd_cwnd * (BICTCP_BETA_SCALE + beta)) / (2 * BICTCP_BETA_SCALE); else ca->last_max_cwnd = tp->snd_cwnd; ca->loss_cwnd = tp->snd_cwnd; if (tp->snd_cwnd <= low_window) return max(tp->snd_cwnd >> 1U, 2U); else return max((tp->snd_cwnd * beta) / BICTCP_BETA_SCALE, 2U); }
這里面最核心的就是Fast Convergence機制了!在網絡中,一旦有新的節點加入通信,肯定是要占用網絡帶寬的,勢必需要其他正在通信的節點讓渡出部分帶寬。為了讓舊節點盡快釋放帶寬,這里采用了Fast Convergence機制:每次發生丟包后,會對比此次丟包時擁塞窗口的大小和之前的擁塞窗口大小,如小於上次的擁塞窗口,說明有新節點加入通信,占用了部分帶寬。此時舊節點需要多留一些帶寬給新節點使用,以使得網絡盡快收斂到穩定狀態!
(3)當發生丟包時,記錄當前窗口為W-max, 減小窗口后的大小為W,BIC算法就是根據這個原理在(W, W-max]區間內做二分搜索;當接近於W-max時曲線應該更平滑,當離W-max較遠的時候,曲線可以更加陡峭;
/* * Compute congestion window to use. 二分法重新計算擁塞窗口 */ static inline void bictcp_update(struct bictcp *ca, u32 cwnd) { if (ca->last_cwnd == cwnd && (s32)(tcp_time_stamp - ca->last_time) <= HZ / 32) return; ca->last_cwnd = cwnd; ca->last_time = tcp_time_stamp; if (ca->epoch_start == 0) /* record the beginning of an epoch */ ca->epoch_start = tcp_time_stamp; /* start off normal */ if (cwnd <= low_window) { ca->cnt = cwnd; return; } /* binary increase :二分增加;相比reno的線性增加,這里的效率明顯高很多*/ if (cwnd < ca->last_max_cwnd) { __u32 dist = (ca->last_max_cwnd - cwnd) / BICTCP_B; if (dist > max_increment) /* linear increase */ ca->cnt = cwnd / max_increment; else if (dist <= 1U) /* binary search increase */ ca->cnt = (cwnd * smooth_part) / BICTCP_B; else /* binary search increase */ ca->cnt = cwnd / dist; } else { /* slow start AMD linear increase */ if (cwnd < ca->last_max_cwnd + BICTCP_B) /* slow start */ ca->cnt = (cwnd * smooth_part) / BICTCP_B; else if (cwnd < ca->last_max_cwnd + max_increment*(BICTCP_B-1)) /* slow start */ ca->cnt = (cwnd * (BICTCP_B-1)) / (cwnd - ca->last_max_cwnd); else /* linear increase */ ca->cnt = cwnd / max_increment; } /* if in slow start or link utilization is very low */ if (ca->last_max_cwnd == 0) { if (ca->cnt > 20) /* increase cwnd 5% per RTT */ ca->cnt = 20; } ca->cnt = (ca->cnt << ACK_RATIO_SHIFT) / ca->delayed_ack; if (ca->cnt == 0) /* cannot be zero */ ca->cnt = 1; } static void bictcp_cong_avoid(struct sock *sk, u32 ack, u32 acked) { struct tcp_sock *tp = tcp_sk(sk); struct bictcp *ca = inet_csk_ca(sk); if (!tcp_is_cwnd_limited(sk)) return; if (tcp_in_slow_start(tp)) tcp_slow_start(tp, acked); else { bictcp_update(ca, tp->snd_cwnd);//計算ca->cnt, 表示增加一個cwnd需要的ack數量 tcp_cong_avoid_ai(tp, ca->cnt, 1);// 根據ca->cnt計算snd_cwnd } }
計算snd_cwnd值:
/* In theory this is tp->snd_cwnd += 1 / tp->snd_cwnd (or alternative w), * for every packet that was ACKed. */ void tcp_cong_avoid_ai(struct tcp_sock *tp, u32 w, u32 acked) { /* If credits accumulated at a higher w, apply them gently now. */ /*如果 w 很大,那么, snd_cwnd_cnt 可能會積累為一個很大的值; 此后, w 由於種種原因突然被縮小了很多。那么下面計算處理的 delta 就會很大。 這可能導致流量的爆發。為了避免這種情況,這里提前增加了一個特判 */ if (tp->snd_cwnd_cnt >= w) { tp->snd_cwnd_cnt = 0; tp->snd_cwnd++; } /* 累計被確認的包的數目 */ tp->snd_cwnd_cnt += acked; if (tp->snd_cwnd_cnt >= w) { /* 窗口增大的大小應當為被確認的包的數目除以當前窗口大小。 * 以往都是直接加一,但直接加一並不是正確的加法增加 (AI) 的實現。 * 例如, w 為 10, acked 為 20 時,應當增加 20/10=2,而不是 1。 */ u32 delta = tp->snd_cwnd_cnt / w; tp->snd_cwnd_cnt -= delta * w; tp->snd_cwnd += delta; } tp->snd_cwnd = min(tp->snd_cwnd, tp->snd_cwnd_clamp); }
(4)重新計算epoch_start的值:
/* 如果目前沒有任何數據包在傳輸了,那么需要重新設定epoch_start。這個是為了 解決當應用程序在一段時間內不發送任何數據時, now-epoch_start 會變得很大,由此, 根據 Cubic 函數計算出來的目標擁塞窗口值也會變得很大。但顯然,這是一個錯誤。因此, 需要在應用程序重新開始發送數據時,重置epoch_start 的值。在這里CA_EVENT_TX_START事 件表明目前所有的包都已經被確認了(即沒有任何正在傳輸的包),而應用程序又開始 發送新的數據包了。所有的包都被確認說明應用程序有一段時間沒有發包。因而,在程 序又重新開始發包時,需要重新設定 epoch_start的值,以便在計算擁塞窗口的大小時, 仍能合理地遵循 cubic 函數的曲線*/ static void bictcp_cwnd_event(struct sock *sk, enum tcp_ca_event event) { if (event == CA_EVENT_TX_START) { struct bictcp *ca = inet_csk_ca(sk); u32 now = tcp_time_stamp; s32 delta; delta = now - tcp_sk(sk)->lsndtime; /* We were application limited (idle) for a while. * Shift epoch_start to keep cwnd growth to cubic curve. */ if (ca->epoch_start && delta > 0) { ca->epoch_start += delta; if (after(ca->epoch_start, now)) ca->epoch_start = now; } return; } }
(5)發送到收到ACK后,需要重新計算鏈路的延遲情況,以確認后續的各種窗口
/* Track delayed acknowledgment ratio using sliding window * ratio = (15*ratio + sample) / 16 */ static void bictcp_acked(struct sock *sk, const struct ack_sample *sample) { const struct inet_connection_sock *icsk = inet_csk(sk); if (icsk->icsk_ca_state == TCP_CA_Open) { struct bictcp *ca = inet_csk_ca(sk); ca->delayed_ack += sample->pkts_acked - (ca->delayed_ack >> ACK_RATIO_SHIFT); } }
總結:
1、擁塞控制本質:
(1)目的:控制發送方發數據包的速度,避免少數節點無節制發數據導致整個網絡被占滿,其他節點無法通信;但如果過度節制又會導致網絡大幅空閑,利用率降低,所以需要在擁塞和利用率之間找到平衡!
(2)節點新加入時,不知道網絡是否擁塞,只能不停地探測:依次同時發送1、2、4、8個.....(按照2^N增長)數據包!當然,這種指數級別的增長肯定是會到頭的,達到以下3個條件之一后就要考慮減少同時發送的數據包了:
- 達到了人為事先設置的ssthresh值
- 等待的ack超時,可能是發送的包丟了,也可能是ack的包丟了;當然也可能沒丟,還在路上了!
- 收到多個(一般是3個)冗余的ack,說明對方沒收到前序的某個包
(3)減少同時發送的數據包,具體減少到多少了?一般是從一半的量重新開始!比如同時發送16個數據包時觸發了上述3個條件之一,那么重新從每次發送8個數據包開始探測!
(4)重新開始探測后,每次同時發送的數據包需要增加多少個了?reno是線性增加,BIC是二分增加,cubic用的是3次方的公式計算增加值!
reno的線性增加:
bic的二分增加:
cubic的三次函數:
2、jhash是個比較好的hash算法,這個版本的實現如下:
/* jhash - hash an arbitrary key * @k: sequence of bytes as key * @length: the length of the key * @initval: the previous hash, or an arbitray value * * The generic version, hashes an arbitrary sequence of bytes. * No alignment or length assumptions are made about the input key. * * Returns the hash value of the key. The result depends on endianness. */ static inline u32 jhash(const void *key, u32 length, u32 initval) { u32 a, b, c; const u8 *k = key; /* Set up the internal state */ a = b = c = JHASH_INITVAL + length + initval; /* All but the last block: affect some 32 bits of (a,b,c) */ while (length > 12) { a += __get_unaligned_cpu32(k); b += __get_unaligned_cpu32(k + 4); c += __get_unaligned_cpu32(k + 8); __jhash_mix(a, b, c); length -= 12; k += 12; } /* Last block: affect all 32 bits of (c) */ /* All the case statements fall through */ switch (length) { case 12: c += (u32)k[11]<<24; case 11: c += (u32)k[10]<<16; case 10: c += (u32)k[9]<<8; case 9: c += k[8]; case 8: b += (u32)k[7]<<24; case 7: b += (u32)k[6]<<16; case 6: b += (u32)k[5]<<8; case 5: b += k[4]; case 4: a += (u32)k[3]<<24; case 3: a += (u32)k[2]<<16; case 2: a += (u32)k[1]<<8; case 1: a += k[0]; __jhash_final(a, b, c); case 0: /* Nothing left to add */ break; } return c; }
參考:
1、https://www.bilibili.com/video/BV1L4411a7RN?from=search&seid=9607714680684056910 擁塞控制
2、https://www.bilibili.com/video/BV1MU4y157SY/ cubic擁塞控制
3、https://www.cnblogs.com/huang-xiang/p/13226229.html cubic擁塞控制算法