數據包接收


設備驅動層

當一些網絡包到來觸發了中斷,內核處理完這些網絡包之后,我們可以先進入主動輪詢 poll 網卡的方式,主動去接收到來的網絡包。如果一直有,就一直處理,等處理告一段落,就返回干其他的事情。當再有下一批網絡包到來的時候,再中斷,再輪詢 poll。這樣就會大大減少中斷的數量,提升網絡處理的效率,這種處理方式我們稱為 NAPI。

static struct pci_driver ixgb_driver = {
  .name     = ixgb_driver_name,
  .id_table = ixgb_pci_tbl,
  .probe    = ixgb_probe,
  .remove   = ixgb_remove,
  .err_handler = &ixgb_err_handler
};

MODULE_AUTHOR("Intel Corporation, <linux.nics@intel.com>");
MODULE_DESCRIPTION("Intel(R) PRO/10GbE Network Driver");
MODULE_LICENSE("GPL");
MODULE_VERSION(DRV_VERSION);

/**
 * ixgb_init_module - Driver Registration Routine
 *
 * ixgb_init_module is the first routine called when the driver is
 * loaded. All it does is register with the PCI subsystem.
 **/

static int __init
ixgb_init_module(void)
{
  pr_info("%s - version %s\n", ixgb_driver_string, ixgb_driver_version);
  pr_info("%s\n", ixgb_copyright);

  return pci_register_driver(&ixgb_driver);
}

module_init(ixgb_init_module);

在網卡驅動程序初始化的時候,我們會調用 ixgb_init_module,注冊一個驅動 ixgb_driver,並且調用它的 probe 函數 ixgb_probe

static int
ixgb_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
  struct net_device *netdev = NULL;
  struct ixgb_adapter *adapter;
......
  netdev = alloc_etherdev(sizeof(struct ixgb_adapter));
  SET_NETDEV_DEV(netdev, &pdev->dev);

  pci_set_drvdata(pdev, netdev);
  adapter = netdev_priv(netdev);
  adapter->netdev = netdev;
  adapter->pdev = pdev;
  adapter->hw.back = adapter;
  adapter->msg_enable = netif_msg_init(debug, DEFAULT_MSG_ENABLE);

  adapter->hw.hw_addr = pci_ioremap_bar(pdev, BAR_0);
......
  netdev->netdev_ops = &ixgb_netdev_ops;
  ixgb_set_ethtool_ops(netdev);
  netdev->watchdog_timeo = 5 * HZ;
  netif_napi_add(netdev, &adapter->napi, ixgb_clean, 64);

  strncpy(netdev->name, pci_name(pdev), sizeof(netdev->name) - 1);

  adapter->bd_number = cards_found;
  adapter->link_speed = 0;
  adapter->link_duplex = 0;
......
}

在 ixgb_probe 中,我們會創建一個 struct net_device 表示這個網絡設備,並且 netif_napi_add 函數為這個網絡設備注冊一個輪詢 poll 函數 ixgb_clean,將來一旦出現網絡包的時候,就是要通過他來輪詢了

當一個網卡被激活的時候,我們會調用函數 ixgb_open->ixgb_up,在這里面注冊一個硬件的中斷處理函數

int
ixgb_up(struct ixgb_adapter *adapter)
{
  struct net_device *netdev = adapter->netdev;
......
    err = request_irq(adapter->pdev->irq, ixgb_intr, irq_flags,
                    netdev->name, netdev);
......
}

/**
 * ixgb_intr - Interrupt Handler
 * @irq: interrupt number
 * @data: pointer to a network interface device structure
 **/

static irqreturn_t
ixgb_intr(int irq, void *data)
{
  struct net_device *netdev = data;
  struct ixgb_adapter *adapter = netdev_priv(netdev);
  struct ixgb_hw *hw = &adapter->hw;
......
  if (napi_schedule_prep(&adapter->napi)) {
    IXGB_WRITE_REG(&adapter->hw, IMC, ~0);
    __napi_schedule(&adapter->napi);
  }
  return IRQ_HANDLED;
}

如果一個網絡包到來,觸發了硬件中斷,就會調用 ixgb_intr,這里面會調用 __napi_schedule

/**
 * __napi_schedule - schedule for receive
 * @n: entry to schedule
 *
 * The entry's receive function will be scheduled to run.
 * Consider using __napi_schedule_irqoff() if hard irqs are masked.
 */
void __napi_schedule(struct napi_struct *n)
{
  unsigned long flags;

  local_irq_save(flags);
  ____napi_schedule(this_cpu_ptr(&softnet_data), n);
  local_irq_restore(flags);
}

static inline void ____napi_schedule(struct softnet_data *sd,
             struct napi_struct *napi)
{
  list_add_tail(&napi->poll_list, &sd->poll_list);
  __raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

__napi_schedule 是處於中斷處理的關鍵部分,在他被調用的時候,中斷是暫時關閉的,但是處理網絡包是個復雜的過程,需要到延遲處理部分,所以 ____napi_schedule 將當前設備放到 struct softnet_data 結構的 poll_list 里面,說明在延遲處理部分可以接着處理這個 poll_list 里面的網絡設備

然后 ____napi_schedule 觸發一個軟中斷 NET_RX_SOFTIRQ,通過軟中斷觸發中斷處理的延遲處理部分,也是常用的手段

軟中斷 NET_RX_SOFTIRQ 對應的中斷處理函數是 net_rx_action

static __latent_entropy void net_rx_action(struct softirq_action *h)
{
  struct softnet_data *sd = this_cpu_ptr(&softnet_data);
    LIST_HEAD(list);
    list_splice_init(&sd->poll_list, &list);
......
  for (;;) {
    struct napi_struct *n;
......
    n = list_first_entry(&list, struct napi_struct, poll_list);
    budget -= napi_poll(n, &repoll);
  }
......
}

在 net_rx_action 中,會得到 struct softnet_data 結構,這個結構在發送的時候我們也遇到過。當時它的 output_queue 用於網絡包的發送,這里的 poll_list 用於網絡包的接收

struct softnet_data {
  struct list_head  poll_list;
......
  struct Qdisc    *output_queue;
  struct Qdisc    **output_queue_tailp;
......
}

在 net_rx_action 中,接下來是一個循環,在 poll_list 里面取出網絡包到達的設備,然后調用 napi_poll 來輪詢這些設備,napi_poll 會調用最初設備初始化的時候,注冊的 poll 函數,對於 ixgb_driver,對應的函數是 ixgb_clean

ixgb_clean 會調用 ixgb_clean_rx_irq

static bool
ixgb_clean_rx_irq(struct ixgb_adapter *adapter, int *work_done, int work_to_do)
{
  struct ixgb_desc_ring *rx_ring = &adapter->rx_ring;
  struct net_device *netdev = adapter->netdev;
  struct pci_dev *pdev = adapter->pdev;
  struct ixgb_rx_desc *rx_desc, *next_rxd;
  struct ixgb_buffer *buffer_info, *next_buffer, *next2_buffer;
  u32 length;
  unsigned int i, j;
  int cleaned_count = 0;
  bool cleaned = false;

  i = rx_ring->next_to_clean;
  rx_desc = IXGB_RX_DESC(*rx_ring, i);
  buffer_info = &rx_ring->buffer_info[i];

  while (rx_desc->status & IXGB_RX_DESC_STATUS_DD) {
    struct sk_buff *skb;
    u8 status;

    status = rx_desc->status;
    skb = buffer_info->skb;
    buffer_info->skb = NULL;

    prefetch(skb->data - NET_IP_ALIGN);

    if (++i == rx_ring->count)
      i = 0;
    next_rxd = IXGB_RX_DESC(*rx_ring, i);
    prefetch(next_rxd);

    j = i + 1;
    if (j == rx_ring->count)
      j = 0;
    next2_buffer = &rx_ring->buffer_info[j];
    prefetch(next2_buffer);

    next_buffer = &rx_ring->buffer_info[i];
......
    length = le16_to_cpu(rx_desc->length);
    rx_desc->length = 0;
......
    ixgb_check_copybreak(&adapter->napi, buffer_info, length, &skb);

    /* Good Receive */
    skb_put(skb, length);

    /* Receive Checksum Offload */
    ixgb_rx_checksum(adapter, rx_desc, skb);

    skb->protocol = eth_type_trans(skb, netdev);

    netif_receive_skb(skb);
......
    /* use prefetched values */
    rx_desc = next_rxd;
    buffer_info = next_buffer;
  }

  rx_ring->next_to_clean = i;
......
}

在網絡設備的驅動層,有一個用於接收網絡包的 rx_ring。它是一個環,從網卡硬件接收的包會放在這個環里面。這個環里面的 buffer_info[]是一個數組,存放的是網絡包的內容。i 和 j 是這個數組的下標,在 ixgb_clean_rx_irq 里面的 while 循環中,依次處理環里面的數據。在這里面,我們看到了 i 和 j 加一之后,如果超過了數組的大小,就跳回下標 0,就說明這是一個環。

ixgb_check_copybreak 函數將 buffer_info 里面的內容,拷貝到 struct sk_buff *skb,從而可以作為一個網絡包進行后續的處理,然后調用 netif_receive_skb

 

網絡協議棧的二層邏輯

從 netif_receive_skb 函數開始,我們就進入了內核的網絡協議棧

netif_receive_skb->netif_receive_skb_internal->__netif_receive_skb->__netif_receive_skb_core

在 __netif_receive_skb_core 中,我們先是處理了二層的一些邏輯。

例如,對於 VLAN 的處理,接下來要想辦法交給第三層

static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
{
  struct packet_type *ptype, *pt_prev;
......
  type = skb->protocol;
......
  deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
             &orig_dev->ptype_specific);
  if (pt_prev) {
    ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
  }
......
}

static inline void deliver_ptype_list_skb(struct sk_buff *skb,
            struct packet_type **pt,
            struct net_device *orig_dev,
            __be16 type,
            struct list_head *ptype_list)
{
  struct packet_type *ptype, *pt_prev = *pt;

  list_for_each_entry_rcu(ptype, ptype_list, list) {
    if (ptype->type != type)
      continue;
    if (pt_prev)
      deliver_skb(skb, pt_prev, orig_dev);
    pt_prev = ptype;
  }
  *pt = pt_prev;
}

在網絡包 struct sk_buff 里面,二層的頭里面有一個 protocol,表示里面一層,也即三層是什么協議。deliver_ptype_list_skb 在一個協議列表中逐個匹配。如果能夠匹配到,就返回。

這些協議的注冊在網絡協議棧初始化的時候, inet_init 函數調用 dev_add_pack(&ip_packet_type),添加 IP 協議。協議被放在一個鏈表里面。

void dev_add_pack(struct packet_type *pt)
{
    struct list_head *head = ptype_head(pt);
    list_add_rcu(&pt->list, head);
}

static inline struct list_head *ptype_head(const struct packet_type *pt)
{
    if (pt->type == htons(ETH_P_ALL))
        return pt->dev ? &pt->dev->ptype_all : &ptype_all;
    else
        return pt->dev ? &pt->dev->ptype_specific : &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK];
}

假設這個時候的網絡包是一個 IP 包,則在這個鏈表里面一定能夠找到 ip_packet_type,在 __netif_receive_skb_core 中會調用 ip_packet_type 的 func 函數

static struct packet_type ip_packet_type __read_mostly = {
  .type = cpu_to_be16(ETH_P_IP),
  .func = ip_rcv,
};

網絡協議棧的 IP 層

int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev)
{
  const struct iphdr *iph;
  struct net *net;
  u32 len;
......
  net = dev_net(dev);
......
  iph = ip_hdr(skb);
  len = ntohs(iph->tot_len);
  skb->transport_header = skb->network_header + iph->ihl*4;
......
  return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING,
           net, NULL, skb, dev, NULL,
           ip_rcv_finish);
......
}

在 ip_rcv 中,得到 IP 頭,然后又遇到了我們見過多次的 NF_HOOK,這次因為是接收網絡包,第一個 hook 點是 NF_INET_PRE_ROUTING,也就是 iptables 的 PREROUTING 鏈。如果里面有規則,則執行規則,然后調用 ip_rcv_finish

static int ip_rcv_finish(struct net *net, struct sock *sk, struct sk_buff *skb)
{
  const struct iphdr *iph = ip_hdr(skb);
  struct net_device *dev = skb->dev;
  struct rtable *rt;
  int err;
......
  rt = skb_rtable(skb);
.....
  return dst_input(skb);
}

static inline int dst_input(struct sk_buff *skb)
{
  return skb_dst(skb)->input(skb);

ip_rcv_finish 得到網絡包對應的路由表,然后調用 dst_input,在 dst_input 中,調用的是 struct rtable 的成員的 dst 的 input 函數。在 rt_dst_alloc 中,我們可以看到,input 函數指向的是 ip_local_deliver。

int ip_local_deliver(struct sk_buff *skb)
{
  /*
   *  Reassemble IP fragments.
   */
  struct net *net = dev_net(skb->dev);

  if (ip_is_fragment(ip_hdr(skb))) {
    if (ip_defrag(net, skb, IP_DEFRAG_LOCAL_DELIVER))
      return 0;
  }

  return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN,
           net, NULL, skb, skb->dev, NULL,
           ip_local_deliver_finish);
}

在 ip_local_deliver 函數中,如果 IP 層進行了分段,則進行重新的組合。接下來就是我們熟悉的 NF_HOOK。hook 點在 NF_INET_LOCAL_IN,對應 iptables 里面的 INPUT 鏈。在經過 iptables 規則處理完畢后,我們調用 ip_local_deliver_finish

static int ip_local_deliver_finish(struct net *net, struct sock *sk, struct sk_buff *skb)
{
  __skb_pull(skb, skb_network_header_len(skb));

  int protocol = ip_hdr(skb)->protocol;
  const struct net_protocol *ipprot;

  ipprot = rcu_dereference(inet_protos[protocol]);
  if (ipprot) {
    int ret;
    ret = ipprot->handler(skb);
......
  }
......
}

在 IP 頭中,有一個字段 protocol 用於指定里面一層的協議,在這里應該是 TCP 協議。

於是,從 inet_protos 數組中,找出 TCP 協議對應的處理函數。這個數組的定義如下,里面的內容是 struct net_protocol

struct net_protocol __rcu *inet_protos[MAX_INET_PROTOS] __read_mostly;

int inet_add_protocol(const struct net_protocol *prot, unsigned char protocol)
{
......
  return !cmpxchg((const struct net_protocol **)&inet_protos[protocol],
      NULL, prot) ? 0 : -1;
}

static int __init inet_init(void)
{
......
  if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0)
    pr_crit("%s: Cannot add UDP protocol\n", __func__);
  if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0)
    pr_crit("%s: Cannot add TCP protocol\n", __func__);
......
}

static struct net_protocol tcp_protocol = {
  .early_demux  =  tcp_v4_early_demux,
  .early_demux_handler =  tcp_v4_early_demux,
  .handler  =  tcp_v4_rcv,
  .err_handler  =  tcp_v4_err,
  .no_policy  =  1,
  .netns_ok  =  1,
  .icmp_strict_tag_validation = 1,
};

static struct net_protocol udp_protocol = {
  .early_demux =  udp_v4_early_demux,
  .early_demux_handler =  udp_v4_early_demux,
  .handler =  udp_rcv,
  .err_handler =  udp_err,
  .no_policy =  1,
  .netns_ok =  1,
};

在系統初始化的時候,網絡協議棧的初始化調用的是 inet_init,它會調用 inet_add_protocol,將 TCP 協議對應的處理函數 tcp_protocol、UDP 協議對應的處理函數 udp_protocol,放到 inet_protos 數組中

在上面的網絡包的接收過程中,會取出 TCP 協議對應的處理函數 tcp_protocol,然后調用 handler 函數,也即 tcp_v4_rcv 函數

 

網絡協議棧的 TCP 層

從 tcp_v4_rcv 函數開始,我們的處理邏輯就從 IP 層到了 TCP 層

int tcp_v4_rcv(struct sk_buff *skb)
{
  struct net *net = dev_net(skb->dev);
  const struct iphdr *iph;
  const struct tcphdr *th;
  bool refcounted;
  struct sock *sk;
  int ret;
......
  th = (const struct tcphdr *)skb->data;
  iph = ip_hdr(skb);
......
  TCP_SKB_CB(skb)->seq = ntohl(th->seq);
  TCP_SKB_CB(skb)->end_seq = (TCP_SKB_CB(skb)->seq + th->syn + th->fin + skb->len - th->doff * 4);
  TCP_SKB_CB(skb)->ack_seq = ntohl(th->ack_seq);
  TCP_SKB_CB(skb)->tcp_flags = tcp_flag_byte(th);
  TCP_SKB_CB(skb)->tcp_tw_isn = 0;
  TCP_SKB_CB(skb)->ip_dsfield = ipv4_get_dsfield(iph);
  TCP_SKB_CB(skb)->sacked   = 0;

lookup:
  sk = __inet_lookup_skb(&tcp_hashinfo, skb, __tcp_hdrlen(th), th->source, th->dest, &refcounted);

process:
  if (sk->sk_state == TCP_TIME_WAIT)
    goto do_time_wait;

  if (sk->sk_state == TCP_NEW_SYN_RECV) {
......
  }
......
  th = (const struct tcphdr *)skb->data;
  iph = ip_hdr(skb);

  skb->dev = NULL;

  if (sk->sk_state == TCP_LISTEN) {
    ret = tcp_v4_do_rcv(sk, skb);
    goto put_and_return;
  }
......
  if (!sock_owned_by_user(sk)) {
    if (!tcp_prequeue(sk, skb))
      ret = tcp_v4_do_rcv(sk, skb);
  } else if (tcp_add_backlog(sk, skb)) {
    goto discard_and_relse;
  }
......
}

在 tcp_v4_rcv 中,得到 TCP 的頭之后,我們可以開始處理 TCP 層的事情。因為 TCP 層是分狀態的,狀態被維護在數據結構 struct sock 里面,因而我們要根據 IP 地址以及 TCP 頭里面的內容,在 tcp_hashinfo 中找到這個包對應的 struct sock,從而得到這個包對應的連接的狀態。

接下來,我們就根據不同的狀態做不同的處理,例如,上面代碼中的 TCP_LISTEN、TCP_NEW_SYN_RECV 狀態屬於連接建立過程中。這個我們在講三次握手的時候講過了。再如,TCP_TIME_WAIT 狀態是連接結束的時候的狀態,這個我們暫時可以不用看

接下來,我們來分析最主流的網絡包的接收過程,這里面涉及三個隊列:
    backlog 隊列
    prequeue 隊列
    sk_receive_queue 隊列
    為什么接收網絡包的過程,需要在這三個隊列里面倒騰過來、倒騰過去呢?這是因為,同樣一個網絡包要在三個主體之間交接。
第一個主體是軟中斷的處理過程。如果你沒忘記的話,我們在執行 tcp_v4_rcv 函數的時候,依然處於軟中斷的處理邏輯里,所以必然會占用這個軟中斷。
第二個主體就是用戶態進程。如果用戶態觸發系統調用 read 讀取網絡包,也要從隊列里面找。第三個主體就是內核協議棧。哪怕用戶進程沒有調用 read,讀取網絡包,當網絡包來的時候,也得有一個地方收着呀。

這時候,我們就能夠了解上面代碼中 sock_owned_by_user 的意思了,其實就是說,當前這個 sock 是不是正有一個用戶態進程等着讀數據呢,如果沒有,內核協議棧也調用 tcp_add_backlog,暫存在 backlog 隊列中,並且抓緊離開軟中斷的處理過程

如果有一個用戶態進程等待讀取數據呢?我們先調用 tcp_prequeue,也即趕緊放入 prequeue 隊列,並且離開軟中斷的處理過程。在這個函數里面,我們會看到對於 sysctl_tcp_low_latency 的判斷,也即是不是要低時延地處理網絡包

如果把 sysctl_tcp_low_latency 設置為 0,那就要放在 prequeue 隊列中暫存,這樣不用等待網絡包處理完畢,就可以離開軟中斷的處理過程,但是會造成比較長的時延。如果把 sysctl_tcp_low_latency 設置為 1,我們還是調用 tcp_v4_do_rcv

int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
{
  struct sock *rsk;

  if (sk->sk_state == TCP_ESTABLISHED) { /* Fast path */
    struct dst_entry *dst = sk->sk_rx_dst;
......
    tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len);
    return 0;
  }
......
  if (tcp_rcv_state_process(sk, skb)) {
......
  }
  return 0;
......
}

在 tcp_v4_do_rcv 中,分兩種情況,一種情況是連接已經建立,處於 TCP_ESTABLISHED 狀態,調用 tcp_rcv_established。另一種情況,就是其他的狀態,調用 tcp_rcv_state_process

int tcp_rcv_state_process(struct sock *sk, struct sk_buff *skb)
{
  struct tcp_sock *tp = tcp_sk(sk);
  struct inet_connection_sock *icsk = inet_csk(sk);
  const struct tcphdr *th = tcp_hdr(skb);
  struct request_sock *req;
  int queued = 0;
  bool acceptable;

  switch (sk->sk_state) {
  case TCP_CLOSE:
......
  case TCP_LISTEN:
......
  case TCP_SYN_SENT:
......
  }
......
  switch (sk->sk_state) {
  case TCP_SYN_RECV:
......
  case TCP_FIN_WAIT1: 
......
  case TCP_CLOSING:
......
  case TCP_LAST_ACK:
......
    }

  /* step 7: process the segment text */
  switch (sk->sk_state) {
  case TCP_CLOSE_WAIT:
  case TCP_CLOSING:
  case TCP_LAST_ACK:
......
  case TCP_FIN_WAIT1:
  case TCP_FIN_WAIT2:
......
  case TCP_ESTABLISHED:
......
  }
}

在 tcp_rcv_state_process 中,如果我們對着 TCP 的狀態圖進行比對,能看到,對於 TCP 所有狀態的處理,其中和連接建立相關的狀態,咱們已經分析過,所以我們重點關注連接狀態下的工作模式。

 

 

在連接狀態下,我們會調用 tcp_rcv_established。在這個函數里面,我們會調用 tcp_data_queue,將其放入 sk_receive_queue 隊列進行處理。

static void tcp_data_queue(struct sock *sk, struct sk_buff *skb)
{
  struct tcp_sock *tp = tcp_sk(sk);
  bool fragstolen = false;
......
  if (TCP_SKB_CB(skb)->seq == tp->rcv_nxt) {
    if (tcp_receive_window(tp) == 0)
      goto out_of_window;

    /* Ok. In sequence. In window. */
    if (tp->ucopy.task == current &&
        tp->copied_seq == tp->rcv_nxt && tp->ucopy.len &&
        sock_owned_by_user(sk) && !tp->urg_data) {
      int chunk = min_t(unsigned int, skb->len,
            tp->ucopy.len);

      __set_current_state(TASK_RUNNING);

      if (!skb_copy_datagram_msg(skb, 0, tp->ucopy.msg, chunk)) {
        tp->ucopy.len -= chunk;
        tp->copied_seq += chunk;
        eaten = (chunk == skb->len);
        tcp_rcv_space_adjust(sk);
      }
    }

    if (eaten <= 0) {
queue_and_out:
......
      eaten = tcp_queue_rcv(sk, skb, 0, &fragstolen);
    }
    tcp_rcv_nxt_update(tp, TCP_SKB_CB(skb)->end_seq);
......
    if (!RB_EMPTY_ROOT(&tp->out_of_order_queue)) {
      tcp_ofo_queue(sk);
......
    }
......
    return;
  }

  if (!after(TCP_SKB_CB(skb)->end_seq, tp->rcv_nxt)) {
    /* A retransmit, 2nd most common case.  Force an immediate ack. */
    tcp_dsack_set(sk, TCP_SKB_CB(skb)->seq, TCP_SKB_CB(skb)->end_seq);

out_of_window:
    tcp_enter_quickack_mode(sk);
    inet_csk_schedule_ack(sk);
drop:
    tcp_drop(sk, skb);
    return;
  }

  /* Out of window. F.e. zero window probe. */
  if (!before(TCP_SKB_CB(skb)->seq, tp->rcv_nxt + tcp_receive_window(tp)))
    goto out_of_window;

  tcp_enter_quickack_mode(sk);

  if (before(TCP_SKB_CB(skb)->seq, tp->rcv_nxt)) {
    /* Partial packet, seq < rcv_next < end_seq */
    tcp_dsack_set(sk, TCP_SKB_CB(skb)->seq, tp->rcv_nxt);
    /* If window is closed, drop tail of packet. But after
     * remembering D-SACK for its head made in previous line.
     */
    if (!tcp_receive_window(tp))
      goto out_of_window;
    goto queue_and_out;
  }

  tcp_data_queue_ofo(sk, skb);
}

在 tcp_data_queue 中,對於收到的網絡包,我們要分情況進行處理:

第一種情況,seq == tp->rcv_nxt,說明來的網絡包正是我服務端期望的下一個網絡包。這個時候我們判斷 sock_owned_by_user,也即用戶進程也是正在等待讀取,這種情況下,就直接 skb_copy_datagram_msg,將網絡包拷貝給用戶進程就可以了

如果用戶進程沒有正在等待讀取,或者因為內存原因沒有能夠拷貝成功,tcp_queue_rcv 里面還是將網絡包放入 sk_receive_queue 隊列。接下來,tcp_rcv_nxt_update 將 tp->rcv_nxt 設置為 end_seq,也即當前的網絡包接收成功后,更新下一個期待的網絡包。這個時候,我們還會判斷一下另一個隊列,out_of_order_queue,也看看亂序隊列的情況,看看亂序隊列里面的包,會不會因為這個新的網絡包的到來,也能放入到 sk_receive_queue 隊列中。

例如,客戶端發送的網絡包序號為 56789。在 5 還沒有到達的時候,服務端的 rcv_nxt 應該是 5,也即期望下一個網絡包是 5。但是由於中間網絡通路的問題,56 還沒到達服務端,78 已經到達了服務端了,這就出現了亂序

亂序的包不能進入 sk_receive_queue 隊列。因為一旦進入到這個隊列,意味着可以發送給用戶進程。然而,按照 TCP 的定義,用戶進程應該是按順序收到包的,沒有排好序,就不能給用戶進程。所以,78 不能進入 sk_receive_queue 隊列,只能暫時放在 out_of_order_queue 亂序隊列中。

當 56 到達的時候,56 先進入 sk_receive_queue 隊列。這個時候我們再來看 out_of_order_queue 亂序隊列中的 78,發現能夠接上。於是,78 也能進入 sk_receive_queue 隊列了。tcp_ofo_queue 函數就是做這個事情的。


第二種情況,end_seq 不大於 rcv_nxt,也即服務端期望網絡包 5。但是,來了一個網絡包 3,怎樣才會出現這種情況呢?肯定是服務端早就收到了網絡包 3,但是 ACK 沒有到達客戶端,中途丟了,那客戶端就認為網絡包 3 沒有發送成功,於是又發送了一遍,這種情況下,要趕緊給客戶端再發送一次 ACK,表示早就收到了。

第三種情況,seq 不小於 rcv_nxt + tcp_receive_window。這說明客戶端發送得太猛了。本來 seq 肯定應該在接收窗口里面的,這樣服務端才來得及處理,結果現在超出了接收窗口,說明客戶端一下子把服務端給塞滿了。

這種情況下,服務端不能再接收數據包了,只能發送 ACK 了,在 ACK 中會將接收窗口為 0 的情況告知客戶端,客戶端就知道不能再發送了。這個時候雙方只能交互窗口探測數據包,直到服務端因為用戶進程把數據讀走了,空出接收窗口,才能在 ACK 里面再次告訴客戶端,又有窗口了,又能發送數據包了

第四種情況,seq 小於 rcv_nxt,但是 end_seq 大於 rcv_nxt,這說明從 seq 到 rcv_nxt 這部分網絡包原來的 ACK 客戶端沒有收到,所以重新發送了一次,從 rcv_nxt 到 end_seq 時新發送的,可以放入 sk_receive_queue 隊列。

當前四種情況都排除掉,說明網絡包一定是一個亂序包了。這里有點兒難理解,我們還是用上面那個亂序的例子仔細分析一下 rcv_nxt=5。我們假設 tcp_receive_window 也是 5,也即超過 10 服務端就接收不了了。當前來的這個網絡包既不在 rcv_nxt 之前(不是 3 這種),也不在 rcv_nxt + tcp_receive_window 之后(不是 11 這種),說明這正在我們期望的接收窗口里面,但是又不是 rcv_nxt(不是我們馬上期望的網絡包 5),這正是上面的例子中網絡包 78 的情況。對於網絡包 78,我們只好調用 tcp_data_queue_ofo 進入 out_of_order_queue 亂序隊列,但是沒有關系,當網絡包 56 到來的時候,我們會走第一種情況,把 78 拿出來放到 sk_receive_queue 隊列中。

 

Socket 層

當接收的網絡包進入各種隊列之后,接下來我們就要等待用戶進程去讀取它們了

讀取一個 socket,就像讀取一個文件一樣,讀取 socket 的文件描述符,通過 read 系統調用。read 系統調用對於一個文件描述符的操作,大致過程都是類似的,在文件系統那一節,我們已經詳細解析過。最終它會調用到用來表示一個打開文件的結構 stuct file 指向的 file_operations 操作

static const struct file_operations socket_file_ops = {
  .owner =  THIS_MODULE,
  .llseek =  no_llseek,
  .read_iter =  sock_read_iter,
  .write_iter =  sock_write_iter,
  .poll =    sock_poll,
  .unlocked_ioctl = sock_ioctl,
  .mmap =    sock_mmap,
  .release =  sock_close,
  .fasync =  sock_fasync,
  .sendpage =  sock_sendpage,
  .splice_write = generic_splice_sendpage,
  .splice_read =  sock_splice_read,
};

按照文件系統的讀取流程,調用的是 sock_read_iter

static ssize_t sock_read_iter(struct kiocb *iocb, struct iov_iter *to)
{
  struct file *file = iocb->ki_filp;
  struct socket *sock = file->private_data;
  struct msghdr msg = {.msg_iter = *to,
           .msg_iocb = iocb};
  ssize_t res;

  if (file->f_flags & O_NONBLOCK)
    msg.msg_flags = MSG_DONTWAIT;
......
  res = sock_recvmsg(sock, &msg, msg.msg_flags);
  *to = msg.msg_iter;
  return res;
}

在 sock_read_iter 中,通過 VFS 中的 struct file,將創建好的 socket 結構拿出來,然后調用 sock_recvmsg,sock_recvmsg 會調用 sock_recvmsg_nosec

static inline int sock_recvmsg_nosec(struct socket *sock, struct msghdr *msg, int flags)
{
  return sock->ops->recvmsg(sock, msg, msg_data_left(msg), flags);
}

這里調用了 socket 的 ops 的 recvmsg,這個我們遇到好幾次了。根據 inet_stream_ops 的定義,這里調用的是 inet_recvmsg

int inet_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
     int flags)
{
  struct sock *sk = sock->sk;
  int addr_len = 0;
  int err;
......
  err = sk->sk_prot->recvmsg(sk, msg, size, flags & MSG_DONTWAIT,
           flags & ~MSG_DONTWAIT, &addr_len);
......
}

這里面,從 socket 結構,我們可以得到更底層的 sock 結構,然后調用 sk_prot 的 recvmsg 方法。這個同樣遇到好幾次了,根據 tcp_prot 的定義,調用的是 tcp_recvmsg

int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock,
    int flags, int *addr_len)
{
  struct tcp_sock *tp = tcp_sk(sk);
  int copied = 0;
  u32 peek_seq;
  u32 *seq;
  unsigned long used;
  int err;
  int target;    /* Read at least this many bytes */
  long timeo;
  struct task_struct *user_recv = NULL;
  struct sk_buff *skb, *last;
.....
  do {
    u32 offset;
......
    /* Next get a buffer. */
    last = skb_peek_tail(&sk->sk_receive_queue);
    skb_queue_walk(&sk->sk_receive_queue, skb) {
      last = skb;
      offset = *seq - TCP_SKB_CB(skb)->seq;
      if (offset < skb->len)
        goto found_ok_skb;
......
    }
......
    if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) {
      /* Install new reader */
      if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) {
        user_recv = current;
        tp->ucopy.task = user_recv;
        tp->ucopy.msg = msg;
      }

      tp->ucopy.len = len;
      /* Look: we have the following (pseudo)queues:
       *
       * 1. packets in flight
       * 2. backlog
       * 3. prequeue
       * 4. receive_queue
       *
       * Each queue can be processed only if the next ones
       * are empty. 
       */
      if (!skb_queue_empty(&tp->ucopy.prequeue))
        goto do_prequeue;
    }

    if (copied >= target) {
      /* Do not sleep, just process backlog. */
      release_sock(sk);
      lock_sock(sk);
    } else {
      sk_wait_data(sk, &timeo, last);
    }

    if (user_recv) {
      int chunk;
      chunk = len - tp->ucopy.len;
      if (chunk != 0) {
        len -= chunk;
        copied += chunk;
      }

      if (tp->rcv_nxt == tp->copied_seq &&
          !skb_queue_empty(&tp->ucopy.prequeue)) {
do_prequeue:
        tcp_prequeue_process(sk);

        chunk = len - tp->ucopy.len;
        if (chunk != 0) {
          len -= chunk;
          copied += chunk;
        }
      }
    }
    continue;
  found_ok_skb:
    /* Ok so how much can we use? */
    used = skb->len - offset;
    if (len < used)
      used = len;

    if (!(flags & MSG_TRUNC)) {
      err = skb_copy_datagram_msg(skb, offset, msg, used);
......
    }

    *seq += used;
    copied += used;
    len -= used;

    tcp_rcv_space_adjust(sk);
......
  } while (len > 0);
......
}

cp_recvmsg 這個函數比較長,里面邏輯也很復雜,好在里面有一段注釋概擴了這里面的邏輯。注釋里面提到了三個隊列,receive_queue 隊列、prequeue 隊列和 backlog 隊列。這里面,我們需要把前一個隊列處理完畢,才處理后一個隊列。

tcp_recvmsg 的整個邏輯也是這樣執行的:這里面有一個 while 循環,不斷地讀取網絡包。

這里,我們會先處理 sk_receive_queue 隊列。如果找到了網絡包,就跳到 found_ok_skb 這里。這里會調用 skb_copy_datagram_msg,將網絡包拷貝到用戶進程中,然后直接進入下一層循環。

直到 sk_receive_queue 隊列處理完畢,我們才到了 sysctl_tcp_low_latency 判斷。如果不需要低時延,則會有 prequeue 隊列。

於是,我們能就跳到 do_prequeue 這里,調用 tcp_prequeue_process 進行處理。如果 sysctl_tcp_low_latency 設置為 1,也即沒有 prequeue 隊列,或者 prequeue 隊列為空,則需要處理 backlog 隊列,在 release_sock 函數中處理。

release_sock 會調用 __release_sock,這里面會依次處理隊列中的網絡包

void release_sock(struct sock *sk)
{
......
  if (sk->sk_backlog.tail)
    __release_sock(sk);
......
}

static void __release_sock(struct sock *sk)
  __releases(&sk->sk_lock.slock)
  __acquires(&sk->sk_lock.slock)
{
  struct sk_buff *skb, *next;

  while ((skb = sk->sk_backlog.head) != NULL) {
    sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
    do {
      next = skb->next;
      prefetch(next);
      skb->next = NULL;
      sk_backlog_rcv(sk, skb);
      cond_resched();
      skb = next;
    } while (skb != NULL);
  }
......
}

最后,哪里都沒有網絡包,我們只好調用 sk_wait_data,繼續等待在哪里,等待網絡包的到來

 

總結

 

  •  硬件網卡接收到網絡包之后,通過 DMA 技術,將網絡包放入 Ring Buffer;
  • 硬件網卡通過中斷通知 CPU 新的網絡包的到來;網卡驅動程序會注冊中斷處理函數 ixgb_intr;
  • 中斷處理函數處理完需要暫時屏蔽中斷的核心流程之后,通過軟中斷 NET_RX_SOFTIRQ 觸發接下來的處理過程;
  • NET_RX_SOFTIRQ 軟中斷處理函數 net_rx_action,net_rx_action 會調用 napi_poll,進而調用 ixgb_clean_rx_irq,從 Ring Buffer 中讀取數據到內核 struct sk_buff;
  • 調用 netif_receive_skb 進入內核網絡協議棧,進行一些關於 VLAN 的二層邏輯處理后,調用 ip_rcv 進入三層 IP 層;在 IP 層,會處理 iptables 規則,然后調用 ip_local_deliver 交給更上層 TCP 層;在 TCP 層調用 tcp_v4_rcv,這里面有三個隊列需要處理,如果當前的 Socket 不是正在被讀;
  • 取,則放入 backlog 隊列,如果正在被讀取,不需要很實時的話,則放入 prequeue 隊列,其他情況調用 tcp_v4_do_rcv;
  • 在 tcp_v4_do_rcv 中,如果是處於 TCP_ESTABLISHED 狀態,調用 tcp_rcv_established,其他的狀態,調用 tcp_rcv_state_process;
  • 在 tcp_rcv_established 中,調用 tcp_data_queue,如果序列號能夠接的上,則放入 sk_receive_queue 隊列;
  • 如果序列號接不上,則暫時放入 out_of_order_queue 隊列,等序列號能夠接上的時候,再放入 sk_receive_queue 隊列。

至此內核接收網絡包的過程到此結束,接下來就是用戶態讀取網絡包的過程,這個過程分成幾個層次

  • VFS 層:read 系統調用找到 struct file,根據里面的 file_operations 的定義,調用 sock_read_iter 函數。sock_read_iter 函數調用 sock_recvmsg 函數。
  • Socket 層:從 struct file 里面的 private_data 得到 struct socket,根據里面 ops 的定義,調用 inet_recvmsg 函數。
  • Sock 層:從 struct socket 里面的 sk 得到 struct sock,根據里面 sk_prot 的定義,調用 tcp_recvmsg 函數。
  • TCP 層:tcp_recvmsg 函數會依次讀取 receive_queue 隊列、prequeue 隊列和 backlog 隊列

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM